"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver"
+ "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
+ v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
- mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
+ mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
// Push the first cluster resource through the management server and
// verify the configuration pushed to the child policy.
// contains the expected discovery mechanisms.
func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) {
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
- mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
+ mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
// Configure the management server with the aggregate cluster resource
// pointing to two child clusters, one EDS and one LogicalDNS. Include the
// policy contains a single discovery mechanism.
func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) {
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
- mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
+ mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
// Configure the management server with the aggregate cluster resource
// pointing to two child clusters.
// discovery mechanisms.
func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T) {
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
- mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
+ mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
// Start off with the requested cluster being a leaf EDS cluster.
resources := e2e.UpdateOptions{
// longer exceed maximum depth, but be at the maximum allowed depth, and
// verifies that an RPC can be made successfully.
func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) {
- mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
+ mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil)
resources := e2e.UpdateOptions{
NodeID: nodeID,
// pushed only after all child clusters are resolved.
func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) {
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
- mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
+ mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
// Configure the management server with an aggregate cluster resource having
// a diamond dependency pattern, (A->[B,C]; B->D; C->D). Includes resources
// pushed only after all child clusters are resolved.
func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) {
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
- mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
+ mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
// Configure the management server with an aggregate cluster resource that
// has duplicates in the graph, (A->[B, C]; B->[C, D]). Include resources
// child policy and that an RPC can be successfully made.
func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) {
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
- mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
+ mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil)
const (
clusterNameA = clusterName // cluster name in cds LB policy config
// that the aggregate cluster graph has no leaf clusters.
func (s) TestAggregatedCluster_CycleWithNoLeafNode(t *testing.T) {
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
- mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
+ mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil)
const (
clusterNameA = clusterName // cluster name in cds LB policy config
// child policy and RPCs should get routed to that leaf cluster.
func (s) TestAggregatedCluster_CycleWithLeafNode(t *testing.T) {
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
- mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
+ mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil)
// Start a test service backend.
server := stubserver.StartTestService(t, nil)
// removed from the tree no longer has a watcher and the new cluster added has a
// new watcher.
func (s) TestWatchers(t *testing.T) {
- mgmtServer, nodeID, _, _, _, cdsResourceRequestedCh, _ := setupWithManagementServer(t)
-
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
+ cdsResourceRequestedCh := make(chan []string, 1)
+ onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
+ if req.GetTypeUrl() == version.V3ClusterURL {
+ if len(req.GetResourceNames()) > 0 {
+ select {
+ case cdsResourceRequestedCh <- req.GetResourceNames():
+ case <-ctx.Done():
+ }
+ }
+ }
+ return nil
+ }
+ mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, onStreamReq)
const (
clusterA = clusterName
}
// Performs the following setup required for tests:
-// - Spins up an xDS management server
+// - Spins up an xDS management server and and the provided onStreamRequest
+// function is set to be called for every incoming request on the ADS stream.
// - Creates an xDS client talking to this management server
// - Creates a manual resolver that configures the cds LB policy as the
// top-level policy, and pushes an initial configuration to it
// - the grpc channel to the test backend service
// - the manual resolver configured on the channel
// - the xDS client used the grpc channel
-// - a channel on which requested cluster resource names are sent
-// - a channel used to signal that previously requested cluster resources are
-// no longer requested
-func setupWithManagementServer(t *testing.T) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) {
- return setupWithManagementServerAndListener(t, nil)
-}
-
-// Same as setupWithManagementServer, but also allows the caller to specify
-// a listener to be used by the management server.
-func setupWithManagementServerAndListener(t *testing.T, lis net.Listener) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) {
+func setupWithManagementServer(t *testing.T, lis net.Listener, onStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) {
t.Helper()
-
- cdsResourceRequestedCh := make(chan []string, 1)
- cdsResourceCanceledCh := make(chan struct{}, 1)
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
- Listener: lis,
- OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
- if req.GetTypeUrl() == version.V3ClusterURL {
- switch len(req.GetResourceNames()) {
- case 0:
- select {
- case cdsResourceCanceledCh <- struct{}{}:
- default:
- }
- default:
- select {
- case cdsResourceRequestedCh <- req.GetResourceNames():
- default:
- }
- }
- }
- return nil
- },
+ Listener: lis,
+ OnStreamRequest: onStreamRequest,
// Required for aggregate clusters as all resources cannot be requested
// at once.
AllowResourceSubset: true,
cc.Connect()
t.Cleanup(func() { cc.Close() })
- return mgmtServer, nodeID, cc, r, xdsC, cdsResourceRequestedCh, cdsResourceCanceledCh
+ return mgmtServer, nodeID, cc, r, xdsC
}
// Helper function to compare the load balancing configuration received on the
// configuration changes, it stops requesting the old cluster resource and
// starts requesting the new one.
func (s) TestConfigurationUpdate_Success(t *testing.T) {
- _, _, _, r, xdsClient, cdsResourceRequestedCh, _ := setupWithManagementServer(t)
-
- // Verify that the specified cluster resource is requested.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
+ cdsResourceRequestedCh := make(chan []string, 1)
+ onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
+ if req.GetTypeUrl() == version.V3ClusterURL {
+ if len(req.GetResourceNames()) > 0 {
+ select {
+ case cdsResourceRequestedCh <- req.GetResourceNames():
+ case <-ctx.Done():
+ }
+ }
+ }
+ return nil
+ }
+ _, _, _, r, xdsClient := setupWithManagementServer(t, nil, onStreamReq)
+
+ // Verify that the specified cluster resource is requested.
wantNames := []string{clusterName}
if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
t.Fatal(err)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
- mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
+ mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// balancing configuration pushed to the child is as expected.
func (s) TestClusterUpdate_SuccessWithLRS(t *testing.T) {
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
- mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
+ mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
clusterResource := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
ClusterName: clusterName,
// continue using the previous good update.
func (s) TestClusterUpdate_Failure(t *testing.T) {
_, resolverErrCh, _, _ := registerWrappedClusterResolverPolicy(t)
- mgmtServer, nodeID, cc, _, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServer(t)
-
- // Verify that the specified cluster resource is requested.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- wantNames := []string{clusterName}
- if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
- t.Fatal(err)
+ cdsResourceCanceledCh := make(chan struct{}, 1)
+ onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
+ if req.GetTypeUrl() == version.V3ClusterURL {
+ if len(req.GetResourceNames()) == 0 {
+ select {
+ case cdsResourceCanceledCh <- struct{}{}:
+ case <-ctx.Done():
+ }
+ }
+ }
+ return nil
}
+ mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, onStreamReq)
// Configure the management server to return a cluster resource that
// contains a config_source_specifier for the `lrs_server` field which is not
func (s) TestResolverError(t *testing.T) {
_, resolverErrCh, _, childPolicyCloseCh := registerWrappedClusterResolverPolicy(t)
lis := testutils.NewListenerWrapper(t, nil)
- mgmtServer, nodeID, cc, r, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServerAndListener(t, lis)
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+ cdsResourceCanceledCh := make(chan struct{}, 1)
+ cdsResourceRequestedCh := make(chan []string, 1)
+ onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
+ if req.GetTypeUrl() == version.V3ClusterURL {
+ switch len(req.GetResourceNames()) {
+ case 0:
+ select {
+ case cdsResourceCanceledCh <- struct{}{}:
+ case <-ctx.Done():
+ }
+ default:
+ select {
+ case cdsResourceRequestedCh <- req.GetResourceNames():
+ case <-ctx.Done():
+ }
+ }
+ }
+ return nil
+ }
+ mgmtServer, nodeID, cc, r, _ := setupWithManagementServer(t, lis, onStreamReq)
// Grab the wrapped connection from the listener wrapper. This will be used
// to verify the connection is closed.
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
val, err := lis.NewConnCh.Receive(ctx)
if err != nil {
t.Fatalf("Failed to receive new connection from wrapped listener: %v", err)
// - when the cluster resource is re-sent by the management server, RPCs
// should start succeeding.
func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) {
- mgmtServer, nodeID, cc, _, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServer(t)
-
- // Verify that the specified cluster resource is requested.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- wantNames := []string{clusterName}
- if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
- t.Fatal(err)
+ cdsResourceCanceledCh := make(chan struct{}, 1)
+ onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
+ if req.GetTypeUrl() == version.V3ClusterURL {
+ if len(req.GetResourceNames()) == 0 {
+ select {
+ case cdsResourceCanceledCh <- struct{}{}:
+ case <-ctx.Done():
+ }
+ }
+ }
+ return nil
}
+ mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, onStreamReq)
// Start a test service backend.
server := stubserver.StartTestService(t, nil)
func (s) TestClose(t *testing.T) {
cdsBalancerCh := registerWrappedCDSPolicy(t)
_, _, _, childPolicyCloseCh := registerWrappedClusterResolverPolicy(t)
- mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
+ mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil)
// Start a test service backend.
server := stubserver.StartTestService(t, nil)
func (s) TestExitIdle(t *testing.T) {
cdsBalancerCh := registerWrappedCDSPolicy(t)
_, _, exitIdleCh, _ := registerWrappedClusterResolverPolicy(t)
- mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
+ mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil)
// Start a test service backend.
server := stubserver.StartTestService(t, nil)