]> git.feebdaed.xyz Git - 0xmirror/grpc-go.git/commitdiff
xdsclient: move listener resource type implementation to match external xdsclient...
authorEaswar Swaminathan <easwars@google.com>
Tue, 14 Oct 2025 17:18:24 +0000 (10:18 -0700)
committerGitHub <noreply@github.com>
Tue, 14 Oct 2025 17:18:24 +0000 (10:18 -0700)
Addresses https://github.com/grpc/grpc-go/issues/8381

This PR *only* changes the implementation of the listener resource type
to adhere to the external xdsclient API. The other resource type
implementations will be handled in subsequent PRs, and once all resource
type implementations have switched to the external xdsclient API, we can
get rid of some of existing APIs.

RELEASE NOTES: none

21 files changed:
internal/xds/clients/xdsclient/channel.go
internal/xds/clients/xdsclient/helpers_test.go
internal/xds/clients/xdsclient/resource_type.go
internal/xds/clients/xdsclient/test/helpers_test.go
internal/xds/resolver/watch_service.go
internal/xds/resolver/xds_resolver.go
internal/xds/server/listener_wrapper.go
internal/xds/xds.go
internal/xds/xdsclient/client.go
internal/xds/xdsclient/clientimpl_test.go
internal/xds/xdsclient/clientimpl_watchers.go
internal/xds/xdsclient/metrics_test.go
internal/xds/xdsclient/resource_types.go
internal/xds/xdsclient/tests/ads_stream_ack_nack_test.go
internal/xds/xdsclient/tests/ads_stream_restart_test.go
internal/xds/xdsclient/tests/federation_watchers_test.go
internal/xds/xdsclient/tests/lds_watchers_test.go
internal/xds/xdsclient/tests/resource_update_test.go
internal/xds/xdsclient/xdsresource/listener_resource_type.go
internal/xds/xdsclient/xdsresource/resource_type.go
xds/csds/csds_e2e_test.go

