--- /dev/null
+/*
+ * Copyright 2025 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package xdsdepmgr provides the implementation of the xDS dependency manager
+// that manages all the xDS watches and resources as described in gRFC A74.
+package xdsdepmgr
+
+import (
+ "fmt"
+ "sync"
+
+ "google.golang.org/grpc/grpclog"
+ internalgrpclog "google.golang.org/grpc/internal/grpclog"
+ "google.golang.org/grpc/internal/xds/xdsclient"
+ "google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
+)
+
+const prefix = "[xdsdepmgr %p] "
+
+var logger = grpclog.Component("xds")
+
+func prefixLogger(p *DependencyManager) *internalgrpclog.PrefixLogger {
+ return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p))
+}
+
+// ConfigWatcher is the interface for consumers of aggregated xDS configuration
+// from the DependencyManager. The only consumer of this configuration is
+// currently the xDS resolver.
+type ConfigWatcher interface {
+ // Update is invoked when a new, validated xDS configuration is available.
+ //
+ // Implementations must treat the received config as read-only and should
+ // not modify it.
+ Update(*xdsresource.XDSConfig)
+
+ // Error is invoked when an error is received from the listener or route
+ // resource watcher. This includes cases where:
+ // - The listener or route resource watcher reports a resource error.
+ // - The received listener resource is a socket listener, not an API
+ // listener. TODO(i/8114): Implement this check.
+ // - The received route configuration does not contain a virtual host
+ // matching the channel's default authority.
+ Error(error)
+}
+
+// DependencyManager registers watches on the xDS client for all required xDS
+// resources, resolves dependencies between them, and returns a complete
+// configuration to the xDS resolver.
+type DependencyManager struct {
+ // The following fields are initialized at creation time and are read-only
+ // after that.
+ logger *internalgrpclog.PrefixLogger
+ watcher ConfigWatcher
+ xdsClient xdsclient.XDSClient
+ ldsResourceName string
+ dataplaneAuthority string
+ nodeID string
+
+ // All the fields below are protected by mu.
+ mu sync.Mutex
+ stopped bool
+
+ listenerWatcher *listenerWatcher
+ currentListenerUpdate *xdsresource.ListenerUpdate
+ routeConfigWatcher *routeConfigWatcher
+ rdsResourceName string
+ currentRouteConfig *xdsresource.RouteConfigUpdate
+ currentVirtualHost *xdsresource.VirtualHost
+}
+
+// New creates a new DependencyManager.
+//
+// - listenerName is the name of the Listener resource to request from the
+// management server.
+// - dataplaneAuthority is used to select the best matching virtual host from
+// the route configuration received from the management server.
+// - xdsClient is the xDS client to use to register resource watches.
+// - watcher is the ConfigWatcher interface that will receive the aggregated
+// XDSConfig updates and errors.
+func New(listenerName, dataplaneAuthority string, xdsClient xdsclient.XDSClient, watcher ConfigWatcher) *DependencyManager {
+ dm := &DependencyManager{
+ ldsResourceName: listenerName,
+ dataplaneAuthority: dataplaneAuthority,
+ xdsClient: xdsClient,
+ watcher: watcher,
+ nodeID: xdsClient.BootstrapConfig().Node().GetId(),
+ }
+ dm.logger = prefixLogger(dm)
+
+ // Start the listener watch. Listener watch will start the other resource
+ // watches as needed.
+ dm.listenerWatcher = newListenerWatcher(listenerName, dm)
+ return dm
+}
+
+// Close cancels all registered resource watches.
+func (m *DependencyManager) Close() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ if m.stopped {
+ return
+ }
+
+ m.stopped = true
+ if m.listenerWatcher != nil {
+ m.listenerWatcher.stop()
+ }
+ if m.routeConfigWatcher != nil {
+ m.routeConfigWatcher.stop()
+ }
+}
+
+// annotateErrorWithNodeID annotates the given error with the provided xDS node
+// ID.
+func (m *DependencyManager) annotateErrorWithNodeID(err error) error {
+ return fmt.Errorf("[xDS node id: %v]: %v", m.nodeID, err)
+}
+
+// maybeSendUpdateLocked checks that all the resources have been received and sends
+// the current aggregated xDS configuration to the watcher if all the updates
+// are available.
+func (m *DependencyManager) maybeSendUpdateLocked() {
+ m.watcher.Update(&xdsresource.XDSConfig{
+ Listener: m.currentListenerUpdate,
+ RouteConfig: m.currentRouteConfig,
+ VirtualHost: m.currentVirtualHost,
+ })
+}
+
+func (m *DependencyManager) applyRouteConfigUpdateLocked(update *xdsresource.RouteConfigUpdate) {
+ matchVH := xdsresource.FindBestMatchingVirtualHost(m.dataplaneAuthority, update.VirtualHosts)
+ if matchVH == nil {
+ m.watcher.Error(m.annotateErrorWithNodeID(fmt.Errorf("could not find VirtualHost for %q", m.dataplaneAuthority)))
+ return
+ }
+ m.currentRouteConfig = update
+ m.currentVirtualHost = matchVH
+ m.maybeSendUpdateLocked()
+}
+
+func (m *DependencyManager) onListenerResourceUpdate(update *xdsresource.ListenerUpdate, onDone func()) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ defer onDone()
+ if m.stopped {
+ return
+ }
+
+ if m.logger.V(2) {
+ m.logger.Infof("Received update for Listener resource %q: %+v", m.ldsResourceName, update)
+ }
+
+ m.currentListenerUpdate = update
+
+ if update.InlineRouteConfig != nil {
+ // If there was a previous route config watcher because of a non-inline
+ // route configuration, cancel it.
+ m.rdsResourceName = ""
+ if m.routeConfigWatcher != nil {
+ m.routeConfigWatcher.stop()
+ m.routeConfigWatcher = nil
+ }
+ m.applyRouteConfigUpdateLocked(update.InlineRouteConfig)
+ return
+ }
+
+ // We get here only if there was no inline route configuration. If the route
+ // config name has not changed, send an update with existing route
+ // configuration and the newly received listener configuration.
+ if m.rdsResourceName == update.RouteConfigName {
+ m.maybeSendUpdateLocked()
+ return
+ }
+
+ // If the route config name has changed, cancel the old watcher and start a
+ // new one. At this point, since the new route config name has not yet been
+ // resolved, no update is sent to the channel, and therefore the old route
+ // configuration (if received) is used until the new one is received.
+ m.rdsResourceName = update.RouteConfigName
+ if m.routeConfigWatcher != nil {
+ m.routeConfigWatcher.stop()
+ m.currentVirtualHost = nil
+ }
+ m.routeConfigWatcher = newRouteConfigWatcher(m.rdsResourceName, m)
+}
+
+func (m *DependencyManager) onListenerResourceError(err error, onDone func()) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ defer onDone()
+ if m.stopped {
+ return
+ }
+
+ m.logger.Warningf("Received resource error for Listener resource %q: %v", m.ldsResourceName, m.annotateErrorWithNodeID(err))
+
+ if m.routeConfigWatcher != nil {
+ m.routeConfigWatcher.stop()
+ }
+ m.rdsResourceName = ""
+ m.currentVirtualHost = nil
+ m.routeConfigWatcher = nil
+ m.watcher.Error(fmt.Errorf("listener resource error: %v", m.annotateErrorWithNodeID(err)))
+}
+
+// onListenerResourceAmbientError handles ambient errors received from the
+// listener resource watcher. Since ambient errors do not impact the current
+// state of the resource, no change is made to the current configuration and the
+// errors are only logged for visibility.
+func (m *DependencyManager) onListenerResourceAmbientError(err error, onDone func()) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ defer onDone()
+ if m.stopped {
+ return
+ }
+
+ m.logger.Warningf("Listener resource ambient error: %v", m.annotateErrorWithNodeID(err))
+}
+
+func (m *DependencyManager) onRouteConfigResourceUpdate(resourceName string, update *xdsresource.RouteConfigUpdate, onDone func()) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ defer onDone()
+ if m.stopped || m.rdsResourceName != resourceName {
+ return
+ }
+
+ if m.logger.V(2) {
+ m.logger.Infof("Received update for RouteConfiguration resource %q: %+v", resourceName, update)
+ }
+ m.applyRouteConfigUpdateLocked(update)
+}
+
+func (m *DependencyManager) onRouteConfigResourceError(resourceName string, err error, onDone func()) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ defer onDone()
+ if m.stopped || m.rdsResourceName != resourceName {
+ return
+ }
+ m.logger.Warningf("Received resource error for RouteConfiguration resource %q: %v", resourceName, m.annotateErrorWithNodeID(err))
+ m.watcher.Error(fmt.Errorf("route resource error: %v", m.annotateErrorWithNodeID(err)))
+}
+
+// onRouteResourceAmbientError handles ambient errors received from the route
+// resource watcher. Since ambient errors do not impact the current state of the
+// resource, no change is made to the current configuration and the errors are
+// only logged for visibility.
+func (m *DependencyManager) onRouteConfigResourceAmbientError(resourceName string, err error, onDone func()) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ defer onDone()
+ if m.stopped || m.rdsResourceName != resourceName {
+ return
+ }
+
+ m.logger.Warningf("Route resource ambient error %q: %v", resourceName, m.annotateErrorWithNodeID(err))
+}
--- /dev/null
+/*
+ *
+ * Copyright 2025 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package xdsdepmgr_test
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/envoyproxy/go-control-plane/pkg/wellknown"
+ "github.com/google/go-cmp/cmp"
+ "github.com/google/go-cmp/cmp/cmpopts"
+ "github.com/google/uuid"
+ "google.golang.org/grpc/internal/grpctest"
+ "google.golang.org/grpc/internal/testutils"
+ "google.golang.org/grpc/internal/testutils/xds/e2e"
+ "google.golang.org/grpc/internal/xds/bootstrap"
+ "google.golang.org/grpc/internal/xds/xdsclient"
+ "google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
+ "google.golang.org/grpc/internal/xds/xdsdepmgr"
+
+ v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
+ v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
+ v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
+ v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
+
+ _ "google.golang.org/grpc/internal/xds/httpfilter/router" // Register the router filter
+)
+
+type s struct {
+ grpctest.Tester
+}
+
+func Test(t *testing.T) {
+ grpctest.RunSubTests(t, s{})
+}
+
+const (
+ defaultTestTimeout = 10 * time.Second
+ defaultTestShortTimeout = 100 * time.Microsecond
+
+ defaultTestServiceName = "service-name"
+ defaultTestRouteConfigName = "route-config-name"
+ defaultTestClusterName = "cluster-name"
+)
+
+func newStringP(s string) *string {
+ return &s
+}
+
+// testWatcher is an implementation of the ConfigWatcher interface that sends
+// the updates and errors received from the dependency manager to respective
+// channels, for the tests to verify.
+type testWatcher struct {
+ updateCh chan *xdsresource.XDSConfig
+ errorCh chan error
+}
+
+// Update sends the received XDSConfig update to the update channel.
+func (w *testWatcher) Update(cfg *xdsresource.XDSConfig) {
+ w.updateCh <- cfg
+}
+
+// Error sends the received error to the error channel.
+func (w *testWatcher) Error(err error) {
+ w.errorCh <- err
+}
+
+func verifyError(ctx context.Context, errCh chan error, wantErr, wantNodeID string) error {
+ select {
+ case gotErr := <-errCh:
+ if gotErr == nil {
+ return fmt.Errorf("got nil error from resolver, want error %q", wantErr)
+ }
+ if !strings.Contains(gotErr.Error(), wantErr) {
+ return fmt.Errorf("got error from resolver %q, want %q", gotErr, wantErr)
+ }
+ if !strings.Contains(gotErr.Error(), wantNodeID) {
+ return fmt.Errorf("got error from resolver %q, want nodeID %q", gotErr, wantNodeID)
+ }
+ case <-ctx.Done():
+ return fmt.Errorf("timeout waiting for error from dependency manager")
+ }
+ return nil
+}
+
+func verifyXDSConfig(ctx context.Context, xdsCh chan *xdsresource.XDSConfig, errCh chan error, want *xdsresource.XDSConfig) error {
+ select {
+ case <-ctx.Done():
+ return fmt.Errorf("timeout waiting for update from dependency manager")
+ case update := <-xdsCh:
+ cmpOpts := []cmp.Option{
+ cmpopts.EquateEmpty(),
+ cmpopts.IgnoreFields(xdsresource.HTTPFilter{}, "Filter", "Config"),
+ cmpopts.IgnoreFields(xdsresource.ListenerUpdate{}, "Raw"),
+ cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw"),
+ }
+ if diff := cmp.Diff(update, want, cmpOpts...); diff != "" {
+ return fmt.Errorf("received unexpected update from dependency manager. Diff (-got +want):\n%v", diff)
+ }
+ case err := <-errCh:
+ return fmt.Errorf("received unexpected error from dependency manager: %v", err)
+ }
+ return nil
+}
+
+func createXDSClient(t *testing.T, bootstrapContents []byte) xdsclient.XDSClient {
+ t.Helper()
+
+ config, err := bootstrap.NewConfigFromContents(bootstrapContents)
+ if err != nil {
+ t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err)
+ }
+
+ pool := xdsclient.NewPool(config)
+ if err != nil {
+ t.Fatalf("Failed to create an xDS client pool: %v", err)
+ }
+ c, cancel, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{Name: t.Name()})
+ if err != nil {
+ t.Fatalf("Failed to create an xDS client: %v", err)
+ }
+ t.Cleanup(cancel)
+ return c
+}
+
+// Spins up an xDS management server and sets up the xDS bootstrap configuration.
+//
+// Returns the following:
+// - A reference to the xDS management server
+// - Contents of the bootstrap configuration pointing to xDS management
+// server
+func setupManagementServerForTest(t *testing.T, nodeID string) (*e2e.ManagementServer, []byte) {
+ t.Helper()
+
+ mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
+ t.Cleanup(mgmtServer.Stop)
+
+ bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
+ return mgmtServer, bootstrapContents
+}
+
+// Tests the happy case where the dependency manager receives all the required
+// resources and verifies that Update is called with with the correct XDSConfig.
+func (s) TestHappyCase(t *testing.T) {
+ nodeID := uuid.New().String()
+ mgmtServer, bc := setupManagementServerForTest(t, nodeID)
+ xdsClient := createXDSClient(t, bc)
+
+ watcher := &testWatcher{
+ updateCh: make(chan *xdsresource.XDSConfig),
+ errorCh: make(chan error),
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+
+ listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)
+ route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)
+ resources := e2e.UpdateOptions{
+ NodeID: nodeID,
+ Listeners: []*v3listenerpb.Listener{listener},
+ Routes: []*v3routepb.RouteConfiguration{route},
+ SkipValidation: true,
+ }
+ if err := mgmtServer.Update(ctx, resources); err != nil {
+ t.Fatal(err)
+ }
+
+ dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher)
+ defer dm.Close()
+ wantXdsConfig := &xdsresource.XDSConfig{
+ Listener: &xdsresource.ListenerUpdate{
+ RouteConfigName: defaultTestRouteConfigName,
+ HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
+ },
+ RouteConfig: &xdsresource.RouteConfigUpdate{
+ VirtualHosts: []*xdsresource.VirtualHost{
+ {
+ Domains: []string{defaultTestServiceName},
+ Routes: []*xdsresource.Route{{
+ Prefix: newStringP("/"),
+ WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+ ActionType: xdsresource.RouteActionRoute,
+ }},
+ },
+ },
+ },
+ VirtualHost: &xdsresource.VirtualHost{
+ Domains: []string{defaultTestServiceName},
+ Routes: []*xdsresource.Route{{
+ Prefix: newStringP("/"),
+ WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+ ActionType: xdsresource.RouteActionRoute},
+ },
+ },
+ }
+ if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// Tests the case where the listener contains an inline route configuration and
+// verifies that Update is called with the correct XDSConfig.
+func (s) TestInlineRouteConfig(t *testing.T) {
+ nodeID := uuid.New().String()
+ mgmtServer, bc := setupManagementServerForTest(t, nodeID)
+ xdsClient := createXDSClient(t, bc)
+
+ watcher := &testWatcher{
+ updateCh: make(chan *xdsresource.XDSConfig),
+ errorCh: make(chan error),
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+
+ hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
+ RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
+ RouteConfig: e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName),
+ },
+ HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})}, // router fields are unused by grpc
+ })
+ listener := &v3listenerpb.Listener{
+ Name: defaultTestServiceName,
+ ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm},
+ FilterChains: []*v3listenerpb.FilterChain{{
+ Name: "filter-chain-name",
+ Filters: []*v3listenerpb.Filter{{
+ Name: wellknown.HTTPConnectionManager,
+ ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm},
+ }},
+ }},
+ }
+ resources := e2e.UpdateOptions{
+ NodeID: nodeID,
+ Listeners: []*v3listenerpb.Listener{listener},
+ SkipValidation: true,
+ }
+ if err := mgmtServer.Update(ctx, resources); err != nil {
+ t.Fatal(err)
+ }
+ dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher)
+ defer dm.Close()
+
+ wantConfig := &xdsresource.XDSConfig{
+ Listener: &xdsresource.ListenerUpdate{
+ InlineRouteConfig: &xdsresource.RouteConfigUpdate{
+ VirtualHosts: []*xdsresource.VirtualHost{
+ {
+ Domains: []string{defaultTestServiceName},
+ Routes: []*xdsresource.Route{{
+ Prefix: newStringP("/"),
+ WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+ ActionType: xdsresource.RouteActionRoute},
+ },
+ },
+ },
+ },
+ HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
+ },
+ RouteConfig: &xdsresource.RouteConfigUpdate{
+ VirtualHosts: []*xdsresource.VirtualHost{
+ {
+ Domains: []string{defaultTestServiceName},
+ Routes: []*xdsresource.Route{{
+ Prefix: newStringP("/"),
+ WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+ ActionType: xdsresource.RouteActionRoute},
+ },
+ },
+ },
+ },
+ VirtualHost: &xdsresource.VirtualHost{
+ Domains: []string{defaultTestServiceName},
+ Routes: []*xdsresource.Route{{
+ Prefix: newStringP("/"),
+ WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+ ActionType: xdsresource.RouteActionRoute},
+ },
+ },
+ }
+ if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantConfig); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// Tests the case where dependency manager only receives listener resource but
+// does not receive route config resource. Verfies that Update is not called
+// since we do not have all resources.
+func (s) TestIncompleteResources(t *testing.T) {
+ nodeID := uuid.New().String()
+ mgmtServer, bc := setupManagementServerForTest(t, nodeID)
+ xdsClient := createXDSClient(t, bc)
+
+ watcher := &testWatcher{
+ updateCh: make(chan *xdsresource.XDSConfig),
+ errorCh: make(chan error),
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+
+ listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)
+ if err := mgmtServer.Update(ctx, e2e.UpdateOptions{
+ NodeID: nodeID,
+ Listeners: []*v3listenerpb.Listener{listener},
+ SkipValidation: true,
+ }); err != nil {
+ t.Fatal(err)
+ }
+ dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher)
+ defer dm.Close()
+
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ select {
+ case <-sCtx.Done():
+ case update := <-watcher.updateCh:
+ t.Fatalf("Received unexpected update from dependency manager: %+v", update)
+ case err := <-watcher.errorCh:
+ t.Fatalf("Received unexpected error from dependency manager: %v", err)
+ }
+}
+
+// Tests the case where dependency manager receives a listener resource error by
+// sending the correct update first and then removing the listener resource. It
+// verifies that Error is called with the correct error.
+func (s) TestListenerResourceError(t *testing.T) {
+ nodeID := uuid.New().String()
+ mgmtServer, bc := setupManagementServerForTest(t, nodeID)
+ xdsClient := createXDSClient(t, bc)
+
+ watcher := &testWatcher{
+ updateCh: make(chan *xdsresource.XDSConfig),
+ errorCh: make(chan error),
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+
+ // Send a correct update first
+ listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)
+ listener.FilterChains = nil
+ route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)
+ resources := e2e.UpdateOptions{
+ NodeID: nodeID,
+ Listeners: []*v3listenerpb.Listener{listener},
+ Routes: []*v3routepb.RouteConfiguration{route},
+ SkipValidation: true,
+ }
+ if err := mgmtServer.Update(ctx, resources); err != nil {
+ t.Fatal(err)
+ }
+
+ dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher)
+ defer dm.Close()
+
+ wantXdsConfig := &xdsresource.XDSConfig{
+ Listener: &xdsresource.ListenerUpdate{
+ RouteConfigName: defaultTestRouteConfigName,
+ HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
+ },
+ RouteConfig: &xdsresource.RouteConfigUpdate{
+ VirtualHosts: []*xdsresource.VirtualHost{
+ {
+ Domains: []string{defaultTestServiceName},
+ Routes: []*xdsresource.Route{{
+ Prefix: newStringP("/"),
+ WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+ ActionType: xdsresource.RouteActionRoute,
+ }},
+ },
+ },
+ },
+ VirtualHost: &xdsresource.VirtualHost{
+ Domains: []string{defaultTestServiceName},
+ Routes: []*xdsresource.Route{{
+ Prefix: newStringP("/"),
+ WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+ ActionType: xdsresource.RouteActionRoute,
+ }},
+ },
+ }
+ if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil {
+ t.Fatal(err)
+ }
+
+ // Remove listener resource so that we get listener resource error.
+ resources.Listeners = nil
+ if err := mgmtServer.Update(ctx, resources); err != nil {
+ t.Fatal(err)
+ }
+
+ if err := verifyError(ctx, watcher.errorCh, fmt.Sprintf("xds: resource %q of type %q has been removed", defaultTestServiceName, "ListenerResource"), nodeID); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// Tests the case where dependency manager receives a route config resource
+// error by sending a route resource that is NACKed by the XDSClient. It
+// verifies that Error is called with correct error.
+func (s) TestRouteResourceError(t *testing.T) {
+ nodeID := uuid.New().String()
+ mgmtServer, bc := setupManagementServerForTest(t, nodeID)
+ xdsClient := createXDSClient(t, bc)
+
+ errorCh := make(chan error, 1)
+ watcher := &testWatcher{
+ updateCh: make(chan *xdsresource.XDSConfig),
+ errorCh: errorCh,
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+
+ listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)
+ route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)
+ // Remove the Match to make sure the route resource is NACKed by XDSClient
+ // sending a route resource error to dependency manager.
+ route.VirtualHosts[0].Routes[0].Match = nil
+ resources := e2e.UpdateOptions{
+ NodeID: nodeID,
+ Listeners: []*v3listenerpb.Listener{listener},
+ Routes: []*v3routepb.RouteConfiguration{route},
+ SkipValidation: true,
+ }
+ if err := mgmtServer.Update(ctx, resources); err != nil {
+ t.Fatal(err)
+ }
+
+ dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher)
+ defer dm.Close()
+
+ if err := verifyError(ctx, watcher.errorCh, "route resource error", nodeID); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// Tests the case where route config updates receives does not have any virtual
+// host. Verifies that Error is called with correct error.
+func (s) TestNoVirtualHost(t *testing.T) {
+ nodeID := uuid.New().String()
+ mgmtServer, bc := setupManagementServerForTest(t, nodeID)
+ xdsClient := createXDSClient(t, bc)
+
+ watcher := &testWatcher{
+ updateCh: make(chan *xdsresource.XDSConfig),
+ errorCh: make(chan error),
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+
+ listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)
+ route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)
+ route.VirtualHosts = nil
+ resources := e2e.UpdateOptions{
+ NodeID: nodeID,
+ Listeners: []*v3listenerpb.Listener{listener},
+ Routes: []*v3routepb.RouteConfiguration{route},
+ SkipValidation: true,
+ }
+ if err := mgmtServer.Update(ctx, resources); err != nil {
+ t.Fatal(err)
+ }
+ dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher)
+ defer dm.Close()
+
+ if err := verifyError(ctx, watcher.errorCh, "could not find VirtualHost", nodeID); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// Tests the case where we get an ambient error and verify that we correctly log
+// a warning for it. To make sure we get an ambient error, we send a correct
+// update first, then send an invalid one and then send the valid resource
+// again. We send the valid resource again so that we can be sure the ambient
+// error reaches the dependency manager since there is no other way to wait for
+// it.
+func (s) TestAmbientError(t *testing.T) {
+ // Expect a warning log for the ambient error.
+ grpctest.ExpectWarning("Listener resource ambient error")
+
+ nodeID := uuid.New().String()
+ mgmtServer, bc := setupManagementServerForTest(t, nodeID)
+ xdsClient := createXDSClient(t, bc)
+
+ watcher := &testWatcher{
+ updateCh: make(chan *xdsresource.XDSConfig),
+ errorCh: make(chan error),
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+
+ // Configure a valid listener and route.
+ listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)
+ route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)
+ resources := e2e.UpdateOptions{
+ NodeID: nodeID,
+ Listeners: []*v3listenerpb.Listener{listener},
+ Routes: []*v3routepb.RouteConfiguration{route},
+ SkipValidation: true,
+ }
+ if err := mgmtServer.Update(ctx, resources); err != nil {
+ t.Fatal(err)
+ }
+
+ dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher)
+ defer dm.Close()
+
+ // Wait for the initial valid update.
+ wantXdsConfig := &xdsresource.XDSConfig{
+ Listener: &xdsresource.ListenerUpdate{
+ RouteConfigName: defaultTestRouteConfigName,
+ HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
+ },
+ RouteConfig: &xdsresource.RouteConfigUpdate{
+ VirtualHosts: []*xdsresource.VirtualHost{
+ {
+ Domains: []string{defaultTestServiceName},
+ Routes: []*xdsresource.Route{{
+ Prefix: newStringP("/"),
+ WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+ ActionType: xdsresource.RouteActionRoute,
+ }},
+ },
+ },
+ },
+ VirtualHost: &xdsresource.VirtualHost{
+ Domains: []string{defaultTestServiceName},
+ Routes: []*xdsresource.Route{{
+ Prefix: newStringP("/"),
+ WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+ ActionType: xdsresource.RouteActionRoute,
+ }},
+ },
+ }
+ if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil {
+ t.Fatal(err)
+ }
+
+ // Configure a listener resource that is expected to be NACKed because it
+ // does not contain the `RouteSpecifier` field in the HTTPConnectionManager.
+ // Since a valid one is already cached, this should result in an ambient
+ // error.
+ hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
+ HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})},
+ })
+ lis := &v3listenerpb.Listener{
+ Name: defaultTestServiceName,
+ ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm},
+ FilterChains: []*v3listenerpb.FilterChain{{
+ Name: "filter-chain-name",
+ Filters: []*v3listenerpb.Filter{{
+ Name: wellknown.HTTPConnectionManager,
+ ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm},
+ }},
+ }},
+ }
+ resources.Listeners = []*v3listenerpb.Listener{lis}
+ if err := mgmtServer.Update(ctx, resources); err != nil {
+ t.Fatal(err)
+ }
+
+ // We expect no call to Error or Update on our watcher. We just wait for
+ // a short duration to ensure that.
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ select {
+ case err := <-watcher.errorCh:
+ t.Fatalf("Unexpected call to Error %v", err)
+ case update := <-watcher.updateCh:
+ t.Fatalf("Unexpected call to Update %+v", update)
+ case <-sCtx.Done():
+ }
+
+ // Send valid resources again.
+ listener = e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)
+ route = e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)
+ resources = e2e.UpdateOptions{
+ NodeID: nodeID,
+ Listeners: []*v3listenerpb.Listener{listener},
+ Routes: []*v3routepb.RouteConfiguration{route},
+ SkipValidation: true,
+ }
+ if err := mgmtServer.Update(ctx, resources); err != nil {
+ t.Fatal(err)
+ }
+ if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// Tests the case where the cluster name changes in the route resource update
+// and verify that each time Update is called with correct cluster name.
+func (s) TestRouteResourceUpdate(t *testing.T) {
+ nodeID := uuid.New().String()
+ mgmtServer, bc := setupManagementServerForTest(t, nodeID)
+ xdsClient := createXDSClient(t, bc)
+
+ watcher := &testWatcher{
+ updateCh: make(chan *xdsresource.XDSConfig),
+ errorCh: make(chan error),
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+
+ // Initial resources with defaultTestClusterName
+ listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)
+ route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)
+ resources := e2e.UpdateOptions{
+ NodeID: nodeID,
+ Listeners: []*v3listenerpb.Listener{listener},
+ Routes: []*v3routepb.RouteConfiguration{route},
+ SkipValidation: true,
+ }
+ if err := mgmtServer.Update(ctx, resources); err != nil {
+ t.Fatal(err)
+ }
+
+ dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher)
+ defer dm.Close()
+
+ // Wait for the first update.
+ wantXdsConfig := &xdsresource.XDSConfig{
+ Listener: &xdsresource.ListenerUpdate{
+ RouteConfigName: defaultTestRouteConfigName,
+ HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
+ },
+ RouteConfig: &xdsresource.RouteConfigUpdate{
+ VirtualHosts: []*xdsresource.VirtualHost{
+ {
+ Domains: []string{defaultTestServiceName},
+ Routes: []*xdsresource.Route{{
+ Prefix: newStringP("/"),
+ WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+ ActionType: xdsresource.RouteActionRoute,
+ }},
+ },
+ },
+ },
+ VirtualHost: &xdsresource.VirtualHost{
+ Domains: []string{defaultTestServiceName},
+ Routes: []*xdsresource.Route{{
+ Prefix: newStringP("/"),
+ WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+ ActionType: xdsresource.RouteActionRoute,
+ }},
+ },
+ }
+ if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil {
+ t.Fatal(err)
+ }
+
+ // Update route to point to a new cluster.
+ newClusterName := "new-cluster-name"
+ route2 := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, newClusterName)
+ resources.Routes = []*v3routepb.RouteConfiguration{route2}
+ if err := mgmtServer.Update(ctx, resources); err != nil {
+ t.Fatal(err)
+ }
+
+ // Wait for the second update and verify it has the new cluster.
+ wantXdsConfig.RouteConfig.VirtualHosts[0].Routes[0].WeightedClusters[0].Name = newClusterName
+ wantXdsConfig.VirtualHost.Routes[0].WeightedClusters[0].Name = newClusterName
+ if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// Tests the case where the route resource is first sent from the management
+// server and the changed to be inline with the listener and then again changed
+// to be received from the management server. It verifies that each time Update
+// called with the correct XDSConfig.
+func (s) TestRouteResourceChangeToInline(t *testing.T) {
+ nodeID := uuid.New().String()
+ mgmtServer, bc := setupManagementServerForTest(t, nodeID)
+ defer mgmtServer.Stop()
+ xdsClient := createXDSClient(t, bc)
+
+ watcher := &testWatcher{
+ updateCh: make(chan *xdsresource.XDSConfig),
+ errorCh: make(chan error),
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+
+ // Initial resources with defaultTestClusterName
+ listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)
+ route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)
+ resources := e2e.UpdateOptions{
+ NodeID: nodeID,
+ Listeners: []*v3listenerpb.Listener{listener},
+ Routes: []*v3routepb.RouteConfiguration{route},
+ SkipValidation: true,
+ }
+ if err := mgmtServer.Update(ctx, resources); err != nil {
+ t.Fatal(err)
+ }
+
+ dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher)
+ defer dm.Close()
+
+ // Wait for the first update.
+ wantXdsConfig := &xdsresource.XDSConfig{
+ Listener: &xdsresource.ListenerUpdate{
+ RouteConfigName: defaultTestRouteConfigName,
+ HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
+ },
+ RouteConfig: &xdsresource.RouteConfigUpdate{
+ VirtualHosts: []*xdsresource.VirtualHost{
+ {
+ Domains: []string{defaultTestServiceName},
+ Routes: []*xdsresource.Route{{
+ Prefix: newStringP("/"),
+ WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+ ActionType: xdsresource.RouteActionRoute,
+ }},
+ },
+ },
+ },
+ VirtualHost: &xdsresource.VirtualHost{
+ Domains: []string{defaultTestServiceName},
+ Routes: []*xdsresource.Route{{
+ Prefix: newStringP("/"),
+ WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+ ActionType: xdsresource.RouteActionRoute,
+ }},
+ },
+ }
+ if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil {
+ t.Fatal(err)
+ }
+
+ // Update route to point to a new cluster.
+ newClusterName := "new-cluster-name"
+ hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
+ RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
+ RouteConfig: e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, newClusterName),
+ },
+ HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})}, // router fields are unused by grpc
+ })
+ resources.Listeners[0].ApiListener.ApiListener = hcm
+ resources.Routes = nil
+ if err := mgmtServer.Update(ctx, resources); err != nil {
+ t.Fatal(err)
+ }
+
+ // Wait for the second update and verify it has the new cluster.
+ wantXdsConfig.Listener.InlineRouteConfig = &xdsresource.RouteConfigUpdate{
+ VirtualHosts: []*xdsresource.VirtualHost{
+ {
+ Domains: []string{defaultTestServiceName},
+ Routes: []*xdsresource.Route{{Prefix: newStringP("/"),
+ WeightedClusters: []xdsresource.WeightedCluster{{Name: newClusterName, Weight: 100}},
+ ActionType: xdsresource.RouteActionRoute,
+ }},
+ },
+ },
+ }
+ wantXdsConfig.Listener.RouteConfigName = ""
+ wantXdsConfig.RouteConfig.VirtualHosts[0].Routes[0].WeightedClusters[0].Name = newClusterName
+ wantXdsConfig.VirtualHost.Routes[0].WeightedClusters[0].Name = newClusterName
+ if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil {
+ t.Fatal(err)
+ }
+
+ // Change the route resource back to non-inline.
+ listener = e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)
+ route = e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)
+ resources = e2e.UpdateOptions{
+ NodeID: nodeID,
+ Listeners: []*v3listenerpb.Listener{listener},
+ Routes: []*v3routepb.RouteConfiguration{route},
+ SkipValidation: true,
+ }
+ if err := mgmtServer.Update(ctx, resources); err != nil {
+ t.Fatal(err)
+ }
+
+ // Wait for the third update and verify it has the original cluster.
+ wantXdsConfig.Listener.InlineRouteConfig = nil
+ wantXdsConfig.Listener.RouteConfigName = defaultTestRouteConfigName
+ wantXdsConfig.RouteConfig.VirtualHosts[0].Routes[0].WeightedClusters[0].Name = defaultTestClusterName
+ wantXdsConfig.VirtualHost.Routes[0].WeightedClusters[0].Name = defaultTestClusterName
+ if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil {
+ t.Fatal(err)
+ }
+}