]> git.feebdaed.xyz Git - 0xmirror/grpc-go.git/commitdiff
xdsclient: improve fallback test involving three servers (#8604)
authorEaswar Swaminathan <easwars@google.com>
Thu, 25 Sep 2025 17:15:27 +0000 (10:15 -0700)
committerGitHub <noreply@github.com>
Thu, 25 Sep 2025 17:15:27 +0000 (10:15 -0700)
The existing fallback test that involves three servers is flaky. The
reason for the flake is because some of the resources have the same name
in different servers. The listener resource is expected to have the same
name across the different management servers, but we generally expect
the other resources to have different names.

See the following from the gRFC:
- In
https://github.com/grpc/proposal/blob/master/A71-xds-fallback.md#reservations-about-using-the-fallback-server-data,
we have the following:
```
We have no guarantee that a combination of resources from different xDS servers form a valid cohesive
configuration, so we cannot make this determination on a per-resource basis. We need any given gRPC
channel or server listener to only use the resources from a single server.
```
- In
https://github.com/grpc/proposal/blob/master/A71-xds-fallback.md#config-tears,
we have the following:
```
Config tears happen when the client winds up using some combination of resources from the primary and
fallback servers at the same time, even though that combination of resources was never validated to work
together. In theory, this can cause correctness issues where we might send traffic to the wrong location or
the wrong way, or it can cause RPCs to fail. Note that this can happen only when the primary and fallback
server use the same resource names.
```

This PR ensures that all the different management servers have different
resource names for all resources except the listener. Also, ran the test
on forge 100K times with no failures.

This PR also improves a couple of logs that I found useful when
debugging the failures.

RELEASE NOTES: none

internal/xds/clients/xdsclient/authority.go
internal/xds/xdsclient/tests/fallback/fallback_test.go

index 49a4480cf15498ffa372fb8881e76c3eeb287515..e4425fa7b3caa6eba6f12d9ebd58babeff026f01 100644 (file)
@@ -570,10 +570,6 @@ func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *ServerConfig)
                return true
        }
 
-       if a.logger.V(2) {
-               a.logger.Infof("Received update from non-active server %q", serverConfig)
-       }
-
        // If the resource update is not from the current active server, it means
        // that we have received an update either from:
        // - a server that has a higher priority than the current active server and
@@ -586,8 +582,14 @@ func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *ServerConfig)
        serverIdx := a.serverIndexForConfig(serverConfig)
        activeServerIdx := a.serverIndexForConfig(a.activeXDSChannel.serverConfig)
        if activeServerIdx < serverIdx {
+               if a.logger.V(2) {
+                       a.logger.Infof("Ignoring update from lower priority server [%d] %q", serverIdx, serverConfig)
+               }
                return false
        }
+       if a.logger.V(2) {
+               a.logger.Infof("Received update from higher priority server [%d] %q", serverIdx, serverConfig)
+       }
 
        // At this point, we are guaranteed that we have received a response from a
        // higher priority server compared to the current active server. So, we
@@ -622,7 +624,7 @@ func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *ServerConfig)
                // Release the reference to the channel.
                if cfg.cleanup != nil {
                        if a.logger.V(2) {
-                               a.logger.Infof("Closing lower priority server %q", cfg.serverConfig)
+                               a.logger.Infof("Closing lower priority server [%d] %q", i, cfg.serverConfig)
                        }
                        cfg.cleanup()
                        cfg.cleanup = nil
index b45ee7b036a18c980a5d8614cd23f08896e0653a..5c96ea2e73348481c5c3c14af41ca51ae3a09ba4 100644 (file)
@@ -742,14 +742,13 @@ func (s) TestFallback_OnStartup_RPCSuccess(t *testing.T) {
        }
 }
 
-// TestXDSFallback_ThreeServerPromotion verifies that when the primary
-// management server is unavailable, the system attempts to connect to the
-// first fallback server, and if that is also down, to the second fallback
-// server. It also ensures that the system switches back to the first fallback
-// server once it becomes available again, and eventually returns to the
-// primary server when it comes back online, closing connections to the
-// fallback servers accordingly.
-func (s) TestXDSFallback_ThreeServerPromotion(t *testing.T) {
+// Test verifies that when the primary management server is unavailable, the
+// system attempts to connect to the first fallback server, and if that is also
+// down, to the second fallback server. It also ensures that the system switches
+// back to the first fallback server once it becomes available again, and
+// eventually returns to the primary server when it comes back online, closing
+// connections to the fallback servers accordingly.
+func (s) TestFallback_ThreeServerPromotion(t *testing.T) {
        ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTestTimeout)
        defer cancel()
 
@@ -770,34 +769,49 @@ func (s) TestXDSFallback_ThreeServerPromotion(t *testing.T) {
 
        // Start three test service backends.
        backend1 := stubserver.StartTestService(t, nil)
+       backend1Port := testutils.ParsePort(t, backend1.Address)
        defer backend1.Stop()
        backend2 := stubserver.StartTestService(t, nil)
+       backend2Port := testutils.ParsePort(t, backend2.Address)
        defer backend2.Stop()
        backend3 := stubserver.StartTestService(t, nil)
+       backend3Port := testutils.ParsePort(t, backend3.Address)
        defer backend3.Stop()
 
        nodeID := uuid.New().String()
-       const serviceName = "my-service-fallback-xds"
+       const (
+               serviceName              = "my-service-fallback-xds"
+               primaryRouteConfigName   = "primary-route-" + serviceName
+               secondaryRouteConfigName = "secondary-route-" + serviceName
+               tertiaryRouteConfigName  = "tertiary-route-" + serviceName
+               primaryClusterName       = "primary-cluster-" + serviceName
+               secondaryClusterName     = "secondary-cluster-" + serviceName
+               tertiaryClusterName      = "tertiary-cluster-" + serviceName
+               primaryEndpointsName     = "primary-endpoints-" + serviceName
+               secondaryEndpointsName   = "secondary-endpoints-" + serviceName
+               tertiaryEndpointsName    = "tertiary-endpoints-" + serviceName
+       )
 
        // Configure partial resources on the primary and secondary
        // management servers.
        primaryManagementServer.Update(ctx, e2e.UpdateOptions{
                NodeID:    nodeID,
-               Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, "route-p")},
+               Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, primaryRouteConfigName)},
        })
        secondaryManagementServer.Update(ctx, e2e.UpdateOptions{
                NodeID:    nodeID,
-               Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, "route-s1")},
+               Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, secondaryRouteConfigName)},
        })
 
        // Configure full resources on tertiary management server.