index e36e362201d6b04101be553d5f08563a4b8fe89c..7c40f1dab8f84138d80009d2c05387d4c95139af 100644 (file)
@@ -253,10 +253,7 @@ func decodeResponse(opts *DecodeOptions, rType *ResourceType, resp response) (ma
        perResourceErrors := make(map[string]error) // Tracks resource validation errors, where we have a resource name.
        ret := make(map[string]dataAndErrTuple)     // Return result, a map from resource name to either resource data or error.
        for _, r := range resp.resources {
-               result, err := rType.Decoder.Decode(AnyProto{
-                       TypeURL: r.GetTypeUrl(),
-                       Value:   r.GetValue(),
-               }, *opts)
+               result, err := rType.Decoder.Decode(NewAnyProto(r), *opts)
 
                // Name field of the result is left unpopulated only when resource
                // deserialization fails.
index 6f5f8597881de8483204a42c03f72beef0f4d85a..b10dbcf354f6147a6f5d108663d776433a568779 100644 (file)
@@ -45,9 +45,8 @@ func Test(t *testing.T) {
 }
 
 const (
-       defaultTestWatchExpiryTimeout = 100 * time.Millisecond
-       defaultTestTimeout            = 5 * time.Second
-       defaultTestShortTimeout       = 10 * time.Millisecond // For events expected to *not* happen.
+       defaultTestTimeout      = 5 * time.Second
+       defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
        // listenerResourceTypeName represents the transport agnostic name for the
        // listener resource.
        listenerResourceTypeName = "ListenerResource"
@@ -162,12 +161,8 @@ type listenerDecoder struct{}
 
 // Decode deserializes and validates an xDS resource serialized inside the
 // provided `Any` proto, as received from the xDS management server.
-func (listenerDecoder) Decode(resource AnyProto, _ DecodeOptions) (*DecodeResult, error) {
-       rProto := &anypb.Any{
-               TypeUrl: resource.TypeURL,
-               Value:   resource.Value,
-       }
-       name, listener, err := unmarshalListenerResource(rProto)
+func (listenerDecoder) Decode(resource *AnyProto, _ DecodeOptions) (*DecodeResult, error) {
+       name, listener, err := unmarshalListenerResource(resource.ToAny())
        switch {
        case name == "":
                // Name is unset only when protobuf deserialization fails.
index ae8e21dfef30c53419295b1ed51042739cd6820c..ea16c21d34a3addba8cc586f89b8a520138d6045 100644 (file)
@@ -18,6 +18,8 @@
 
 package xdsclient
 
+import "google.golang.org/protobuf/types/known/anypb"
+
 // ResourceType wraps all resource-type specific functionality. Each supported
 // resource type needs to provide an implementation of the Decoder.
 type ResourceType struct {
@@ -59,13 +61,30 @@ type Decoder interface {
        //
        // If unmarshalling or validation fails, it returns a non-nil error.
        // Otherwise, returns a fully populated DecodeResult.
-       Decode(resource AnyProto, options DecodeOptions) (*DecodeResult, error)
+       Decode(resource *AnyProto, options DecodeOptions) (*DecodeResult, error)
 }
 
 // AnyProto contains the type URL and serialized proto data of an xDS resource.
 type AnyProto struct {
-       TypeURL string
-       Value   []byte
+       typeURL string
+       value   []byte
+}
+
+// NewAnyProto creates an AnyProto from an anypb.Any. Must be called with a
+// non-nil argument.
+func NewAnyProto(a *anypb.Any) *AnyProto {
+       return &AnyProto{
+               typeURL: a.TypeUrl,
+               value:   a.Value,
+       }
+}
+
+// ToAny converts an AnyProto to an anypb.Any. Never returns nil.
+func (a *AnyProto) ToAny() *anypb.Any {
+       return &anypb.Any{
+               TypeUrl: a.typeURL,
+               Value:   a.value,
+       }
 }
 
 // DecodeOptions wraps the options required by ResourceType implementations for
index ae7d5cea69453c51bc1114a4deb5f284e4db9208..1d6ac9ca7f4ad2cec96b461de86f0c58b5b646dc 100644 (file)
@@ -173,12 +173,8 @@ type listenerDecoder struct{}
 
 // Decode deserializes and validates an xDS resource serialized inside the
 // provided `Any` proto, as received from the xDS management server.
-func (listenerDecoder) Decode(resource xdsclient.AnyProto, _ xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) {
-       rProto := &anypb.Any{
-               TypeUrl: resource.TypeURL,
-               Value:   resource.Value,
-       }
-       name, listener, err := unmarshalListenerResource(rProto)
+func (listenerDecoder) Decode(resource *xdsclient.AnyProto, _ xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) {
+       name, listener, err := unmarshalListenerResource(resource.ToAny())
        switch {
        case name == "":
                // Name is unset only when protobuf deserialization fails.
index 44b885c4405523103a03f0765703887f2bbbe2f4..43ff23b7ca4c16b40f198a156d33efb0bcf41f35 100644 (file)
@@ -36,8 +36,8 @@ func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatch
        return lw
 }
 
-func (l *listenerWatcher) ResourceChanged(update *xdsresource.ListenerResourceData, onDone func()) {
-       handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone() }
+func (l *listenerWatcher) ResourceChanged(update *xdsresource.ListenerUpdate, onDone func()) {
+       handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update); onDone() }
        l.parent.serializer.ScheduleOr(handleUpdate, onDone)
 }
 
index 8bb960a6567f151fbc5bacc0e257fbbf828b79ef..9180cd0768992eccb2fcffa443a57dda974cd127 100644 (file)
@@ -237,7 +237,7 @@ type xdsResolver struct {
        ldsResourceName     string
        listenerWatcher     *listenerWatcher
        listenerUpdateRecvd bool
-       currentListener     xdsresource.ListenerUpdate
+       currentListener     *xdsresource.ListenerUpdate
 
        rdsResourceName        string
        routeConfigWatcher     *routeConfigWatcher
@@ -510,7 +510,7 @@ func (r *xdsResolver) onResourceError(err error) {
 }
 
 // Only executed in the context of a serializer callback.
-func (r *xdsResolver) onListenerResourceUpdate(update xdsresource.ListenerUpdate) {
+func (r *xdsResolver) onListenerResourceUpdate(update *xdsresource.ListenerUpdate) {
        if r.logger.V(2) {
                r.logger.Infof("Received update for Listener resource %q: %v", r.ldsResourceName, pretty.ToJSON(update))
        }
index 1f7da61175e6cc9e4840ad47a03cf86b5e25193c..49cfdb635a2010eb46805d07347e6a424b62d4f5 100644 (file)
@@ -33,6 +33,7 @@ import (
        internalgrpclog "google.golang.org/grpc/internal/grpclog"
        "google.golang.org/grpc/internal/grpcsync"
        "google.golang.org/grpc/internal/xds/bootstrap"
+       "google.golang.org/grpc/internal/xds/clients/xdsclient"
        "google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
 )
 
@@ -59,6 +60,7 @@ type ServingModeCallback func(addr net.Addr, mode connectivity.ServingMode, err
 // the listenerWrapper.
 type XDSClient interface {
        WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func())
+       WatchResourceV2(typeURL, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func())
        BootstrapConfig() *bootstrap.Config
 }
 
@@ -386,17 +388,17 @@ type ldsWatcher struct {
        name   string
 }
 
-func (lw *ldsWatcher) ResourceChanged(update *xdsresource.ListenerResourceData, onDone func()) {
+func (lw *ldsWatcher) ResourceChanged(update *xdsresource.ListenerUpdate, onDone func()) {
        defer onDone()
        if lw.parent.closed.HasFired() {
                lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update)
                return
        }
        if lw.logger.V(2) {
-               lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update.Resource)
+               lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update)
        }
        l := lw.parent
-       ilc := update.Resource.InboundListenerCfg
+       ilc := update.InboundListenerCfg
        // Make sure that the socket address on the received Listener resource
        // matches the address of the net.Listener passed to us by the user. This
        // check is done here instead of at the XDSClient layer because of the
index b9a4ec90a5ed8ee4bc9b59491de3cbb0162d5c3a..4d5a85ef35a80a8efc079f44fa6e18f315b88e1c 100644 (file)
@@ -93,9 +93,6 @@ func SetLocalityIDInEndpoint(endpoint resolver.Endpoint, l clients.Locality) res
        return endpoint
 }
 
-// ResourceTypeMapForTesting maps TypeUrl to corresponding ResourceType.
-var ResourceTypeMapForTesting map[string]any
-
 // UnknownCSMLabels are TelemetryLabels emitted from CDS if CSM Telemetry Label
 // data is not present in the CDS Resource.
 var UnknownCSMLabels = map[string]string{
index 514273164402aefca756f03020cd6b9f7dc2e794..1d803402c4d5ab66039dcdd87ce7d3984b6a5bb9 100644 (file)
@@ -23,10 +23,12 @@ package xdsclient
 import (
        "context"
 
-       v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
        "google.golang.org/grpc/internal/xds/bootstrap"
        "google.golang.org/grpc/internal/xds/clients/lrsclient"
+       "google.golang.org/grpc/internal/xds/clients/xdsclient"
        "google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
+
+       v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
 )
 
 // XDSClient is a full fledged gRPC client which queries a set of discovery APIs
@@ -49,6 +51,11 @@ type XDSClient interface {
        // the watcher is canceled. Callers need to handle this case.
        WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func())
 
+       // WatchResourceV2 matches the API of the external xdsclient interface.
+       // Once all users of xdsclient have been moved to this watch API, we can
+       // remove the WatchResource API above, and rename this to WatchResource.
+       WatchResourceV2(typeURL, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func())
+
        ReportLoad(*bootstrap.ServerConfig) (*lrsclient.LoadStore, func(context.Context))
 
        BootstrapConfig() *bootstrap.Config
index d2fc8f7f933258cfade84b0804c072a9ee60a814..87a35b8ab079d81bf0e83353a86eb3b2356a017d 100644 (file)
@@ -81,7 +81,7 @@ func (s) TestBuildXDSClientConfig_Success(t *testing.T) {
                                        Node:        clients.Node{ID: node.GetId(), Cluster: node.GetCluster(), Metadata: node.Metadata, Locality: clients.Locality{Region: node.Locality.Region, Zone: node.Locality.Zone, SubZone: node.Locality.SubZone}, UserAgentName: node.UserAgentName, UserAgentVersion: node.GetUserAgentVersion()},
                                        Authorities: map[string]xdsclient.Authority{},
                                        ResourceTypes: map[string]xdsclient.ResourceType{
-                                               version.V3ListenerURL:    {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericListenerResourceTypeDecoder(c)},
+                                               version.V3ListenerURL:    {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewListenerResourceTypeDecoder(c)},
                                                version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder()},
                                                version.V3ClusterURL:     {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(c, gServerCfgMap)},
                                                version.V3EndpointsURL:   {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder()},
@@ -119,7 +119,7 @@ func (s) TestBuildXDSClientConfig_Success(t *testing.T) {
                                        Node:        clients.Node{ID: node.GetId(), Cluster: node.GetCluster(), Metadata: node.Metadata, UserAgentName: node.UserAgentName, UserAgentVersion: node.GetUserAgentVersion()},
                                        Authorities: map[string]xdsclient.Authority{"auth1": {XDSServers: []xdsclient.ServerConfig{expTopLevelS}}, "auth2": {XDSServers: []xdsclient.ServerConfig{expAuth2S}}},
                                        ResourceTypes: map[string]xdsclient.ResourceType{
-                                               version.V3ListenerURL:    {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericListenerResourceTypeDecoder(c)},
+                                               version.V3ListenerURL:    {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewListenerResourceTypeDecoder(c)},
                                                version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder()},
                                                version.V3ClusterURL:     {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(c, gSCfgMap)},
                                                version.V3EndpointsURL:   {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder()},
@@ -151,7 +151,7 @@ func (s) TestBuildXDSClientConfig_Success(t *testing.T) {
                                        Node:        clients.Node{ID: node.GetId(), Cluster: node.GetCluster(), Metadata: node.Metadata, UserAgentName: node.UserAgentName, UserAgentVersion: node.GetUserAgentVersion()},
                                        Authorities: map[string]xdsclient.Authority{},
                                        ResourceTypes: map[string]xdsclient.ResourceType{
-                                               version.V3ListenerURL:    {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericListenerResourceTypeDecoder(c)},
+                                               version.V3ListenerURL:    {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewListenerResourceTypeDecoder(c)},
                                                version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder()},
                                                version.V3ClusterURL:     {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(c, gServerCfgMap)},
                                                version.V3EndpointsURL:   {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder()},
@@ -183,7 +183,7 @@ func (s) TestBuildXDSClientConfig_Success(t *testing.T) {
                                        Node:        clients.Node{ID: node.GetId(), Cluster: node.GetCluster(), Metadata: node.Metadata, UserAgentName: node.UserAgentName, UserAgentVersion: node.GetUserAgentVersion()},
                                        Authorities: map[string]xdsclient.Authority{},
                                        ResourceTypes: map[string]xdsclient.ResourceType{
-                                               version.V3ListenerURL:    {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericListenerResourceTypeDecoder(c)},
+                                               version.V3ListenerURL:    {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewListenerResourceTypeDecoder(c)},
                                                version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder()},
                                                version.V3ClusterURL:     {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(c, gServerCfgMap)},
                                                version.V3EndpointsURL:   {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder()},
index 398de1ed73b6645f91d7a91d34be835dc7ae82a5..b90d3d36c669aaa3c7b80298cee3013764539993 100644 (file)
@@ -18,6 +18,7 @@
 package xdsclient
 
 import (
+       "google.golang.org/grpc/internal/xds/clients/xdsclient"
        "google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
 )
 
@@ -29,3 +30,7 @@ import (
 func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func()) {
        return c.XDSClient.WatchResource(rType.TypeURL(), resourceName, xdsresource.GenericResourceWatcher(watcher))
 }
+
+func (c *clientImpl) WatchResourceV2(typeURL, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func()) {
+       return c.XDSClient.WatchResource(typeURL, resourceName, watcher)
+}
index 26a38aea381e5c6ba463bbf6f039902544812a63..9b3541d110a56184aa0ba87ea35e933fccff64c9 100644 (file)
@@ -40,7 +40,7 @@ import (
 
 type noopListenerWatcher struct{}
 
-func (noopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerResourceData, onDone func()) {
+func (noopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerUpdate, onDone func()) {
        onDone()
 }
 
index 88451ab8257e3350489d936e8fa195178d75c2e0..a044eedf829415a29f88ca9afa288438235472c4 100644 (file)
@@ -30,7 +30,7 @@ func supportedResourceTypes(config *bootstrap.Config, gServerCfgMap map[xdsclien
                        TypeURL:                    version.V3ListenerURL,
                        TypeName:                   xdsresource.ListenerResourceTypeName,
                        AllResourcesRequiredInSotW: true,
-                       Decoder:                    xdsresource.NewGenericListenerResourceTypeDecoder(config),
+                       Decoder:                    xdsresource.NewListenerResourceTypeDecoder(config),
                },
                version.V3RouteConfigURL: {
                        TypeURL:                    version.V3RouteConfigURL,
index 29ddae55aa47d084f0b20581441e6aca88b4cf4e..962b7c28a841daa38be97b47319c8768aa0a2ede 100644 (file)
@@ -154,7 +154,7 @@ func (s) TestADS_ACK_NACK_Simple(t *testing.T) {
 
        // Verify the update received by the watcher.
        wantUpdate := listenerUpdateErrTuple{
-               update: xdsresource.ListenerUpdate{
+               update: &xdsresource.ListenerUpdate{
                        RouteConfigName: routeConfigName,
                        HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
                },
@@ -459,7 +459,7 @@ func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) {
 
        // Verify the update received by the watcher.
        wantUpdate := listenerUpdateErrTuple{
-               update: xdsresource.ListenerUpdate{
+               update: &xdsresource.ListenerUpdate{
                        RouteConfigName: routeConfigName,
                        HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
                },
index 81f3a3c4535bec2a8da07d5a2c3d891ea655b408..3a4c61acabeab77606c5176342e957ceba44494d 100644 (file)
@@ -160,7 +160,7 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) {
 
        // Verify the update received by the watcher.
        wantListenerUpdate := listenerUpdateErrTuple{
-               update: xdsresource.ListenerUpdate{
+               update: &xdsresource.ListenerUpdate{
                        RouteConfigName: routeConfigName,
                        HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
                },
index 2e844d5046ba21719ca1b7881964cbcb70d72aa6..436235bfdbfd94ca3659aff27ca3e2ca4f586cec 100644 (file)
@@ -122,7 +122,7 @@ func (s) TestFederation_ListenerResourceContextParamOrder(t *testing.T) {
        }
 
        wantUpdate := listenerUpdateErrTuple{
-               update: xdsresource.ListenerUpdate{
+               update: &xdsresource.ListenerUpdate{
                        RouteConfigName: "rds-resource",
                        HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
                },
index 877148639b2c718db2c21ce145a752cbf295d6b1..16d613d62a22c399a6a30d8fc2a9f94d63a5b456 100644 (file)
@@ -48,7 +48,7 @@ import (
 
 type noopListenerWatcher struct{}
 
-func (noopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerResourceData, onDone func()) {
+func (noopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerUpdate, onDone func()) {
        onDone()
 }
 func (noopListenerWatcher) ResourceError(_ error, onDone func()) {
@@ -59,7 +59,7 @@ func (noopListenerWatcher) AmbientError(_ error, onDone func()) {
 }
 
 type listenerUpdateErrTuple struct {
-       update xdsresource.ListenerUpdate
+       update *xdsresource.ListenerUpdate
        err    error
 }
 
@@ -71,8 +71,8 @@ func newListenerWatcher() *listenerWatcher {
        return &listenerWatcher{updateCh: testutils.NewChannel()}
 }
 
-func (lw *listenerWatcher) ResourceChanged(update *xdsresource.ListenerResourceData, onDone func()) {
-       lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource})
+func (lw *listenerWatcher) ResourceChanged(update *xdsresource.ListenerUpdate, onDone func()) {
+       lw.updateCh.Send(listenerUpdateErrTuple{update: update})
        onDone()
 }
 
@@ -100,8 +100,8 @@ func newListenerWatcherMultiple(size int) *listenerWatcherMultiple {
        return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(size)}
 }
 
-func (lw *listenerWatcherMultiple) ResourceChanged(update *xdsresource.ListenerResourceData, onDone func()) {
-       lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource})
+func (lw *listenerWatcherMultiple) ResourceChanged(update *xdsresource.ListenerUpdate, onDone func()) {
+       lw.updateCh.Send(listenerUpdateErrTuple{update: update})
        onDone()
 }
 
@@ -218,7 +218,7 @@ func (s) TestLDSWatch(t *testing.T) {
                        updatedWatchedResource: e2e.DefaultClientListener(ldsName, "new-rds-resource"),
                        notWatchedResource:     e2e.DefaultClientListener("unsubscribed-lds-resource", rdsName),
                        wantUpdate: listenerUpdateErrTuple{
-                               update: xdsresource.ListenerUpdate{
+                               update: &xdsresource.ListenerUpdate{
                                        RouteConfigName: rdsName,
                                        HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
                                },
@@ -231,7 +231,7 @@ func (s) TestLDSWatch(t *testing.T) {
                        updatedWatchedResource: e2e.DefaultClientListener(ldsNameNewStyle, "new-rds-resource"),
                        notWatchedResource:     e2e.DefaultClientListener("unsubscribed-lds-resource", rdsNameNewStyle),
                        wantUpdate: listenerUpdateErrTuple{
-                               update: xdsresource.ListenerUpdate{
+                               update: &xdsresource.ListenerUpdate{
                                        RouteConfigName: rdsNameNewStyle,
                                        HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
                                },
@@ -360,13 +360,13 @@ func (s) TestLDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
                        watchedResource:        e2e.DefaultClientListener(ldsName, rdsName),
                        updatedWatchedResource: e2e.DefaultClientListener(ldsName, "new-rds-resource"),
                        wantUpdateV1: listenerUpdateErrTuple{
-                               update: xdsresource.ListenerUpdate{
+                               update: &xdsresource.ListenerUpdate{
                                        RouteConfigName: rdsName,
                                        HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
                                },
                        },
                        wantUpdateV2: listenerUpdateErrTuple{
-                               update: xdsresource.ListenerUpdate{
+                               update: &xdsresource.ListenerUpdate{
                                        RouteConfigName: "new-rds-resource",
                                        HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
                                },
@@ -378,13 +378,13 @@ func (s) TestLDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
                        watchedResource:        e2e.DefaultClientListener(ldsNameNewStyle, rdsNameNewStyle),
                        updatedWatchedResource: e2e.DefaultClientListener(ldsNameNewStyle, "new-rds-resource"),
                        wantUpdateV1: listenerUpdateErrTuple{
-                               update: xdsresource.ListenerUpdate{
+                               update: &xdsresource.ListenerUpdate{
                                        RouteConfigName: rdsNameNewStyle,
                                        HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
                                },
                        },
                        wantUpdateV2: listenerUpdateErrTuple{
-                               update: xdsresource.ListenerUpdate{
+                               update: &xdsresource.ListenerUpdate{
                                        RouteConfigName: "new-rds-resource",
                                        HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
                                },
@@ -572,7 +572,7 @@ func (s) TestLDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) {
        // resources returned differ only in the resource name. Therefore the
        // expected update is the same for all the watchers.
        wantUpdate := listenerUpdateErrTuple{
-               update: xdsresource.ListenerUpdate{
+               update: &xdsresource.ListenerUpdate{
                        RouteConfigName: rdsName,
                        HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
                },
@@ -653,7 +653,7 @@ func (s) TestLDSWatch_ResourceCaching(t *testing.T) {
 
        // Verify the contents of the received update.
        wantUpdate := listenerUpdateErrTuple{
-               update: xdsresource.ListenerUpdate{
+               update: &xdsresource.ListenerUpdate{
                        RouteConfigName: rdsName,
                        HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
                },
@@ -774,7 +774,7 @@ func (s) TestLDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
 
        // Verify the contents of the received update.
        wantUpdate := listenerUpdateErrTuple{
-               update: xdsresource.ListenerUpdate{
+               update: &xdsresource.ListenerUpdate{
                        RouteConfigName: rdsName,
                        HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
                },
@@ -871,7 +871,7 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) {
        // resources returned differ only in the resource name. Therefore the
        // expected update is the same for both watchers.
        wantUpdate := listenerUpdateErrTuple{
-               update: xdsresource.ListenerUpdate{
+               update: &xdsresource.ListenerUpdate{
                        RouteConfigName: rdsName,
                        HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
                },
@@ -918,7 +918,7 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) {
                t.Fatal(err)
        }
        wantUpdate = listenerUpdateErrTuple{
-               update: xdsresource.ListenerUpdate{
+               update: &xdsresource.ListenerUpdate{
                        RouteConfigName: "new-rds-resource",
                        HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
                },
@@ -978,7 +978,7 @@ func (s) TestLDSWatch_NewWatcherForRemovedResource(t *testing.T) {
 
        // Verify the contents of the received update for existing watch.
        wantUpdate := listenerUpdateErrTuple{
-               update: xdsresource.ListenerUpdate{
+               update: &xdsresource.ListenerUpdate{
                        RouteConfigName: rdsName,
                        HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
                },
@@ -1116,7 +1116,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
 
        // Verify the contents of the received update.
        wantUpdate := listenerUpdateErrTuple{
-               update: xdsresource.ListenerUpdate{
+               update: &xdsresource.ListenerUpdate{
                        RouteConfigName: rdsName,
                        HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
                },
@@ -1235,7 +1235,7 @@ func (s) TestLDSWatch_PartialValid(t *testing.T) {
        // Verify that the watcher watching the good resource receives a good
        // update.
        wantUpdate := listenerUpdateErrTuple{
-               update: xdsresource.ListenerUpdate{
+               update: &xdsresource.ListenerUpdate{
                        RouteConfigName: rdsName,
                        HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
                },
@@ -1318,7 +1318,7 @@ func (s) TestLDSWatch_PartialResponse(t *testing.T) {
 
        // Verify the contents of the received update for first watcher.
        wantUpdate1 := listenerUpdateErrTuple{
-               update: xdsresource.ListenerUpdate{
+               update: &xdsresource.ListenerUpdate{
                        RouteConfigName: rdsName,
                        HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
                },
@@ -1348,7 +1348,7 @@ func (s) TestLDSWatch_PartialResponse(t *testing.T) {
 
        // Verify the contents of the received update for the second watcher.
        wantUpdate2 := listenerUpdateErrTuple{
-               update: xdsresource.ListenerUpdate{
+               update: &xdsresource.ListenerUpdate{
                        RouteConfigName: rdsName,
                        HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
                },
index 298b152b7713269bf53db64a5f6266c4e3efcad6..2a7f146f983d2425ae809f626ed1acd6110dc046 100644 (file)
@@ -146,7 +146,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) {
                desc                     string
                resourceName             string
                managementServerResponse *v3discoverypb.DiscoveryResponse
-               wantUpdate               xdsresource.ListenerUpdate
+               wantUpdate               *xdsresource.ListenerUpdate
                wantErr                  string
                wantGenericXDSConfig     []*v3statuspb.ClientConfig_GenericXdsConfig
        }{
@@ -236,7 +236,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) {
                                VersionInfo: "1",
                                Resources:   []*anypb.Any{testutils.MarshalAny(t, resource1)},
                        },
-                       wantUpdate: xdsresource.ListenerUpdate{
+                       wantUpdate: &xdsresource.ListenerUpdate{
                                RouteConfigName: "route-configuration-name",
                                HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
                        },
@@ -258,7 +258,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) {
                                VersionInfo: "1",
                                Resources:   []*anypb.Any{testutils.MarshalAny(t, resource1), testutils.MarshalAny(t, resource2)},
                        },
-                       wantUpdate: xdsresource.ListenerUpdate{
+                       wantUpdate: &xdsresource.ListenerUpdate{
                                RouteConfigName: "route-configuration-name",
                                HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
                        },
index 100a06f97b67231bebbc25313a8eb818080c56f3..a1e1c672c88f06521cad03f1f2df20848f086c8c 100644 (file)
 package xdsresource
 
 import (
+       "bytes"
        "fmt"
 
-       "google.golang.org/grpc/internal/pretty"
        "google.golang.org/grpc/internal/xds/bootstrap"
-       xdsclient "google.golang.org/grpc/internal/xds/clients/xdsclient"
+       "google.golang.org/grpc/internal/xds/clients/xdsclient"
        "google.golang.org/grpc/internal/xds/xdsclient/xdsresource/version"
-       "google.golang.org/protobuf/proto"
-       "google.golang.org/protobuf/types/known/anypb"
 )
 
-const (
-       // ListenerResourceTypeName represents the transport agnostic name for the
-       // listener resource.
-       ListenerResourceTypeName = "ListenerResource"
-)
+// ListenerResourceTypeName is a human friendly name for the listener resource.
+const ListenerResourceTypeName = "ListenerResource"
+
+// listenerResourceDecoder is an implementation of the xdsclient.Decoder
+// interface for listener resources.
+type listenerResourceDecoder struct {
+       bootstrapConfig *bootstrap.Config
+}
 
-var (
-       // Compile time interface checks.
-       _ Type = listenerResourceType{}
-
-       // Singleton instantiation of the resource type implementation.
-       listenerType = listenerResourceType{
-               resourceTypeState: resourceTypeState{
-                       typeURL:                    version.V3ListenerURL,
-                       typeName:                   ListenerResourceTypeName,
-                       allResourcesRequiredInSotW: true,
-               },
+func (d *listenerResourceDecoder) Decode(resource *xdsclient.AnyProto, _ xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) {
+       name, listener, err := unmarshalListenerResource(resource.ToAny())
+       if name == "" {
+               // Name is unset only when protobuf deserialization fails.
+               return nil, err
+       }
+       if err != nil {
+               // Protobuf deserialization succeeded, but resource validation failed.
+               return &xdsclient.DecodeResult{
+                       Name:     name,
+                       Resource: &ListenerResourceData{Resource: ListenerUpdate{}},
+               }, err
+       }
+
+       // Perform extra validation here.
+       if err := listenerValidator(d.bootstrapConfig, listener); err != nil {
+               return &xdsclient.DecodeResult{
+                       Name:     name,
+                       Resource: &ListenerResourceData{Resource: ListenerUpdate{}},
+               }, err
        }
-)
 
-// listenerResourceType provides the resource-type specific functionality for a
-// Listener resource.
-//
-// Implements the Type interface.
-type listenerResourceType struct {
-       resourceTypeState
+       return &xdsclient.DecodeResult{
+               Name:     name,
+               Resource: &ListenerResourceData{Resource: listener},
+       }, nil
 }
 
 func securityConfigValidator(bc *bootstrap.Config, sc *SecurityConfig) error {
@@ -85,58 +92,23 @@ func listenerValidator(bc *bootstrap.Config, lis ListenerUpdate) error {
        })
 }
 
-// Decode deserializes and validates an xDS resource serialized inside the
-// provided `Any` proto, as received from the xDS management server.
-func (listenerResourceType) Decode(opts *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) {
-       name, listener, err := unmarshalListenerResource(resource)
-       switch {
-       case name == "":
-               // Name is unset only when protobuf deserialization fails.
-               return nil, err
-       case err != nil:
-               // Protobuf deserialization succeeded, but resource validation failed.
-               return &DecodeResult{Name: name, Resource: &ListenerResourceData{Resource: ListenerUpdate{}}}, err
-       }
-
-       // Perform extra validation here.
-       if err := listenerValidator(opts.BootstrapConfig, listener); err != nil {
-               return &DecodeResult{Name: name, Resource: &ListenerResourceData{Resource: ListenerUpdate{}}}, err
-       }
-
-       return &DecodeResult{Name: name, Resource: &ListenerResourceData{Resource: listener}}, nil
-}
-
-// ListenerResourceData wraps the configuration of a Listener resource as
-// received from the management server.
-//
-// Implements the ResourceData interface.
+// ListenerResourceData is an implementation of the xdsclient.ResourceData
+// interface for listener resources.
 type ListenerResourceData struct {
-       ResourceData
-
-       // TODO: We have always stored update structs by value. See if this can be
-       // switched to a pointer?
        Resource ListenerUpdate
 }
 
-// RawEqual returns true if other is equal to l.
-func (l *ListenerResourceData) RawEqual(other ResourceData) bool {
-       if l == nil && other == nil {
-               return true
-       }
-       if (l == nil) != (other == nil) {
+// Equal returns true if other is equal to l.
+func (l *ListenerResourceData) Equal(other xdsclient.ResourceData) bool {
+       if other == nil {
                return false
        }
-       return proto.Equal(l.Resource.Raw, other.Raw())
-}
-
-// ToJSON returns a JSON string representation of the resource data.
-func (l *ListenerResourceData) ToJSON() string {
-       return pretty.ToJSON(l.Resource)
+       return bytes.Equal(l.Bytes(), other.Bytes())
 }
 
-// Raw returns the underlying raw protobuf form of the listener resource.
-func (l *ListenerResourceData) Raw() *anypb.Any {
-       return l.Resource.Raw
+// Bytes returns the protobuf serialized bytes of the listener resource proto.
+func (l *ListenerResourceData) Bytes() []byte {
+       return l.Resource.Raw.GetValue()
 }
 
 // ListenerWatcher wraps the callbacks to be invoked for different
@@ -144,7 +116,7 @@ func (l *ListenerResourceData) Raw() *anypb.Any {
 // contains an exhaustive list of what method is invoked under what conditions.
 type ListenerWatcher interface {
        // ResourceChanged indicates a new version of the resource is available.
-       ResourceChanged(resource *ListenerResourceData, done func())
+       ResourceChanged(resource *ListenerUpdate, done func())
 
        // ResourceError indicates an error occurred while trying to fetch or
        // decode the associated resource. The previous version of the resource
@@ -163,9 +135,9 @@ type delegatingListenerWatcher struct {
        watcher ListenerWatcher
 }
 
-func (d *delegatingListenerWatcher) ResourceChanged(data ResourceData, onDone func()) {
+func (d *delegatingListenerWatcher) ResourceChanged(data xdsclient.ResourceData, onDone func()) {
        l := data.(*ListenerResourceData)
-       d.watcher.ResourceChanged(l, onDone)
+       d.watcher.ResourceChanged(&l.Resource, onDone)
 }
 func (d *delegatingListenerWatcher) ResourceError(err error, onDone func()) {
        d.watcher.ResourceError(err, onDone)
@@ -177,13 +149,12 @@ func (d *delegatingListenerWatcher) AmbientError(err error, onDone func()) {
 
 // WatchListener uses xDS to discover the configuration associated with the
 // provided listener resource name.
-func WatchListener(p Producer, name string, w ListenerWatcher) (cancel func()) {
-       delegator := &delegatingListenerWatcher{watcher: w}
-       return p.WatchResource(listenerType, name, delegator)
+func WatchListener(p ProducerV2, name string, w ListenerWatcher) (cancel func()) {
+       return p.WatchResourceV2(version.V3ListenerURL, name, &delegatingListenerWatcher{watcher: w})
 }
 
-// NewGenericListenerResourceTypeDecoder returns a xdsclient.Decoder that wraps
+// NewListenerResourceTypeDecoder returns a xdsclient.Decoder that wraps
 // the xdsresource.listenerType.
-func NewGenericListenerResourceTypeDecoder(bc *bootstrap.Config) xdsclient.Decoder {
-       return &GenericResourceTypeDecoder{ResourceType: listenerType, BootstrapConfig: bc}
+func NewListenerResourceTypeDecoder(bc *bootstrap.Config) xdsclient.Decoder {
+       return &listenerResourceDecoder{bootstrapConfig: bc}
 }
index 2c591312f1be79a9a2cdf6b6d7b2f4d4af26bc6d..aaa30cecfb118f549743a5b996af8928dcc074fd 100644 (file)
@@ -27,21 +27,11 @@ package xdsresource
 import (
        "fmt"
 
-       xdsinternal "google.golang.org/grpc/internal/xds"
        "google.golang.org/grpc/internal/xds/bootstrap"
        "google.golang.org/grpc/internal/xds/clients/xdsclient"
-       "google.golang.org/grpc/internal/xds/xdsclient/xdsresource/version"
        "google.golang.org/protobuf/types/known/anypb"
 )
 
-func init() {
-       xdsinternal.ResourceTypeMapForTesting = make(map[string]any)
-       xdsinternal.ResourceTypeMapForTesting[version.V3ListenerURL] = listenerType
-       xdsinternal.ResourceTypeMapForTesting[version.V3RouteConfigURL] = routeConfigType
-       xdsinternal.ResourceTypeMapForTesting[version.V3ClusterURL] = clusterType
-       xdsinternal.ResourceTypeMapForTesting[version.V3EndpointsURL] = endpointsType
-}
-
 // Producer contains a single method to discover resource configuration from a
 // remote management server using xDS APIs.
 //
@@ -55,6 +45,15 @@ type Producer interface {
        WatchResource(rType Type, resourceName string, watcher ResourceWatcher) (cancel func())
 }
 
+// ProducerV2 is like Producer, but uses the external xdsclient API.
+//
+// Once all resource type implementations have been migrated to use the external
+// xdsclient API, this interface will be renamed to Producer and the existing
+// Producer interface will be deleted.
+type ProducerV2 interface {
+       WatchResourceV2(typeURL, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func())
+}
+
 // ResourceWatcher is notified of the resource updates and errors that are
 // received by the xDS client from the management server.
 //
@@ -184,17 +183,13 @@ type GenericResourceTypeDecoder struct {
 
 // Decode deserialize and validate resource bytes of an xDS resource received
 // from the xDS management server.
-func (gd *GenericResourceTypeDecoder) Decode(resource xdsclient.AnyProto, gOpts xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) {
-       rProto := &anypb.Any{
-               TypeUrl: resource.TypeURL,
-               Value:   resource.Value,
-       }
+func (gd *GenericResourceTypeDecoder) Decode(resource *xdsclient.AnyProto, gOpts xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) {
        opts := &DecodeOptions{BootstrapConfig: gd.BootstrapConfig}
        if gOpts.ServerConfig != nil {
                opts.ServerConfig = gd.ServerConfigMap[*gOpts.ServerConfig]
        }
 
-       result, err := gd.ResourceType.Decode(opts, rProto)
+       result, err := gd.ResourceType.Decode(opts, resource.ToAny())
        if result == nil {
                return nil, err
        }
index d09deeb2682d7dd94ac82a7a170857206e8a728b..25b643e1516bd18213130db52c5df5e26f23e55e 100644 (file)
@@ -71,7 +71,7 @@ func Test(t *testing.T) {
 
 type nopListenerWatcher struct{}
 
-func (nopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerResourceData, onDone func()) {
+func (nopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerUpdate, onDone func()) {
        onDone()
 }
 func (nopListenerWatcher) ResourceError(_ error, onDone func()) {
@@ -138,7 +138,7 @@ func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWa
        }
 }
 
-func (w *blockingListenerWatcher) ResourceChanged(_ *xdsresource.ListenerResourceData, onDone func()) {
+func (w *blockingListenerWatcher) ResourceChanged(_ *xdsresource.ListenerUpdate, onDone func()) {
        writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
 }
 func (w *blockingListenerWatcher) ResourceError(_ error, onDone func()) {