]> git.feebdaed.xyz Git - 0xmirror/grpc-go.git/commitdiff
xds/resolver: Optimize Interceptor Chain Construction (#8641)
authorEaswar Swaminathan <easwars@google.com>
Tue, 11 Nov 2025 17:34:57 +0000 (09:34 -0800)
committerGitHub <noreply@github.com>
Tue, 11 Nov 2025 17:34:57 +0000 (09:34 -0800)
#### Existing behavior:
- At routing time, when an RPC matches a route and a cluster is
selected, the interceptor chain for that specific RPC is built.
- This chain is built on a per-RPC basis.
- A subsequent RPC that matches the exact same route and cluster will
trigger the entire chain reconstruction again, even if no configuration
has changed.

#### New  behavior:
- The interceptor chain is now pre-built for every route and every
pickable cluster associated with that route.
- The chains are constructed once when the config selector is built.

#### Other changes:
- Existing unit tests have been converted to be more e2e style tests.
- This lays the necessary groundwork for upcoming changes to the filter
API, specifically to support filter state retention

RELEASE NOTES: NONE

internal/xds/resolver/serviceconfig.go
internal/xds/resolver/xds_http_filters_test.go [new file with mode: 0644]
internal/xds/resolver/xds_resolver.go
internal/xds/resolver/xds_resolver_test.go

index f2ceabe7a8dba2ee15ad49ad5a24ca6307a399d7..3754b14dd3e4fbb1738ed10a6efcde322d01e16b 100644 (file)
@@ -97,17 +97,14 @@ func serviceConfigJSON(activeClusters map[string]*clusterInfo) []byte {
 }
 
 type virtualHost struct {
-       // map from filter name to its config
-       httpFilterConfigOverride map[string]httpfilter.FilterConfig
        // retry policy present in virtual host
        retryConfig *xdsresource.RetryConfig
 }
 
 // routeCluster holds information about a cluster as referenced by a route.
 type routeCluster struct {
-       name string
-       // map from filter name to its config
-       httpFilterConfigOverride map[string]httpfilter.FilterConfig
+       name        string                      // Name of the cluster.
+       interceptor iresolver.ClientInterceptor // HTTP filters to run for RPCs matching this route.
 }
 
 type route struct {
@@ -115,10 +112,8 @@ type route struct {
        actionType        xdsresource.RouteActionType   // holds route action type
        clusters          wrr.WRR                       // holds *routeCluster entries
        maxStreamDuration time.Duration
-       // map from filter name to its config
-       httpFilterConfigOverride map[string]httpfilter.FilterConfig
-       retryConfig              *xdsresource.RetryConfig
-       hashPolicies             []*xdsresource.HashPolicy
+       retryConfig       *xdsresource.RetryConfig
+       hashPolicies      []*xdsresource.HashPolicy
 }
 
 func (r route) String() string {
@@ -200,11 +195,6 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
        ref := &cs.clusters[cluster.name].refCount
        atomic.AddInt32(ref, 1)
 
-       interceptor, err := cs.newInterceptor(rt, cluster)
-       if err != nil {
-               return nil, annotateErrorWithNodeID(err, cs.xdsNodeID)
-       }
-
        lbCtx := clustermanager.SetPickedCluster(rpcInfo.Context, cluster.name)
        lbCtx = iringhash.SetXDSRequestHash(lbCtx, cs.generateHash(rpcInfo, rt.hashPolicies))
 
@@ -220,7 +210,7 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
                                cs.sendNewServiceConfig()
                        }
                },
-               Interceptor: interceptor,
+               Interceptor: cluster.interceptor,
        }
 
        if rt.maxStreamDuration != 0 {
@@ -310,35 +300,6 @@ func (cs *configSelector) generateHash(rpcInfo iresolver.RPCInfo, hashPolicies [
        return rand.Uint64()
 }
 
-func (cs *configSelector) newInterceptor(rt *route, cluster *routeCluster) (iresolver.ClientInterceptor, error) {
-       if len(cs.httpFilterConfig) == 0 {
-               return nil, nil
-       }
-       interceptors := make([]iresolver.ClientInterceptor, 0, len(cs.httpFilterConfig))
-       for _, filter := range cs.httpFilterConfig {
-               override := cluster.httpFilterConfigOverride[filter.Name] // cluster is highest priority
-               if override == nil {
-                       override = rt.httpFilterConfigOverride[filter.Name] // route is second priority
-               }
-               if override == nil {
-                       override = cs.virtualHost.httpFilterConfigOverride[filter.Name] // VH is third & lowest priority
-               }
-               ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder)
-               if !ok {
-                       // Should not happen if it passed xdsClient validation.
-                       return nil, fmt.Errorf("filter does not support use in client")
-               }
-               i, err := ib.BuildClientInterceptor(filter.Config, override)
-               if err != nil {
-                       return nil, fmt.Errorf("error constructing filter: %v", err)
-               }
-               if i != nil {
-                       interceptors = append(interceptors, i)
-               }
-       }
-       return &interceptorList{interceptors: interceptors}, nil
-}
-
 // stop decrements refs of all clusters referenced by this config selector.
 func (cs *configSelector) stop() {
        // The resolver's old configSelector may be nil.  Handle that here.
@@ -363,6 +324,38 @@ func (cs *configSelector) stop() {
        }
 }
 
+// newInterceptor builds a chain of client interceptors for the given filters
+// and override configuration. The cluster override has the highest priority,
+// followed by the route override, and finally the virtual host override.
+func newInterceptor(filters []xdsresource.HTTPFilter, clusterOverride, routeOverride, virtualHostOverride map[string]httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) {
+       if len(filters) == 0 {
+               return nil, nil
+       }
+       interceptors := make([]iresolver.ClientInterceptor, 0, len(filters))
+       for _, filter := range filters {
+               override := clusterOverride[filter.Name]
+               if override == nil {
+                       override = routeOverride[filter.Name]
+               }
+               if override == nil {
+                       override = virtualHostOverride[filter.Name]
+               }
+               ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder)
+               if !ok {
+                       // Should not happen if it passed xdsClient validation.
+                       return nil, fmt.Errorf("filter %q does not support use in client", filter.Name)
+               }
+               i, err := ib.BuildClientInterceptor(filter.Config, override)
+               if err != nil {
+                       return nil, fmt.Errorf("error constructing filter: %v", err)
+               }
+               if i != nil {
+                       interceptors = append(interceptors, i)
+               }
+       }
+       return &interceptorList{interceptors: interceptors}, nil
+}
+
 type interceptorList struct {
        interceptors []iresolver.ClientInterceptor
 }