-       tertiaryManagementServer.Update(ctx, e2e.DefaultClientResources(e2e.ResourceParams{
-               DialTarget: serviceName,
-               NodeID:     nodeID,
-               Host:       "localhost",
-               Port:       testutils.ParsePort(t, backend3.Address),
-               SecLevel:   e2e.SecurityLevelNone,
-       }))
+       updateOpts := e2e.UpdateOptions{
+               NodeID:    nodeID,
+               Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, tertiaryRouteConfigName)},
+               Routes:    []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(tertiaryRouteConfigName, serviceName, tertiaryClusterName)},
+               Clusters:  []*v3clusterpb.Cluster{e2e.DefaultCluster(tertiaryClusterName, tertiaryEndpointsName, e2e.SecurityLevelNone)},
+               Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(tertiaryEndpointsName, "localhost", []uint32{backend3Port})},
+       }
+       tertiaryManagementServer.Update(ctx, updateOpts)
 
        // Create bootstrap configuration for all three management servers.
        bootstrapContents, err := bootstrap.NewContentsForTesting(bootstrap.ConfigOptionsForTesting{
@@ -874,15 +888,14 @@ func (s) TestXDSFallback_ThreeServerPromotion(t *testing.T) {
 
        // Secondary1 becomes available, RPCs go to backend2.
        secondaryLis.Restart()
-       secondaryManagementServer.Update(ctx, e2e.UpdateOptions{
+       updateOpts = e2e.UpdateOptions{
                NodeID:    nodeID,
-               Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, "route-s1")},
-               Routes:    []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig("route-s1", serviceName, "cluster-s1")},
-               Clusters:  []*v3clusterpb.Cluster{e2e.DefaultCluster("cluster-s1", "endpoints-s1", e2e.SecurityLevelNone)},
-               Endpoints: []*v3endpointpb.ClusterLoadAssignment{
-                       e2e.DefaultEndpoint("endpoints-s1", "localhost", []uint32{testutils.ParsePort(t, backend2.Address)}),
-               },
-       })
+               Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, secondaryRouteConfigName)},
+               Routes:    []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(secondaryRouteConfigName, serviceName, secondaryClusterName)},
+               Clusters:  []*v3clusterpb.Cluster{e2e.DefaultCluster(secondaryClusterName, secondaryEndpointsName, e2e.SecurityLevelNone)},
+               Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(secondaryEndpointsName, "localhost", []uint32{backend2Port})},
+       }
+       secondaryManagementServer.Update(ctx, updateOpts)
 
        secondaryConn, err := secondaryWrappedLis.NewConnCh.Receive(ctx)
        if err != nil {
@@ -897,13 +910,14 @@ func (s) TestXDSFallback_ThreeServerPromotion(t *testing.T) {
 
        // Primary becomes available, RPCs go to backend1.
        primaryLis.Restart()
-       primaryManagementServer.Update(ctx, e2e.DefaultClientResources(e2e.ResourceParams{
-               DialTarget: serviceName,
-               NodeID:     nodeID,
-               Host:       "localhost",
-               Port:       testutils.ParsePort(t, backend1.Address),
-               SecLevel:   e2e.SecurityLevelNone,
-       }))
+       updateOpts = e2e.UpdateOptions{
+               NodeID:    nodeID,
+               Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, primaryRouteConfigName)},
+               Routes:    []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(primaryRouteConfigName, serviceName, primaryClusterName)},
+               Clusters:  []*v3clusterpb.Cluster{e2e.DefaultCluster(primaryClusterName, primaryEndpointsName, e2e.SecurityLevelNone)},
+               Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(primaryEndpointsName, "localhost", []uint32{backend1Port})},
+       }
+       primaryManagementServer.Update(ctx, updateOpts)
 
        if _, err := primaryWrappedLis.NewConnCh.Receive(ctx); err != nil {
                t.Fatalf("Timeout when waiting for new connection to primary: %v", err)