From 4e9b800097b648b010c2203a84dd462cdc0d9959 Mon Sep 17 00:00:00 2001 From: eshitachandwani <59800922+eshitachandwani@users.noreply.github.com> Date: Thu, 2 Oct 2025 00:54:25 +0530 Subject: [PATCH] xds/cdsbalancer: change tests to use xds resolver (#8579) Change the tests in xds cds balancer to use xds resolver instead of manual resolver This change is being done as part of gRFC [A74 : xDS Config tears](https://github.com/grpc/proposal/blob/master/A74-xds-config-tears.md). This is to make sure the tests pass after the change too. RELEASE NOTES: None --- .../cdsbalancer/aggregate_cluster_test.go | 149 ++++---- .../cdsbalancer/cdsbalancer_security_test.go | 317 ++++++------------ .../balancer/cdsbalancer/cdsbalancer_test.go | 315 ++++++++++------- 3 files changed, 391 insertions(+), 390 deletions(-) diff --git a/internal/xds/balancer/cdsbalancer/aggregate_cluster_test.go b/internal/xds/balancer/cdsbalancer/aggregate_cluster_test.go index 581f511b..7e277f3e 100644 --- a/internal/xds/balancer/cdsbalancer/aggregate_cluster_test.go +++ b/internal/xds/balancer/cdsbalancer/aggregate_cluster_test.go @@ -39,9 +39,14 @@ import ( v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" + + _ "google.golang.org/grpc/internal/xds/httpfilter/router" // Register the router filter. + _ "google.golang.org/grpc/internal/xds/resolver" // Register the xds resolver ) // makeAggregateClusterResource returns an aggregate cluster resource with the @@ -105,13 +110,13 @@ func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) { }, { name: "dns", - firstClusterResource: makeLogicalDNSClusterResource(clusterName, "dns_host", uint32(8080)), - secondClusterResource: makeLogicalDNSClusterResource(clusterName, "dns_host_new", uint32(8080)), + firstClusterResource: makeLogicalDNSClusterResource(clusterName, "dns_host", uint32(port)), + secondClusterResource: makeLogicalDNSClusterResource(clusterName, "dns_host_new", uint32(port)), wantFirstChildCfg: &clusterresolver.LBConfig{ DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{ Cluster: clusterName, Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS, - DNSHostname: "dns_host:8080", + DNSHostname: fmt.Sprintf("dns_host:%d", port), OutlierDetection: json.RawMessage(`{}`), TelemetryLabels: xdsinternal.UnknownCSMLabels, }}, @@ -121,7 +126,7 @@ func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) { DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{ Cluster: clusterName, Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS, - DNSHostname: "dns_host_new:8080", + DNSHostname: fmt.Sprintf("dns_host_new:%d", port), OutlierDetection: json.RawMessage(`{}`), TelemetryLabels: xdsinternal.UnknownCSMLabels, }}, @@ -133,13 +138,16 @@ func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) + mgmtServer, nodeID, _ := setupWithManagementServer(t, nil, nil) // Push the first cluster resource through the management server and // verify the configuration pushed to the child policy. resources := e2e.UpdateOptions{ NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(target, routeName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, target, clusterName)}, Clusters: []*v3clusterpb.Cluster{test.firstClusterResource}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, host, []uint32{port})}, SkipValidation: true, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -154,6 +162,7 @@ func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) { // Push the second cluster resource through the management server and // verify the configuration pushed to the child policy. resources.Clusters[0] = test.secondClusterResource + resources.Endpoints[0] = e2e.DefaultEndpoint(serviceName+"-new", host, []uint32{port}) if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } @@ -176,19 +185,21 @@ func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) { // contains the expected discovery mechanisms. func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) + mgmtServer, nodeID, _ := setupWithManagementServer(t, nil, nil) // Configure the management server with the aggregate cluster resource // pointing to two child clusters, one EDS and one LogicalDNS. Include the // resource corresponding to the EDS cluster here, but don't include // resource corresponding to the LogicalDNS cluster yet. resources := e2e.UpdateOptions{ - NodeID: nodeID, + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(target, routeName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, target, clusterName)}, Clusters: []*v3clusterpb.Cluster{ makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}), e2e.DefaultCluster(edsClusterName, serviceName, e2e.SecurityLevelNone), }, - SkipValidation: true, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, host, []uint32{port})}, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -237,14 +248,16 @@ func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) { const dnsClusterNameNew = dnsClusterName + "-new" const dnsHostNameNew = dnsHostName + "-new" resources = e2e.UpdateOptions{ - NodeID: nodeID, + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(target, routeName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, target, clusterName)}, Clusters: []*v3clusterpb.Cluster{ makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterNameNew}), e2e.DefaultCluster(edsClusterName, serviceName, e2e.SecurityLevelNone), makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), makeLogicalDNSClusterResource(dnsClusterNameNew, dnsHostNameNew, dnsPort), }, - SkipValidation: true, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, host, []uint32{port})}, } if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) @@ -283,18 +296,20 @@ func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) { // policy contains a single discovery mechanism. func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) + mgmtServer, nodeID, _ := setupWithManagementServer(t, nil, nil) // Configure the management server with the aggregate cluster resource // pointing to two child clusters. resources := e2e.UpdateOptions{ - NodeID: nodeID, + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(target, routeName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, target, clusterName)}, Clusters: []*v3clusterpb.Cluster{ makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}), e2e.DefaultCluster(edsClusterName, serviceName, e2e.SecurityLevelNone), makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), }, - SkipValidation: true, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, host, []uint32{port})}, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -326,12 +341,14 @@ func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) { } resources = e2e.UpdateOptions{ - NodeID: nodeID, + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(target, routeName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, target, clusterName)}, Clusters: []*v3clusterpb.Cluster{ e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone), makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), }, - SkipValidation: true, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, host, []uint32{port})}, } if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) @@ -358,13 +375,15 @@ func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) { // discovery mechanisms. func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) + mgmtServer, nodeID, _ := setupWithManagementServer(t, nil, nil) // Start off with the requested cluster being a leaf EDS cluster. resources := e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)}, - SkipValidation: true, + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(target, routeName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, target, clusterName)}, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, host, []uint32{port})}, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -388,13 +407,15 @@ func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T // Switch the requested cluster to be an aggregate cluster pointing to two // child clusters. resources = e2e.UpdateOptions{ - NodeID: nodeID, + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(target, routeName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, target, clusterName)}, Clusters: []*v3clusterpb.Cluster{ makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}), e2e.DefaultCluster(edsClusterName, serviceName, e2e.SecurityLevelNone), makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), }, - SkipValidation: true, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, host, []uint32{port})}, } if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) @@ -424,9 +445,11 @@ func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T // Switch the cluster back to a leaf EDS cluster. resources = e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)}, - SkipValidation: true, + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(target, routeName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, target, clusterName)}, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, host, []uint32{port})}, } if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) @@ -452,10 +475,13 @@ func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T // longer exceed maximum depth, but be at the maximum allowed depth, and // verifies that an RPC can be made successfully. func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) { - mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil) + mgmtServer, nodeID, cc := setupWithManagementServer(t, nil, nil) resources := e2e.UpdateOptions{ - NodeID: nodeID, + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(target, routeName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, target, clusterName)}, + Clusters: []*v3clusterpb.Cluster{ makeAggregateClusterResource(clusterName, []string{clusterName + "-1"}), makeAggregateClusterResource(clusterName+"-1", []string{clusterName + "-2"}), @@ -475,7 +501,7 @@ func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) { makeAggregateClusterResource(clusterName+"-15", []string{clusterName + "-16"}), e2e.DefaultCluster(clusterName+"-16", serviceName, e2e.SecurityLevelNone), }, - SkipValidation: true, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, host, []uint32{port})}, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -502,7 +528,9 @@ func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) { // Update the aggregate cluster resource to no longer exceed max depth, and // be at the maximum depth allowed. resources = e2e.UpdateOptions{ - NodeID: nodeID, + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(target, routeName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, target, clusterName)}, Clusters: []*v3clusterpb.Cluster{ makeAggregateClusterResource(clusterName, []string{clusterName + "-1"}), makeAggregateClusterResource(clusterName+"-1", []string{clusterName + "-2"}), @@ -521,8 +549,7 @@ func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) { makeAggregateClusterResource(clusterName+"-14", []string{clusterName + "-15"}), e2e.DefaultCluster(clusterName+"-15", serviceName, e2e.SecurityLevelNone), }, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})}, - SkipValidation: true, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, host, []uint32{testutils.ParsePort(t, server.Address)})}, } if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) @@ -540,7 +567,7 @@ func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) { // pushed only after all child clusters are resolved. func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) + mgmtServer, nodeID, _ := setupWithManagementServer(t, nil, nil) // Configure the management server with an aggregate cluster resource having // a diamond dependency pattern, (A->[B,C]; B->D; C->D). Includes resources @@ -554,13 +581,15 @@ func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) { clusterNameD = clusterName + "-D" ) resources := e2e.UpdateOptions{ - NodeID: nodeID, + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(target, routeName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, target, clusterName)}, Clusters: []*v3clusterpb.Cluster{ makeAggregateClusterResource(clusterNameA, []string{clusterNameB, clusterNameC}), makeAggregateClusterResource(clusterNameB, []string{clusterNameD}), e2e.DefaultCluster(clusterNameD, serviceName, e2e.SecurityLevelNone), }, - SkipValidation: true, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, host, []uint32{port})}, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -607,7 +636,7 @@ func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) { // pushed only after all child clusters are resolved. func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) + mgmtServer, nodeID, _ := setupWithManagementServer(t, nil, nil) // Configure the management server with an aggregate cluster resource that // has duplicates in the graph, (A->[B, C]; B->[C, D]). Include resources @@ -621,13 +650,15 @@ func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) { clusterNameD = clusterName + "-D" ) resources := e2e.UpdateOptions{ - NodeID: nodeID, + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(target, routeName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, target, clusterName)}, Clusters: []*v3clusterpb.Cluster{ makeAggregateClusterResource(clusterNameA, []string{clusterNameB, clusterNameC}), makeAggregateClusterResource(clusterNameB, []string{clusterNameC, clusterNameD}), e2e.DefaultCluster(clusterNameD, serviceName, e2e.SecurityLevelNone), }, - SkipValidation: true, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, host, []uint32{port})}, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -646,7 +677,8 @@ func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) { // Now configure the resource for cluster C in the management server, // thereby completing the cluster graph. This should result in configuration // being pushed down to the child policy. - resources.Clusters = append(resources.Clusters, e2e.DefaultCluster(clusterNameC, serviceName, e2e.SecurityLevelNone)) + resources.Clusters = append(resources.Clusters, e2e.DefaultCluster(clusterNameC, edsClusterName, e2e.SecurityLevelNone)) + resources.Endpoints = append(resources.Endpoints, e2e.DefaultEndpoint(edsClusterName, "eshita_eds", []uint32{1234})) if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } @@ -656,7 +688,7 @@ func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) { { Cluster: clusterNameC, Type: clusterresolver.DiscoveryMechanismTypeEDS, - EDSServiceName: serviceName, + EDSServiceName: edsClusterName, OutlierDetection: json.RawMessage(`{}`), TelemetryLabels: xdsinternal.UnknownCSMLabels, }, @@ -685,7 +717,7 @@ func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) { // child policy and that an RPC can be successfully made. func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil) + mgmtServer, nodeID, cc := setupWithManagementServer(t, nil, nil) const ( clusterNameA = clusterName // cluster name in cds LB policy config @@ -695,6 +727,8 @@ func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) { // child is itself. resources := e2e.UpdateOptions{ NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(target, routeName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, target, clusterName)}, Clusters: []*v3clusterpb.Cluster{makeAggregateClusterResource(clusterNameA, []string{clusterNameA})}, SkipValidation: true, } @@ -729,13 +763,14 @@ func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) { // Update the aggregate cluster to point to a leaf EDS cluster. resources = e2e.UpdateOptions{ - NodeID: nodeID, + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(target, routeName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, target, clusterName)}, Clusters: []*v3clusterpb.Cluster{ makeAggregateClusterResource(clusterNameA, []string{clusterNameB}), e2e.DefaultCluster(clusterNameB, serviceName, e2e.SecurityLevelNone), }, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})}, - SkipValidation: true, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, host, []uint32{testutils.ParsePort(t, server.Address)})}, } if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) @@ -770,7 +805,7 @@ func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) { // that the aggregate cluster graph has no leaf clusters. func (s) TestAggregatedCluster_CycleWithNoLeafNode(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil) + mgmtServer, nodeID, cc := setupWithManagementServer(t, nil, nil) const ( clusterNameA = clusterName // cluster name in cds LB policy config @@ -779,7 +814,9 @@ func (s) TestAggregatedCluster_CycleWithNoLeafNode(t *testing.T) { // Configure the management server with an aggregate cluster resource graph // that contains a cycle and no leaf clusters. resources := e2e.UpdateOptions{ - NodeID: nodeID, + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(target, routeName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, target, clusterName)}, Clusters: []*v3clusterpb.Cluster{ makeAggregateClusterResource(clusterNameA, []string{clusterNameB}), makeAggregateClusterResource(clusterNameB, []string{clusterNameA}), @@ -818,7 +855,7 @@ func (s) TestAggregatedCluster_CycleWithNoLeafNode(t *testing.T) { // child policy and RPCs should get routed to that leaf cluster. func (s) TestAggregatedCluster_CycleWithLeafNode(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil) + mgmtServer, nodeID, cc := setupWithManagementServer(t, nil, nil) // Start a test service backend. server := stubserver.StartTestService(t, nil) @@ -832,13 +869,15 @@ func (s) TestAggregatedCluster_CycleWithLeafNode(t *testing.T) { // Configure the management server with an aggregate cluster resource graph // that contains a cycle, but also contains a leaf cluster. resources := e2e.UpdateOptions{ - NodeID: nodeID, + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(target, routeName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, target, clusterName)}, Clusters: []*v3clusterpb.Cluster{ makeAggregateClusterResource(clusterNameA, []string{clusterNameB}), makeAggregateClusterResource(clusterNameB, []string{clusterNameA, clusterNameC}), e2e.DefaultCluster(clusterNameC, serviceName, e2e.SecurityLevelNone), }, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, host, []uint32{testutils.ParsePort(t, server.Address)})}, SkipValidation: true, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -888,7 +927,7 @@ func (s) TestWatchers(t *testing.T) { } return nil } - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, onStreamReq) + mgmtServer, nodeID, _ := setupWithManagementServer(t, nil, onStreamReq) const ( clusterA = clusterName @@ -899,7 +938,9 @@ func (s) TestWatchers(t *testing.T) { // Initial CDS resources: A -> B, C initialResources := e2e.UpdateOptions{ - NodeID: nodeID, + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(target, routeName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, target, clusterA)}, Clusters: []*v3clusterpb.Cluster{ makeAggregateClusterResource(clusterA, []string{clusterB, clusterC}), }, @@ -914,14 +955,10 @@ func (s) TestWatchers(t *testing.T) { } // Update the CDS resources to remove cluster C and add cluster D. - updatedResources := e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{ - makeAggregateClusterResource(clusterA, []string{clusterB, clusterD}), - }, - SkipValidation: true, + initialResources.Clusters = []*v3clusterpb.Cluster{ + makeAggregateClusterResource(clusterA, []string{clusterB, clusterD}), } - if err := mgmtServer.Update(ctx, updatedResources); err != nil { + if err := mgmtServer.Update(ctx, initialResources); err != nil { t.Fatalf("Update failed: %v", err) } wantNames = []string{clusterA, clusterB, clusterD} diff --git a/internal/xds/balancer/cdsbalancer/cdsbalancer_security_test.go b/internal/xds/balancer/cdsbalancer/cdsbalancer_security_test.go index 401a16dc..f26571fd 100644 --- a/internal/xds/balancer/cdsbalancer/cdsbalancer_security_test.go +++ b/internal/xds/balancer/cdsbalancer/cdsbalancer_security_test.go @@ -25,110 +25,41 @@ import ( "os" "strings" "testing" - "unsafe" - "github.com/google/go-cmp/cmp" "github.com/google/uuid" "google.golang.org/grpc" - "google.golang.org/grpc/attributes" - "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/tls/certprovider" "google.golang.org/grpc/credentials/xds" "google.golang.org/grpc/internal" - "google.golang.org/grpc/internal/balancer/stub" - xdscredsinternal "google.golang.org/grpc/internal/credentials/xds" "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/internal/xds/bootstrap" - "google.golang.org/grpc/internal/xds/xdsclient" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" - "google.golang.org/grpc/resolver/manual" - "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/testdata" v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" v3tlspb "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" _ "google.golang.org/grpc/credentials/tls/certprovider/pemfile" // Register the file watcher certificate provider plugin. + _ "google.golang.org/grpc/internal/xds/httpfilter/router" // Register the router filter. + _ "google.golang.org/grpc/internal/xds/resolver" // Register the xds resolver ) -// testCCWrapper wraps a balancer.ClientConn and intercepts NewSubConn and -// returns the xDS handshake info back to the test for inspection. -type testCCWrapper struct { - balancer.ClientConn - handshakeInfoCh chan *xdscredsinternal.HandshakeInfo -} - -// NewSubConn forwards the call to the underlying balancer.ClientConn, but -// before that, it validates the following: -// - there is only one address in the addrs slice -// - the single address contains xDS handshake information, which is then -// pushed onto the handshakeInfoCh channel -func (tcc *testCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { - if len(addrs) != 1 { - return nil, fmt.Errorf("NewSubConn got %d addresses, want 1", len(addrs)) - } - getHI := internal.GetXDSHandshakeInfoForTesting.(func(attr *attributes.Attributes) *unsafe.Pointer) - hi := getHI(addrs[0].Attributes) - if hi == nil { - return nil, fmt.Errorf("NewSubConn got address without xDS handshake info") - } - - sc, err := tcc.ClientConn.NewSubConn(addrs, opts) - select { - case tcc.handshakeInfoCh <- (*xdscredsinternal.HandshakeInfo)(*hi): - default: - } - return sc, err -} - -// Registers a wrapped cds LB policy for the duration of this test that retains -// all the functionality of the original cds LB policy, but overrides the -// NewSubConn method passed to the policy and makes the xDS handshake -// information passed to NewSubConn available to the test. -// -// Accepts as argument a channel onto which xDS handshake information passed to -// NewSubConn is written to. -func registerWrappedCDSPolicyWithNewSubConnOverride(t *testing.T, ch chan *xdscredsinternal.HandshakeInfo) { - cdsBuilder := balancer.Get(cdsName) - internal.BalancerUnregister(cdsBuilder.Name()) - var ccWrapper *testCCWrapper - stub.Register(cdsBuilder.Name(), stub.BalancerFuncs{ - Init: func(bd *stub.BalancerData) { - ccWrapper = &testCCWrapper{ - ClientConn: bd.ClientConn, - handshakeInfoCh: ch, - } - bd.ChildBalancer = cdsBuilder.Build(ccWrapper, bd.BuildOptions) - }, - ParseConfig: func(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { - return cdsBuilder.(balancer.ConfigParser).ParseConfig(lbCfg) - }, - UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { - return bd.ChildBalancer.UpdateClientConnState(ccs) - }, - Close: func(bd *stub.BalancerData) { - bd.ChildBalancer.Close() - }, - }) - t.Cleanup(func() { balancer.Register(cdsBuilder) }) -} - // Common setup for security tests: // - creates an xDS client with the specified bootstrap configuration -// - creates a manual resolver that specifies cds as the top-level LB policy -// - creates a channel that uses the passed in client creds and the manual -// resolver +// - creates a gRPC channel that uses the passed in client creds // - creates a test server that uses the passed in server creds // // Returns the following: @@ -137,32 +68,13 @@ func registerWrappedCDSPolicyWithNewSubConnOverride(t *testing.T, ch chan *xdscr func setupForSecurityTests(t *testing.T, bootstrapContents []byte, clientCreds, serverCreds credentials.TransportCredentials) (*grpc.ClientConn, string) { t.Helper() - config, err := bootstrap.NewConfigFromContents(bootstrapContents) - if err != nil { - t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err) + if internal.NewXDSResolverWithConfigForTesting == nil { + t.Fatalf("internal.NewXDSResolverWithConfigForTesting is nil") } - pool := xdsclient.NewPool(config) - xdsClient, xdsClose, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - }) + r, err := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error))(bootstrapContents) if err != nil { - t.Fatalf("Failed to create xDS client: %v", err) - } - t.Cleanup(xdsClose) - - // Create a manual resolver that configures the CDS LB policy as the - // top-level LB policy on the channel. - r := manual.NewBuilderWithScheme("whatever") - jsonSC := fmt.Sprintf(`{ - "loadBalancingConfig":[{ - "cds_experimental":{ - "cluster": "%s" - } - }] - }`, clusterName) - scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) - state := xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient) - r.InitialState(state) + t.Fatalf("Failed to create xDS resolver for testing: %v", err) + } // Create a ClientConn with the specified transport credentials. cc, err := grpc.NewClient(r.Scheme()+":///test.service", grpc.WithTransportCredentials(clientCreds), grpc.WithResolvers(r)) @@ -259,14 +171,8 @@ func verifySecurityInformationFromPeer(t *testing.T, pr *peer.Peer, wantSecLevel // Tests the case where xDS credentials are not in use, but the cds LB policy // receives a Cluster update with security configuration. Verifies that the -// security configuration is not parsed by the cds LB policy by looking at the -// xDS handshake info passed to NewSubConn. +// connection between client and server is insecure. func (s) TestSecurityConfigWithoutXDSCreds(t *testing.T) { - // Register a wrapped cds LB policy for the duration of this test that writes - // the xDS handshake info passed to NewSubConn onto the given channel. - handshakeInfoCh := make(chan *xdscredsinternal.HandshakeInfo, 1) - registerWrappedCDSPolicyWithNewSubConnOverride(t, handshakeInfoCh) - // Spin up an xDS management server. mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) @@ -278,14 +184,15 @@ func (s) TestSecurityConfigWithoutXDSCreds(t *testing.T) { // insecure credentials. cc, serverAddress := setupForSecurityTests(t, bc, insecure.NewCredentials(), nil) - // Configure cluster and endpoints resources in the management server. The + // Configure default resources in the management server. The // cluster resource is configured to return security configuration. - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelMTLS)}, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})}, - SkipValidation: true, - } + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: target, + NodeID: nodeID, + Host: host, + Port: testutils.ParsePort(t, serverAddress), + SecLevel: e2e.SecurityLevelMTLS, + }) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { @@ -293,34 +200,19 @@ func (s) TestSecurityConfigWithoutXDSCreds(t *testing.T) { } // Verify that a successful RPC can be made. + peer := &peer.Peer{} client := testgrpc.NewTestServiceClient(cc) - if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(peer)); err != nil { t.Fatalf("EmptyCall() failed: %v", err) } - // Ensure that the xDS handshake info passed to NewSubConn is empty. - var gotHI *xdscredsinternal.HandshakeInfo - select { - case gotHI = <-handshakeInfoCh: - case <-ctx.Done(): - t.Fatal("Timeout when waiting to read handshake info passed to NewSubConn") - } - wantHI := xdscredsinternal.NewHandshakeInfo(nil, nil, nil, false) - if !cmp.Equal(gotHI, wantHI) { - t.Fatalf("NewSubConn got handshake info %+v, want %+v", gotHI, wantHI) - } + verifySecurityInformationFromPeer(t, peer, e2e.SecurityLevelNone) } // Tests the case where xDS credentials are in use, but the cds LB policy // receives a Cluster update without security configuration. Verifies that the -// xDS handshake info passed to NewSubConn specified the use of fallback -// credentials. +// connection between client and server is insecure. func (s) TestNoSecurityConfigWithXDSCreds(t *testing.T) { - // Register a wrapped cds LB policy for the duration of this test that writes - // the xDS handshake info passed to NewSubConn onto the given channel. - handshakeInfoCh := make(chan *xdscredsinternal.HandshakeInfo, 1) - registerWrappedCDSPolicyWithNewSubConnOverride(t, handshakeInfoCh) - // Spin up an xDS management server. mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) @@ -332,14 +224,14 @@ func (s) TestNoSecurityConfigWithXDSCreds(t *testing.T) { // insecure credentials. cc, serverAddress := setupForSecurityTests(t, bc, xdsClientCredsWithInsecureFallback(t), nil) - // Configure cluster and endpoints resources in the management server. The + // Configure default resources in the management server. The // cluster resource is not configured to return any security configuration. - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)}, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})}, - SkipValidation: true, - } + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: target, + NodeID: nodeID, + Host: host, + Port: testutils.ParsePort(t, serverAddress), + }) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { @@ -347,25 +239,12 @@ func (s) TestNoSecurityConfigWithXDSCreds(t *testing.T) { } // Verify that a successful RPC can be made. + peer := &peer.Peer{} client := testgrpc.NewTestServiceClient(cc) - if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(peer)); err != nil { t.Fatalf("EmptyCall() failed: %v", err) } - - // Ensure that the xDS handshake info passed to NewSubConn is empty. - var gotHI *xdscredsinternal.HandshakeInfo - select { - case gotHI = <-handshakeInfoCh: - case <-ctx.Done(): - t.Fatal("Timeout when waiting to read handshake info passed to NewSubConn") - } - wantHI := xdscredsinternal.NewHandshakeInfo(nil, nil, nil, false) - if !cmp.Equal(gotHI, wantHI) { - t.Fatalf("NewSubConn got handshake info %+v, want %+v", gotHI, wantHI) - } - if !gotHI.UseFallbackCreds() { - t.Fatal("NewSubConn got handshake info that does not specify the use of fallback creds") - } + verifySecurityInformationFromPeer(t, peer, e2e.SecurityLevelNone) } // Tests the case where the security config returned by the management server @@ -395,11 +274,13 @@ func (s) TestSecurityConfigNotFoundInBootstrap(t *testing.T) { // Configure a cluster resource that contains security configuration, in the // management server. - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelMTLS)}, - SkipValidation: true, - } + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: target, + NodeID: nodeID, + Host: host, + Port: port, + SecLevel: e2e.SecurityLevelMTLS, + }) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { @@ -458,15 +339,17 @@ func (s) TestCertproviderStoreError(t *testing.T) { } // Create a grpc channel with xDS creds. - cc, _ := setupForSecurityTests(t, bootstrapContents, xdsClientCredsWithInsecureFallback(t), nil) + cc, serverAddress := setupForSecurityTests(t, bootstrapContents, xdsClientCredsWithInsecureFallback(t), nil) // Configure a cluster resource that contains security configuration, in the // management server. - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelMTLS)}, - SkipValidation: true, - } + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: target, + NodeID: nodeID, + Host: host, + Port: testutils.ParsePort(t, serverAddress), + SecLevel: e2e.SecurityLevelMTLS, + }) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { @@ -493,14 +376,15 @@ func (s) TestGoodSecurityConfig(t *testing.T) { // credentials. cc, serverAddress := setupForSecurityTests(t, bc, xdsClientCredsWithInsecureFallback(t), tlsServerCreds(t)) - // Configure cluster and endpoints resources in the management server. The + // Configure default resources in the management server. The // cluster resource is configured to return security configuration. - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelMTLS)}, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})}, - SkipValidation: true, - } + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: target, + NodeID: nodeID, + Host: host, + Port: testutils.ParsePort(t, serverAddress), + SecLevel: e2e.SecurityLevelMTLS, + }) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { @@ -535,8 +419,7 @@ func (s) TestSecurityConfigUpdate_BadToGood(t *testing.T) { // credentials. cc, serverAddress := setupForSecurityTests(t, bc, xdsClientCredsWithInsecureFallback(t), tlsServerCreds(t)) - // Configure cluster and endpoints resources in the management server. The - // cluster resource contains security configuration with a certificate + // The cluster resource contains security configuration with a certificate // provider instance that is missing in the bootstrap configuration. cluster := e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone) cluster.TransportSocket = &v3corepb.TransportSocket{ @@ -554,10 +437,11 @@ func (s) TestSecurityConfigUpdate_BadToGood(t *testing.T) { }, } resources := e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{cluster}, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})}, - SkipValidation: true, + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(target, routeName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, target, clusterName)}, + Clusters: []*v3clusterpb.Cluster{cluster}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, host, []uint32{testutils.ParsePort(t, serverAddress)})}, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -572,8 +456,10 @@ func (s) TestSecurityConfigUpdate_BadToGood(t *testing.T) { // configuration. resources = e2e.UpdateOptions{ NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(target, routeName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, target, clusterName)}, Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelMTLS)}, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, host, []uint32{testutils.ParsePort(t, serverAddress)})}, SkipValidation: true, } if err := mgmtServer.Update(ctx, resources); err != nil { @@ -607,14 +493,15 @@ func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) { // credentials. cc, serverAddress := setupForSecurityTests(t, bc, xdsClientCredsWithInsecureFallback(t), tlsServerCreds(t)) - // Configure cluster and endpoints resources in the management server. The + // Configure default resources in the management server. The // cluster resource is configured to return security configuration. - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelMTLS)}, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})}, - SkipValidation: true, - } + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: target, + NodeID: nodeID, + Host: host, + Port: testutils.ParsePort(t, serverAddress), + SecLevel: e2e.SecurityLevelMTLS, + }) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { @@ -636,12 +523,13 @@ func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) { // Update the resources in the management server to contain no security // configuration. This should result in the use of fallback credentials, // which is insecure in our case. - resources = e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)}, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, insecureServer.Address)})}, - SkipValidation: true, - } + resources = e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: target, + NodeID: nodeID, + Host: host, + Port: testutils.ParsePort(t, insecureServer.Address), + SecLevel: e2e.SecurityLevelNone, + }) if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } @@ -686,14 +574,15 @@ func (s) TestSecurityConfigUpdate_GoodToBad(t *testing.T) { // credentials. cc, serverAddress := setupForSecurityTests(t, bc, xdsClientCredsWithInsecureFallback(t), tlsServerCreds(t)) - // Configure cluster and endpoints resources in the management server. The + // Configure default resources in the management server. The // cluster resource is configured to return security configuration. - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelMTLS)}, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})}, - SkipValidation: true, - } + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: target, + NodeID: nodeID, + Host: host, + Port: testutils.ParsePort(t, serverAddress), + SecLevel: e2e.SecurityLevelMTLS, + }) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { @@ -708,11 +597,10 @@ func (s) TestSecurityConfigUpdate_GoodToBad(t *testing.T) { } verifySecurityInformationFromPeer(t, peer, e2e.SecurityLevelMTLS) - // Configure cluster and endpoints resources in the management server. The - // cluster resource contains security configuration with a certificate + // The cluster resource contains security configuration with a certificate // provider instance that is missing in the bootstrap configuration. - cluster := e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone) - cluster.TransportSocket = &v3corepb.TransportSocket{ + resources.Clusters[0] = e2e.DefaultCluster(resources.Clusters[0].Name, resources.Endpoints[0].ClusterName, e2e.SecurityLevelNone) + resources.Clusters[0].TransportSocket = &v3corepb.TransportSocket{ Name: "envoy.transport_sockets.tls", ConfigType: &v3corepb.TransportSocket_TypedConfig{ TypedConfig: testutils.MarshalAny(t, &v3tlspb.UpstreamTlsContext{ @@ -726,12 +614,6 @@ func (s) TestSecurityConfigUpdate_GoodToBad(t *testing.T) { }), }, } - resources = e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{cluster}, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})}, - SkipValidation: true, - } if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } @@ -791,14 +673,15 @@ func (s) TestSystemRootCertsSecurityConfig(t *testing.T) { // credentials. cc, serverAddress := setupForSecurityTests(t, bc, xdsClientCredsWithInsecureFallback(t), tlsServerCreds(t)) - // Configure cluster and endpoints resources in the management server. The - // cluster resource is configured to return security configuration. - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelTLSWithSystemRootCerts)}, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})}, - SkipValidation: true, - } + // Configure default resources in the management server. The cluster + // resource is configured to return security configuration. + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: target, + NodeID: nodeID, + Host: host, + Port: testutils.ParsePort(t, serverAddress), + SecLevel: e2e.SecurityLevelTLSWithSystemRootCerts, + }) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { diff --git a/internal/xds/balancer/cdsbalancer/cdsbalancer_test.go b/internal/xds/balancer/cdsbalancer/cdsbalancer_test.go index ca13655d..3bd1a49c 100644 --- a/internal/xds/balancer/cdsbalancer/cdsbalancer_test.go +++ b/internal/xds/balancer/cdsbalancer/cdsbalancer_test.go @@ -55,19 +55,27 @@ import ( v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" - _ "google.golang.org/grpc/balancer/ringhash" // Register the ring_hash LB policy + _ "google.golang.org/grpc/balancer/ringhash" // Register the ring_hash LB policy + _ "google.golang.org/grpc/internal/xds/httpfilter/router" // Register the router filter. + _ "google.golang.org/grpc/internal/xds/resolver" // Register the xds resolver ) const ( + target = "test.service" + routeName = "test_route" clusterName = "cluster1" edsClusterName = clusterName + "-eds" dnsClusterName = clusterName + "-dns" serviceName = "service1" dnsHostName = "dns_host" + host = "localhost" + port = 8080 dnsPort = uint32(8080) defaultTestTimeout = 5 * time.Second defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen. @@ -152,38 +160,8 @@ func registerWrappedClusterResolverPolicy(t *testing.T) (chan serviceconfig.Load return lbCfgCh, resolverErrCh, exitIdleCh, closeCh } -// Registers a wrapped cds LB policy for the duration of this test that retains -// all the functionality of the original cds LB policy, but makes the newly -// built policy available to the test to directly invoke any balancer methods. -// -// Returns a channel on which the newly built cds LB policy is written to. -func registerWrappedCDSPolicy(t *testing.T) chan balancer.Balancer { - cdsBuilder := balancer.Get(cdsName) - internal.BalancerUnregister(cdsBuilder.Name()) - cdsBalancerCh := make(chan balancer.Balancer, 1) - stub.Register(cdsBuilder.Name(), stub.BalancerFuncs{ - Init: func(bd *stub.BalancerData) { - bal := cdsBuilder.Build(bd.ClientConn, bd.BuildOptions) - bd.ChildBalancer = bal - cdsBalancerCh <- bal - }, - ParseConfig: func(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { - return cdsBuilder.(balancer.ConfigParser).ParseConfig(lbCfg) - }, - UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { - return bd.ChildBalancer.UpdateClientConnState(ccs) - }, - Close: func(bd *stub.BalancerData) { - bd.ChildBalancer.Close() - }, - }) - t.Cleanup(func() { balancer.Register(cdsBuilder) }) - - return cdsBalancerCh -} - // Performs the following setup required for tests: -// - Spins up an xDS management server and and the provided onStreamRequest +// - Spins up an xDS management server and the provided onStreamRequest // function is set to be called for every incoming request on the ADS stream. // - Creates an xDS client talking to this management server // - Creates a manual resolver that configures the cds LB policy as the @@ -196,7 +174,7 @@ func registerWrappedCDSPolicy(t *testing.T) chan balancer.Balancer { // - the grpc channel to the test backend service // - the manual resolver configured on the channel // - the xDS client used the grpc channel -func setupWithManagementServer(t *testing.T, lis net.Listener, onStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) { +func setupWithManagementServerAndManualResolver(t *testing.T, lis net.Listener, onStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) { t.Helper() mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ Listener: lis, @@ -236,7 +214,7 @@ func setupWithManagementServer(t *testing.T, lis net.Listener, onStreamRequest f cc, err := grpc.NewClient(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) if err != nil { - t.Fatalf("grpc.NewClient(%q) = %v", lis.Addr().String(), err) + t.Fatalf("grpc.NewClient(\"%s:///test.service\") = %v", r.Scheme(), err) } cc.Connect() t.Cleanup(func() { cc.Close() }) @@ -244,6 +222,48 @@ func setupWithManagementServer(t *testing.T, lis net.Listener, onStreamRequest f return mgmtServer, nodeID, cc, r, xdsC } +// Performs the following setup required for tests: +// - Spins up an xDS management server and the provided onStreamRequest +// function is set to be called for every incoming request on the ADS stream. +// - Creates an xDS client talking to this management server +// - Creates a gRPC channel with grpc.NewClient that create a xds resolver. +// +// Returns the following: +// - the xDS management server +// - the nodeID expected by the management server +// - the grpc channel to the test backend service +// - the xDS client used the grpc channel +func setupWithManagementServer(t *testing.T, lis net.Listener, onStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error) (*e2e.ManagementServer, string, *grpc.ClientConn) { + t.Helper() + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + Listener: lis, + OnStreamRequest: onStreamRequest, + // Required for aggregate clusters as all resources cannot be requested + // at once. + AllowResourceSubset: true, + }) + + // Create bootstrap configuration pointing to the above management server. + nodeID := uuid.New().String() + bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + + if internal.NewXDSResolverWithConfigForTesting == nil { + t.Fatalf("internal.NewXDSResolverWithConfigForTesting is nil") + } + r, err := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error))(bc) + if err != nil { + t.Fatalf("Failed to create xDS resolver for testing: %v", err) + } + cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", target), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("grpc.NewClient(\"xds:///%s\") = %v", target, err) + } + cc.Connect() + t.Cleanup(func() { cc.Close() }) + + return mgmtServer, nodeID, cc +} + // Helper function to compare the load balancing configuration received on the // channel with the expected one. Both configs are marshalled to JSON and then // compared. @@ -308,7 +328,7 @@ func (s) TestConfigurationUpdate_Success(t *testing.T) { } return nil } - _, _, _, r, xdsClient := setupWithManagementServer(t, nil, onStreamReq) + _, _, _, r, xdsClient := setupWithManagementServerAndManualResolver(t, nil, onStreamReq) // Verify that the specified cluster resource is requested. wantNames := []string{clusterName} @@ -601,14 +621,16 @@ func (s) TestClusterUpdate_Success(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) + mgmtServer, nodeID, _ := setupWithManagementServer(t, nil, nil) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{test.clusterResource}, - SkipValidation: true, + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(target, routeName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, target, test.clusterResource.Name)}, + Clusters: []*v3clusterpb.Cluster{test.clusterResource}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, host, []uint32{port})}, }); err != nil { t.Fatal(err) } @@ -625,7 +647,7 @@ func (s) TestClusterUpdate_Success(t *testing.T) { // balancing configuration pushed to the child is as expected. func (s) TestClusterUpdate_SuccessWithLRS(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) + mgmtServer, nodeID, _ := setupWithManagementServer(t, nil, nil) clusterResource := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ ClusterName: clusterName, @@ -652,9 +674,11 @@ func (s) TestClusterUpdate_SuccessWithLRS(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{clusterResource}, - SkipValidation: true, + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(target, routeName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, target, clusterResource.Name)}, + Clusters: []*v3clusterpb.Cluster{clusterResource}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, host, []uint32{port})}, }); err != nil { t.Fatal(err) } @@ -674,7 +698,7 @@ func (s) TestClusterUpdate_SuccessWithLRS(t *testing.T) { // continue using the previous good update. func (s) TestClusterUpdate_Failure(t *testing.T) { _, resolverErrCh, _, _ := registerWrappedClusterResolverPolicy(t) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) defer cancel() cdsResourceCanceledCh := make(chan struct{}, 1) onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error { @@ -688,18 +712,19 @@ func (s) TestClusterUpdate_Failure(t *testing.T) { } return nil } - mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, onStreamReq) + mgmtServer, nodeID, cc := setupWithManagementServer(t, nil, onStreamReq) // Configure the management server to return a cluster resource that // contains a config_source_specifier for the `lrs_server` field which is not // set to `self`, and hence is expected to be NACKed by the client. - cluster := e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone) - cluster.LrsServer = &v3corepb.ConfigSource{ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{}} - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{cluster}, - SkipValidation: true, - } + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: target, + NodeID: nodeID, + Host: host, + Port: port, + }) + resources.Clusters[0].LrsServer = &v3corepb.ConfigSource{ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{}} + if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } @@ -728,13 +753,13 @@ func (s) TestClusterUpdate_Failure(t *testing.T) { server := stubserver.StartTestService(t, nil) t.Cleanup(server.Stop) - // Configure cluster and endpoints resources in the management server. - resources = e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)}, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})}, - SkipValidation: true, - } + // Configure correct cluster and endpoints resources in the management server. + resources = e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: target, + NodeID: nodeID, + Host: host, + Port: testutils.ParsePort(t, server.Address), + }) if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } @@ -745,12 +770,8 @@ func (s) TestClusterUpdate_Failure(t *testing.T) { } // Send the bad cluster resource again. - resources = e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{cluster}, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})}, - SkipValidation: true, - } + resources.Clusters[0].LrsServer = &v3corepb.ConfigSource{ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{}} + if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } @@ -818,7 +839,7 @@ func (s) TestResolverError(t *testing.T) { } return nil } - mgmtServer, nodeID, cc, r, _ := setupWithManagementServer(t, lis, onStreamReq) + mgmtServer, nodeID, cc, r, _ := setupWithManagementServerAndManualResolver(t, lis, onStreamReq) // Grab the wrapped connection from the listener wrapper. This will be used // to verify the connection is closed. @@ -872,7 +893,7 @@ func (s) TestResolverError(t *testing.T) { resources := e2e.UpdateOptions{ NodeID: nodeID, Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)}, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, host, []uint32{testutils.ParsePort(t, server.Address)})}, SkipValidation: true, } if err := mgmtServer.Update(ctx, resources); err != nil { @@ -950,6 +971,83 @@ func (s) TestResolverError(t *testing.T) { } } +// Tests the scenario for resolver errors: when a resolver resource not found +// error is received when the LDS resource is removed with a previous good +// update from the management server, the cds LB policy is expected to push the +// error down the child policy and put the channel in TRANSIENT_FAILURE. It is +// also expected to cancel the CDS watch. +func (s) TestResourceNotFoundResolverError(t *testing.T) { + _, _, _, childPolicyCloseCh := registerWrappedClusterResolverPolicy(t) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cdsResourceCanceledCh := make(chan struct{}, 1) + onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + if req.GetTypeUrl() == version.V3ClusterURL { + if len(req.GetResourceNames()) == 0 { + select { + case cdsResourceCanceledCh <- struct{}{}: + case <-ctx.Done(): + } + } + } + return nil + } + mgmtServer, nodeID, cc := setupWithManagementServer(t, nil, onStreamReq) + + // Start a test service backend. + server := stubserver.StartTestService(t, nil) + t.Cleanup(server.Stop) + + // Configure default resources in the management server. + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: target, + NodeID: nodeID, + Host: host, + Port: testutils.ParsePort(t, server.Address), + }) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Verify that a successful RPC can be made. + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) + } + + // Push a resource-not-found type error by removing the LDS resource. + resources.Listeners = nil + resources.SkipValidation = true + mgmtServer.Update(ctx, resources) + + // Wait for the CDS resource to be not requested anymore. + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for CDS resource to be not requested") + case <-cdsResourceCanceledCh: + } + + // Verify that the resolver error is pushed to the child policy. + select { + case <-childPolicyCloseCh: + case <-ctx.Done(): + t.Fatal("Timeout when waiting for child policy to be closed") + } + + testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure) + + // Ensure that the resolver error is propagated to the RPC caller. + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for error from RPC") + default: + _, err := client.EmptyCall(ctx, &testpb.Empty{}) + if err := verifyRPCError(err, codes.Unavailable, "", nodeID); err == nil { + return + } + } +} + // Tests scenarios involving removal of a cluster resource from the management // server. // @@ -973,19 +1071,19 @@ func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) { } return nil } - mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, onStreamReq) + mgmtServer, nodeID, cc := setupWithManagementServer(t, nil, onStreamReq) // Start a test service backend. server := stubserver.StartTestService(t, nil) t.Cleanup(server.Stop) - // Configure cluster and endpoints resources in the management server. - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)}, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})}, - SkipValidation: true, - } + // Configure default resources in the management server. + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: target, + NodeID: nodeID, + Host: host, + Port: testutils.ParsePort(t, server.Address), + }) if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } @@ -998,7 +1096,9 @@ func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) { // Remove the cluster resource from the management server, triggering a // resource-not-found error. + oldClusters := resources.Clusters resources.Clusters = nil + resources.SkipValidation = true if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } @@ -1016,19 +1116,14 @@ func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) { // Ensure RPC fails with Unavailable status code and the error message is // meaningful and contains the xDS node ID. - wantErr := fmt.Sprintf("resource %q of type %q has been removed", clusterName, "ClusterResource") + wantErr := fmt.Sprintf("resource %q of type %q has been removed", oldClusters[0].Name, "ClusterResource") _, err := client.EmptyCall(ctx, &testpb.Empty{}) if err := verifyRPCError(err, codes.Unavailable, wantErr, nodeID); err != nil { t.Fatal(err) } // Re-add the cluster resource to the management server. - resources = e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)}, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})}, - SkipValidation: true, - } + resources.Clusters = oldClusters if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } @@ -1041,22 +1136,21 @@ func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) { // Tests that closing the cds LB policy results in the the child policy being // closed. -func (s) TestClose(t *testing.T) { - cdsBalancerCh := registerWrappedCDSPolicy(t) +func TestClose(t *testing.T) { _, _, _, childPolicyCloseCh := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil) + mgmtServer, nodeID, cc := setupWithManagementServer(t, nil, nil) // Start a test service backend. server := stubserver.StartTestService(t, nil) t.Cleanup(server.Stop) - // Configure cluster and endpoints resources in the management server. - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)}, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})}, - SkipValidation: true, - } + // Configure resources in the management server. + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: target, + NodeID: nodeID, + Host: host, + Port: testutils.ParsePort(t, server.Address), + }) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { @@ -1069,14 +1163,8 @@ func (s) TestClose(t *testing.T) { t.Fatalf("EmptyCall() failed: %v", err) } - // Retrieve the cds LB policy and close it. - var cdsBal balancer.Balancer - select { - case cdsBal = <-cdsBalancerCh: - case <-ctx.Done(): - t.Fatal("Timeout when waiting for cds LB policy to be created") - } - cdsBal.Close() + // Close the gRPC ClientConn, which will close the cds LB policy. + cc.Close() // Wait for the child policy to be closed. select { @@ -1089,21 +1177,20 @@ func (s) TestClose(t *testing.T) { // Tests that calling ExitIdle on the cds LB policy results in the call being // propagated to the child policy. func (s) TestExitIdle(t *testing.T) { - cdsBalancerCh := registerWrappedCDSPolicy(t) _, _, exitIdleCh, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil) + mgmtServer, nodeID, cc := setupWithManagementServer(t, nil, nil) // Start a test service backend. server := stubserver.StartTestService(t, nil) t.Cleanup(server.Stop) - // Configure cluster and endpoints resources in the management server. - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)}, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})}, - SkipValidation: true, - } + // Configure resources in the management server. + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: target, + NodeID: nodeID, + Host: host, + Port: testutils.ParsePort(t, server.Address), + }) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { @@ -1116,14 +1203,8 @@ func (s) TestExitIdle(t *testing.T) { t.Fatalf("EmptyCall() failed: %v", err) } - // Retrieve the cds LB policy and call ExitIdle() on it. - var cdsBal balancer.Balancer - select { - case cdsBal = <-cdsBalancerCh: - case <-ctx.Done(): - t.Fatal("Timeout when waiting for cds LB policy to be created") - } - cdsBal.ExitIdle() + // Connect to gRPC channel that will call ExitIdle on the cds LB policy. + cc.Connect() // Wait for ExitIdle to be called on the child policy. select { -- 2.43.0