]> git.feebdaed.xyz Git - 0xmirror/grpc-go.git/commitdiff
internal/xds: move the LDS and RDS watchers to dependency manager (#8651)
authoreshitachandwani <59800922+eshitachandwani@users.noreply.github.com>
Sun, 16 Nov 2025 09:24:58 +0000 (14:54 +0530)
committerGitHub <noreply@github.com>
Sun, 16 Nov 2025 09:24:58 +0000 (14:54 +0530)
This PR moves the LDS and RDS watchers to dependency manager without
changing the current functionality or behaviour. This is a part of
implementation of gRFC
[A74](https://github.com/grpc/proposal/blob/master/A74-xds-config-tears.md).

RELEASE NOTES: None

---------

Co-authored-by: Easwar Swaminathan <easwars@google.com>
internal/grpctest/tlogger.go
internal/xds/xdsclient/xdsresource/xdsconfig.go [new file with mode: 0644]
internal/xds/xdsdepmgr/watch_service.go [new file with mode: 0644]
internal/xds/xdsdepmgr/xds_dependency_manager.go [new file with mode: 0644]
internal/xds/xdsdepmgr/xds_dependency_manager_test.go [new file with mode: 0644]

index 492381758762bb1ceef186ce78278c1daf87152d..746aa9b089e227340515d57eb8b14fdc7b6a0065 100644 (file)
@@ -66,10 +66,10 @@ type tLogger struct {
        v           int
        initialized bool
 
-       mu     sync.Mutex // guards t, start, and errors
-       t      *testing.T
-       start  time.Time
-       errors map[*regexp.Regexp]int
+       mu    sync.Mutex
+       t     *testing.T
+       start time.Time
+       logs  map[logType]map[*regexp.Regexp]int
 }
 
 func init() {
@@ -87,7 +87,11 @@ func init() {
                }
        }
        // Initialize tLogr with the determined verbosity level.
-       tLogr = &tLogger{errors: make(map[*regexp.Regexp]int), v: vLevel}
+       logsMap := map[logType]map[*regexp.Regexp]int{
+               errorLog:   {},
+               warningLog: {},
+       }
+       tLogr = &tLogger{logs: logsMap, v: vLevel}
 }
 
 // getCallingPrefix returns the <file:line> at the given depth from the stack.
@@ -115,11 +119,14 @@ func (tl *tLogger) log(ltype logType, depth int, format string, args ...any) {
                switch ltype {
                case errorLog:
                        // fmt.Sprintln is used rather than fmt.Sprint because tl.Log uses fmt.Sprintln behavior.
-                       if tl.expected(fmt.Sprintln(args...)) {
+                       if tl.expected(fmt.Sprintln(args...), errorLog) {
                                tl.t.Log(args...)
                        } else {
                                tl.t.Error(args...)
                        }
+               case warningLog:
+                       tl.expected(fmt.Sprintln(args...), warningLog)
+                       tl.t.Log(args...)
                case fatalLog:
                        panic(fmt.Sprint(args...))
                default:
@@ -130,11 +137,14 @@ func (tl *tLogger) log(ltype logType, depth int, format string, args ...any) {
                format = "%v " + format + "%s"
                switch ltype {
                case errorLog:
-                       if tl.expected(fmt.Sprintf(format, args...)) {
+                       if tl.expected(fmt.Sprintf(format, args...), errorLog) {
                                tl.t.Logf(format, args...)
                        } else {
                                tl.t.Errorf(format, args...)
                        }
+               case warningLog:
+                       tl.expected(fmt.Sprintln(args...), warningLog)
+                       tl.t.Log(args...)
                case fatalLog:
                        panic(fmt.Sprintf(format, args...))
                default:
@@ -154,7 +164,8 @@ func (tl *tLogger) update(t *testing.T) {
        }
        tl.t = t
        tl.start = time.Now()
-       tl.errors = map[*regexp.Regexp]int{}
+       tl.logs[errorLog] = map[*regexp.Regexp]int{}
+       tl.logs[warningLog] = map[*regexp.Regexp]int{}
 }
 
 // ExpectError declares an error to be expected. For the next test, the first
@@ -163,11 +174,20 @@ func (tl *tLogger) update(t *testing.T) {
 // Update(). Note that if an expected error is not encountered, this will cause
 // the test to fail.
 func ExpectError(expr string) {
-       ExpectErrorN(expr, 1)
+       expectLogsN(expr, 1, errorLog)
 }
 
 // ExpectErrorN declares an error to be expected n times.
 func ExpectErrorN(expr string, n int) {
+       expectLogsN(expr, n, errorLog)
+}
+
+// ExpectWarning declares a warning to be expected.
+func ExpectWarning(expr string) {
+       expectLogsN(expr, 1, warningLog)
+}
+
+func expectLogsN(expr string, n int, logType logType) {
        tLogr.mu.Lock()
        defer tLogr.mu.Unlock()
        re, err := regexp.Compile(expr)
@@ -175,28 +195,35 @@ func ExpectErrorN(expr string, n int) {
                tLogr.t.Error(err)
                return
        }
-       tLogr.errors[re] += n
+       tLogr.logs[logType][re] += n
 }
 
 // endTest checks if expected errors were not encountered.
 func (tl *tLogger) endTest(t *testing.T) {
        tl.mu.Lock()
        defer tl.mu.Unlock()
-       for re, count := range tl.errors {
+       for re, count := range tl.logs[errorLog] {
                if count > 0 {
                        t.Errorf("Expected error '%v' not encountered", re.String())
                }
        }
-       tl.errors = map[*regexp.Regexp]int{}
+       for re, count := range tl.logs[warningLog] {
+               if count > 0 {
+                       t.Errorf("Expected warning '%v' not encountered", re.String())
+               }
+       }
+       tl.logs[errorLog] = map[*regexp.Regexp]int{}
+       tl.logs[warningLog] = map[*regexp.Regexp]int{}
 }
 
-// expected determines if the error string is protected or not.
-func (tl *tLogger) expected(s string) bool {
-       for re, count := range tl.errors {
+// expected determines if the log string of the particular type is protected or
+// not.
+func (tl *tLogger) expected(s string, logType logType) bool {
+       for re, count := range tl.logs[logType] {
                if re.FindStringIndex(s) != nil {
-                       tl.errors[re]--
+                       tl.logs[logType][re]--
                        if count <= 1 {
-                               delete(tl.errors, re)
+                               delete(tl.logs[logType], re)
                        }
                        return true
                }
diff --git a/internal/xds/xdsclient/xdsresource/xdsconfig.go b/internal/xds/xdsclient/xdsresource/xdsconfig.go
new file mode 100644 (file)
index 0000000..5c13714
--- /dev/null
@@ -0,0 +1,114 @@
+/*
+ *
+ * Copyright 2025 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package xdsresource
+
+import "google.golang.org/grpc/resolver"
+
+// XDSConfig holds the complete gRPC client-side xDS configuration containing
+// all necessary resources.
+type XDSConfig struct {
+       // Listener holds the listener configuration. It is guaranteed to be
+       // non-nil.
+       Listener *ListenerUpdate
+
+       // RouteConfig holds the route configuration. It will be populated even if
+       // the route configuration was inlined into the Listener resource. It is
+       // guaranteed to be non-nil.
+       RouteConfig *RouteConfigUpdate
+
+       // VirtualHost is selected from the route configuration whose domain field
+       // offers the best match against the provided dataplane authority. It is
+       // guaranteed to be non-nil.
+       VirtualHost *VirtualHost
+
+       // Clusters is a map from cluster name to its configuration.
+       Clusters map[string]*ClusterResult
+}
+
+// ClusterResult contains a cluster's configuration when a valid resource is
+// received from the management server. It contains an error when:
+//   - an invalid resource is received from the management server and
+//     a valid resource was not already present or
+//   - the cluster resource does not exist on the management server
+type ClusterResult struct {
+       Config ClusterConfig
+       Err    error
+}
+
+// ClusterConfig contains configuration for a single cluster.
+type ClusterConfig struct {
+       // Cluster configuration for the cluster. This field is always set to a
+       // non-nil value.
+       Cluster *ClusterUpdate
+       // EndpointConfig contains endpoint configuration for a leaf cluster. This
+       // field is only set for EDS and LOGICAL_DNS clusters.
+       EndpointConfig *EndpointConfig
+       // AggregateConfig contains configuration for an aggregate cluster. This
+       // field is only set for AGGREGATE clusters.
+       AggregateConfig *AggregateConfig
+}
+
+// AggregateConfig holds the configuration for an aggregate cluster.
+type AggregateConfig struct {
+       // LeafClusters contains a prioritized list of names of the leaf clusters
+       // for the cluster.
+       LeafClusters []string
+}
+
+// EndpointConfig contains configuration corresponding to the endpoints in a
+// cluster. Only one of EDSUpdate or DNSEndpoints will be populated based on the
+// cluster type.
+type EndpointConfig struct {
+       // Endpoint configurartion for the EDS clusters.
+       EDSUpdate *EndpointsUpdate
+       // Endpoint configuration for the LOGICAL_DNS clusters.
+       DNSEndpoints *DNSUpdate
+       // ResolutionNote stores error encountered while obtaining endpoints data
+       // for the cluster. It will contain a nil value when a valid endpoint data is
+       // received. It contains an error when:
+       //   - an invalid resource is received from the management server or
+       //   - the endpoint resource does not exist on the management server
+       ResolutionNote error
+}
+
+// DNSUpdate represents the result of a DNS resolution, containing a list of
+// discovered endpoints.
+type DNSUpdate struct {
+       // Endpoints is the complete list of endpoints returned by the DNS resolver.
+       Endpoints []resolver.Endpoint
+}
+
+// xdsConfigkey is the type used as the key to store XDSConfig in the Attributes
+// field of resolver.State.
+type xdsConfigkey struct{}
+
+// SetXDSConfig returns a copy of state in which the Attributes field is updated
+// with the XDSConfig.
+func SetXDSConfig(state resolver.State, config *XDSConfig) resolver.State {
+       state.Attributes = state.Attributes.WithValue(xdsConfigkey{}, config)
+       return state
+}
+
+// XDSConfigFromResolverState returns XDSConfig stored as an attribute in the
+// resolver state.
+func XDSConfigFromResolverState(state resolver.State) *XDSConfig {
+       if v := state.Attributes.Value(xdsConfigkey{}); v != nil {
+               return v.(*XDSConfig)
+       }
+       return nil
+}
diff --git a/internal/xds/xdsdepmgr/watch_service.go b/internal/xds/xdsdepmgr/watch_service.go
new file mode 100644 (file)
index 0000000..73f1b70
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ *
+ * Copyright 2025 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package xdsdepmgr
+
+import "google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
+
+type listenerWatcher struct {
+       resourceName string
+       cancel       func()
+       depMgr       *DependencyManager
+}
+
+func newListenerWatcher(resourceName string, depMgr *DependencyManager) *listenerWatcher {
+       lw := &listenerWatcher{resourceName: resourceName, depMgr: depMgr}
+       lw.cancel = xdsresource.WatchListener(depMgr.xdsClient, resourceName, lw)
+       return lw
+}
+
+func (l *listenerWatcher) ResourceChanged(update *xdsresource.ListenerUpdate, onDone func()) {
+       l.depMgr.onListenerResourceUpdate(update, onDone)
+}
+
+func (l *listenerWatcher) ResourceError(err error, onDone func()) {
+       l.depMgr.onListenerResourceError(err, onDone)
+}
+
+func (l *listenerWatcher) AmbientError(err error, onDone func()) {
+       l.depMgr.onListenerResourceAmbientError(err, onDone)
+}
+
+func (l *listenerWatcher) stop() {
+       l.cancel()
+       if l.depMgr.logger.V(2) {
+               l.depMgr.logger.Infof("Canceling watch on Listener resource %q", l.resourceName)
+       }
+}
+
+type routeConfigWatcher struct {
+       resourceName string
+       cancel       func()
+       depMgr       *DependencyManager
+}
+
+func newRouteConfigWatcher(resourceName string, depMgr *DependencyManager) *routeConfigWatcher {
+       rw := &routeConfigWatcher{resourceName: resourceName, depMgr: depMgr}
+       rw.cancel = xdsresource.WatchRouteConfig(depMgr.xdsClient, resourceName, rw)
+       return rw
+}
+
+func (r *routeConfigWatcher) ResourceChanged(u *xdsresource.RouteConfigUpdate, onDone func()) {
+       r.depMgr.onRouteConfigResourceUpdate(r.resourceName, u, onDone)
+}
+
+func (r *routeConfigWatcher) ResourceError(err error, onDone func()) {
+       r.depMgr.onRouteConfigResourceError(r.resourceName, err, onDone)
+}
+
+func (r *routeConfigWatcher) AmbientError(err error, onDone func()) {
+       r.depMgr.onRouteConfigResourceAmbientError(r.resourceName, err, onDone)
+}
+
+func (r *routeConfigWatcher) stop() {
+       r.cancel()
+       if r.depMgr.logger.V(2) {
+               r.depMgr.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName)
+       }
+}
diff --git a/internal/xds/xdsdepmgr/xds_dependency_manager.go b/internal/xds/xdsdepmgr/xds_dependency_manager.go
new file mode 100644 (file)
index 0000000..fae5ca9
--- /dev/null
@@ -0,0 +1,279 @@
+/*
+ * Copyright 2025 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package xdsdepmgr provides the implementation of the xDS dependency manager
+// that manages all the xDS watches and resources as described in gRFC A74.
+package xdsdepmgr
+
+import (
+       "fmt"
+       "sync"
+
+       "google.golang.org/grpc/grpclog"
+       internalgrpclog "google.golang.org/grpc/internal/grpclog"
+       "google.golang.org/grpc/internal/xds/xdsclient"
+       "google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
+)
+
+const prefix = "[xdsdepmgr %p] "
+
+var logger = grpclog.Component("xds")
+
+func prefixLogger(p *DependencyManager) *internalgrpclog.PrefixLogger {
+       return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p))
+}
+
+// ConfigWatcher is the interface for consumers of aggregated xDS configuration
+// from the DependencyManager. The only consumer of this configuration is
+// currently the xDS resolver.
+type ConfigWatcher interface {
+       // Update is invoked when a new, validated xDS configuration is available.
+       //
+       // Implementations must treat the received config as read-only and should
+       // not modify it.
+       Update(*xdsresource.XDSConfig)
+
+       // Error is invoked when an error is received from the listener or route
+       // resource watcher. This includes cases where:
+       //  - The listener or route resource watcher reports a resource error.
+       //  - The received listener resource is a socket listener, not an API
+       //    listener. TODO(i/8114): Implement this check.
+       //  - The received route configuration does not contain a virtual host
+       //    matching the channel's default authority.
+       Error(error)
+}
+
+// DependencyManager registers watches on the xDS client for all required xDS
+// resources, resolves dependencies between them, and returns a complete
+// configuration to the xDS resolver.
+type DependencyManager struct {
+       // The following fields are initialized at creation time and are read-only
+       // after that.
+       logger             *internalgrpclog.PrefixLogger
+       watcher            ConfigWatcher
+       xdsClient          xdsclient.XDSClient
+       ldsResourceName    string
+       dataplaneAuthority string
+       nodeID             string
+
+       // All the fields below are protected by mu.
+       mu      sync.Mutex
+       stopped bool
+
+       listenerWatcher       *listenerWatcher
+       currentListenerUpdate *xdsresource.ListenerUpdate
+       routeConfigWatcher    *routeConfigWatcher
+       rdsResourceName       string
+       currentRouteConfig    *xdsresource.RouteConfigUpdate
+       currentVirtualHost    *xdsresource.VirtualHost
+}
+
+// New creates a new DependencyManager.
+//
+//   - listenerName is the name of the Listener resource to request from the
+//     management server.
+//   - dataplaneAuthority is used to select the best matching virtual host from
+//     the route configuration received from the management server.
+//   - xdsClient is the xDS client to use to register resource watches.
+//   - watcher is the ConfigWatcher interface that will receive the aggregated
+//     XDSConfig updates and errors.
+func New(listenerName, dataplaneAuthority string, xdsClient xdsclient.XDSClient, watcher ConfigWatcher) *DependencyManager {
+       dm := &DependencyManager{
+               ldsResourceName:    listenerName,
+               dataplaneAuthority: dataplaneAuthority,
+               xdsClient:          xdsClient,
+               watcher:            watcher,
+               nodeID:             xdsClient.BootstrapConfig().Node().GetId(),
+       }
+       dm.logger = prefixLogger(dm)
+
+       // Start the listener watch. Listener watch will start the other resource
+       // watches as needed.
+       dm.listenerWatcher = newListenerWatcher(listenerName, dm)
+       return dm
+}
+
+// Close cancels all registered resource watches.
+func (m *DependencyManager) Close() {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       if m.stopped {
+               return
+       }
+
+       m.stopped = true
+       if m.listenerWatcher != nil {
+               m.listenerWatcher.stop()
+       }
+       if m.routeConfigWatcher != nil {
+               m.routeConfigWatcher.stop()
+       }
+}
+
+// annotateErrorWithNodeID annotates the given error with the provided xDS node
+// ID.
+func (m *DependencyManager) annotateErrorWithNodeID(err error) error {
+       return fmt.Errorf("[xDS node id: %v]: %v", m.nodeID, err)
+}
+
+// maybeSendUpdateLocked checks that all the resources have been received and sends
+// the current aggregated xDS configuration to the watcher if all the updates
+// are available.
+func (m *DependencyManager) maybeSendUpdateLocked() {
+       m.watcher.Update(&xdsresource.XDSConfig{
+               Listener:    m.currentListenerUpdate,
+               RouteConfig: m.currentRouteConfig,
+               VirtualHost: m.currentVirtualHost,
+       })
+}
+
+func (m *DependencyManager) applyRouteConfigUpdateLocked(update *xdsresource.RouteConfigUpdate) {
+       matchVH := xdsresource.FindBestMatchingVirtualHost(m.dataplaneAuthority, update.VirtualHosts)
+       if matchVH == nil {
+               m.watcher.Error(m.annotateErrorWithNodeID(fmt.Errorf("could not find VirtualHost for %q", m.dataplaneAuthority)))
+               return
+       }
+       m.currentRouteConfig = update
+       m.currentVirtualHost = matchVH
+       m.maybeSendUpdateLocked()
+}
+
+func (m *DependencyManager) onListenerResourceUpdate(update *xdsresource.ListenerUpdate, onDone func()) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       defer onDone()
+       if m.stopped {
+               return
+       }
+
+       if m.logger.V(2) {
+               m.logger.Infof("Received update for Listener resource %q: %+v", m.ldsResourceName, update)
+       }
+
+       m.currentListenerUpdate = update
+
+       if update.InlineRouteConfig != nil {
+               // If there was a previous route config watcher because of a non-inline
+               // route configuration, cancel it.
+               m.rdsResourceName = ""
+               if m.routeConfigWatcher != nil {
+                       m.routeConfigWatcher.stop()
+                       m.routeConfigWatcher = nil
+               }
+               m.applyRouteConfigUpdateLocked(update.InlineRouteConfig)
+               return
+       }
+
+       // We get here only if there was no inline route configuration. If the route
+       // config name has not changed, send an update with existing route
+       // configuration and the newly received listener configuration.
+       if m.rdsResourceName == update.RouteConfigName {
+               m.maybeSendUpdateLocked()
+               return
+       }
+
+       // If the route config name has changed, cancel the old watcher and start a
+       // new one. At this point, since the new route config name has not yet been
+       // resolved, no update is sent to the channel, and therefore the old route
+       // configuration (if received) is used until the new one is received.
+       m.rdsResourceName = update.RouteConfigName
+       if m.routeConfigWatcher != nil {
+               m.routeConfigWatcher.stop()
+               m.currentVirtualHost = nil
+       }
+       m.routeConfigWatcher = newRouteConfigWatcher(m.rdsResourceName, m)
+}
+
+func (m *DependencyManager) onListenerResourceError(err error, onDone func()) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       defer onDone()
+       if m.stopped {
+               return
+       }
+
+       m.logger.Warningf("Received resource error for Listener resource %q: %v", m.ldsResourceName, m.annotateErrorWithNodeID(err))
+
+       if m.routeConfigWatcher != nil {
+               m.routeConfigWatcher.stop()
+       }
+       m.rdsResourceName = ""
+       m.currentVirtualHost = nil
+       m.routeConfigWatcher = nil
+       m.watcher.Error(fmt.Errorf("listener resource error: %v", m.annotateErrorWithNodeID(err)))
+}
+
+// onListenerResourceAmbientError handles ambient errors received from the
+// listener resource watcher. Since ambient errors do not impact the current
+// state of the resource, no change is made to the current configuration and the
+// errors are only logged for visibility.
+func (m *DependencyManager) onListenerResourceAmbientError(err error, onDone func()) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       defer onDone()
+       if m.stopped {
+               return
+       }
+
+       m.logger.Warningf("Listener resource ambient error: %v", m.annotateErrorWithNodeID(err))
+}
+
+func (m *DependencyManager) onRouteConfigResourceUpdate(resourceName string, update *xdsresource.RouteConfigUpdate, onDone func()) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       defer onDone()
+       if m.stopped || m.rdsResourceName != resourceName {
+               return
+       }
+
+       if m.logger.V(2) {
+               m.logger.Infof("Received update for RouteConfiguration resource %q: %+v", resourceName, update)
+       }
+       m.applyRouteConfigUpdateLocked(update)
+}
+
+func (m *DependencyManager) onRouteConfigResourceError(resourceName string, err error, onDone func()) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       defer onDone()
+       if m.stopped || m.rdsResourceName != resourceName {
+               return
+       }
+       m.logger.Warningf("Received resource error for RouteConfiguration resource %q: %v", resourceName, m.annotateErrorWithNodeID(err))
+       m.watcher.Error(fmt.Errorf("route resource error: %v", m.annotateErrorWithNodeID(err)))
+}
+
+// onRouteResourceAmbientError handles ambient errors received from the route
+// resource watcher. Since ambient errors do not impact the current state of the
+// resource, no change is made to the current configuration and the errors are
+// only logged for visibility.
+func (m *DependencyManager) onRouteConfigResourceAmbientError(resourceName string, err error, onDone func()) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       defer onDone()
+       if m.stopped || m.rdsResourceName != resourceName {
+               return
+       }
+
+       m.logger.Warningf("Route resource ambient error %q: %v", resourceName, m.annotateErrorWithNodeID(err))
+}
diff --git a/internal/xds/xdsdepmgr/xds_dependency_manager_test.go b/internal/xds/xdsdepmgr/xds_dependency_manager_test.go
new file mode 100644 (file)
index 0000000..79a0933
--- /dev/null
@@ -0,0 +1,804 @@
+/*
+ *
+ * Copyright 2025 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package xdsdepmgr_test
+
+import (
+       "context"
+       "fmt"
+       "strings"
+       "testing"
+       "time"
+
+       "github.com/envoyproxy/go-control-plane/pkg/wellknown"
+       "github.com/google/go-cmp/cmp"
+       "github.com/google/go-cmp/cmp/cmpopts"
+       "github.com/google/uuid"
+       "google.golang.org/grpc/internal/grpctest"
+       "google.golang.org/grpc/internal/testutils"
+       "google.golang.org/grpc/internal/testutils/xds/e2e"
+       "google.golang.org/grpc/internal/xds/bootstrap"
+       "google.golang.org/grpc/internal/xds/xdsclient"
+       "google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
+       "google.golang.org/grpc/internal/xds/xdsdepmgr"
+
+       v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
+       v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
+       v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
+       v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
+
+       _ "google.golang.org/grpc/internal/xds/httpfilter/router" // Register the router filter
+)
+
+type s struct {
+       grpctest.Tester
+}
+
+func Test(t *testing.T) {
+       grpctest.RunSubTests(t, s{})
+}
+
+const (
+       defaultTestTimeout      = 10 * time.Second
+       defaultTestShortTimeout = 100 * time.Microsecond
+
+       defaultTestServiceName     = "service-name"
+       defaultTestRouteConfigName = "route-config-name"
+       defaultTestClusterName     = "cluster-name"
+)
+
+func newStringP(s string) *string {
+       return &s
+}
+
+// testWatcher is an implementation of the ConfigWatcher interface that sends
+// the updates and errors received from the dependency manager to respective
+// channels, for the tests to verify.
+type testWatcher struct {
+       updateCh chan *xdsresource.XDSConfig
+       errorCh  chan error
+}
+
+// Update sends the received XDSConfig update to the update channel.
+func (w *testWatcher) Update(cfg *xdsresource.XDSConfig) {
+       w.updateCh <- cfg
+}
+
+// Error sends the received error to the error channel.
+func (w *testWatcher) Error(err error) {
+       w.errorCh <- err
+}
+
+func verifyError(ctx context.Context, errCh chan error, wantErr, wantNodeID string) error {
+       select {
+       case gotErr := <-errCh:
+               if gotErr == nil {
+                       return fmt.Errorf("got nil error from resolver, want error %q", wantErr)
+               }
+               if !strings.Contains(gotErr.Error(), wantErr) {
+                       return fmt.Errorf("got error from resolver %q, want %q", gotErr, wantErr)
+               }
+               if !strings.Contains(gotErr.Error(), wantNodeID) {
+                       return fmt.Errorf("got error from resolver %q, want nodeID %q", gotErr, wantNodeID)
+               }
+       case <-ctx.Done():
+               return fmt.Errorf("timeout waiting for error from dependency manager")
+       }
+       return nil
+}
+
+func verifyXDSConfig(ctx context.Context, xdsCh chan *xdsresource.XDSConfig, errCh chan error, want *xdsresource.XDSConfig) error {
+       select {
+       case <-ctx.Done():
+               return fmt.Errorf("timeout waiting for update from dependency manager")
+       case update := <-xdsCh:
+               cmpOpts := []cmp.Option{
+                       cmpopts.EquateEmpty(),
+                       cmpopts.IgnoreFields(xdsresource.HTTPFilter{}, "Filter", "Config"),
+                       cmpopts.IgnoreFields(xdsresource.ListenerUpdate{}, "Raw"),
+                       cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw"),
+               }
+               if diff := cmp.Diff(update, want, cmpOpts...); diff != "" {
+                       return fmt.Errorf("received unexpected update from dependency manager. Diff (-got +want):\n%v", diff)
+               }
+       case err := <-errCh:
+               return fmt.Errorf("received unexpected error from dependency manager: %v", err)
+       }
+       return nil
+}
+
+func createXDSClient(t *testing.T, bootstrapContents []byte) xdsclient.XDSClient {
+       t.Helper()
+
+       config, err := bootstrap.NewConfigFromContents(bootstrapContents)
+       if err != nil {
+               t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err)
+       }
+
+       pool := xdsclient.NewPool(config)
+       if err != nil {
+               t.Fatalf("Failed to create an xDS client pool: %v", err)
+       }
+       c, cancel, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{Name: t.Name()})
+       if err != nil {
+               t.Fatalf("Failed to create an xDS client: %v", err)
+       }
+       t.Cleanup(cancel)
+       return c
+}
+
+// Spins up an xDS management server and sets up the xDS bootstrap configuration.
+//
+// Returns the following:
+//   - A reference to the xDS management server
+//   - Contents of the bootstrap configuration pointing to xDS management
+//     server
+func setupManagementServerForTest(t *testing.T, nodeID string) (*e2e.ManagementServer, []byte) {
+       t.Helper()
+
+       mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
+       t.Cleanup(mgmtServer.Stop)
+
+       bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
+       return mgmtServer, bootstrapContents
+}
+
+// Tests the happy case where the dependency manager receives all the required
+// resources and verifies that Update is called with with the correct XDSConfig.
+func (s) TestHappyCase(t *testing.T) {
+       nodeID := uuid.New().String()
+       mgmtServer, bc := setupManagementServerForTest(t, nodeID)
+       xdsClient := createXDSClient(t, bc)
+
+       watcher := &testWatcher{
+               updateCh: make(chan *xdsresource.XDSConfig),
+               errorCh:  make(chan error),
+       }
+
+       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+       defer cancel()
+
+       listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)
+       route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)
+       resources := e2e.UpdateOptions{
+               NodeID:         nodeID,
+               Listeners:      []*v3listenerpb.Listener{listener},
+               Routes:         []*v3routepb.RouteConfiguration{route},
+               SkipValidation: true,
+       }
+       if err := mgmtServer.Update(ctx, resources); err != nil {
+               t.Fatal(err)
+       }
+
+       dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher)
+       defer dm.Close()
+       wantXdsConfig := &xdsresource.XDSConfig{
+               Listener: &xdsresource.ListenerUpdate{
+                       RouteConfigName: defaultTestRouteConfigName,
+                       HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
+               },
+               RouteConfig: &xdsresource.RouteConfigUpdate{
+                       VirtualHosts: []*xdsresource.VirtualHost{
+                               {
+                                       Domains: []string{defaultTestServiceName},
+                                       Routes: []*xdsresource.Route{{
+                                               Prefix:           newStringP("/"),
+                                               WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+                                               ActionType:       xdsresource.RouteActionRoute,
+                                       }},
+                               },
+                       },
+               },
+               VirtualHost: &xdsresource.VirtualHost{
+                       Domains: []string{defaultTestServiceName},
+                       Routes: []*xdsresource.Route{{
+                               Prefix:           newStringP("/"),
+                               WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+                               ActionType:       xdsresource.RouteActionRoute},
+                       },
+               },
+       }
+       if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil {
+               t.Fatal(err)
+       }
+}
+
+// Tests the case where the listener contains an inline route configuration and
+// verifies that Update is called with the correct XDSConfig.
+func (s) TestInlineRouteConfig(t *testing.T) {
+       nodeID := uuid.New().String()
+       mgmtServer, bc := setupManagementServerForTest(t, nodeID)
+       xdsClient := createXDSClient(t, bc)
+
+       watcher := &testWatcher{
+               updateCh: make(chan *xdsresource.XDSConfig),
+               errorCh:  make(chan error),
+       }
+       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+       defer cancel()
+
+       hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
+               RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
+                       RouteConfig: e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName),
+               },
+               HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})}, // router fields are unused by grpc
+       })
+       listener := &v3listenerpb.Listener{
+               Name:        defaultTestServiceName,
+               ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm},
+               FilterChains: []*v3listenerpb.FilterChain{{
+                       Name: "filter-chain-name",
+                       Filters: []*v3listenerpb.Filter{{
+                               Name:       wellknown.HTTPConnectionManager,
+                               ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm},
+                       }},
+               }},
+       }
+       resources := e2e.UpdateOptions{
+               NodeID:         nodeID,
+               Listeners:      []*v3listenerpb.Listener{listener},
+               SkipValidation: true,
+       }
+       if err := mgmtServer.Update(ctx, resources); err != nil {
+               t.Fatal(err)
+       }
+       dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher)
+       defer dm.Close()
+
+       wantConfig := &xdsresource.XDSConfig{
+               Listener: &xdsresource.ListenerUpdate{
+                       InlineRouteConfig: &xdsresource.RouteConfigUpdate{
+                               VirtualHosts: []*xdsresource.VirtualHost{
+                                       {
+                                               Domains: []string{defaultTestServiceName},
+                                               Routes: []*xdsresource.Route{{
+                                                       Prefix:           newStringP("/"),
+                                                       WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+                                                       ActionType:       xdsresource.RouteActionRoute},
+                                               },
+                                       },
+                               },
+                       },
+                       HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
+               },
+               RouteConfig: &xdsresource.RouteConfigUpdate{
+                       VirtualHosts: []*xdsresource.VirtualHost{
+                               {
+                                       Domains: []string{defaultTestServiceName},
+                                       Routes: []*xdsresource.Route{{
+                                               Prefix:           newStringP("/"),
+                                               WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+                                               ActionType:       xdsresource.RouteActionRoute},
+                                       },
+                               },
+                       },
+               },
+               VirtualHost: &xdsresource.VirtualHost{
+                       Domains: []string{defaultTestServiceName},
+                       Routes: []*xdsresource.Route{{
+                               Prefix:           newStringP("/"),
+                               WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+                               ActionType:       xdsresource.RouteActionRoute},
+                       },
+               },
+       }
+       if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantConfig); err != nil {
+               t.Fatal(err)
+       }
+}
+
+// Tests the case where dependency manager only receives listener resource but
+// does not receive route config resource. Verfies that Update is not called
+// since we do not have all resources.
+func (s) TestIncompleteResources(t *testing.T) {
+       nodeID := uuid.New().String()
+       mgmtServer, bc := setupManagementServerForTest(t, nodeID)
+       xdsClient := createXDSClient(t, bc)
+
+       watcher := &testWatcher{
+               updateCh: make(chan *xdsresource.XDSConfig),
+               errorCh:  make(chan error),
+       }
+       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+       defer cancel()
+
+       listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)
+       if err := mgmtServer.Update(ctx, e2e.UpdateOptions{
+               NodeID:         nodeID,
+               Listeners:      []*v3listenerpb.Listener{listener},
+               SkipValidation: true,
+       }); err != nil {
+               t.Fatal(err)
+       }
+       dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher)
+       defer dm.Close()
+
+       sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+       defer sCancel()
+       select {
+       case <-sCtx.Done():
+       case update := <-watcher.updateCh:
+               t.Fatalf("Received unexpected update from dependency manager: %+v", update)
+       case err := <-watcher.errorCh:
+               t.Fatalf("Received unexpected error from dependency manager: %v", err)
+       }
+}
+
+// Tests the case where dependency manager receives a listener resource error by
+// sending the correct update first and then removing the listener resource. It
+// verifies that Error is called with the correct error.
+func (s) TestListenerResourceError(t *testing.T) {
+       nodeID := uuid.New().String()
+       mgmtServer, bc := setupManagementServerForTest(t, nodeID)
+       xdsClient := createXDSClient(t, bc)
+
+       watcher := &testWatcher{
+               updateCh: make(chan *xdsresource.XDSConfig),
+               errorCh:  make(chan error),
+       }
+       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+       defer cancel()
+
+       // Send a correct update first
+       listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)
+       listener.FilterChains = nil
+       route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)
+       resources := e2e.UpdateOptions{
+               NodeID:         nodeID,
+               Listeners:      []*v3listenerpb.Listener{listener},
+               Routes:         []*v3routepb.RouteConfiguration{route},
+               SkipValidation: true,
+       }
+       if err := mgmtServer.Update(ctx, resources); err != nil {
+               t.Fatal(err)
+       }
+
+       dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher)
+       defer dm.Close()
+
+       wantXdsConfig := &xdsresource.XDSConfig{
+               Listener: &xdsresource.ListenerUpdate{
+                       RouteConfigName: defaultTestRouteConfigName,
+                       HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
+               },
+               RouteConfig: &xdsresource.RouteConfigUpdate{
+                       VirtualHosts: []*xdsresource.VirtualHost{
+                               {
+                                       Domains: []string{defaultTestServiceName},
+                                       Routes: []*xdsresource.Route{{
+                                               Prefix:           newStringP("/"),
+                                               WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+                                               ActionType:       xdsresource.RouteActionRoute,
+                                       }},
+                               },
+                       },
+               },
+               VirtualHost: &xdsresource.VirtualHost{
+                       Domains: []string{defaultTestServiceName},
+                       Routes: []*xdsresource.Route{{
+                               Prefix:           newStringP("/"),
+                               WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+                               ActionType:       xdsresource.RouteActionRoute,
+                       }},
+               },
+       }
+       if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil {
+               t.Fatal(err)
+       }
+
+       // Remove listener resource so that we get listener resource error.
+       resources.Listeners = nil
+       if err := mgmtServer.Update(ctx, resources); err != nil {
+               t.Fatal(err)
+       }
+
+       if err := verifyError(ctx, watcher.errorCh, fmt.Sprintf("xds: resource %q of type %q has been removed", defaultTestServiceName, "ListenerResource"), nodeID); err != nil {
+               t.Fatal(err)
+       }
+}
+
+// Tests the case where dependency manager receives a route config resource
+// error by sending a route resource that is NACKed by the XDSClient. It
+// verifies that Error is called with correct error.
+func (s) TestRouteResourceError(t *testing.T) {
+       nodeID := uuid.New().String()
+       mgmtServer, bc := setupManagementServerForTest(t, nodeID)
+       xdsClient := createXDSClient(t, bc)
+
+       errorCh := make(chan error, 1)
+       watcher := &testWatcher{
+               updateCh: make(chan *xdsresource.XDSConfig),
+               errorCh:  errorCh,
+       }
+       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+       defer cancel()
+
+       listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)
+       route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)
+       // Remove the Match to make sure the route resource is NACKed by XDSClient
+       // sending a route resource error to dependency manager.
+       route.VirtualHosts[0].Routes[0].Match = nil
+       resources := e2e.UpdateOptions{
+               NodeID:         nodeID,
+               Listeners:      []*v3listenerpb.Listener{listener},
+               Routes:         []*v3routepb.RouteConfiguration{route},
+               SkipValidation: true,
+       }
+       if err := mgmtServer.Update(ctx, resources); err != nil {
+               t.Fatal(err)
+       }
+
+       dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher)
+       defer dm.Close()
+
+       if err := verifyError(ctx, watcher.errorCh, "route resource error", nodeID); err != nil {
+               t.Fatal(err)
+       }
+}
+
+// Tests the case where route config updates receives does not have any virtual
+// host. Verifies that Error is called with correct error.
+func (s) TestNoVirtualHost(t *testing.T) {
+       nodeID := uuid.New().String()
+       mgmtServer, bc := setupManagementServerForTest(t, nodeID)
+       xdsClient := createXDSClient(t, bc)
+
+       watcher := &testWatcher{
+               updateCh: make(chan *xdsresource.XDSConfig),
+               errorCh:  make(chan error),
+       }
+
+       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+       defer cancel()
+
+       listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)
+       route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)
+       route.VirtualHosts = nil
+       resources := e2e.UpdateOptions{
+               NodeID:         nodeID,
+               Listeners:      []*v3listenerpb.Listener{listener},
+               Routes:         []*v3routepb.RouteConfiguration{route},
+               SkipValidation: true,
+       }
+       if err := mgmtServer.Update(ctx, resources); err != nil {
+               t.Fatal(err)
+       }
+       dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher)
+       defer dm.Close()
+
+       if err := verifyError(ctx, watcher.errorCh, "could not find VirtualHost", nodeID); err != nil {
+               t.Fatal(err)
+       }
+}
+
+// Tests the case where we get an ambient error and verify that we correctly log
+// a warning for it. To make sure we get an ambient error, we send a correct
+// update first, then send an invalid one and then send the valid resource
+// again. We send the valid resource again so that we can be sure the ambient
+// error reaches the dependency manager since there is no other way to wait for
+// it.
+func (s) TestAmbientError(t *testing.T) {
+       // Expect a warning log for the ambient error.
+       grpctest.ExpectWarning("Listener resource ambient error")
+
+       nodeID := uuid.New().String()
+       mgmtServer, bc := setupManagementServerForTest(t, nodeID)
+       xdsClient := createXDSClient(t, bc)
+
+       watcher := &testWatcher{
+               updateCh: make(chan *xdsresource.XDSConfig),
+               errorCh:  make(chan error),
+       }
+       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+       defer cancel()
+
+       // Configure a valid listener and route.
+       listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)
+       route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)
+       resources := e2e.UpdateOptions{
+               NodeID:         nodeID,
+               Listeners:      []*v3listenerpb.Listener{listener},
+               Routes:         []*v3routepb.RouteConfiguration{route},
+               SkipValidation: true,
+       }
+       if err := mgmtServer.Update(ctx, resources); err != nil {
+               t.Fatal(err)
+       }
+
+       dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher)
+       defer dm.Close()
+
+       // Wait for the initial valid update.
+       wantXdsConfig := &xdsresource.XDSConfig{
+               Listener: &xdsresource.ListenerUpdate{
+                       RouteConfigName: defaultTestRouteConfigName,
+                       HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
+               },
+               RouteConfig: &xdsresource.RouteConfigUpdate{
+                       VirtualHosts: []*xdsresource.VirtualHost{
+                               {
+                                       Domains: []string{defaultTestServiceName},
+                                       Routes: []*xdsresource.Route{{
+                                               Prefix:           newStringP("/"),
+                                               WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+                                               ActionType:       xdsresource.RouteActionRoute,
+                                       }},
+                               },
+                       },
+               },
+               VirtualHost: &xdsresource.VirtualHost{
+                       Domains: []string{defaultTestServiceName},
+                       Routes: []*xdsresource.Route{{
+                               Prefix:           newStringP("/"),
+                               WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+                               ActionType:       xdsresource.RouteActionRoute,
+                       }},
+               },
+       }
+       if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil {
+               t.Fatal(err)
+       }
+
+       // Configure a listener resource that is expected to be NACKed because it
+       // does not contain the `RouteSpecifier` field in the HTTPConnectionManager.
+       // Since a valid one is already cached, this should result in an ambient
+       // error.
+       hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
+               HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})},
+       })
+       lis := &v3listenerpb.Listener{
+               Name:        defaultTestServiceName,
+               ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm},
+               FilterChains: []*v3listenerpb.FilterChain{{
+                       Name: "filter-chain-name",
+                       Filters: []*v3listenerpb.Filter{{
+                               Name:       wellknown.HTTPConnectionManager,
+                               ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm},
+                       }},
+               }},
+       }
+       resources.Listeners = []*v3listenerpb.Listener{lis}
+       if err := mgmtServer.Update(ctx, resources); err != nil {
+               t.Fatal(err)
+       }
+
+       // We expect no call to Error or Update on our watcher. We just wait for
+       // a short duration to ensure that.
+       sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+       defer sCancel()
+       select {
+       case err := <-watcher.errorCh:
+               t.Fatalf("Unexpected call to Error %v", err)
+       case update := <-watcher.updateCh:
+               t.Fatalf("Unexpected call to Update %+v", update)
+       case <-sCtx.Done():
+       }
+
+       // Send valid resources again.
+       listener = e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)
+       route = e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)
+       resources = e2e.UpdateOptions{
+               NodeID:         nodeID,
+               Listeners:      []*v3listenerpb.Listener{listener},
+               Routes:         []*v3routepb.RouteConfiguration{route},
+               SkipValidation: true,
+       }
+       if err := mgmtServer.Update(ctx, resources); err != nil {
+               t.Fatal(err)
+       }
+       if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil {
+               t.Fatal(err)
+       }
+}
+
+// Tests the case where the cluster name changes in the route resource update
+// and verify that each time Update is called with correct cluster name.
+func (s) TestRouteResourceUpdate(t *testing.T) {
+       nodeID := uuid.New().String()
+       mgmtServer, bc := setupManagementServerForTest(t, nodeID)
+       xdsClient := createXDSClient(t, bc)
+
+       watcher := &testWatcher{
+               updateCh: make(chan *xdsresource.XDSConfig),
+               errorCh:  make(chan error),
+       }
+       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+       defer cancel()
+
+       // Initial resources with defaultTestClusterName
+       listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)
+       route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)
+       resources := e2e.UpdateOptions{
+               NodeID:         nodeID,
+               Listeners:      []*v3listenerpb.Listener{listener},
+               Routes:         []*v3routepb.RouteConfiguration{route},
+               SkipValidation: true,
+       }
+       if err := mgmtServer.Update(ctx, resources); err != nil {
+               t.Fatal(err)
+       }
+
+       dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher)
+       defer dm.Close()
+
+       // Wait for the first update.
+       wantXdsConfig := &xdsresource.XDSConfig{
+               Listener: &xdsresource.ListenerUpdate{
+                       RouteConfigName: defaultTestRouteConfigName,
+                       HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
+               },
+               RouteConfig: &xdsresource.RouteConfigUpdate{
+                       VirtualHosts: []*xdsresource.VirtualHost{
+                               {
+                                       Domains: []string{defaultTestServiceName},
+                                       Routes: []*xdsresource.Route{{
+                                               Prefix:           newStringP("/"),
+                                               WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+                                               ActionType:       xdsresource.RouteActionRoute,
+                                       }},
+                               },
+                       },
+               },
+               VirtualHost: &xdsresource.VirtualHost{
+                       Domains: []string{defaultTestServiceName},
+                       Routes: []*xdsresource.Route{{
+                               Prefix:           newStringP("/"),
+                               WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+                               ActionType:       xdsresource.RouteActionRoute,
+                       }},
+               },
+       }
+       if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil {
+               t.Fatal(err)
+       }
+
+       // Update route to point to a new cluster.
+       newClusterName := "new-cluster-name"
+       route2 := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, newClusterName)
+       resources.Routes = []*v3routepb.RouteConfiguration{route2}
+       if err := mgmtServer.Update(ctx, resources); err != nil {
+               t.Fatal(err)
+       }
+
+       // Wait for the second update and verify it has the new cluster.
+       wantXdsConfig.RouteConfig.VirtualHosts[0].Routes[0].WeightedClusters[0].Name = newClusterName
+       wantXdsConfig.VirtualHost.Routes[0].WeightedClusters[0].Name = newClusterName
+       if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil {
+               t.Fatal(err)
+       }
+}
+
+// Tests the case where the route resource is first sent from the management
+// server and the changed to be inline with the listener and then again changed
+// to be received from the management server. It verifies that each time Update
+// called with the correct XDSConfig.
+func (s) TestRouteResourceChangeToInline(t *testing.T) {
+       nodeID := uuid.New().String()
+       mgmtServer, bc := setupManagementServerForTest(t, nodeID)
+       defer mgmtServer.Stop()
+       xdsClient := createXDSClient(t, bc)
+
+       watcher := &testWatcher{
+               updateCh: make(chan *xdsresource.XDSConfig),
+               errorCh:  make(chan error),
+       }
+       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+       defer cancel()
+
+       // Initial resources with defaultTestClusterName
+       listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)
+       route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)
+       resources := e2e.UpdateOptions{
+               NodeID:         nodeID,
+               Listeners:      []*v3listenerpb.Listener{listener},
+               Routes:         []*v3routepb.RouteConfiguration{route},
+               SkipValidation: true,
+       }
+       if err := mgmtServer.Update(ctx, resources); err != nil {
+               t.Fatal(err)
+       }
+
+       dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher)
+       defer dm.Close()
+
+       // Wait for the first update.
+       wantXdsConfig := &xdsresource.XDSConfig{
+               Listener: &xdsresource.ListenerUpdate{
+                       RouteConfigName: defaultTestRouteConfigName,
+                       HTTPFilters:     []xdsresource.HTTPFilter{{Name: "router"}},
+               },
+               RouteConfig: &xdsresource.RouteConfigUpdate{
+                       VirtualHosts: []*xdsresource.VirtualHost{
+                               {
+                                       Domains: []string{defaultTestServiceName},
+                                       Routes: []*xdsresource.Route{{
+                                               Prefix:           newStringP("/"),
+                                               WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+                                               ActionType:       xdsresource.RouteActionRoute,
+                                       }},
+                               },
+                       },
+               },
+               VirtualHost: &xdsresource.VirtualHost{
+                       Domains: []string{defaultTestServiceName},
+                       Routes: []*xdsresource.Route{{
+                               Prefix:           newStringP("/"),
+                               WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
+                               ActionType:       xdsresource.RouteActionRoute,
+                       }},
+               },
+       }
+       if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil {
+               t.Fatal(err)
+       }
+
+       // Update route to point to a new cluster.
+       newClusterName := "new-cluster-name"
+       hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
+               RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
+                       RouteConfig: e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, newClusterName),
+               },
+               HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})}, // router fields are unused by grpc
+       })
+       resources.Listeners[0].ApiListener.ApiListener = hcm
+       resources.Routes = nil
+       if err := mgmtServer.Update(ctx, resources); err != nil {
+               t.Fatal(err)
+       }
+
+       // Wait for the second update and verify it has the new cluster.
+       wantXdsConfig.Listener.InlineRouteConfig = &xdsresource.RouteConfigUpdate{
+               VirtualHosts: []*xdsresource.VirtualHost{
+                       {
+                               Domains: []string{defaultTestServiceName},
+                               Routes: []*xdsresource.Route{{Prefix: newStringP("/"),
+                                       WeightedClusters: []xdsresource.WeightedCluster{{Name: newClusterName, Weight: 100}},
+                                       ActionType:       xdsresource.RouteActionRoute,
+                               }},
+                       },
+               },
+       }
+       wantXdsConfig.Listener.RouteConfigName = ""
+       wantXdsConfig.RouteConfig.VirtualHosts[0].Routes[0].WeightedClusters[0].Name = newClusterName
+       wantXdsConfig.VirtualHost.Routes[0].WeightedClusters[0].Name = newClusterName
+       if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil {
+               t.Fatal(err)
+       }
+
+       // Change the route resource back to non-inline.
+       listener = e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)
+       route = e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)
+       resources = e2e.UpdateOptions{
+               NodeID:         nodeID,
+               Listeners:      []*v3listenerpb.Listener{listener},
+               Routes:         []*v3routepb.RouteConfiguration{route},
+               SkipValidation: true,
+       }
+       if err := mgmtServer.Update(ctx, resources); err != nil {
+               t.Fatal(err)
+       }
+
+       // Wait for the third update and verify it has the original cluster.
+       wantXdsConfig.Listener.InlineRouteConfig = nil
+       wantXdsConfig.Listener.RouteConfigName = defaultTestRouteConfigName
+       wantXdsConfig.RouteConfig.VirtualHosts[0].Routes[0].WeightedClusters[0].Name = defaultTestClusterName
+       wantXdsConfig.VirtualHost.Routes[0].WeightedClusters[0].Name = defaultTestClusterName
+       if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil {
+               t.Fatal(err)
+       }
+}