diff --git a/internal/xds/resolver/xds_http_filters_test.go b/internal/xds/resolver/xds_http_filters_test.go
new file mode 100644 (file)
index 0000000..9d2c5ac
--- /dev/null
@@ -0,0 +1,659 @@
+/*
+ *
+ * Copyright 2025 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package resolver_test
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "strings"
+       "testing"
+
+       "github.com/google/go-cmp/cmp"
+       "github.com/google/uuid"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/credentials/insecure"
+       "google.golang.org/grpc/internal"
+       iresolver "google.golang.org/grpc/internal/resolver"
+       "google.golang.org/grpc/internal/stubserver"
+       "google.golang.org/grpc/internal/testutils"
+       "google.golang.org/grpc/internal/testutils/xds/e2e"
+       "google.golang.org/grpc/internal/xds/httpfilter"
+       rinternal "google.golang.org/grpc/internal/xds/resolver/internal"
+       "google.golang.org/grpc/metadata"
+       "google.golang.org/grpc/resolver"
+       "google.golang.org/grpc/status"
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/types/known/anypb"
+       "google.golang.org/protobuf/types/known/structpb"
+       "google.golang.org/protobuf/types/known/wrapperspb"
+
+       v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3"
+       v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
+       v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
+       v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
+       v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
+       v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
+       testgrpc "google.golang.org/grpc/interop/grpc_testing"
+       testpb "google.golang.org/grpc/interop/grpc_testing"
+
+       _ "google.golang.org/grpc/xds" // Register all required xDS components
+)
+
+const (
+       filterCfgPathFieldName  = "path"
+       filterCfgErrorFieldName = "new_stream_error"
+       filterCfgMetadataKey    = "test-filter-config"
+)
+
+// testFilterCfg is the internal representation of the filter config proto. It
+// is returned by filter's config parsing methods.
+type testFilterCfg struct {
+       httpfilter.FilterConfig
+       path         string
+       newStreamErr string
+}
+
+// filterConfigFromProto parses filter config specified as a v3.TypedStruct into
+// a testFilterCfg.
+func filterConfigFromProto(cfg proto.Message) (httpfilter.FilterConfig, error) {
+       ts, ok := cfg.(*v3xdsxdstypepb.TypedStruct)
+       if !ok {
+               return nil, fmt.Errorf("unsupported filter config type: %T, want %T", cfg, &v3xdsxdstypepb.TypedStruct{})
+       }
+
+       if ts.GetValue() == nil {
+               return testFilterCfg{}, nil
+       }
+       ret := testFilterCfg{}
+       if v := ts.GetValue().GetFields()[filterCfgPathFieldName]; v != nil {
+               ret.path = v.GetStringValue()
+       }
+       if v := ts.GetValue().GetFields()[filterCfgErrorFieldName]; v != nil {
+               ret.newStreamErr = v.GetStringValue()
+       }
+       return ret, nil
+}
+
+type logger interface {
+       Logf(format string, args ...any)
+}
+
+// testHTTPFilterWithRPCMetadata is a HTTP filter used for testing purposes.
+//
+// This filter is used to verify that the xDS resolver and filter stack
+// correctly propagate filter configuration (both base and override) to RPCs. It
+// does this by injecting the config paths from its base and override configs as
+// JSON-encoded metadata into outgoing RPCs.  The metadata can then be observed
+// by the backend, allowing tests to assert that the correct filter
+// configuration was applied for each RPC.
+type testHTTPFilterWithRPCMetadata struct {
+       logger        logger
+       typeURL       string
+       newStreamChan *testutils.Channel // If set, filter config is written to this field from NewStream()
+}
+
+func (fb *testHTTPFilterWithRPCMetadata) TypeURLs() []string { return []string{fb.typeURL} }
+
+func (*testHTTPFilterWithRPCMetadata) ParseFilterConfig(cfg proto.Message) (httpfilter.FilterConfig, error) {
+       return filterConfigFromProto(cfg)
+}
+
+func (*testHTTPFilterWithRPCMetadata) ParseFilterConfigOverride(override proto.Message) (httpfilter.FilterConfig, error) {
+       return filterConfigFromProto(override)
+}
+
+func (*testHTTPFilterWithRPCMetadata) IsTerminal() bool { return false }
+
+// ClientInterceptorBuilder is an optional interface for filters to implement.
+// This compile time check ensures the test filter implements it.
+var _ httpfilter.ClientInterceptorBuilder = &testHTTPFilterWithRPCMetadata{}
+
+func (fb *testHTTPFilterWithRPCMetadata) BuildClientInterceptor(config, override httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) {
+       fb.logger.Logf("BuildClientInterceptor called with config: %+v, override: %+v", config, override)
+
+       if config == nil {
+               return nil, fmt.Errorf("unexpected missing config")
+       }
+
+       baseCfg := config.(testFilterCfg)
+       basePath := baseCfg.path
+       newStreamErr := baseCfg.newStreamErr
+
+       var overridePath string
+       if override != nil {
+               overrideCfg := override.(testFilterCfg)
+               overridePath = overrideCfg.path
+               if overrideCfg.newStreamErr != "" {
+                       newStreamErr = overrideCfg.newStreamErr
+               }
+       }
+
+       return &testFilterInterceptor{
+               logger: fb.logger,
+               cfg: overallFilterConfig{
+                       BasePath:     basePath,
+                       OverridePath: overridePath,
+                       Error:        newStreamErr,
+               },
+               newStreamChan: fb.newStreamChan,
+       }, nil
+}
+
+// overallFilterConfig is a JSON representation of the filter config.
+// It is sent as RPC metadata and written to a channel for test verification.
+type overallFilterConfig struct {
+       BasePath     string `json:"base_path,omitempty"`
+       OverridePath string `json:"override_path,omitempty"`
+       Error        string `json:"error,omitempty"`
+}
+
+// testFilterInterceptor is a client interceptor that injects RPC metadata
+// corresponding to its filter config.
+type testFilterInterceptor struct {
+       logger        logger
+       cfg           overallFilterConfig
+       newStreamChan *testutils.Channel // If set, filter config is written to this field from NewStream()
+}
+
+func (fi *testFilterInterceptor) NewStream(ctx context.Context, _ iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) {
+       // Write the config to the channel, if set. This allows tests to verify that
+       // the filter was invoked at RPC time. This is useful for tests where the
+       // RPC is expected to fail, and therefore the RPC metadata cannot be
+       // observed from the backend.
+       if fi.newStreamChan != nil {
+               fi.newStreamChan.Send(fi.cfg)
+       }
+
+       if fi.cfg.Error != "" {
+               return nil, status.Error(codes.Unavailable, fi.cfg.Error)
+       }
+
+       // Marshal the filter config to JSON and inject it as metadata.
+       bytes, err := json.Marshal(fi.cfg)
+       if err != nil {
+               return nil, fmt.Errorf("failed to marshal filter config: %w", err)
+       }
+       cfg := string(bytes)
+       fi.logger.Logf("Injecting filter config metadata: %v", cfg)
+
+       return newStream(metadata.AppendToOutgoingContext(ctx, filterCfgMetadataKey, fmt.Sprintf("%v", cfg)), done)
+}
+
+func newHTTPFilter(t *testing.T, name, typeURL, path, err string) *v3httppb.HttpFilter {
+       return &v3httppb.HttpFilter{
+               Name: name,
+               ConfigType: &v3httppb.HttpFilter_TypedConfig{
+                       TypedConfig: testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
+                               TypeUrl: typeURL,
+                               Value: &structpb.Struct{
+                                       Fields: map[string]*structpb.Value{
+                                               filterCfgPathFieldName:  {Kind: &structpb.Value_StringValue{StringValue: path}},
+                                               filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: err}},
+                                       },
+                               },
+                       }),
+               },
+       }
+}
+
+// newStubServer returns a stub server that sends any filter config metadata
+// received as part of incoming RPCs to the provided channel.
+func newStubServer(metadataCh chan []string) *stubserver.StubServer {
+       return &stubserver.StubServer{
+               EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
+                       md, ok := metadata.FromIncomingContext(ctx)
+                       if !ok {
+                               return nil, status.Error(codes.InvalidArgument, "missing metadata")
+                       }
+                       select {
+                       case metadataCh <- md.Get(filterCfgMetadataKey):
+                       case <-ctx.Done():
+                               return nil, ctx.Err()
+                       }
+                       return &testpb.Empty{}, nil
+               },
+               UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
+                       md, ok := metadata.FromIncomingContext(ctx)
+                       if !ok {
+                               return nil, status.Error(codes.InvalidArgument, "missing metadata")
+                       }
+                       select {
+                       case metadataCh <- md.Get(filterCfgMetadataKey):
+                       case <-ctx.Done():
+                               return nil, ctx.Err()
+                       }
+                       return &testpb.SimpleResponse{Payload: req.GetPayload()}, nil
+               },
+       }
+}
+
+// Tests HTTP filters with the xDS resolver. The test exercises various levels
+// of filter config overrides (base, virtual host-level, route-level and
+// cluster-level), and verifies that the correct config is applied for each RPC.
+func (s) TestXDSResolverHTTPFilters_AllOverrides(t *testing.T) {
+       // Override default WRR with a deterministic test version.
+       origNewWRR := rinternal.NewWRR
+       rinternal.NewWRR = testutils.NewTestWRR
+       defer func() { rinternal.NewWRR = origNewWRR }()
+
+       // Register a custom httpFilter builder for the test.
+       testFilterName := t.Name()
+       fb := &testHTTPFilterWithRPCMetadata{logger: t, typeURL: testFilterName}
+       httpfilter.Register(fb)
+       defer httpfilter.UnregisterForTesting(fb.typeURL)
+
+       // Spin up an xDS management server
+       mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
+       defer mgmtServer.Stop()
+
+       // Create an xDS resolver with bootstrap configuration pointing to the above
+       // management server.
+       nodeID := uuid.New().String()
+       bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
+       if internal.NewXDSResolverWithConfigForTesting == nil {
+               t.Fatalf("internal.NewXDSResolverWithConfigForTesting is nil")
+       }
+       resolverBuilder, err := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error))(bootstrapContents)
+       if err != nil {
+               t.Fatalf("Failed to create xDS resolver for testing: %v", err)
+       }
+
+       // Start a couple of test backends.
+       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+       defer cancel()
+       const chBufSize = 4 // Expecting 4 metadata entries (2 RPCs, each with 2 filters).
+       metadataCh := make(chan []string, chBufSize)
+       backend1 := stubserver.StartTestService(t, newStubServer(metadataCh))
+       defer backend1.Stop()
+       backend2 := stubserver.StartTestService(t, newStubServer(metadataCh))
+       defer backend2.Stop()
+
+       // Configure resources on the management server.
+       //
+       // The route configuration contains two routes, matching two different RPCs.
+       // The route for the UnaryCall RPC does not contain any cluster-level or
+       // route-level per-filter config overrides. A virtual host-level per-filter
+       // config override exists and it should apply for RPCs matching this route.
+       //
+       // The route for the EmptyCall RPC contains a route-level per-filter config
+       // override that should apply for RPCs routed to cluster "A" since it does
+       // not have any cluster-level overrides. For RPCs matching cluster "B"
+       // though, a cluster-level per-filter config override should take
+       // precedence.
+       const testServiceName = "service-name"
+       const routeConfigName = "route-config"
+       listener := &v3listenerpb.Listener{
+               Name: testServiceName,
+               ApiListener: &v3listenerpb.ApiListener{
+                       ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
+                               RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
+                                       RouteConfig: &v3routepb.RouteConfiguration{
+                                               Name: routeConfigName,
+                                               VirtualHosts: []*v3routepb.VirtualHost{{
+                                                       Domains: []string{testServiceName},
+                                                       Routes: []*v3routepb.Route{
+                                                               {
+                                                                       Match: &v3routepb.RouteMatch{
+                                                                               PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/grpc.testing.TestService/UnaryCall"},
+                                                                       },
+                                                                       Action: &v3routepb.Route_Route{
+                                                                               Route: &v3routepb.RouteAction{
+                                                                                       ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{
+                                                                                               WeightedClusters: &v3routepb.WeightedCluster{
+                                                                                                       Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
+                                                                                                               {Name: "A", Weight: wrapperspb.UInt32(1)},
+                                                                                                               {Name: "B", Weight: wrapperspb.UInt32(1)},
+                                                                                                       },
+                                                                                               },
+                                                                                       },
+                                                                               },
+                                                                       },
+                                                               },
+                                                               {
+                                                                       Match: &v3routepb.RouteMatch{
+                                                                               PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/grpc.testing.TestService/EmptyCall"},
+                                                                       },
+                                                                       Action: &v3routepb.Route_Route{
+                                                                               Route: &v3routepb.RouteAction{
+                                                                                       ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{
+                                                                                               WeightedClusters: &v3routepb.WeightedCluster{
+                                                                                                       Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
+                                                                                                               {
+                                                                                                                       Name:   "A",
+                                                                                                                       Weight: wrapperspb.UInt32(1),
+                                                                                                               },
+                                                                                                               {
+                                                                                                                       Name:   "B",
+                                                                                                                       Weight: wrapperspb.UInt32(1),
+                                                                                                                       TypedPerFilterConfig: map[string]*anypb.Any{
+                                                                                                                               "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
+                                                                                                                                       TypeUrl: testFilterName,
+                                                                                                                                       Value: &structpb.Struct{
+                                                                                                                                               Fields: map[string]*structpb.Value{
+                                                                                                                                                       filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo4"}},
+                                                                                                                                               },
+                                                                                                                                       },
+                                                                                                                               }),
+                                                                                                                               "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
+                                                                                                                                       TypeUrl: testFilterName,
+                                                                                                                                       Value: &structpb.Struct{
+                                                                                                                                               Fields: map[string]*structpb.Value{
+                                                                                                                                                       filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar4"}},
+                                                                                                                                               },
+                                                                                                                                       },
+                                                                                                                               }),
+                                                                                                                       },
+                                                                                                               },
+                                                                                                       },
+                                                                                               },
+                                                                                       },
+                                                                               },
+                                                                       },
+                                                                       TypedPerFilterConfig: map[string]*anypb.Any{
+                                                                               "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
+                                                                                       TypeUrl: testFilterName,
+                                                                                       Value: &structpb.Struct{
+                                                                                               Fields: map[string]*structpb.Value{
+                                                                                                       filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo3"}},
+                                                                                               },
+                                                                                       },
+                                                                               }),
+                                                                               "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
+                                                                                       TypeUrl: testFilterName,
+                                                                                       Value: &structpb.Struct{
+                                                                                               Fields: map[string]*structpb.Value{
+                                                                                                       filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar3"}},
+                                                                                               },
+                                                                                       },
+                                                                               }),
+                                                                       },
+                                                               },
+                                                       },
+                                                       TypedPerFilterConfig: map[string]*anypb.Any{
+                                                               "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
+                                                                       TypeUrl: testFilterName,
+                                                                       Value: &structpb.Struct{
+                                                                               Fields: map[string]*structpb.Value{
+                                                                                       filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo2"}},
+                                                                               },
+                                                                       },
+                                                               }),
+                                                               "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
+                                                                       TypeUrl: testFilterName,
+                                                                       Value: &structpb.Struct{
+                                                                               Fields: map[string]*structpb.Value{
+                                                                                       filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar2"}},
+                                                                               },
+                                                                       },
+                                                               }),
+                                                       },
+                                               }},
+                                       },
+                               },
+                               HttpFilters: []*v3httppb.HttpFilter{
+                                       newHTTPFilter(t, "foo", testFilterName, "foo1", ""),
+                                       newHTTPFilter(t, "bar", testFilterName, "bar1", ""),
+                                       e2e.RouterHTTPFilter,
+                               },
+                       }),
+               },
+       }
+       resources := e2e.UpdateOptions{
+               NodeID:    nodeID,
+               Listeners: []*v3listenerpb.Listener{listener},
+               Clusters: []*v3clusterpb.Cluster{
+                       e2e.DefaultCluster("A", "endpoint_A", e2e.SecurityLevelNone),
+                       e2e.DefaultCluster("B", "endpoint_B", e2e.SecurityLevelNone),
+               },
+               Endpoints: []*v3endpointpb.ClusterLoadAssignment{
+                       e2e.DefaultEndpoint("endpoint_A", "localhost", []uint32{testutils.ParsePort(t, backend1.Address)}),
+                       e2e.DefaultEndpoint("endpoint_B", "localhost", []uint32{testutils.ParsePort(t, backend2.Address)}),
+               },
+               SkipValidation: true,
+       }
+       if err := mgmtServer.Update(ctx, resources); err != nil {
+               t.Fatal(err)
+       }
+
+       // Create a gRPC client using the xDS resolver.
+       cc, err := grpc.NewClient("xds:///"+testServiceName, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolverBuilder))
+       if err != nil {
+               t.Fatalf("Failed to create a gRPC client: %v", err)
+       }
+       defer cc.Close()
+
+       // Helper to make an RPC twice and collect filter configs from metadata. We
+       // make the RPC two times to ensure that we hit both clusters (because of
+       // the deterministic WRR). The returned filter configs are in the order in
+       // which the RPCs were made.
+       collectFilterConfigs := func(rpc func() error) []overallFilterConfig {
+               t.Helper()
+               var gotFilterCfgs []overallFilterConfig
+               for i := 0; i < 2; i++ {
+                       if err := rpc(); err != nil {
+                               t.Fatalf("Unexpected RPC error: %v", err)
+                       }
+                       select {
+                       case cfg := <-metadataCh:
+                               if len(cfg) != 2 {
+                                       t.Fatalf("Unexpected number of filter config metadata, got: %d, want: 2", len(cfg))
+                               }
+                               for _, c := range cfg {
+                                       var ofc overallFilterConfig
+                                       if err := json.Unmarshal([]byte(c), &ofc); err != nil {
+                                               t.Fatalf("Failed to unmarshal filter config JSON %q: %v", c, err)
+                                       }
+                                       gotFilterCfgs = append(gotFilterCfgs, ofc)
+                               }
+                       case <-ctx.Done():
+                               t.Fatalf("Timeout waiting for metadata from backend")
+                       }
+               }
+               return gotFilterCfgs
+       }
+
+       // Test base filter config (UnaryCall). Because of the deterministic WRR, we
+       // know the expected order of clusters for the two RPCs.
+       wantFilterCfgs := []overallFilterConfig{
+               {BasePath: "foo1", OverridePath: "foo2"}, // Routed to cluster A
+               {BasePath: "bar1", OverridePath: "bar2"}, // Routed to cluster A
+               {BasePath: "foo1", OverridePath: "foo2"}, // Routed to cluster B
+               {BasePath: "bar1", OverridePath: "bar2"}, // Routed to cluster B
+       }
+       client := testgrpc.NewTestServiceClient(cc)
+       gotFilterCfgs := collectFilterConfigs(func() error {
+               _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{})
+               return err
+       })
+       if diff := cmp.Diff(wantFilterCfgs, gotFilterCfgs); diff != "" {
+               t.Fatalf("Unexpected filter configs (-want +got):\n%s", diff)
+       }
+
+       // Test per-route and per-cluster overrides (EmptyCall).
+       wantFilterCfgs = []overallFilterConfig{
+               {BasePath: "foo1", OverridePath: "foo3"}, // Routed to cluster A
+               {BasePath: "bar1", OverridePath: "bar3"}, // Routed to cluster A
+               {BasePath: "foo1", OverridePath: "foo4"}, // Routed to cluster B
+               {BasePath: "bar1", OverridePath: "bar4"}, // Routed to cluster B
+       }
+       gotFilterCfgs = collectFilterConfigs(func() error {
+               _, err := client.EmptyCall(ctx, &testpb.Empty{})
+               return err
+       })
+       if diff := cmp.Diff(wantFilterCfgs, gotFilterCfgs); diff != "" {
+               t.Fatalf("Unexpected filter configs (-want +got):\n%s", diff)
+       }
+}
+
+// Tests that if a filter returns an error from its NewStream method, the RPC
+// fails with that error. It also verifies that subsequent filters in the chain
+// are not run.
+func (s) TestXDSResolverHTTPFilters_NewStreamError(t *testing.T) {
+       // Register a custom httpFilter builder for the test and use a channel to
+       // get notified when the interceptor is invoked.
+       testFilterName := t.Name()
+       fb := &testHTTPFilterWithRPCMetadata{
+               logger:        t,
+               typeURL:       testFilterName,
+               newStreamChan: testutils.NewChannelWithSize(3), // We have three filters.
+       }
+       httpfilter.Register(fb)
+       defer httpfilter.UnregisterForTesting(fb.typeURL)
+
+       // Spin up an xDS management server
+       mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
+       defer mgmtServer.Stop()
+
+       // Create an xDS resolver with bootstrap configuration pointing to the above
+       // management server.
+       nodeID := uuid.New().String()
+       bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
+       if internal.NewXDSResolverWithConfigForTesting == nil {
+               t.Fatalf("internal.NewXDSResolverWithConfigForTesting is nil")
+       }
+       resolverBuilder, err := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error))(bootstrapContents)
+       if err != nil {
+               t.Fatalf("Failed to create xDS resolver for testing: %v", err)
+       }
+
+       // Start a test backend, but we expect the filter to fail the RPC before it
+       // ever gets to the backend. The test is designed to fail if the RPC
+       // *succeeds* (i.e., if the backend is reached). A large channel buffer is
+       // used to prevent blocking in the unexpected case where the filter fails to
+       // reject the RPC.
+       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+       defer cancel()
+       metadataCh := make(chan []string, 10)
+       backend := stubserver.StartTestService(t, newStubServer(metadataCh))
+       defer backend.Stop()
+
+       // Configure resources on the management server.
+       //
+       // The route configuration contains two routes, matching two different RPCs.
+       // The route for the UnaryCall RPC does not contain any cluster-level or
+       // route-level per-filter config overrides. A virtual host-level per-filter
+       // config override exists and it should apply for RPCs matching this route.
+       //
+       // The route for the EmptyCall RPC contains a route-level per-filter config
+       // override that should apply for RPCs routed to cluster "A" since it does
+       // not have any cluster-level overrides. For RPCs matching cluster "B"
+       // though, a cluster-level per-filter config override should take
+       // precedence.
+       const testServiceName = "service-name"
+       const routeConfigName = "route-config"
+       listener := &v3listenerpb.Listener{
+               Name: testServiceName,
+               ApiListener: &v3listenerpb.ApiListener{
+                       ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
+                               RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
+                                       RouteConfig: &v3routepb.RouteConfiguration{
+                                               Name: routeConfigName,
+                                               VirtualHosts: []*v3routepb.VirtualHost{{
+                                                       Domains: []string{testServiceName},
+                                                       Routes: []*v3routepb.Route{
+                                                               {
+                                                                       Match: &v3routepb.RouteMatch{
+                                                                               PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/grpc.testing.TestService/EmptyCall"},
+                                                                       },
+                                                                       Action: &v3routepb.Route_Route{
+                                                                               Route: &v3routepb.RouteAction{
+                                                                                       ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{
+                                                                                               WeightedClusters: &v3routepb.WeightedCluster{
+                                                                                                       Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
+                                                                                                               {Name: "A", Weight: wrapperspb.UInt32(1)},
+                                                                                                       },
+                                                                                               },
+                                                                                       },
+                                                                               },
+                                                                       },
+                                                               },
+                                                       },
+                                               }},
+                                       },
+                               },
+                               HttpFilters: []*v3httppb.HttpFilter{
+                                       newHTTPFilter(t, "foo-good", testFilterName, "foo-good", ""),
+                                       newHTTPFilter(t, "foo-failing", testFilterName, "foo-failing", "filter interceptor error"),
+                                       newHTTPFilter(t, "bar-good", testFilterName, "bar-good", ""),
+                                       e2e.RouterHTTPFilter,
+                               },
+                       }),
+               },
+       }
+       resources := e2e.UpdateOptions{
+               NodeID:         nodeID,
+               Listeners:      []*v3listenerpb.Listener{listener},
+               Clusters:       []*v3clusterpb.Cluster{e2e.DefaultCluster("A", "endpoint_A", e2e.SecurityLevelNone)},
+               Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint("endpoint_A", "localhost", []uint32{testutils.ParsePort(t, backend.Address)})},
+               SkipValidation: true,
+       }
+       if err := mgmtServer.Update(ctx, resources); err != nil {
+               t.Fatal(err)
+       }
+
+       // Create a gRPC client using the xDS resolver.
+       cc, err := grpc.NewClient("xds:///"+testServiceName, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolverBuilder))
+       if err != nil {
+               t.Fatalf("Failed to create a gRPC client: %v", err)
+       }
+       defer cc.Close()
+
+       client := testgrpc.NewTestServiceClient(cc)
+       _, err = client.EmptyCall(ctx, &testpb.Empty{})
+       if err == nil {
+               t.Fatalf("EmptyCall() RPC succeeded when expected to fail")
+       }
+       if got, want := status.Code(err), codes.Unavailable; got != want {
+               t.Fatalf("EmptyCall() RPC error code, got: %v, want: %v", got, want)
+       }
+       if got, want := err.Error(), "filter interceptor error"; !strings.Contains(got, want) {
+               t.Fatalf("Unexpected RPC error, got: %v, want: %v", err, "rpc error: code = Unavailable desc = filter interceptor error")
+       }
+
+       // Verify that the first good filter was invoked
+       cfg, err := fb.newStreamChan.Receive(ctx)
+       if err != nil {
+               t.Fatal("Timeout waiting for first filter to be invoked")
+       }
+       ofc := cfg.(overallFilterConfig)
+       wantCfg := overallFilterConfig{BasePath: "foo-good"}
+       if diff := cmp.Diff(wantCfg, ofc); diff != "" {
+               t.Fatalf("Unexpected first filter config (-want +got):\n%s", diff)
+       }
+
+       // Verify that the failing filter was invoked too.
+       cfg, err = fb.newStreamChan.Receive(ctx)
+       if err != nil {
+               t.Fatal("Timeout waiting for second filter to be invoked")
+       }
+       ofc = cfg.(overallFilterConfig)
+       wantCfg = overallFilterConfig{BasePath: "foo-failing", Error: "filter interceptor error"}
+       if diff := cmp.Diff(wantCfg, ofc); diff != "" {
+               t.Fatalf("Unexpected second filter config (-want +got):\n%s", diff)
+       }
+
+       // Verify that the last good filter was not invoked.
+       sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+       defer sCancel()
+       if _, err = fb.newStreamChan.Receive(sCtx); err == nil {
+               t.Fatal("Last filter was invoked when expected not to be")
+       }
+}
index 4662e7b7396a0f89ff1725681629bbf1e61fe42d..c28892a6f28c931738a41e5111db17a446c05796 100644 (file)
@@ -328,7 +328,7 @@ func (r *xdsResolver) sendNewServiceConfig(cs stoppableConfigSelector) bool {
 // r.activeClusters for previously-unseen clusters.
 //
 // Only executed in the context of a serializer callback.
-func (r *xdsResolver) newConfigSelector() *configSelector {
+func (r *xdsResolver) newConfigSelector() (*configSelector, error) {
        cs := &configSelector{
                channelID: r.channelID,
                xdsNodeID: r.xdsClient.BootstrapConfig().Node().GetId(),
@@ -338,8 +338,7 @@ func (r *xdsResolver) newConfigSelector() *configSelector {
                        })
                },
                virtualHost: virtualHost{
-                       httpFilterConfigOverride: r.currentVirtualHost.HTTPFilterConfigOverride,
-                       retryConfig:              r.currentVirtualHost.RetryConfig,
+                       retryConfig: r.currentVirtualHost.RetryConfig,
                },
                routes:           make([]route, len(r.currentVirtualHost.Routes)),
                clusters:         make(map[string]*clusterInfo),
@@ -350,18 +349,20 @@ func (r *xdsResolver) newConfigSelector() *configSelector {
                clusters := rinternal.NewWRR.(func() wrr.WRR)()
                if rt.ClusterSpecifierPlugin != "" {
                        clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin
-                       clusters.Add(&routeCluster{
-                               name: clusterName,
-                       }, 1)
+                       clusters.Add(&routeCluster{name: clusterName}, 1)
                        ci := r.addOrGetActiveClusterInfo(clusterName)
                        ci.cfg = xdsChildConfig{ChildPolicy: balancerConfig(r.currentRouteConfig.ClusterSpecifierPlugins[rt.ClusterSpecifierPlugin])}
                        cs.clusters[clusterName] = ci
                } else {
                        for _, wc := range rt.WeightedClusters {
                                clusterName := clusterPrefix + wc.Name
+                               interceptor, err := newInterceptor(r.currentListener.HTTPFilters, wc.HTTPFilterConfigOverride, rt.HTTPFilterConfigOverride, r.currentVirtualHost.HTTPFilterConfigOverride)
+                               if err != nil {
+                                       return nil, err
+                               }
                                clusters.Add(&routeCluster{
-                                       name:                     clusterName,
-                                       httpFilterConfigOverride: wc.HTTPFilterConfigOverride,
+                                       name:        clusterName,
+                                       interceptor: interceptor,
                                }, int64(wc.Weight))
                                ci := r.addOrGetActiveClusterInfo(clusterName)
                                ci.cfg = xdsChildConfig{ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: wc.Name})}
@@ -378,7 +379,6 @@ func (r *xdsResolver) newConfigSelector() *configSelector {
                        cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration
                }
 
-               cs.routes[i].httpFilterConfigOverride = rt.HTTPFilterConfigOverride
                cs.routes[i].retryConfig = rt.RetryConfig
                cs.routes[i].hashPolicies = rt.HashPolicies
        }
@@ -390,7 +390,7 @@ func (r *xdsResolver) newConfigSelector() *configSelector {
                atomic.AddInt32(&ci.refCount, 1)
        }
 
-       return cs
+       return cs, nil
 }
 
 // pruneActiveClusters deletes entries in r.activeClusters with zero
@@ -446,7 +446,12 @@ func (r *xdsResolver) onResolutionComplete() {
                return
        }
 
-       cs := r.newConfigSelector()
+       cs, err := r.newConfigSelector()
+       if err != nil {
+               // Send an erroring config selector in this case that fails RPCs.
+               r.onResourceError(fmt.Errorf("xds: failed to create config selector: %v", err))
+               return
+       }
        if !r.sendNewServiceConfig(cs) {
                // Channel didn't like the update we provided (unexpected); erase
                // this config selector and ignore this update, continuing with
index b28724d37383af77a143743045d878c70830a246..eaf2252fd65b8d82fa1a91c053b728cf80fe7ba5 100644 (file)
@@ -41,20 +41,15 @@ import (
        "google.golang.org/grpc/internal/testutils/xds/e2e"
        "google.golang.org/grpc/internal/xds/balancer/clustermanager"
        "google.golang.org/grpc/internal/xds/bootstrap"
-       "google.golang.org/grpc/internal/xds/httpfilter"
        rinternal "google.golang.org/grpc/internal/xds/resolver/internal"
        "google.golang.org/grpc/internal/xds/xdsclient"
        "google.golang.org/grpc/internal/xds/xdsclient/xdsresource/version"
        "google.golang.org/grpc/metadata"
        "google.golang.org/grpc/resolver"
        "google.golang.org/grpc/serviceconfig"
-       "google.golang.org/protobuf/proto"
-       "google.golang.org/protobuf/types/known/anypb"
        "google.golang.org/protobuf/types/known/durationpb"
-       "google.golang.org/protobuf/types/known/structpb"
        "google.golang.org/protobuf/types/known/wrapperspb"
 
-       v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3"
        v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
        v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
        v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
@@ -1158,106 +1153,6 @@ func (s) TestResolverWRR(t *testing.T) {
        }
 }
 
-const filterCfgPathFieldName = "path"
-const filterCfgErrorFieldName = "new_stream_error"
-
-type filterCfg struct {
-       httpfilter.FilterConfig
-       path         string
-       newStreamErr error
-}
-
-type filterBuilder struct {
-       paths   []string
-       typeURL string
-}
-
-func (fb *filterBuilder) TypeURLs() []string { return []string{fb.typeURL} }
-
-func filterConfigFromProto(cfg proto.Message) (httpfilter.FilterConfig, error) {
-       ts, ok := cfg.(*v3xdsxdstypepb.TypedStruct)
-       if !ok {
-               return nil, fmt.Errorf("unsupported filter config type: %T, want %T", cfg, &v3xdsxdstypepb.TypedStruct{})
-       }
-
-       if ts.GetValue() == nil {
-               return filterCfg{}, nil
-       }
-       ret := filterCfg{}
-       if v := ts.GetValue().GetFields()[filterCfgPathFieldName]; v != nil {
-               ret.path = v.GetStringValue()
-       }
-       if v := ts.GetValue().GetFields()[filterCfgErrorFieldName]; v != nil {
-               if v.GetStringValue() == "" {
-                       ret.newStreamErr = nil
-               } else {
-                       ret.newStreamErr = fmt.Errorf("%s", v.GetStringValue())
-               }
-       }
-       return ret, nil
-}
-
-func (*filterBuilder) ParseFilterConfig(cfg proto.Message) (httpfilter.FilterConfig, error) {
-       return filterConfigFromProto(cfg)
-}
-
-func (*filterBuilder) ParseFilterConfigOverride(override proto.Message) (httpfilter.FilterConfig, error) {
-       return filterConfigFromProto(override)
-}
-
-func (*filterBuilder) IsTerminal() bool { return false }
-
-var _ httpfilter.ClientInterceptorBuilder = &filterBuilder{}
-
-func (fb *filterBuilder) BuildClientInterceptor(config, override httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) {
-       if config == nil {
-               panic("unexpected missing config")
-       }
-
-       fi := &filterInterceptor{
-               parent: fb,
-               pathCh: make(chan string, 10),
-       }
-
-       fb.paths = append(fb.paths, "build:"+config.(filterCfg).path)
-       err := config.(filterCfg).newStreamErr
-       if override != nil {
-               fb.paths = append(fb.paths, "override:"+override.(filterCfg).path)
-               err = override.(filterCfg).newStreamErr
-       }
-
-       fi.cfgPath = config.(filterCfg).path
-       fi.err = err
-       return fi, nil
-}
-
-type filterInterceptor struct {
-       parent  *filterBuilder
-       pathCh  chan string
-       cfgPath string
-       err     error
-}
-
-func (fi *filterInterceptor) NewStream(ctx context.Context, _ iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) {
-       fi.parent.paths = append(fi.parent.paths, "newstream:"+fi.cfgPath)
-       if fi.err != nil {
-               return nil, fi.err
-       }
-       d := func() {
-               fi.parent.paths = append(fi.parent.paths, "done:"+fi.cfgPath)
-               done()
-       }
-       cs, err := newStream(ctx, d)
-       if err != nil {
-               return nil, err
-       }
-       return &clientStream{ClientStream: cs}, nil
-}
-
-type clientStream struct {
-       iresolver.ClientStream
-}
-
 func (s) TestConfigSelector_FailureCases(t *testing.T) {
        const methodName = "1"
 
@@ -1345,298 +1240,6 @@ func (s) TestConfigSelector_FailureCases(t *testing.T) {
        }
 }
 
-func newHTTPFilter(t *testing.T, name, typeURL, path, err string) *v3httppb.HttpFilter {
-       return &v3httppb.HttpFilter{
-               Name: name,
-               ConfigType: &v3httppb.HttpFilter_TypedConfig{
-                       TypedConfig: testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
-                               TypeUrl: typeURL,
-                               Value: &structpb.Struct{
-                                       Fields: map[string]*structpb.Value{
-                                               filterCfgPathFieldName:  {Kind: &structpb.Value_StringValue{StringValue: path}},
-                                               filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: err}},
-                                       },
-                               },
-                       }),
-               },
-       }
-}
-
-func (s) TestXDSResolverHTTPFilters(t *testing.T) {
-       const methodName1 = "1"
-       const methodName2 = "2"
-       testFilterName := t.Name()
-
-       testCases := []struct {
-               name          string
-               listener      *v3listenerpb.Listener
-               rpcRes        map[string][][]string
-               wantStreamErr string
-       }{
-               {
-                       name: "NewStream error - ensure earlier interceptor Done is still called",
-                       listener: &v3listenerpb.Listener{
-                               Name: defaultTestServiceName,
-                               ApiListener: &v3listenerpb.ApiListener{
-                                       ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
-                                               RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
-                                                       RouteConfig: &v3routepb.RouteConfiguration{
-                                                               Name: defaultTestRouteConfigName,
-                                                               VirtualHosts: []*v3routepb.VirtualHost{{
-                                                                       Domains: []string{defaultTestServiceName},
-                                                                       Routes: []*v3routepb.Route{{
-                                                                               Match: &v3routepb.RouteMatch{
-                                                                                       PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName1},
-                                                                               },
-                                                                               Action: &v3routepb.Route_Route{
-                                                                                       Route: &v3routepb.RouteAction{
-                                                                                               ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{
-                                                                                                       WeightedClusters: &v3routepb.WeightedCluster{
-                                                                                                               Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
-                                                                                                                       {Name: "A", Weight: wrapperspb.UInt32(1)},
-                                                                                                                       {Name: "B", Weight: wrapperspb.UInt32(1)},
-                                                                                                               },
-                                                                                                       },
-                                                                                               },
-                                                                                       },
-                                                                               },
-                                                                       }},
-                                                               }},
-                                                       }},
-                                               HttpFilters: []*v3httppb.HttpFilter{
-                                                       newHTTPFilter(t, "foo", testFilterName, "foo1", ""),
-                                                       newHTTPFilter(t, "bar", testFilterName, "bar1", "bar newstream err"),
-                                                       e2e.RouterHTTPFilter,
-                                               },
-                                       }),
-                               },
-                       },
-                       rpcRes: map[string][][]string{
-                               methodName1: {
-                                       {"build:foo1", "build:bar1", "newstream:foo1", "newstream:bar1", "done:foo1"}, // err in bar1 NewStream()
-                               },
-                       },
-                       wantStreamErr: "bar newstream err",
-               },
-               {
-                       name: "all overrides",
-                       listener: &v3listenerpb.Listener{
-                               Name: defaultTestServiceName,
-                               ApiListener: &v3listenerpb.ApiListener{
-                                       ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
-                                               RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
-                                                       RouteConfig: &v3routepb.RouteConfiguration{
-                                                               Name: defaultTestRouteConfigName,
-                                                               VirtualHosts: []*v3routepb.VirtualHost{{
-                                                                       Domains: []string{defaultTestServiceName},
-                                                                       Routes: []*v3routepb.Route{
-                                                                               {
-                                                                                       Match: &v3routepb.RouteMatch{
-                                                                                               PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName1},
-                                                                                       },
-                                                                                       Action: &v3routepb.Route_Route{
-                                                                                               Route: &v3routepb.RouteAction{
-                                                                                                       ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{
-                                                                                                               WeightedClusters: &v3routepb.WeightedCluster{
-                                                                                                                       Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
-                                                                                                                               {Name: "A", Weight: wrapperspb.UInt32(1)},
-                                                                                                                               {Name: "B", Weight: wrapperspb.UInt32(1)},
-                                                                                                                       },
-                                                                                                               },
-                                                                                                       },
-                                                                                               },
-                                                                                       },
-                                                                               },
-                                                                               {
-                                                                                       Match: &v3routepb.RouteMatch{
-                                                                                               PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName2},
-                                                                                       },
-                                                                                       Action: &v3routepb.Route_Route{
-                                                                                               Route: &v3routepb.RouteAction{
-                                                                                                       ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{
-                                                                                                               WeightedClusters: &v3routepb.WeightedCluster{
-                                                                                                                       Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
-                                                                                                                               {Name: "A", Weight: wrapperspb.UInt32(1)},
-                                                                                                                               {
-                                                                                                                                       Name:   "B",
-                                                                                                                                       Weight: wrapperspb.UInt32(1),
-                                                                                                                                       TypedPerFilterConfig: map[string]*anypb.Any{
-                                                                                                                                               "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
-                                                                                                                                                       TypeUrl: testFilterName,
-                                                                                                                                                       Value: &structpb.Struct{
-                                                                                                                                                               Fields: map[string]*structpb.Value{
-                                                                                                                                                                       filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo4"}},
-                                                                                                                                                               },
-                                                                                                                                                       },
-                                                                                                                                               }),
-                                                                                                                                               "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
-                                                                                                                                                       TypeUrl: testFilterName,
-                                                                                                                                                       Value: &structpb.Struct{
-                                                                                                                                                               Fields: map[string]*structpb.Value{
-                                                                                                                                                                       filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar4"}},
-                                                                                                                                                               },
-                                                                                                                                                       },
-                                                                                                                                               }),
-                                                                                                                                       },
-                                                                                                                               },
-                                                                                                                       },
-                                                                                                               },
-                                                                                                       },
-                                                                                               },
-                                                                                       },
-                                                                                       TypedPerFilterConfig: map[string]*anypb.Any{
-                                                                                               "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
-                                                                                                       TypeUrl: testFilterName,
-                                                                                                       Value: &structpb.Struct{
-                                                                                                               Fields: map[string]*structpb.Value{
-                                                                                                                       filterCfgPathFieldName:  {Kind: &structpb.Value_StringValue{StringValue: "foo3"}},
-                                                                                                                       filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: ""}},
-                                                                                                               },
-                                                                                                       },
-                                                                                               }),
-                                                                                               "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
-                                                                                                       TypeUrl: testFilterName,
-                                                                                                       Value: &structpb.Struct{
-                                                                                                               Fields: map[string]*structpb.Value{
-                                                                                                                       filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar3"}},
-                                                                                                               },
-                                                                                                       },
-                                                                                               }),
-                                                                                       },
-                                                                               },
-                                                                       },
-                                                                       TypedPerFilterConfig: map[string]*anypb.Any{
-                                                                               "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
-                                                                                       TypeUrl: testFilterName,
-                                                                                       Value: &structpb.Struct{
-                                                                                               Fields: map[string]*structpb.Value{
-                                                                                                       filterCfgPathFieldName:  {Kind: &structpb.Value_StringValue{StringValue: "foo2"}},
-                                                                                                       filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: ""}},
-                                                                                               },
-                                                                                       },
-                                                                               }),
-                                                                               "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
-                                                                                       TypeUrl: testFilterName,
-                                                                                       Value: &structpb.Struct{
-                                                                                               Fields: map[string]*structpb.Value{
-                                                                                                       filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar2"}},
-                                                                                               },
-                                                                                       },
-                                                                               }),
-                                                                       },
-                                                               }},
-                                                       }},
-                                               HttpFilters: []*v3httppb.HttpFilter{
-                                                       newHTTPFilter(t, "foo", testFilterName, "foo1", "this is overridden to nil"),
-                                                       newHTTPFilter(t, "bar", testFilterName, "bar1", ""),
-                                                       e2e.RouterHTTPFilter,
-                                               },
-                                       }),
-                               },
-                       },
-                       rpcRes: map[string][][]string{
-                               methodName1: {
-                                       {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
-                                       {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
-                               },
-                               methodName2: {
-                                       {"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
-                                       {"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
-                                       {"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
-                                       {"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
-                               },
-                       },
-               },
-       }
-
-       for _, tc := range testCases {
-               t.Run(tc.name, func(t *testing.T) {
-                       origNewWRR := rinternal.NewWRR
-                       rinternal.NewWRR = testutils.NewTestWRR
-                       defer func() { rinternal.NewWRR = origNewWRR }()
-
-                       // Register a custom httpFilter builder for the test.
-                       fb := &filterBuilder{typeURL: testFilterName}
-                       httpfilter.Register(fb)
-
-                       // Spin up an xDS management server.
-                       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
-                       defer cancel()
-                       nodeID := uuid.New().String()
-                       mgmtServer, _, _, bc := setupManagementServerForTest(t, nodeID)
-
-                       // Build an xDS resolver.
-                       stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, bc)
-
-                       cluster := []*v3clusterpb.Cluster{
-                               e2e.DefaultCluster("A", "endpoint_A", e2e.SecurityLevelNone),
-                               e2e.DefaultCluster("B", "endpoint_B", e2e.SecurityLevelNone),
-                       }
-                       endpoints := []*v3endpointpb.ClusterLoadAssignment{
-                               e2e.DefaultEndpoint("endpoint_A", defaultTestHostname, defaultTestPort),
-                               e2e.DefaultEndpoint("endpoint_B", defaultTestHostname, defaultTestPort),
-                       }
-                       // Update the management server with a listener resource that
-                       // contains an inline route configuration.
-                       configureAllResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{tc.listener}, nil, cluster, endpoints)
-
-                       // Ensure that the resolver pushes a state update to the channel.
-                       cs := verifyUpdateFromResolver(ctx, t, stateCh, "")
-
-                       for method, wants := range tc.rpcRes {
-                               // Order of wants is non-deterministic.
-                               remainingWant := make([][]string, len(wants))
-                               copy(remainingWant, wants)
-                               for n := range wants {
-                                       res, err := cs.SelectConfig(iresolver.RPCInfo{Method: method, Context: ctx})
-                                       if err != nil {
-                                               t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err)
-                                       }
-
-                                       var doneFunc func()
-                                       _, err = res.Interceptor.NewStream(ctx, iresolver.RPCInfo{}, func() {}, func(_ context.Context, done func()) (iresolver.ClientStream, error) {
-                                               doneFunc = done
-                                               return nil, nil
-                                       })
-                                       if tc.wantStreamErr != "" {
-                                               if err == nil || !strings.Contains(err.Error(), tc.wantStreamErr) {
-                                                       t.Errorf("NewStream(...) = _, %v; want _, Contains(%v)", err, tc.wantStreamErr)
-                                               }
-                                               if err == nil {
-                                                       res.OnCommitted()
-                                                       doneFunc()
-                                               }
-                                               continue
-                                       }
-                                       if err != nil {
-                                               t.Fatalf("unexpected error from Interceptor.NewStream: %v", err)
-
-                                       }
-                                       res.OnCommitted()
-                                       doneFunc()
-
-                                       gotPaths := fb.paths
-                                       fb.paths = []string{}
-
-                                       // Confirm the desired path is found in remainingWant, and remove it.
-                                       pass := false
-                                       for i := range remainingWant {
-                                               if cmp.Equal(gotPaths, remainingWant[i]) {
-                                                       remainingWant[i] = remainingWant[len(remainingWant)-1]
-                                                       remainingWant = remainingWant[:len(remainingWant)-1]
-                                                       pass = true
-                                                       break
-                                               }
-                                       }
-                                       if !pass {
-                                               t.Errorf("%q:%v - path:\n%v\nwant one of:\n%v", method, n, gotPaths, remainingWant)
-                                       }
-                               }
-                       }
-               })
-       }
-}
-
 func newDurationP(d time.Duration) *time.Duration {
        return &d
 }