]> git.feebdaed.xyz Git - 0xmirror/grpc-go.git/commitdiff
xds/cdsbalancer: increase buffer size of requested resource channel in test (#8467)
authoreshitachandwani <59800922+eshitachandwani@users.noreply.github.com>
Mon, 18 Aug 2025 05:15:30 +0000 (10:45 +0530)
committerGitHub <noreply@github.com>
Mon, 18 Aug 2025 05:15:30 +0000 (10:45 +0530)
RELEASE NOTES: N/A

Fixes: https://github.com/grpc/grpc-go/issues/8462
The main issue was that the requests were getting dropped since we use a
[non-blocking
send](https://github.com/grpc/grpc-go/blob/a5e7cd6d4c2c31b1e6649789c2ddc9a82ad6b5fa/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go#L222C5-L227C6)
for resources in test along with buffer size of just
[one](https://github.com/grpc/grpc-go/blob/a5e7cd6d4c2c31b1e6649789c2ddc9a82ad6b5fa/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go#L210)
which was resulting in resource request updates being dropped if the
receiver is not executing at the exact moment.
Fix:
Changed the `setupManagementServer` to take `listener` and `OnStreamReq`
function as a parameter and in the `TestWatcher` added a blocking send
whenever a cluster resource is requested.

xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go
xds/internal/balancer/cdsbalancer/cdsbalancer_test.go

index f1dead24d5ce9e290e745c5a1ead3999483cb2dd..4db998b4c54396c4db00967f0b350f2eaa31bf53 100644 (file)
@@ -35,9 +35,11 @@ import (
        "google.golang.org/grpc/status"
        "google.golang.org/grpc/xds/internal"
        "google.golang.org/grpc/xds/internal/balancer/clusterresolver"
+       "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
 
        v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
        v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
+       v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
        testgrpc "google.golang.org/grpc/interop/grpc_testing"
        testpb "google.golang.org/grpc/interop/grpc_testing"
 )
@@ -131,7 +133,7 @@ func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) {
        for _, test := range tests {
                t.Run(test.name, func(t *testing.T) {
                        lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
-                       mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
+                       mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
 
                        // Push the first cluster resource through the management server and
                        // verify the configuration pushed to the child policy.
@@ -174,7 +176,7 @@ func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) {
 // contains the expected discovery mechanisms.
 func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) {
        lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
-       mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
+       mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
 
        // Configure the management server with the aggregate cluster resource
        // pointing to two child clusters, one EDS and one LogicalDNS. Include the
@@ -281,7 +283,7 @@ func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) {
 // policy contains a single discovery mechanism.
 func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) {
        lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
-       mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
+       mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
 
        // Configure the management server with the aggregate cluster resource
        // pointing to two child clusters.
@@ -356,7 +358,7 @@ func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) {
 // discovery mechanisms.
 func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T) {
        lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
-       mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
+       mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
 
        // Start off with the requested cluster being a leaf EDS cluster.
        resources := e2e.UpdateOptions{
@@ -450,7 +452,7 @@ func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T
 // longer exceed maximum depth, but be at the maximum allowed depth, and
 // verifies that an RPC can be made successfully.
 func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) {
-       mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
+       mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil)
 
        resources := e2e.UpdateOptions{
                NodeID: nodeID,
@@ -538,7 +540,7 @@ func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) {
 // pushed only after all child clusters are resolved.
 func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) {
        lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
-       mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
+       mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
 
        // Configure the management server with an aggregate cluster resource having
        // a diamond dependency pattern, (A->[B,C]; B->D; C->D). Includes resources
@@ -605,7 +607,7 @@ func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) {
 // pushed only after all child clusters are resolved.
 func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) {
        lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
-       mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
+       mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
 
        // Configure the management server with an aggregate cluster resource that
        // has duplicates in the graph, (A->[B, C]; B->[C, D]). Include resources
@@ -683,7 +685,7 @@ func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) {
 // child policy and that an RPC can be successfully made.
 func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) {
        lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
-       mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
+       mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil)
 
        const (
                clusterNameA = clusterName // cluster name in cds LB policy config
@@ -768,7 +770,7 @@ func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) {
 // that the aggregate cluster graph has no leaf clusters.
 func (s) TestAggregatedCluster_CycleWithNoLeafNode(t *testing.T) {
        lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
-       mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
+       mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil)
 
        const (
                clusterNameA = clusterName // cluster name in cds LB policy config
@@ -816,7 +818,7 @@ func (s) TestAggregatedCluster_CycleWithNoLeafNode(t *testing.T) {
 // child policy and RPCs should get routed to that leaf cluster.
 func (s) TestAggregatedCluster_CycleWithLeafNode(t *testing.T) {
        lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
-       mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
+       mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil)
 
        // Start a test service backend.
        server := stubserver.StartTestService(t, nil)
@@ -872,10 +874,21 @@ func (s) TestAggregatedCluster_CycleWithLeafNode(t *testing.T) {
 // removed from the tree no longer has a watcher and the new cluster added has a
 // new watcher.
 func (s) TestWatchers(t *testing.T) {
-       mgmtServer, nodeID, _, _, _, cdsResourceRequestedCh, _ := setupWithManagementServer(t)
-
        ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
        defer cancel()
+       cdsResourceRequestedCh := make(chan []string, 1)
+       onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
+               if req.GetTypeUrl() == version.V3ClusterURL {
+                       if len(req.GetResourceNames()) > 0 {
+                               select {
+                               case cdsResourceRequestedCh <- req.GetResourceNames():
+                               case <-ctx.Done():
+                               }
+                       }
+               }
+               return nil
+       }
+       mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, onStreamReq)
 
        const (
                clusterA = clusterName
index a152b78d7df5f235dda1b3fe2fcc35816cc65ddd..ac232462805602a14929768e26339caa59667244 100644 (file)
@@ -183,7 +183,8 @@ func registerWrappedCDSPolicy(t *testing.T) chan balancer.Balancer {
 }
 
 // Performs the following setup required for tests:
-//   - Spins up an xDS management server
+//   - Spins up an xDS management server and and the provided onStreamRequest
+//     function is set to be called for every incoming request on the ADS stream.
 //   - Creates an xDS client talking to this management server
 //   - Creates a manual resolver that configures the cds LB policy as the
 //     top-level policy, and pushes an initial configuration to it
@@ -195,39 +196,11 @@ func registerWrappedCDSPolicy(t *testing.T) chan balancer.Balancer {
 //   - the grpc channel to the test backend service
 //   - the manual resolver configured on the channel
 //   - the xDS client used the grpc channel
-//   - a channel on which requested cluster resource names are sent
-//   - a channel used to signal that previously requested cluster resources are
-//     no longer requested
-func setupWithManagementServer(t *testing.T) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) {
-       return setupWithManagementServerAndListener(t, nil)
-}
-
-// Same as setupWithManagementServer, but also allows the caller to specify
-// a listener to be used by the management server.
-func setupWithManagementServerAndListener(t *testing.T, lis net.Listener) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) {
+func setupWithManagementServer(t *testing.T, lis net.Listener, onStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) {
        t.Helper()
-
-       cdsResourceRequestedCh := make(chan []string, 1)
-       cdsResourceCanceledCh := make(chan struct{}, 1)
        mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
-               Listener: lis,
-               OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
-                       if req.GetTypeUrl() == version.V3ClusterURL {
-                               switch len(req.GetResourceNames()) {
-                               case 0:
-                                       select {
-                                       case cdsResourceCanceledCh <- struct{}{}:
-                                       default:
-                                       }
-                               default:
-                                       select {
-                                       case cdsResourceRequestedCh <- req.GetResourceNames():
-                                       default:
-                                       }
-                               }
-                       }
-                       return nil
-               },
+               Listener:        lis,
+               OnStreamRequest: onStreamRequest,
                // Required for aggregate clusters as all resources cannot be requested
                // at once.
                AllowResourceSubset: true,
@@ -268,7 +241,7 @@ func setupWithManagementServerAndListener(t *testing.T, lis net.Listener) (*e2e.
        cc.Connect()
        t.Cleanup(func() { cc.Close() })
 
-       return mgmtServer, nodeID, cc, r, xdsC, cdsResourceRequestedCh, cdsResourceCanceledCh
+       return mgmtServer, nodeID, cc, r, xdsC
 }
 
 // Helper function to compare the load balancing configuration received on the
@@ -321,11 +294,23 @@ func verifyRPCError(gotErr error, wantCode codes.Code, wantErr, wantNodeID strin
 // configuration changes, it stops requesting the old cluster resource and
 // starts requesting the new one.
 func (s) TestConfigurationUpdate_Success(t *testing.T) {
-       _, _, _, r, xdsClient, cdsResourceRequestedCh, _ := setupWithManagementServer(t)
-
-       // Verify that the specified cluster resource is requested.
        ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
        defer cancel()
+       cdsResourceRequestedCh := make(chan []string, 1)
+       onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
+               if req.GetTypeUrl() == version.V3ClusterURL {
+                       if len(req.GetResourceNames()) > 0 {
+                               select {
+                               case cdsResourceRequestedCh <- req.GetResourceNames():
+                               case <-ctx.Done():
+                               }
+                       }
+               }
+               return nil
+       }
+       _, _, _, r, xdsClient := setupWithManagementServer(t, nil, onStreamReq)
+
+       // Verify that the specified cluster resource is requested.
        wantNames := []string{clusterName}
        if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
                t.Fatal(err)
@@ -616,7 +601,7 @@ func (s) TestClusterUpdate_Success(t *testing.T) {
        for _, test := range tests {
                t.Run(test.name, func(t *testing.T) {
                        lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
-                       mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
+                       mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
 
                        ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
                        defer cancel()
@@ -640,7 +625,7 @@ func (s) TestClusterUpdate_Success(t *testing.T) {
 // balancing configuration pushed to the child is as expected.
 func (s) TestClusterUpdate_SuccessWithLRS(t *testing.T) {
        lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
-       mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
+       mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
 
        clusterResource := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
                ClusterName: clusterName,
@@ -689,15 +674,21 @@ func (s) TestClusterUpdate_SuccessWithLRS(t *testing.T) {
 //     continue using the previous good update.
 func (s) TestClusterUpdate_Failure(t *testing.T) {
        _, resolverErrCh, _, _ := registerWrappedClusterResolverPolicy(t)
-       mgmtServer, nodeID, cc, _, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServer(t)
-
-       // Verify that the specified cluster resource is requested.
        ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
        defer cancel()
-       wantNames := []string{clusterName}
-       if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
-               t.Fatal(err)
+       cdsResourceCanceledCh := make(chan struct{}, 1)
+       onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
+               if req.GetTypeUrl() == version.V3ClusterURL {
+                       if len(req.GetResourceNames()) == 0 {
+                               select {
+                               case cdsResourceCanceledCh <- struct{}{}:
+                               case <-ctx.Done():
+                               }
+                       }
+               }
+               return nil
        }
+       mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, onStreamReq)
 
        // Configure the management server to return a cluster resource that
        // contains a config_source_specifier for the `lrs_server` field which is not
@@ -806,12 +797,31 @@ func (s) TestClusterUpdate_Failure(t *testing.T) {
 func (s) TestResolverError(t *testing.T) {
        _, resolverErrCh, _, childPolicyCloseCh := registerWrappedClusterResolverPolicy(t)
        lis := testutils.NewListenerWrapper(t, nil)
-       mgmtServer, nodeID, cc, r, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServerAndListener(t, lis)
+       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+       defer cancel()
+       cdsResourceCanceledCh := make(chan struct{}, 1)
+       cdsResourceRequestedCh := make(chan []string, 1)
+       onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
+               if req.GetTypeUrl() == version.V3ClusterURL {
+                       switch len(req.GetResourceNames()) {
+                       case 0:
+                               select {
+                               case cdsResourceCanceledCh <- struct{}{}:
+                               case <-ctx.Done():
+                               }
+                       default:
+                               select {
+                               case cdsResourceRequestedCh <- req.GetResourceNames():
+                               case <-ctx.Done():
+                               }
+                       }
+               }
+               return nil
+       }
+       mgmtServer, nodeID, cc, r, _ := setupWithManagementServer(t, lis, onStreamReq)
 
        // Grab the wrapped connection from the listener wrapper. This will be used
        // to verify the connection is closed.
-       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
-       defer cancel()
        val, err := lis.NewConnCh.Receive(ctx)
        if err != nil {
                t.Fatalf("Failed to receive new connection from wrapped listener: %v", err)
@@ -949,15 +959,21 @@ func (s) TestResolverError(t *testing.T) {
 //   - when the cluster resource is re-sent by the management server, RPCs
 //     should start succeeding.
 func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) {
-       mgmtServer, nodeID, cc, _, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServer(t)
-
-       // Verify that the specified cluster resource is requested.
        ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
        defer cancel()
-       wantNames := []string{clusterName}
-       if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
-               t.Fatal(err)
+       cdsResourceCanceledCh := make(chan struct{}, 1)
+       onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
+               if req.GetTypeUrl() == version.V3ClusterURL {
+                       if len(req.GetResourceNames()) == 0 {
+                               select {
+                               case cdsResourceCanceledCh <- struct{}{}:
+                               case <-ctx.Done():
+                               }
+                       }
+               }
+               return nil
        }
+       mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, onStreamReq)
 
        // Start a test service backend.
        server := stubserver.StartTestService(t, nil)
@@ -1028,7 +1044,7 @@ func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) {
 func (s) TestClose(t *testing.T) {
        cdsBalancerCh := registerWrappedCDSPolicy(t)
        _, _, _, childPolicyCloseCh := registerWrappedClusterResolverPolicy(t)
-       mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
+       mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil)
 
        // Start a test service backend.
        server := stubserver.StartTestService(t, nil)
@@ -1075,7 +1091,7 @@ func (s) TestClose(t *testing.T) {
 func (s) TestExitIdle(t *testing.T) {
        cdsBalancerCh := registerWrappedCDSPolicy(t)
        _, _, exitIdleCh, _ := registerWrappedClusterResolverPolicy(t)
-       mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
+       mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil)
 
        // Start a test service backend.
        server := stubserver.StartTestService(t, nil)