"context"
"encoding/json"
"errors"
- "fmt"
"strings"
- "sync"
"testing"
"time"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
- "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/grpctest"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils"
xdsinternal "google.golang.org/grpc/internal/xds"
- "google.golang.org/grpc/internal/xds/bootstrap"
- "google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/internal/xds/testutils/fakeclient"
"google.golang.org/grpc/internal/xds/xdsclient"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
-
- v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
- "github.com/google/go-cmp/cmp"
- "github.com/google/go-cmp/cmp/cmpopts"
)
const (
testClusterName = "test-cluster"
testServiceName = "test-eds-service"
-
- testNamedMetricsKey1 = "test-named1"
- testNamedMetricsKey2 = "test-named2"
)
var (
testBackendEndpoints = []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1:1"}}}}
- toleranceCmpOpt = cmp.Options{cmpopts.EquateApprox(0, 1e-5), cmp.AllowUnexported(loadData{}, localityData{}, requestData{}, serverLoadData{})}
)
type s struct {
grpctest.RunSubTests(t, s{})
}
-// testLoadReporter records load data pertaining to a single cluster.
-//
-// It implements loadReporter interface for the picker. Tests can use it to
-// override the loadStore in the picker to verify load reporting.
-type testLoadReporter struct {
- cluster, service string
-
- mu sync.Mutex
- drops map[string]uint64
- localityRPCCount map[clients.Locality]*rpcCountData
-}
-
-// CallStarted records a call started for the clients.Locality.
-func (lr *testLoadReporter) CallStarted(locality clients.Locality) {
- lr.mu.Lock()
- defer lr.mu.Unlock()
- if _, ok := lr.localityRPCCount[locality]; !ok {
- lr.localityRPCCount[locality] = &rpcCountData{}
- }
- lr.localityRPCCount[locality].inProgress++
- lr.localityRPCCount[locality].issued++
-}
-
-// CallFinished records a call finished for the clients.Locality.
-func (lr *testLoadReporter) CallFinished(locality clients.Locality, err error) {
- lr.mu.Lock()
- defer lr.mu.Unlock()
- if lr.localityRPCCount == nil {
- return
- }
- lrc := lr.localityRPCCount[locality]
- lrc.inProgress--
- if err == nil {
- lrc.succeeded++
- } else {
- lrc.errored++
- }
-}
-
-// CallServerLoad records a server load for the clients.Locality.
-func (lr *testLoadReporter) CallServerLoad(locality clients.Locality, name string, val float64) {
- lr.mu.Lock()
- defer lr.mu.Unlock()
- if lr.localityRPCCount == nil {
- return
- }
- lrc, ok := lr.localityRPCCount[locality]
- if !ok {
- return
- }
- if lrc.serverLoads == nil {
- lrc.serverLoads = make(map[string]*rpcLoadData)
- }
- if _, ok := lrc.serverLoads[name]; !ok {
- lrc.serverLoads[name] = &rpcLoadData{}
- }
- rld := lrc.serverLoads[name]
- rld.add(val)
-}
-
-// CallDropped records a call dropped for the category.
-func (lr *testLoadReporter) CallDropped(category string) {
- lr.mu.Lock()
- defer lr.mu.Unlock()
- lr.drops[category]++
-}
-
-// stats returns and resets all loads reported for a cluster and service,
-// except inProgress rpc counts.
-//
-// It returns nil if the store doesn't contain any (new) data.
-func (lr *testLoadReporter) stats() *loadData {
- lr.mu.Lock()
- defer lr.mu.Unlock()
-
- sd := newLoadData(lr.cluster, lr.service)
- for category, val := range lr.drops {
- if val == 0 {
- continue
- }
- if category != "" {
- // Skip drops without category. They are counted in total_drops, but
- // not in per category. One example is drops by circuit breaking.
- sd.drops[category] = val
- }
- sd.totalDrops += val
- lr.drops[category] = 0 // clear drops for next report
- }
- for locality, countData := range lr.localityRPCCount {
- if countData.succeeded == 0 && countData.errored == 0 && countData.inProgress == 0 && countData.issued == 0 {
- continue
- }
-
- ld := localityData{
- requestStats: requestData{
- succeeded: countData.succeeded,
- errored: countData.errored,
- inProgress: countData.inProgress,
- issued: countData.issued,
- },
- loadStats: make(map[string]serverLoadData),
- }
- // clear localityRPCCount for next report
- countData.succeeded = 0
- countData.errored = 0
- countData.inProgress = 0
- countData.issued = 0
- for key, rld := range countData.serverLoads {
- s, c := rld.loadAndClear() // get and clear serverLoads for next report
- if c == 0 {
- continue
- }
- ld.loadStats[key] = serverLoadData{sum: s, count: c}
- }
- sd.localityStats[locality] = ld
- }
- if sd.totalDrops == 0 && len(sd.drops) == 0 && len(sd.localityStats) == 0 {
- return nil
- }
- return sd
-}
-
-// loadData contains all load data reported to the LoadStore since the most recent
-// call to stats().
-type loadData struct {
- // cluster is the name of the cluster this data is for.
- cluster string
- // service is the name of the EDS service this data is for.
- service string
- // totalDrops is the total number of dropped requests.
- totalDrops uint64
- // drops is the number of dropped requests per category.
- drops map[string]uint64
- // localityStats contains load reports per locality.
- localityStats map[clients.Locality]localityData
-}
-
-// localityData contains load data for a single locality.
-type localityData struct {
- // requestStats contains counts of requests made to the locality.
- requestStats requestData
- // loadStats contains server load data for requests made to the locality,
- // indexed by the load type.
- loadStats map[string]serverLoadData
-}
-
-// requestData contains request counts.
-type requestData struct {
- // succeeded is the number of succeeded requests.
- succeeded uint64
- // errored is the number of requests which ran into errors.
- errored uint64
- // inProgress is the number of requests in flight.
- inProgress uint64
- // issued is the total number requests that were sent.
- issued uint64
-}
-
-// serverLoadData contains server load data.
-type serverLoadData struct {
- // count is the number of load reports.
- count uint64
- // sum is the total value of all load reports.
- sum float64
-}
-
-func newLoadData(cluster, service string) *loadData {
- return &loadData{
- cluster: cluster,
- service: service,
- drops: make(map[string]uint64),
- localityStats: make(map[clients.Locality]localityData),
- }
-}
-
-type rpcCountData struct {
- succeeded uint64
- errored uint64
- inProgress uint64
- issued uint64
- serverLoads map[string]*rpcLoadData
-}
-
-type rpcLoadData struct {
- sum float64
- count uint64
-}
-
-func (rld *rpcLoadData) add(v float64) {
- rld.sum += v
- rld.count++
-}
-
-func (rld *rpcLoadData) loadAndClear() (s float64, c uint64) {
- s, rld.sum = rld.sum, 0
- c, rld.count = rld.count, 0
- return s, c
-}
-
func init() {
NewRandomWRR = testutils.NewTestWRR
}
}
}
-// TestReResolution verifies that when a SubConn turns transient failure,
-// re-resolution is triggered.
-func (s) TestReResolution(t *testing.T) {
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
-
- defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
- xdsC := fakeclient.NewClient()
-
- builder := balancer.Get(Name)
- cc := testutils.NewBalancerClientConn(t)
- b := builder.Build(cc, balancer.BuildOptions{})
- defer b.Close()
-
- if err := b.UpdateClientConnState(balancer.ClientConnState{
- ResolverState: xdsclient.SetClient(resolver.State{Endpoints: testBackendEndpoints}, xdsC),
- BalancerConfig: &LBConfig{
- Cluster: testClusterName,
- EDSServiceName: testServiceName,
- ChildPolicy: &internalserviceconfig.BalancerConfig{
- Name: roundrobin.Name,
- },
- },
- }); err != nil {
- t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
- }
-
- sc1 := <-cc.NewSubConnCh
- sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
- // This should get the connecting picker.
- if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
- t.Fatal(err.Error())
- }
-
- sc1.UpdateState(balancer.SubConnState{
- ConnectivityState: connectivity.TransientFailure,
- ConnectionError: errors.New("test error"),
- })
- // This should get the transient failure picker.
- if err := cc.WaitForErrPicker(ctx); err != nil {
- t.Fatal(err.Error())
- }
-
- // The transient failure should trigger a re-resolution.
- select {
- case <-cc.ResolveNowCh:
- case <-time.After(defaultTestTimeout):
- t.Fatalf("timeout waiting for ResolveNow()")
- }
-
- sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
- // Test pick with one backend.
- if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil {
- t.Fatal(err.Error())
- }
-
- sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
- // This should get the transient failure picker.
- if err := cc.WaitForErrPicker(ctx); err != nil {
- t.Fatal(err.Error())
- }
-
- // The transient failure should trigger a re-resolution.
- select {
- case <-cc.ResolveNowCh:
- case <-time.After(defaultTestTimeout):
- t.Fatalf("timeout waiting for ResolveNow()")
- }
-}
-
-func (s) TestLoadReporting(t *testing.T) {
- var testLocality = clients.Locality{
- Region: "test-region",
- Zone: "test-zone",
- SubZone: "test-sub-zone",
- }
-
- xdsC := fakeclient.NewClient()
-
- builder := balancer.Get(Name)
- cc := testutils.NewBalancerClientConn(t)
- b := builder.Build(cc, balancer.BuildOptions{})
- defer b.Close()
-
- endpoints := make([]resolver.Endpoint, len(testBackendEndpoints))
- for i, e := range testBackendEndpoints {
- endpoints[i] = xdsinternal.SetLocalityIDInEndpoint(e, testLocality)
- for j, a := range e.Addresses {
- endpoints[i].Addresses[j] = xdsinternal.SetLocalityID(a, testLocality)
- }
- }
- testLRSServerConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{
- URI: "trafficdirector.googleapis.com:443",
- ChannelCreds: []bootstrap.ChannelCreds{{Type: "google_default"}},
- })
- if err != nil {
- t.Fatalf("Failed to create LRS server config for testing: %v", err)
- }
- if err := b.UpdateClientConnState(balancer.ClientConnState{
- ResolverState: xdsclient.SetClient(resolver.State{Endpoints: endpoints}, xdsC),
- BalancerConfig: &LBConfig{
- Cluster: testClusterName,
- EDSServiceName: testServiceName,
- LoadReportingServer: testLRSServerConfig,
- // Locality: testLocality,
- ChildPolicy: &internalserviceconfig.BalancerConfig{
- Name: roundrobin.Name,
- },
- },
- }); err != nil {
- t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
- }
-
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
-
- got, err := xdsC.WaitForReportLoad(ctx)
- if err != nil {
- t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
- }
- if got.Server != testLRSServerConfig {
- t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
- }
-
- sc1 := <-cc.NewSubConnCh
- sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
- // This should get the connecting picker.
- if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
- t.Fatal(err.Error())
- }
-
- scs := balancer.SubConnState{ConnectivityState: connectivity.Ready}
- sca := internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address))
- sca(&scs, endpoints[0].Addresses[0])
- sc1.UpdateState(scs)
- // Test pick with one backend.
- testClusterLoadReporter := &testLoadReporter{cluster: testClusterName, service: testServiceName, drops: make(map[string]uint64), localityRPCCount: make(map[clients.Locality]*rpcCountData)}
- const successCount = 5
- const errorCount = 5
- if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
- // Override the loadStore in the picker with testClusterLoadReporter.
- picker := p.(*picker)
- originalLoadStore := picker.loadStore
- picker.loadStore = testClusterLoadReporter
- defer func() { picker.loadStore = originalLoadStore }()
- for i := 0; i < successCount; i++ {
- gotSCSt, err := p.Pick(balancer.PickInfo{})
- if gotSCSt.SubConn != sc1 {
- return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
- }
- lr := &v3orcapb.OrcaLoadReport{
- NamedMetrics: map[string]float64{testNamedMetricsKey1: 3.14, testNamedMetricsKey2: 2.718},
- }
- gotSCSt.Done(balancer.DoneInfo{ServerLoad: lr})
- }
- for i := 0; i < errorCount; i++ {
- gotSCSt, err := p.Pick(balancer.PickInfo{})
- if gotSCSt.SubConn != sc1 {
- return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
- }
- gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("error")})
- }
- return nil
- }); err != nil {
- t.Fatal(err.Error())
- }
-
- // Dump load data from the store and compare with expected counts.
- sd := testClusterLoadReporter.stats()
- if sd == nil {
- t.Fatalf("loads for cluster %v not found in store", testClusterName)
- }
- if sd.cluster != testClusterName || sd.service != testServiceName {
- t.Fatalf("got unexpected load for %q, %q, want %q, %q", sd.cluster, sd.service, testClusterName, testServiceName)
- }
- localityData, ok := sd.localityStats[testLocality]
- if !ok {
- t.Fatalf("loads for %v not found in store", testLocality)
- }
- reqStats := localityData.requestStats
- if reqStats.succeeded != successCount {
- t.Errorf("got succeeded %v, want %v", reqStats.succeeded, successCount)
- }
- if reqStats.errored != errorCount {
- t.Errorf("got errord %v, want %v", reqStats.errored, errorCount)
- }
- if reqStats.inProgress != 0 {
- t.Errorf("got inProgress %v, want %v", reqStats.inProgress, 0)
- }
- wantLoadStats := map[string]serverLoadData{
- testNamedMetricsKey1: {count: 5, sum: 15.7}, // aggregation of 5 * 3.14 = 15.7
- testNamedMetricsKey2: {count: 5, sum: 13.59}, // aggregation of 5 * 2.718 = 13.59
- }
- if diff := cmp.Diff(wantLoadStats, localityData.loadStats, toleranceCmpOpt); diff != "" {
- t.Errorf("localityData.LoadStats returned unexpected diff (-want +got):\n%s", diff)
- }
- b.Close()
- if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
- t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
- }
-}
-
-// TestUpdateLRSServer covers the cases
-// - the init config specifies "" as the LRS server
-// - config modifies LRS server to a different string
-// - config sets LRS server to nil to stop load reporting
-func (s) TestUpdateLRSServer(t *testing.T) {
- var testLocality = clients.Locality{
- Region: "test-region",
- Zone: "test-zone",
- SubZone: "test-sub-zone",
- }
-
- xdsC := fakeclient.NewClient()
-
- builder := balancer.Get(Name)
- cc := testutils.NewBalancerClientConn(t)
- b := builder.Build(cc, balancer.BuildOptions{})
- defer b.Close()
-
- endpoints := make([]resolver.Endpoint, len(testBackendEndpoints))
- for i, e := range testBackendEndpoints {
- endpoints[i] = xdsinternal.SetLocalityIDInEndpoint(e, testLocality)
- }
- testLRSServerConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{
- URI: "trafficdirector.googleapis.com:443",
- ChannelCreds: []bootstrap.ChannelCreds{{Type: "google_default"}},
- })
- if err != nil {
- t.Fatalf("Failed to create LRS server config for testing: %v", err)
- }
- if err := b.UpdateClientConnState(balancer.ClientConnState{
- ResolverState: xdsclient.SetClient(resolver.State{Endpoints: endpoints}, xdsC),
- BalancerConfig: &LBConfig{
- Cluster: testClusterName,
- EDSServiceName: testServiceName,
- LoadReportingServer: testLRSServerConfig,
- ChildPolicy: &internalserviceconfig.BalancerConfig{
- Name: roundrobin.Name,
- },
- },
- }); err != nil {
- t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
- }
-
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
-
- got, err := xdsC.WaitForReportLoad(ctx)
- if err != nil {
- t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
- }
- if got.Server != testLRSServerConfig {
- t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
- }
-
- testLRSServerConfig2, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{
- URI: "trafficdirector-another.googleapis.com:443",
- ChannelCreds: []bootstrap.ChannelCreds{{Type: "google_default"}},
- })
- if err != nil {
- t.Fatalf("Failed to create LRS server config for testing: %v", err)
- }
-
- // Update LRS server to a different name.
- if err := b.UpdateClientConnState(balancer.ClientConnState{
- ResolverState: xdsclient.SetClient(resolver.State{Endpoints: endpoints}, xdsC),
- BalancerConfig: &LBConfig{
- Cluster: testClusterName,
- EDSServiceName: testServiceName,
- LoadReportingServer: testLRSServerConfig2,
- ChildPolicy: &internalserviceconfig.BalancerConfig{
- Name: roundrobin.Name,
- },
- },
- }); err != nil {
- t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
- }
- if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
- t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
- }
- got2, err2 := xdsC.WaitForReportLoad(ctx)
- if err2 != nil {
- t.Fatalf("xdsClient.ReportLoad failed with error: %v", err2)
- }
- if got2.Server != testLRSServerConfig2 {
- t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got2.Server, testLRSServerConfig2)
- }
-
- // Update LRS server to nil, to disable LRS.
- if err := b.UpdateClientConnState(balancer.ClientConnState{
- ResolverState: xdsclient.SetClient(resolver.State{Endpoints: endpoints}, xdsC),
- BalancerConfig: &LBConfig{
- Cluster: testClusterName,
- EDSServiceName: testServiceName,
- ChildPolicy: &internalserviceconfig.BalancerConfig{
- Name: roundrobin.Name,
- },
- },
- }); err != nil {
- t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
- }
- if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
- t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
- }
-
- shortCtx, shortCancel := context.WithTimeout(context.Background(), defaultShortTestTimeout)
- defer shortCancel()
- if s, err := xdsC.WaitForReportLoad(shortCtx); err != context.DeadlineExceeded {
- t.Fatalf("unexpected load report to server: %q", s)
- }
-}
-
// Test verifies that child policies was updated on receipt of
// configuration update.
func (s) TestChildPolicyUpdatedOnConfigUpdate(t *testing.T) {
"google.golang.org/grpc/internal/testutils/xds/fakeserver"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
+ "google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/durationpb"
t.Fatalf("Server did not receive load due to error: %v", err)
}
}
+
+// TestReResolution verifies that when a SubConn turns transient failure,
+// re-resolution is triggered.
+func (s) TestReResolutionAfterTransientFailure(t *testing.T) {
+ // Create an xDS management server.
+ mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
+ defer mgmtServer.Stop()
+
+ // Create bootstrap configuration pointing to the above management server.
+ nodeID := uuid.New().String()
+ bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
+ testutils.CreateBootstrapFileForTesting(t, bc)
+
+ // Create an xDS resolver with the above bootstrap configuration.
+ if internal.NewXDSResolverWithConfigForTesting == nil {
+ t.Fatalf("internal.NewXDSResolverWithConfigForTesting is nil")
+ }
+ resolverBuilder, err := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error))(bc)
+ if err != nil {
+ t.Fatalf("Failed to create xDS resolver for testing: %v", err)
+ }
+
+ // Create a restartable listener to simulate server being down.
+ l, err := testutils.LocalTCPListener()
+ if err != nil {
+ t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
+ }
+ lis := testutils.NewRestartableListener(l)
+ server := stubserver.StartTestService(t, &stubserver.StubServer{
+ Listener: lis,
+ EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil },
+ })
+ defer server.Stop()
+ host, port := hostAndPortFromAddress(t, server.Address)
+
+ const (
+ listenerName = "test-listener"
+ routeName = "test-route"
+ clusterName = "test-aggregate-cluster"
+ dnsCluster = "logical-dns-cluster"
+ )
+
+ // Configure xDS resources.
+ ldnsCluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
+ Type: e2e.ClusterTypeLogicalDNS,
+ ClusterName: dnsCluster,
+ DNSHostName: host,
+ DNSPort: port,
+ })
+ cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
+ ClusterName: clusterName,
+ Type: e2e.ClusterTypeAggregate,
+ ChildNames: []string{dnsCluster},
+ })
+ updateOpts := e2e.UpdateOptions{
+ NodeID: nodeID,
+ Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerName, routeName)},
+ Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, listenerName, clusterName)},
+ Clusters: []*v3clusterpb.Cluster{cluster, ldnsCluster},
+ Endpoints: nil,
+ }
+
+ // Replace DNS resolver with a wrapped resolver to capture ResolveNow calls.
+ resolveNowCh := make(chan struct{}, 1)
+ dnsR := manual.NewBuilderWithScheme("dns")
+ dnsResolverBuilder := resolver.Get("dns")
+ resolver.Register(dnsR)
+ defer resolver.Register(dnsResolverBuilder)
+ dnsR.ResolveNowCallback = func(resolver.ResolveNowOptions) {
+ close(resolveNowCh)
+ }
+ dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: fmt.Sprintf("%s:%d", host, port)}}})
+
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+ if err := mgmtServer.Update(ctx, updateOpts); err != nil {
+ t.Fatalf("Failed to update xDS resources: %v", err)
+ }
+
+ conn, err := grpc.NewClient(fmt.Sprintf("xds:///%s", listenerName), grpc.WithResolvers(resolverBuilder), grpc.WithTransportCredentials(insecure.NewCredentials()))
+ if err != nil {
+ t.Fatalf("Failed to create client: %v", err)
+ }
+ defer conn.Close()
+
+ client := testgrpc.NewTestServiceClient(conn)
+
+ // Verify initial RPC routes correctly to backend.
+ if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
+ t.Fatalf("client.EmptyCall() failed: %v", err)
+ }
+
+ // Stopping the server listener will close the transport on the client,
+ // which will lead to the channel eventually moving to IDLE.
+ lis.Stop()
+ testutils.AwaitState(ctx, t, conn, connectivity.Idle)
+
+ // An RPC at this point is expected to fail with TRANSIENT_FAILURE.
+ if _, err = client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable {
+ t.Fatalf("EmptyCall RPC succeeded when the channel is in TRANSIENT_FAILURE, got %v want %v", err, codes.Unavailable)
+ }
+
+ // Expect resolver's ResolveNow to be called due to TF state.
+ select {
+ case <-resolveNowCh:
+ case <-ctx.Done():
+ t.Fatalf("Timed out waiting for ResolveNow call after TransientFailure")
+ }
+
+ // Restart the listener and expected to reconnect on its own and come out
+ // of TRANSIENT_FAILURE, even without an RPC attempt.
+ lis.Restart()
+ testutils.AwaitState(ctx, t, conn, connectivity.Ready)
+
+ // An RPC at this point is expected to succeed.
+ if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
+ t.Fatalf("client.EmptyCall() failed: %v", err)
+ }
+}
+
+// TestUpdateLRSServerToNil verifies that updating the cluster's LRS server
+// config from 'Self' to nil correctly closes the LRS stream and ensures no
+// more LRS reports are sent.
+func (s) TestUpdateLRSServerToNil(t *testing.T) {
+ // Create an xDS management server that serves ADS and LRS requests.
+ mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{SupportLoadReportingService: true})
+ defer mgmtServer.Stop()
+
+ // Create bootstrap configuration pointing to the above management server.
+ nodeID := uuid.New().String()
+ bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
+ testutils.CreateBootstrapFileForTesting(t, bc)
+
+ // Create an xDS resolver with the above bootstrap configuration.
+ if internal.NewXDSResolverWithConfigForTesting == nil {
+ t.Fatalf("internal.NewXDSResolverWithConfigForTesting is nil")
+ }
+ resolverBuilder, err := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error))(bc)
+ if err != nil {
+ t.Fatalf("Failed to create xDS resolver for testing: %v", err)
+ }
+
+ // Start a server backend exposing the test service.
+ server := stubserver.StartTestService(t, nil)
+ defer server.Stop()
+
+ // Configure the xDS management server with default resources.
+ const serviceName = "my-test-xds-service"
+ resources := e2e.DefaultClientResources(e2e.ResourceParams{
+ DialTarget: serviceName,
+ NodeID: nodeID,
+ Host: "localhost",
+ Port: testutils.ParsePort(t, server.Address),
+ SecLevel: e2e.SecurityLevelNone,
+ })
+ resources.Clusters[0].LrsServer = &v3corepb.ConfigSource{
+ ConfigSourceSpecifier: &v3corepb.ConfigSource_Self{
+ Self: &v3corepb.SelfConfigSource{},
+ },
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+ if err := mgmtServer.Update(ctx, resources); err != nil {
+ t.Fatal(err)
+ }
+
+ // Create a ClientConn and make a successful RPC.
+ cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolverBuilder))
+ if err != nil {
+ t.Fatalf("Failed to dial local test server: %v", err)
+ }
+ defer cc.Close()
+
+ client := testgrpc.NewTestServiceClient(cc)
+ if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
+ t.Fatalf("client.EmptyCall() failed: %v", err)
+ }
+
+ // Ensure that an LRS stream is created.
+ if _, err := mgmtServer.LRSServer.LRSStreamOpenChan.Receive(ctx); err != nil {
+ t.Fatalf("Error waiting for initial LRS stream open: %v", err)
+ }
+ if _, err := mgmtServer.LRSServer.LRSRequestChan.Receive(ctx); err != nil {
+ t.Fatalf("Error waiting for initial LRS report: %v", err)
+ }
+
+ // Update LRS Server to nil
+ resources.Clusters[0].LrsServer = nil
+ if err := mgmtServer.Update(ctx, resources); err != nil {
+ t.Fatal(err)
+ }
+
+ // Ensure that the old LRS stream is closed.
+ if _, err := mgmtServer.LRSServer.LRSStreamCloseChan.Receive(ctx); err != nil {
+ t.Fatalf("Error waiting for initial LRS stream close : %v", err)
+ }
+
+ // Also ensure that a new LRS stream is not created.
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if _, err := mgmtServer.LRSServer.LRSRequestChan.Receive(sCtx); err != context.DeadlineExceeded {
+ t.Fatalf("Expected no LRS reports after disable, got %v want %v", err, context.DeadlineExceeded)
+ }
+}