From 7902ebe20bba8124bdb5edf2218cd072f2ad34b3 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 14 Oct 2025 10:18:24 -0700 Subject: [PATCH] xdsclient: move listener resource type implementation to match external xdsclient API (#8640) Addresses https://github.com/grpc/grpc-go/issues/8381 This PR *only* changes the implementation of the listener resource type to adhere to the external xdsclient API. The other resource type implementations will be handled in subsequent PRs, and once all resource type implementations have switched to the external xdsclient API, we can get rid of some of existing APIs. RELEASE NOTES: none --- internal/xds/clients/xdsclient/channel.go | 5 +- .../xds/clients/xdsclient/helpers_test.go | 13 +- .../xds/clients/xdsclient/resource_type.go | 25 +++- .../clients/xdsclient/test/helpers_test.go | 8 +- internal/xds/resolver/watch_service.go | 4 +- internal/xds/resolver/xds_resolver.go | 4 +- internal/xds/server/listener_wrapper.go | 8 +- internal/xds/xds.go | 3 - internal/xds/xdsclient/client.go | 9 +- internal/xds/xdsclient/clientimpl_test.go | 8 +- internal/xds/xdsclient/clientimpl_watchers.go | 5 + internal/xds/xdsclient/metrics_test.go | 2 +- internal/xds/xdsclient/resource_types.go | 2 +- .../tests/ads_stream_ack_nack_test.go | 4 +- .../tests/ads_stream_restart_test.go | 2 +- .../tests/federation_watchers_test.go | 2 +- .../xds/xdsclient/tests/lds_watchers_test.go | 44 +++--- .../xdsclient/tests/resource_update_test.go | 6 +- .../xdsresource/listener_resource_type.go | 131 +++++++----------- .../xdsclient/xdsresource/resource_type.go | 27 ++-- xds/csds/csds_e2e_test.go | 4 +- 21 files changed, 150 insertions(+), 166 deletions(-) diff --git a/internal/xds/clients/xdsclient/channel.go b/internal/xds/clients/xdsclient/channel.go index e36e3622..7c40f1da 100644 --- a/internal/xds/clients/xdsclient/channel.go +++ b/internal/xds/clients/xdsclient/channel.go @@ -253,10 +253,7 @@ func decodeResponse(opts *DecodeOptions, rType *ResourceType, resp response) (ma perResourceErrors := make(map[string]error) // Tracks resource validation errors, where we have a resource name. ret := make(map[string]dataAndErrTuple) // Return result, a map from resource name to either resource data or error. for _, r := range resp.resources { - result, err := rType.Decoder.Decode(AnyProto{ - TypeURL: r.GetTypeUrl(), - Value: r.GetValue(), - }, *opts) + result, err := rType.Decoder.Decode(NewAnyProto(r), *opts) // Name field of the result is left unpopulated only when resource // deserialization fails. diff --git a/internal/xds/clients/xdsclient/helpers_test.go b/internal/xds/clients/xdsclient/helpers_test.go index 6f5f8597..b10dbcf3 100644 --- a/internal/xds/clients/xdsclient/helpers_test.go +++ b/internal/xds/clients/xdsclient/helpers_test.go @@ -45,9 +45,8 @@ func Test(t *testing.T) { } const ( - defaultTestWatchExpiryTimeout = 100 * time.Millisecond - defaultTestTimeout = 5 * time.Second - defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen. + defaultTestTimeout = 5 * time.Second + defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen. // listenerResourceTypeName represents the transport agnostic name for the // listener resource. listenerResourceTypeName = "ListenerResource" @@ -162,12 +161,8 @@ type listenerDecoder struct{} // Decode deserializes and validates an xDS resource serialized inside the // provided `Any` proto, as received from the xDS management server. -func (listenerDecoder) Decode(resource AnyProto, _ DecodeOptions) (*DecodeResult, error) { - rProto := &anypb.Any{ - TypeUrl: resource.TypeURL, - Value: resource.Value, - } - name, listener, err := unmarshalListenerResource(rProto) +func (listenerDecoder) Decode(resource *AnyProto, _ DecodeOptions) (*DecodeResult, error) { + name, listener, err := unmarshalListenerResource(resource.ToAny()) switch { case name == "": // Name is unset only when protobuf deserialization fails. diff --git a/internal/xds/clients/xdsclient/resource_type.go b/internal/xds/clients/xdsclient/resource_type.go index ae8e21df..ea16c21d 100644 --- a/internal/xds/clients/xdsclient/resource_type.go +++ b/internal/xds/clients/xdsclient/resource_type.go @@ -18,6 +18,8 @@ package xdsclient +import "google.golang.org/protobuf/types/known/anypb" + // ResourceType wraps all resource-type specific functionality. Each supported // resource type needs to provide an implementation of the Decoder. type ResourceType struct { @@ -59,13 +61,30 @@ type Decoder interface { // // If unmarshalling or validation fails, it returns a non-nil error. // Otherwise, returns a fully populated DecodeResult. - Decode(resource AnyProto, options DecodeOptions) (*DecodeResult, error) + Decode(resource *AnyProto, options DecodeOptions) (*DecodeResult, error) } // AnyProto contains the type URL and serialized proto data of an xDS resource. type AnyProto struct { - TypeURL string - Value []byte + typeURL string + value []byte +} + +// NewAnyProto creates an AnyProto from an anypb.Any. Must be called with a +// non-nil argument. +func NewAnyProto(a *anypb.Any) *AnyProto { + return &AnyProto{ + typeURL: a.TypeUrl, + value: a.Value, + } +} + +// ToAny converts an AnyProto to an anypb.Any. Never returns nil. +func (a *AnyProto) ToAny() *anypb.Any { + return &anypb.Any{ + TypeUrl: a.typeURL, + Value: a.value, + } } // DecodeOptions wraps the options required by ResourceType implementations for diff --git a/internal/xds/clients/xdsclient/test/helpers_test.go b/internal/xds/clients/xdsclient/test/helpers_test.go index ae7d5cea..1d6ac9ca 100644 --- a/internal/xds/clients/xdsclient/test/helpers_test.go +++ b/internal/xds/clients/xdsclient/test/helpers_test.go @@ -173,12 +173,8 @@ type listenerDecoder struct{} // Decode deserializes and validates an xDS resource serialized inside the // provided `Any` proto, as received from the xDS management server. -func (listenerDecoder) Decode(resource xdsclient.AnyProto, _ xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) { - rProto := &anypb.Any{ - TypeUrl: resource.TypeURL, - Value: resource.Value, - } - name, listener, err := unmarshalListenerResource(rProto) +func (listenerDecoder) Decode(resource *xdsclient.AnyProto, _ xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) { + name, listener, err := unmarshalListenerResource(resource.ToAny()) switch { case name == "": // Name is unset only when protobuf deserialization fails. diff --git a/internal/xds/resolver/watch_service.go b/internal/xds/resolver/watch_service.go index 44b885c4..43ff23b7 100644 --- a/internal/xds/resolver/watch_service.go +++ b/internal/xds/resolver/watch_service.go @@ -36,8 +36,8 @@ func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatch return lw } -func (l *listenerWatcher) ResourceChanged(update *xdsresource.ListenerResourceData, onDone func()) { - handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone() } +func (l *listenerWatcher) ResourceChanged(update *xdsresource.ListenerUpdate, onDone func()) { + handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update); onDone() } l.parent.serializer.ScheduleOr(handleUpdate, onDone) } diff --git a/internal/xds/resolver/xds_resolver.go b/internal/xds/resolver/xds_resolver.go index 8bb960a6..9180cd07 100644 --- a/internal/xds/resolver/xds_resolver.go +++ b/internal/xds/resolver/xds_resolver.go @@ -237,7 +237,7 @@ type xdsResolver struct { ldsResourceName string listenerWatcher *listenerWatcher listenerUpdateRecvd bool - currentListener xdsresource.ListenerUpdate + currentListener *xdsresource.ListenerUpdate rdsResourceName string routeConfigWatcher *routeConfigWatcher @@ -510,7 +510,7 @@ func (r *xdsResolver) onResourceError(err error) { } // Only executed in the context of a serializer callback. -func (r *xdsResolver) onListenerResourceUpdate(update xdsresource.ListenerUpdate) { +func (r *xdsResolver) onListenerResourceUpdate(update *xdsresource.ListenerUpdate) { if r.logger.V(2) { r.logger.Infof("Received update for Listener resource %q: %v", r.ldsResourceName, pretty.ToJSON(update)) } diff --git a/internal/xds/server/listener_wrapper.go b/internal/xds/server/listener_wrapper.go index 1f7da611..49cfdb63 100644 --- a/internal/xds/server/listener_wrapper.go +++ b/internal/xds/server/listener_wrapper.go @@ -33,6 +33,7 @@ import ( internalgrpclog "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/xds/bootstrap" + "google.golang.org/grpc/internal/xds/clients/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" ) @@ -59,6 +60,7 @@ type ServingModeCallback func(addr net.Addr, mode connectivity.ServingMode, err // the listenerWrapper. type XDSClient interface { WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func()) + WatchResourceV2(typeURL, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func()) BootstrapConfig() *bootstrap.Config } @@ -386,17 +388,17 @@ type ldsWatcher struct { name string } -func (lw *ldsWatcher) ResourceChanged(update *xdsresource.ListenerResourceData, onDone func()) { +func (lw *ldsWatcher) ResourceChanged(update *xdsresource.ListenerUpdate, onDone func()) { defer onDone() if lw.parent.closed.HasFired() { lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update) return } if lw.logger.V(2) { - lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update.Resource) + lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update) } l := lw.parent - ilc := update.Resource.InboundListenerCfg + ilc := update.InboundListenerCfg // Make sure that the socket address on the received Listener resource // matches the address of the net.Listener passed to us by the user. This // check is done here instead of at the XDSClient layer because of the diff --git a/internal/xds/xds.go b/internal/xds/xds.go index b9a4ec90..4d5a85ef 100644 --- a/internal/xds/xds.go +++ b/internal/xds/xds.go @@ -93,9 +93,6 @@ func SetLocalityIDInEndpoint(endpoint resolver.Endpoint, l clients.Locality) res return endpoint } -// ResourceTypeMapForTesting maps TypeUrl to corresponding ResourceType. -var ResourceTypeMapForTesting map[string]any - // UnknownCSMLabels are TelemetryLabels emitted from CDS if CSM Telemetry Label // data is not present in the CDS Resource. var UnknownCSMLabels = map[string]string{ diff --git a/internal/xds/xdsclient/client.go b/internal/xds/xdsclient/client.go index 51427316..1d803402 100644 --- a/internal/xds/xdsclient/client.go +++ b/internal/xds/xdsclient/client.go @@ -23,10 +23,12 @@ package xdsclient import ( "context" - v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3" "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/internal/xds/clients/lrsclient" + "google.golang.org/grpc/internal/xds/clients/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" + + v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3" ) // XDSClient is a full fledged gRPC client which queries a set of discovery APIs @@ -49,6 +51,11 @@ type XDSClient interface { // the watcher is canceled. Callers need to handle this case. WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func()) + // WatchResourceV2 matches the API of the external xdsclient interface. + // Once all users of xdsclient have been moved to this watch API, we can + // remove the WatchResource API above, and rename this to WatchResource. + WatchResourceV2(typeURL, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func()) + ReportLoad(*bootstrap.ServerConfig) (*lrsclient.LoadStore, func(context.Context)) BootstrapConfig() *bootstrap.Config diff --git a/internal/xds/xdsclient/clientimpl_test.go b/internal/xds/xdsclient/clientimpl_test.go index d2fc8f7f..87a35b8a 100644 --- a/internal/xds/xdsclient/clientimpl_test.go +++ b/internal/xds/xdsclient/clientimpl_test.go @@ -81,7 +81,7 @@ func (s) TestBuildXDSClientConfig_Success(t *testing.T) { Node: clients.Node{ID: node.GetId(), Cluster: node.GetCluster(), Metadata: node.Metadata, Locality: clients.Locality{Region: node.Locality.Region, Zone: node.Locality.Zone, SubZone: node.Locality.SubZone}, UserAgentName: node.UserAgentName, UserAgentVersion: node.GetUserAgentVersion()}, Authorities: map[string]xdsclient.Authority{}, ResourceTypes: map[string]xdsclient.ResourceType{ - version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericListenerResourceTypeDecoder(c)}, + version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewListenerResourceTypeDecoder(c)}, version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder()}, version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(c, gServerCfgMap)}, version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder()}, @@ -119,7 +119,7 @@ func (s) TestBuildXDSClientConfig_Success(t *testing.T) { Node: clients.Node{ID: node.GetId(), Cluster: node.GetCluster(), Metadata: node.Metadata, UserAgentName: node.UserAgentName, UserAgentVersion: node.GetUserAgentVersion()}, Authorities: map[string]xdsclient.Authority{"auth1": {XDSServers: []xdsclient.ServerConfig{expTopLevelS}}, "auth2": {XDSServers: []xdsclient.ServerConfig{expAuth2S}}}, ResourceTypes: map[string]xdsclient.ResourceType{ - version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericListenerResourceTypeDecoder(c)}, + version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewListenerResourceTypeDecoder(c)}, version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder()}, version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(c, gSCfgMap)}, version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder()}, @@ -151,7 +151,7 @@ func (s) TestBuildXDSClientConfig_Success(t *testing.T) { Node: clients.Node{ID: node.GetId(), Cluster: node.GetCluster(), Metadata: node.Metadata, UserAgentName: node.UserAgentName, UserAgentVersion: node.GetUserAgentVersion()}, Authorities: map[string]xdsclient.Authority{}, ResourceTypes: map[string]xdsclient.ResourceType{ - version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericListenerResourceTypeDecoder(c)}, + version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewListenerResourceTypeDecoder(c)}, version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder()}, version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(c, gServerCfgMap)}, version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder()}, @@ -183,7 +183,7 @@ func (s) TestBuildXDSClientConfig_Success(t *testing.T) { Node: clients.Node{ID: node.GetId(), Cluster: node.GetCluster(), Metadata: node.Metadata, UserAgentName: node.UserAgentName, UserAgentVersion: node.GetUserAgentVersion()}, Authorities: map[string]xdsclient.Authority{}, ResourceTypes: map[string]xdsclient.ResourceType{ - version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericListenerResourceTypeDecoder(c)}, + version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewListenerResourceTypeDecoder(c)}, version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder()}, version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(c, gServerCfgMap)}, version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder()}, diff --git a/internal/xds/xdsclient/clientimpl_watchers.go b/internal/xds/xdsclient/clientimpl_watchers.go index 398de1ed..b90d3d36 100644 --- a/internal/xds/xdsclient/clientimpl_watchers.go +++ b/internal/xds/xdsclient/clientimpl_watchers.go @@ -18,6 +18,7 @@ package xdsclient import ( + "google.golang.org/grpc/internal/xds/clients/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" ) @@ -29,3 +30,7 @@ import ( func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func()) { return c.XDSClient.WatchResource(rType.TypeURL(), resourceName, xdsresource.GenericResourceWatcher(watcher)) } + +func (c *clientImpl) WatchResourceV2(typeURL, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func()) { + return c.XDSClient.WatchResource(typeURL, resourceName, watcher) +} diff --git a/internal/xds/xdsclient/metrics_test.go b/internal/xds/xdsclient/metrics_test.go index 26a38aea..9b3541d1 100644 --- a/internal/xds/xdsclient/metrics_test.go +++ b/internal/xds/xdsclient/metrics_test.go @@ -40,7 +40,7 @@ import ( type noopListenerWatcher struct{} -func (noopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerResourceData, onDone func()) { +func (noopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerUpdate, onDone func()) { onDone() } diff --git a/internal/xds/xdsclient/resource_types.go b/internal/xds/xdsclient/resource_types.go index 88451ab8..a044eedf 100644 --- a/internal/xds/xdsclient/resource_types.go +++ b/internal/xds/xdsclient/resource_types.go @@ -30,7 +30,7 @@ func supportedResourceTypes(config *bootstrap.Config, gServerCfgMap map[xdsclien TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, - Decoder: xdsresource.NewGenericListenerResourceTypeDecoder(config), + Decoder: xdsresource.NewListenerResourceTypeDecoder(config), }, version.V3RouteConfigURL: { TypeURL: version.V3RouteConfigURL, diff --git a/internal/xds/xdsclient/tests/ads_stream_ack_nack_test.go b/internal/xds/xdsclient/tests/ads_stream_ack_nack_test.go index 29ddae55..962b7c28 100644 --- a/internal/xds/xdsclient/tests/ads_stream_ack_nack_test.go +++ b/internal/xds/xdsclient/tests/ads_stream_ack_nack_test.go @@ -154,7 +154,7 @@ func (s) TestADS_ACK_NACK_Simple(t *testing.T) { // Verify the update received by the watcher. wantUpdate := listenerUpdateErrTuple{ - update: xdsresource.ListenerUpdate{ + update: &xdsresource.ListenerUpdate{ RouteConfigName: routeConfigName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, @@ -459,7 +459,7 @@ func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) { // Verify the update received by the watcher. wantUpdate := listenerUpdateErrTuple{ - update: xdsresource.ListenerUpdate{ + update: &xdsresource.ListenerUpdate{ RouteConfigName: routeConfigName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, diff --git a/internal/xds/xdsclient/tests/ads_stream_restart_test.go b/internal/xds/xdsclient/tests/ads_stream_restart_test.go index 81f3a3c4..3a4c61ac 100644 --- a/internal/xds/xdsclient/tests/ads_stream_restart_test.go +++ b/internal/xds/xdsclient/tests/ads_stream_restart_test.go @@ -160,7 +160,7 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) { // Verify the update received by the watcher. wantListenerUpdate := listenerUpdateErrTuple{ - update: xdsresource.ListenerUpdate{ + update: &xdsresource.ListenerUpdate{ RouteConfigName: routeConfigName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, diff --git a/internal/xds/xdsclient/tests/federation_watchers_test.go b/internal/xds/xdsclient/tests/federation_watchers_test.go index 2e844d50..436235bf 100644 --- a/internal/xds/xdsclient/tests/federation_watchers_test.go +++ b/internal/xds/xdsclient/tests/federation_watchers_test.go @@ -122,7 +122,7 @@ func (s) TestFederation_ListenerResourceContextParamOrder(t *testing.T) { } wantUpdate := listenerUpdateErrTuple{ - update: xdsresource.ListenerUpdate{ + update: &xdsresource.ListenerUpdate{ RouteConfigName: "rds-resource", HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, diff --git a/internal/xds/xdsclient/tests/lds_watchers_test.go b/internal/xds/xdsclient/tests/lds_watchers_test.go index 87714863..16d613d6 100644 --- a/internal/xds/xdsclient/tests/lds_watchers_test.go +++ b/internal/xds/xdsclient/tests/lds_watchers_test.go @@ -48,7 +48,7 @@ import ( type noopListenerWatcher struct{} -func (noopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerResourceData, onDone func()) { +func (noopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerUpdate, onDone func()) { onDone() } func (noopListenerWatcher) ResourceError(_ error, onDone func()) { @@ -59,7 +59,7 @@ func (noopListenerWatcher) AmbientError(_ error, onDone func()) { } type listenerUpdateErrTuple struct { - update xdsresource.ListenerUpdate + update *xdsresource.ListenerUpdate err error } @@ -71,8 +71,8 @@ func newListenerWatcher() *listenerWatcher { return &listenerWatcher{updateCh: testutils.NewChannel()} } -func (lw *listenerWatcher) ResourceChanged(update *xdsresource.ListenerResourceData, onDone func()) { - lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) +func (lw *listenerWatcher) ResourceChanged(update *xdsresource.ListenerUpdate, onDone func()) { + lw.updateCh.Send(listenerUpdateErrTuple{update: update}) onDone() } @@ -100,8 +100,8 @@ func newListenerWatcherMultiple(size int) *listenerWatcherMultiple { return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(size)} } -func (lw *listenerWatcherMultiple) ResourceChanged(update *xdsresource.ListenerResourceData, onDone func()) { - lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) +func (lw *listenerWatcherMultiple) ResourceChanged(update *xdsresource.ListenerUpdate, onDone func()) { + lw.updateCh.Send(listenerUpdateErrTuple{update: update}) onDone() } @@ -218,7 +218,7 @@ func (s) TestLDSWatch(t *testing.T) { updatedWatchedResource: e2e.DefaultClientListener(ldsName, "new-rds-resource"), notWatchedResource: e2e.DefaultClientListener("unsubscribed-lds-resource", rdsName), wantUpdate: listenerUpdateErrTuple{ - update: xdsresource.ListenerUpdate{ + update: &xdsresource.ListenerUpdate{ RouteConfigName: rdsName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, @@ -231,7 +231,7 @@ func (s) TestLDSWatch(t *testing.T) { updatedWatchedResource: e2e.DefaultClientListener(ldsNameNewStyle, "new-rds-resource"), notWatchedResource: e2e.DefaultClientListener("unsubscribed-lds-resource", rdsNameNewStyle), wantUpdate: listenerUpdateErrTuple{ - update: xdsresource.ListenerUpdate{ + update: &xdsresource.ListenerUpdate{ RouteConfigName: rdsNameNewStyle, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, @@ -360,13 +360,13 @@ func (s) TestLDSWatch_TwoWatchesForSameResourceName(t *testing.T) { watchedResource: e2e.DefaultClientListener(ldsName, rdsName), updatedWatchedResource: e2e.DefaultClientListener(ldsName, "new-rds-resource"), wantUpdateV1: listenerUpdateErrTuple{ - update: xdsresource.ListenerUpdate{ + update: &xdsresource.ListenerUpdate{ RouteConfigName: rdsName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, }, wantUpdateV2: listenerUpdateErrTuple{ - update: xdsresource.ListenerUpdate{ + update: &xdsresource.ListenerUpdate{ RouteConfigName: "new-rds-resource", HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, @@ -378,13 +378,13 @@ func (s) TestLDSWatch_TwoWatchesForSameResourceName(t *testing.T) { watchedResource: e2e.DefaultClientListener(ldsNameNewStyle, rdsNameNewStyle), updatedWatchedResource: e2e.DefaultClientListener(ldsNameNewStyle, "new-rds-resource"), wantUpdateV1: listenerUpdateErrTuple{ - update: xdsresource.ListenerUpdate{ + update: &xdsresource.ListenerUpdate{ RouteConfigName: rdsNameNewStyle, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, }, wantUpdateV2: listenerUpdateErrTuple{ - update: xdsresource.ListenerUpdate{ + update: &xdsresource.ListenerUpdate{ RouteConfigName: "new-rds-resource", HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, @@ -572,7 +572,7 @@ func (s) TestLDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { // resources returned differ only in the resource name. Therefore the // expected update is the same for all the watchers. wantUpdate := listenerUpdateErrTuple{ - update: xdsresource.ListenerUpdate{ + update: &xdsresource.ListenerUpdate{ RouteConfigName: rdsName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, @@ -653,7 +653,7 @@ func (s) TestLDSWatch_ResourceCaching(t *testing.T) { // Verify the contents of the received update. wantUpdate := listenerUpdateErrTuple{ - update: xdsresource.ListenerUpdate{ + update: &xdsresource.ListenerUpdate{ RouteConfigName: rdsName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, @@ -774,7 +774,7 @@ func (s) TestLDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { // Verify the contents of the received update. wantUpdate := listenerUpdateErrTuple{ - update: xdsresource.ListenerUpdate{ + update: &xdsresource.ListenerUpdate{ RouteConfigName: rdsName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, @@ -871,7 +871,7 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) { // resources returned differ only in the resource name. Therefore the // expected update is the same for both watchers. wantUpdate := listenerUpdateErrTuple{ - update: xdsresource.ListenerUpdate{ + update: &xdsresource.ListenerUpdate{ RouteConfigName: rdsName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, @@ -918,7 +918,7 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) { t.Fatal(err) } wantUpdate = listenerUpdateErrTuple{ - update: xdsresource.ListenerUpdate{ + update: &xdsresource.ListenerUpdate{ RouteConfigName: "new-rds-resource", HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, @@ -978,7 +978,7 @@ func (s) TestLDSWatch_NewWatcherForRemovedResource(t *testing.T) { // Verify the contents of the received update for existing watch. wantUpdate := listenerUpdateErrTuple{ - update: xdsresource.ListenerUpdate{ + update: &xdsresource.ListenerUpdate{ RouteConfigName: rdsName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, @@ -1116,7 +1116,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { // Verify the contents of the received update. wantUpdate := listenerUpdateErrTuple{ - update: xdsresource.ListenerUpdate{ + update: &xdsresource.ListenerUpdate{ RouteConfigName: rdsName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, @@ -1235,7 +1235,7 @@ func (s) TestLDSWatch_PartialValid(t *testing.T) { // Verify that the watcher watching the good resource receives a good // update. wantUpdate := listenerUpdateErrTuple{ - update: xdsresource.ListenerUpdate{ + update: &xdsresource.ListenerUpdate{ RouteConfigName: rdsName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, @@ -1318,7 +1318,7 @@ func (s) TestLDSWatch_PartialResponse(t *testing.T) { // Verify the contents of the received update for first watcher. wantUpdate1 := listenerUpdateErrTuple{ - update: xdsresource.ListenerUpdate{ + update: &xdsresource.ListenerUpdate{ RouteConfigName: rdsName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, @@ -1348,7 +1348,7 @@ func (s) TestLDSWatch_PartialResponse(t *testing.T) { // Verify the contents of the received update for the second watcher. wantUpdate2 := listenerUpdateErrTuple{ - update: xdsresource.ListenerUpdate{ + update: &xdsresource.ListenerUpdate{ RouteConfigName: rdsName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, diff --git a/internal/xds/xdsclient/tests/resource_update_test.go b/internal/xds/xdsclient/tests/resource_update_test.go index 298b152b..2a7f146f 100644 --- a/internal/xds/xdsclient/tests/resource_update_test.go +++ b/internal/xds/xdsclient/tests/resource_update_test.go @@ -146,7 +146,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { desc string resourceName string managementServerResponse *v3discoverypb.DiscoveryResponse - wantUpdate xdsresource.ListenerUpdate + wantUpdate *xdsresource.ListenerUpdate wantErr string wantGenericXDSConfig []*v3statuspb.ClientConfig_GenericXdsConfig }{ @@ -236,7 +236,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { VersionInfo: "1", Resources: []*anypb.Any{testutils.MarshalAny(t, resource1)}, }, - wantUpdate: xdsresource.ListenerUpdate{ + wantUpdate: &xdsresource.ListenerUpdate{ RouteConfigName: "route-configuration-name", HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, @@ -258,7 +258,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { VersionInfo: "1", Resources: []*anypb.Any{testutils.MarshalAny(t, resource1), testutils.MarshalAny(t, resource2)}, }, - wantUpdate: xdsresource.ListenerUpdate{ + wantUpdate: &xdsresource.ListenerUpdate{ RouteConfigName: "route-configuration-name", HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, diff --git a/internal/xds/xdsclient/xdsresource/listener_resource_type.go b/internal/xds/xdsclient/xdsresource/listener_resource_type.go index 100a06f9..a1e1c672 100644 --- a/internal/xds/xdsclient/xdsresource/listener_resource_type.go +++ b/internal/xds/xdsclient/xdsresource/listener_resource_type.go @@ -18,42 +18,49 @@ package xdsresource import ( + "bytes" "fmt" - "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/internal/xds/bootstrap" - xdsclient "google.golang.org/grpc/internal/xds/clients/xdsclient" + "google.golang.org/grpc/internal/xds/clients/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource/version" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/anypb" ) -const ( - // ListenerResourceTypeName represents the transport agnostic name for the - // listener resource. - ListenerResourceTypeName = "ListenerResource" -) +// ListenerResourceTypeName is a human friendly name for the listener resource. +const ListenerResourceTypeName = "ListenerResource" + +// listenerResourceDecoder is an implementation of the xdsclient.Decoder +// interface for listener resources. +type listenerResourceDecoder struct { + bootstrapConfig *bootstrap.Config +} -var ( - // Compile time interface checks. - _ Type = listenerResourceType{} - - // Singleton instantiation of the resource type implementation. - listenerType = listenerResourceType{ - resourceTypeState: resourceTypeState{ - typeURL: version.V3ListenerURL, - typeName: ListenerResourceTypeName, - allResourcesRequiredInSotW: true, - }, +func (d *listenerResourceDecoder) Decode(resource *xdsclient.AnyProto, _ xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) { + name, listener, err := unmarshalListenerResource(resource.ToAny()) + if name == "" { + // Name is unset only when protobuf deserialization fails. + return nil, err + } + if err != nil { + // Protobuf deserialization succeeded, but resource validation failed. + return &xdsclient.DecodeResult{ + Name: name, + Resource: &ListenerResourceData{Resource: ListenerUpdate{}}, + }, err + } + + // Perform extra validation here. + if err := listenerValidator(d.bootstrapConfig, listener); err != nil { + return &xdsclient.DecodeResult{ + Name: name, + Resource: &ListenerResourceData{Resource: ListenerUpdate{}}, + }, err } -) -// listenerResourceType provides the resource-type specific functionality for a -// Listener resource. -// -// Implements the Type interface. -type listenerResourceType struct { - resourceTypeState + return &xdsclient.DecodeResult{ + Name: name, + Resource: &ListenerResourceData{Resource: listener}, + }, nil } func securityConfigValidator(bc *bootstrap.Config, sc *SecurityConfig) error { @@ -85,58 +92,23 @@ func listenerValidator(bc *bootstrap.Config, lis ListenerUpdate) error { }) } -// Decode deserializes and validates an xDS resource serialized inside the -// provided `Any` proto, as received from the xDS management server. -func (listenerResourceType) Decode(opts *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) { - name, listener, err := unmarshalListenerResource(resource) - switch { - case name == "": - // Name is unset only when protobuf deserialization fails. - return nil, err - case err != nil: - // Protobuf deserialization succeeded, but resource validation failed. - return &DecodeResult{Name: name, Resource: &ListenerResourceData{Resource: ListenerUpdate{}}}, err - } - - // Perform extra validation here. - if err := listenerValidator(opts.BootstrapConfig, listener); err != nil { - return &DecodeResult{Name: name, Resource: &ListenerResourceData{Resource: ListenerUpdate{}}}, err - } - - return &DecodeResult{Name: name, Resource: &ListenerResourceData{Resource: listener}}, nil -} - -// ListenerResourceData wraps the configuration of a Listener resource as -// received from the management server. -// -// Implements the ResourceData interface. +// ListenerResourceData is an implementation of the xdsclient.ResourceData +// interface for listener resources. type ListenerResourceData struct { - ResourceData - - // TODO: We have always stored update structs by value. See if this can be - // switched to a pointer? Resource ListenerUpdate } -// RawEqual returns true if other is equal to l. -func (l *ListenerResourceData) RawEqual(other ResourceData) bool { - if l == nil && other == nil { - return true - } - if (l == nil) != (other == nil) { +// Equal returns true if other is equal to l. +func (l *ListenerResourceData) Equal(other xdsclient.ResourceData) bool { + if other == nil { return false } - return proto.Equal(l.Resource.Raw, other.Raw()) -} - -// ToJSON returns a JSON string representation of the resource data. -func (l *ListenerResourceData) ToJSON() string { - return pretty.ToJSON(l.Resource) + return bytes.Equal(l.Bytes(), other.Bytes()) } -// Raw returns the underlying raw protobuf form of the listener resource. -func (l *ListenerResourceData) Raw() *anypb.Any { - return l.Resource.Raw +// Bytes returns the protobuf serialized bytes of the listener resource proto. +func (l *ListenerResourceData) Bytes() []byte { + return l.Resource.Raw.GetValue() } // ListenerWatcher wraps the callbacks to be invoked for different @@ -144,7 +116,7 @@ func (l *ListenerResourceData) Raw() *anypb.Any { // contains an exhaustive list of what method is invoked under what conditions. type ListenerWatcher interface { // ResourceChanged indicates a new version of the resource is available. - ResourceChanged(resource *ListenerResourceData, done func()) + ResourceChanged(resource *ListenerUpdate, done func()) // ResourceError indicates an error occurred while trying to fetch or // decode the associated resource. The previous version of the resource @@ -163,9 +135,9 @@ type delegatingListenerWatcher struct { watcher ListenerWatcher } -func (d *delegatingListenerWatcher) ResourceChanged(data ResourceData, onDone func()) { +func (d *delegatingListenerWatcher) ResourceChanged(data xdsclient.ResourceData, onDone func()) { l := data.(*ListenerResourceData) - d.watcher.ResourceChanged(l, onDone) + d.watcher.ResourceChanged(&l.Resource, onDone) } func (d *delegatingListenerWatcher) ResourceError(err error, onDone func()) { d.watcher.ResourceError(err, onDone) @@ -177,13 +149,12 @@ func (d *delegatingListenerWatcher) AmbientError(err error, onDone func()) { // WatchListener uses xDS to discover the configuration associated with the // provided listener resource name. -func WatchListener(p Producer, name string, w ListenerWatcher) (cancel func()) { - delegator := &delegatingListenerWatcher{watcher: w} - return p.WatchResource(listenerType, name, delegator) +func WatchListener(p ProducerV2, name string, w ListenerWatcher) (cancel func()) { + return p.WatchResourceV2(version.V3ListenerURL, name, &delegatingListenerWatcher{watcher: w}) } -// NewGenericListenerResourceTypeDecoder returns a xdsclient.Decoder that wraps +// NewListenerResourceTypeDecoder returns a xdsclient.Decoder that wraps // the xdsresource.listenerType. -func NewGenericListenerResourceTypeDecoder(bc *bootstrap.Config) xdsclient.Decoder { - return &GenericResourceTypeDecoder{ResourceType: listenerType, BootstrapConfig: bc} +func NewListenerResourceTypeDecoder(bc *bootstrap.Config) xdsclient.Decoder { + return &listenerResourceDecoder{bootstrapConfig: bc} } diff --git a/internal/xds/xdsclient/xdsresource/resource_type.go b/internal/xds/xdsclient/xdsresource/resource_type.go index 2c591312..aaa30cec 100644 --- a/internal/xds/xdsclient/xdsresource/resource_type.go +++ b/internal/xds/xdsclient/xdsresource/resource_type.go @@ -27,21 +27,11 @@ package xdsresource import ( "fmt" - xdsinternal "google.golang.org/grpc/internal/xds" "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/internal/xds/clients/xdsclient" - "google.golang.org/grpc/internal/xds/xdsclient/xdsresource/version" "google.golang.org/protobuf/types/known/anypb" ) -func init() { - xdsinternal.ResourceTypeMapForTesting = make(map[string]any) - xdsinternal.ResourceTypeMapForTesting[version.V3ListenerURL] = listenerType - xdsinternal.ResourceTypeMapForTesting[version.V3RouteConfigURL] = routeConfigType - xdsinternal.ResourceTypeMapForTesting[version.V3ClusterURL] = clusterType - xdsinternal.ResourceTypeMapForTesting[version.V3EndpointsURL] = endpointsType -} - // Producer contains a single method to discover resource configuration from a // remote management server using xDS APIs. // @@ -55,6 +45,15 @@ type Producer interface { WatchResource(rType Type, resourceName string, watcher ResourceWatcher) (cancel func()) } +// ProducerV2 is like Producer, but uses the external xdsclient API. +// +// Once all resource type implementations have been migrated to use the external +// xdsclient API, this interface will be renamed to Producer and the existing +// Producer interface will be deleted. +type ProducerV2 interface { + WatchResourceV2(typeURL, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func()) +} + // ResourceWatcher is notified of the resource updates and errors that are // received by the xDS client from the management server. // @@ -184,17 +183,13 @@ type GenericResourceTypeDecoder struct { // Decode deserialize and validate resource bytes of an xDS resource received // from the xDS management server. -func (gd *GenericResourceTypeDecoder) Decode(resource xdsclient.AnyProto, gOpts xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) { - rProto := &anypb.Any{ - TypeUrl: resource.TypeURL, - Value: resource.Value, - } +func (gd *GenericResourceTypeDecoder) Decode(resource *xdsclient.AnyProto, gOpts xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) { opts := &DecodeOptions{BootstrapConfig: gd.BootstrapConfig} if gOpts.ServerConfig != nil { opts.ServerConfig = gd.ServerConfigMap[*gOpts.ServerConfig] } - result, err := gd.ResourceType.Decode(opts, rProto) + result, err := gd.ResourceType.Decode(opts, resource.ToAny()) if result == nil { return nil, err } diff --git a/xds/csds/csds_e2e_test.go b/xds/csds/csds_e2e_test.go index d09deeb2..25b643e1 100644 --- a/xds/csds/csds_e2e_test.go +++ b/xds/csds/csds_e2e_test.go @@ -71,7 +71,7 @@ func Test(t *testing.T) { type nopListenerWatcher struct{} -func (nopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerResourceData, onDone func()) { +func (nopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerUpdate, onDone func()) { onDone() } func (nopListenerWatcher) ResourceError(_ error, onDone func()) { @@ -138,7 +138,7 @@ func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWa } } -func (w *blockingListenerWatcher) ResourceChanged(_ *xdsresource.ListenerResourceData, onDone func()) { +func (w *blockingListenerWatcher) ResourceChanged(_ *xdsresource.ListenerUpdate, onDone func()) { writeOnDone(w.testCtxDone, w.onDoneCh, onDone) } func (w *blockingListenerWatcher) ResourceError(_ error, onDone func()) { -- 2.43.0