]> git.feebdaed.xyz Git - 0xmirror/grpc-go.git/commitdiff
xds/clusterimpl: Convert existing unit tests to e2e style (2/N) (#8576)
authorPranjali-2501 <87357388+Pranjali-2501@users.noreply.github.com>
Fri, 3 Oct 2025 20:00:14 +0000 (01:30 +0530)
committerGitHub <noreply@github.com>
Fri, 3 Oct 2025 20:00:14 +0000 (01:30 +0530)
internal/xds/balancer/clusterimpl/balancer_test.go
internal/xds/balancer/clusterimpl/tests/balancer_test.go

index b8b22124b526b74cc3a8991a5ab57e266fab515a..facf23fead630b1b07f700bdeee89bc666886a70 100644 (file)
@@ -22,9 +22,7 @@ import (
        "context"
        "encoding/json"
        "errors"
-       "fmt"
        "strings"
-       "sync"
        "testing"
        "time"
 
@@ -32,22 +30,15 @@ import (
        "google.golang.org/grpc/balancer/base"
        "google.golang.org/grpc/balancer/roundrobin"
        "google.golang.org/grpc/connectivity"
-       "google.golang.org/grpc/internal"
        "google.golang.org/grpc/internal/balancer/stub"
        "google.golang.org/grpc/internal/grpctest"
        internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
        "google.golang.org/grpc/internal/testutils"
        xdsinternal "google.golang.org/grpc/internal/xds"
-       "google.golang.org/grpc/internal/xds/bootstrap"
-       "google.golang.org/grpc/internal/xds/clients"
        "google.golang.org/grpc/internal/xds/testutils/fakeclient"
        "google.golang.org/grpc/internal/xds/xdsclient"
        "google.golang.org/grpc/resolver"
        "google.golang.org/grpc/serviceconfig"
-
-       v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
-       "github.com/google/go-cmp/cmp"
-       "github.com/google/go-cmp/cmp/cmpopts"
 )
 
 const (
@@ -56,14 +47,10 @@ const (
 
        testClusterName = "test-cluster"
        testServiceName = "test-eds-service"
-
-       testNamedMetricsKey1 = "test-named1"
-       testNamedMetricsKey2 = "test-named2"
 )
 
 var (
        testBackendEndpoints = []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1:1"}}}}
-       toleranceCmpOpt      = cmp.Options{cmpopts.EquateApprox(0, 1e-5), cmp.AllowUnexported(loadData{}, localityData{}, requestData{}, serverLoadData{})}
 )
 
 type s struct {
@@ -74,205 +61,6 @@ func Test(t *testing.T) {
        grpctest.RunSubTests(t, s{})
 }
 
-// testLoadReporter records load data pertaining to a single cluster.
-//
-// It implements loadReporter interface for the picker. Tests can use it to
-// override the loadStore in the picker to verify load reporting.
-type testLoadReporter struct {
-       cluster, service string
-
-       mu               sync.Mutex
-       drops            map[string]uint64
-       localityRPCCount map[clients.Locality]*rpcCountData
-}
-
-// CallStarted records a call started for the clients.Locality.
-func (lr *testLoadReporter) CallStarted(locality clients.Locality) {
-       lr.mu.Lock()
-       defer lr.mu.Unlock()
-       if _, ok := lr.localityRPCCount[locality]; !ok {
-               lr.localityRPCCount[locality] = &rpcCountData{}
-       }
-       lr.localityRPCCount[locality].inProgress++
-       lr.localityRPCCount[locality].issued++
-}
-
-// CallFinished records a call finished for the clients.Locality.
-func (lr *testLoadReporter) CallFinished(locality clients.Locality, err error) {
-       lr.mu.Lock()
-       defer lr.mu.Unlock()
-       if lr.localityRPCCount == nil {
-               return
-       }
-       lrc := lr.localityRPCCount[locality]
-       lrc.inProgress--
-       if err == nil {
-               lrc.succeeded++
-       } else {
-               lrc.errored++
-       }
-}
-
-// CallServerLoad records a server load for the clients.Locality.
-func (lr *testLoadReporter) CallServerLoad(locality clients.Locality, name string, val float64) {
-       lr.mu.Lock()
-       defer lr.mu.Unlock()
-       if lr.localityRPCCount == nil {
-               return
-       }
-       lrc, ok := lr.localityRPCCount[locality]
-       if !ok {
-               return
-       }
-       if lrc.serverLoads == nil {
-               lrc.serverLoads = make(map[string]*rpcLoadData)
-       }
-       if _, ok := lrc.serverLoads[name]; !ok {
-               lrc.serverLoads[name] = &rpcLoadData{}
-       }
-       rld := lrc.serverLoads[name]
-       rld.add(val)
-}
-
-// CallDropped records a call dropped for the category.
-func (lr *testLoadReporter) CallDropped(category string) {
-       lr.mu.Lock()
-       defer lr.mu.Unlock()
-       lr.drops[category]++
-}
-
-// stats returns and resets all loads reported for a cluster and service,
-// except inProgress rpc counts.
-//
-// It returns nil if the store doesn't contain any (new) data.
-func (lr *testLoadReporter) stats() *loadData {
-       lr.mu.Lock()
-       defer lr.mu.Unlock()
-
-       sd := newLoadData(lr.cluster, lr.service)
-       for category, val := range lr.drops {
-               if val == 0 {
-                       continue
-               }
-               if category != "" {
-                       // Skip drops without category. They are counted in total_drops, but
-                       // not in per category. One example is drops by circuit breaking.
-                       sd.drops[category] = val
-               }
-               sd.totalDrops += val
-               lr.drops[category] = 0 // clear drops for next report
-       }
-       for locality, countData := range lr.localityRPCCount {
-               if countData.succeeded == 0 && countData.errored == 0 && countData.inProgress == 0 && countData.issued == 0 {
-                       continue
-               }
-
-               ld := localityData{
-                       requestStats: requestData{
-                               succeeded:  countData.succeeded,
-                               errored:    countData.errored,
-                               inProgress: countData.inProgress,
-                               issued:     countData.issued,
-                       },
-                       loadStats: make(map[string]serverLoadData),
-               }
-               // clear localityRPCCount for next report
-               countData.succeeded = 0
-               countData.errored = 0
-               countData.inProgress = 0
-               countData.issued = 0
-               for key, rld := range countData.serverLoads {
-                       s, c := rld.loadAndClear() // get and clear serverLoads for next report
-                       if c == 0 {
-                               continue
-                       }
-                       ld.loadStats[key] = serverLoadData{sum: s, count: c}
-               }
-               sd.localityStats[locality] = ld
-       }
-       if sd.totalDrops == 0 && len(sd.drops) == 0 && len(sd.localityStats) == 0 {
-               return nil
-       }
-       return sd
-}
-
-// loadData contains all load data reported to the LoadStore since the most recent
-// call to stats().
-type loadData struct {
-       // cluster is the name of the cluster this data is for.
-       cluster string
-       // service is the name of the EDS service this data is for.
-       service string
-       // totalDrops is the total number of dropped requests.
-       totalDrops uint64
-       // drops is the number of dropped requests per category.
-       drops map[string]uint64
-       // localityStats contains load reports per locality.
-       localityStats map[clients.Locality]localityData
-}
-
-// localityData contains load data for a single locality.
-type localityData struct {
-       // requestStats contains counts of requests made to the locality.
-       requestStats requestData
-       // loadStats contains server load data for requests made to the locality,
-       // indexed by the load type.
-       loadStats map[string]serverLoadData
-}
-
-// requestData contains request counts.
-type requestData struct {
-       // succeeded is the number of succeeded requests.
-       succeeded uint64
-       // errored is the number of requests which ran into errors.
-       errored uint64
-       // inProgress is the number of requests in flight.
-       inProgress uint64
-       // issued is the total number requests that were sent.
-       issued uint64
-}
-
-// serverLoadData contains server load data.
-type serverLoadData struct {
-       // count is the number of load reports.
-       count uint64
-       // sum is the total value of all load reports.
-       sum float64
-}
-
-func newLoadData(cluster, service string) *loadData {
-       return &loadData{
-               cluster:       cluster,
-               service:       service,
-               drops:         make(map[string]uint64),
-               localityStats: make(map[clients.Locality]localityData),
-       }
-}
-
-type rpcCountData struct {
-       succeeded   uint64
-       errored     uint64
-       inProgress  uint64
-       issued      uint64
-       serverLoads map[string]*rpcLoadData
-}
-
-type rpcLoadData struct {
-       sum   float64
-       count uint64
-}
-
-func (rld *rpcLoadData) add(v float64) {
-       rld.sum += v
-       rld.count++
-}
-
-func (rld *rpcLoadData) loadAndClear() (s float64, c uint64) {
-       s, rld.sum = rld.sum, 0
-       c, rld.count = rld.count, 0
-       return s, c
-}
-
 func init() {
        NewRandomWRR = testutils.NewTestWRR
 }
@@ -424,319 +212,6 @@ func (s) TestClusterNameInAddressAttributes(t *testing.T) {
        }
 }
 
-// TestReResolution verifies that when a SubConn turns transient failure,
-// re-resolution is triggered.
-func (s) TestReResolution(t *testing.T) {
-       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
-       defer cancel()
-
-       defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
-       xdsC := fakeclient.NewClient()
-
-       builder := balancer.Get(Name)
-       cc := testutils.NewBalancerClientConn(t)
-       b := builder.Build(cc, balancer.BuildOptions{})
-       defer b.Close()
-
-       if err := b.UpdateClientConnState(balancer.ClientConnState{
-               ResolverState: xdsclient.SetClient(resolver.State{Endpoints: testBackendEndpoints}, xdsC),
-               BalancerConfig: &LBConfig{
-                       Cluster:        testClusterName,
-                       EDSServiceName: testServiceName,
-                       ChildPolicy: &internalserviceconfig.BalancerConfig{
-                               Name: roundrobin.Name,
-                       },
-               },
-       }); err != nil {
-               t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
-       }
-
-       sc1 := <-cc.NewSubConnCh
-       sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
-       // This should get the connecting picker.
-       if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
-               t.Fatal(err.Error())
-       }
-
-       sc1.UpdateState(balancer.SubConnState{
-               ConnectivityState: connectivity.TransientFailure,
-               ConnectionError:   errors.New("test error"),
-       })
-       // This should get the transient failure picker.
-       if err := cc.WaitForErrPicker(ctx); err != nil {
-               t.Fatal(err.Error())
-       }
-
-       // The transient failure should trigger a re-resolution.
-       select {
-       case <-cc.ResolveNowCh:
-       case <-time.After(defaultTestTimeout):
-               t.Fatalf("timeout waiting for ResolveNow()")
-       }
-
-       sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
-       // Test pick with one backend.
-       if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil {
-               t.Fatal(err.Error())
-       }
-
-       sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
-       // This should get the transient failure picker.
-       if err := cc.WaitForErrPicker(ctx); err != nil {
-               t.Fatal(err.Error())
-       }
-
-       // The transient failure should trigger a re-resolution.
-       select {
-       case <-cc.ResolveNowCh:
-       case <-time.After(defaultTestTimeout):
-               t.Fatalf("timeout waiting for ResolveNow()")
-       }
-}
-
-func (s) TestLoadReporting(t *testing.T) {
-       var testLocality = clients.Locality{
-               Region:  "test-region",
-               Zone:    "test-zone",
-               SubZone: "test-sub-zone",
-       }
-
-       xdsC := fakeclient.NewClient()
-
-       builder := balancer.Get(Name)
-       cc := testutils.NewBalancerClientConn(t)
-       b := builder.Build(cc, balancer.BuildOptions{})
-       defer b.Close()
-
-       endpoints := make([]resolver.Endpoint, len(testBackendEndpoints))
-       for i, e := range testBackendEndpoints {
-               endpoints[i] = xdsinternal.SetLocalityIDInEndpoint(e, testLocality)
-               for j, a := range e.Addresses {
-                       endpoints[i].Addresses[j] = xdsinternal.SetLocalityID(a, testLocality)
-               }
-       }
-       testLRSServerConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{
-               URI:          "trafficdirector.googleapis.com:443",
-               ChannelCreds: []bootstrap.ChannelCreds{{Type: "google_default"}},
-       })
-       if err != nil {
-               t.Fatalf("Failed to create LRS server config for testing: %v", err)
-       }
-       if err := b.UpdateClientConnState(balancer.ClientConnState{
-               ResolverState: xdsclient.SetClient(resolver.State{Endpoints: endpoints}, xdsC),
-               BalancerConfig: &LBConfig{
-                       Cluster:             testClusterName,
-                       EDSServiceName:      testServiceName,
-                       LoadReportingServer: testLRSServerConfig,
-                       // Locality:                testLocality,
-                       ChildPolicy: &internalserviceconfig.BalancerConfig{
-                               Name: roundrobin.Name,
-                       },
-               },
-       }); err != nil {
-               t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
-       }
-
-       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
-       defer cancel()
-
-       got, err := xdsC.WaitForReportLoad(ctx)
-       if err != nil {
-               t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
-       }
-       if got.Server != testLRSServerConfig {
-               t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
-       }
-
-       sc1 := <-cc.NewSubConnCh
-       sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
-       // This should get the connecting picker.
-       if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
-               t.Fatal(err.Error())
-       }
-
-       scs := balancer.SubConnState{ConnectivityState: connectivity.Ready}
-       sca := internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address))
-       sca(&scs, endpoints[0].Addresses[0])
-       sc1.UpdateState(scs)
-       // Test pick with one backend.
-       testClusterLoadReporter := &testLoadReporter{cluster: testClusterName, service: testServiceName, drops: make(map[string]uint64), localityRPCCount: make(map[clients.Locality]*rpcCountData)}
-       const successCount = 5
-       const errorCount = 5
-       if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
-               // Override the loadStore in the picker with testClusterLoadReporter.
-               picker := p.(*picker)
-               originalLoadStore := picker.loadStore
-               picker.loadStore = testClusterLoadReporter
-               defer func() { picker.loadStore = originalLoadStore }()
-               for i := 0; i < successCount; i++ {
-                       gotSCSt, err := p.Pick(balancer.PickInfo{})
-                       if gotSCSt.SubConn != sc1 {
-                               return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
-                       }
-                       lr := &v3orcapb.OrcaLoadReport{
-                               NamedMetrics: map[string]float64{testNamedMetricsKey1: 3.14, testNamedMetricsKey2: 2.718},
-                       }
-                       gotSCSt.Done(balancer.DoneInfo{ServerLoad: lr})
-               }
-               for i := 0; i < errorCount; i++ {
-                       gotSCSt, err := p.Pick(balancer.PickInfo{})
-                       if gotSCSt.SubConn != sc1 {
-                               return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
-                       }
-                       gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("error")})
-               }
-               return nil
-       }); err != nil {
-               t.Fatal(err.Error())
-       }
-
-       // Dump load data from the store and compare with expected counts.
-       sd := testClusterLoadReporter.stats()
-       if sd == nil {
-               t.Fatalf("loads for cluster %v not found in store", testClusterName)
-       }
-       if sd.cluster != testClusterName || sd.service != testServiceName {
-               t.Fatalf("got unexpected load for %q, %q, want %q, %q", sd.cluster, sd.service, testClusterName, testServiceName)
-       }
-       localityData, ok := sd.localityStats[testLocality]
-       if !ok {
-               t.Fatalf("loads for %v not found in store", testLocality)
-       }
-       reqStats := localityData.requestStats
-       if reqStats.succeeded != successCount {
-               t.Errorf("got succeeded %v, want %v", reqStats.succeeded, successCount)
-       }
-       if reqStats.errored != errorCount {
-               t.Errorf("got errord %v, want %v", reqStats.errored, errorCount)
-       }
-       if reqStats.inProgress != 0 {
-               t.Errorf("got inProgress %v, want %v", reqStats.inProgress, 0)
-       }
-       wantLoadStats := map[string]serverLoadData{
-               testNamedMetricsKey1: {count: 5, sum: 15.7},  // aggregation of 5 * 3.14 = 15.7
-               testNamedMetricsKey2: {count: 5, sum: 13.59}, // aggregation of 5 * 2.718 = 13.59
-       }
-       if diff := cmp.Diff(wantLoadStats, localityData.loadStats, toleranceCmpOpt); diff != "" {
-               t.Errorf("localityData.LoadStats returned unexpected diff (-want +got):\n%s", diff)
-       }
-       b.Close()
-       if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
-               t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
-       }
-}
-
-// TestUpdateLRSServer covers the cases
-// - the init config specifies "" as the LRS server
-// - config modifies LRS server to a different string
-// - config sets LRS server to nil to stop load reporting
-func (s) TestUpdateLRSServer(t *testing.T) {
-       var testLocality = clients.Locality{
-               Region:  "test-region",
-               Zone:    "test-zone",
-               SubZone: "test-sub-zone",
-       }
-
-       xdsC := fakeclient.NewClient()
-
-       builder := balancer.Get(Name)
-       cc := testutils.NewBalancerClientConn(t)
-       b := builder.Build(cc, balancer.BuildOptions{})
-       defer b.Close()
-
-       endpoints := make([]resolver.Endpoint, len(testBackendEndpoints))
-       for i, e := range testBackendEndpoints {
-               endpoints[i] = xdsinternal.SetLocalityIDInEndpoint(e, testLocality)
-       }
-       testLRSServerConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{
-               URI:          "trafficdirector.googleapis.com:443",
-               ChannelCreds: []bootstrap.ChannelCreds{{Type: "google_default"}},
-       })
-       if err != nil {
-               t.Fatalf("Failed to create LRS server config for testing: %v", err)
-       }
-       if err := b.UpdateClientConnState(balancer.ClientConnState{
-               ResolverState: xdsclient.SetClient(resolver.State{Endpoints: endpoints}, xdsC),
-               BalancerConfig: &LBConfig{
-                       Cluster:             testClusterName,
-                       EDSServiceName:      testServiceName,
-                       LoadReportingServer: testLRSServerConfig,
-                       ChildPolicy: &internalserviceconfig.BalancerConfig{
-                               Name: roundrobin.Name,
-                       },
-               },
-       }); err != nil {
-               t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
-       }
-
-       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
-       defer cancel()
-
-       got, err := xdsC.WaitForReportLoad(ctx)
-       if err != nil {
-               t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
-       }
-       if got.Server != testLRSServerConfig {
-               t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
-       }
-
-       testLRSServerConfig2, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{
-               URI:          "trafficdirector-another.googleapis.com:443",
-               ChannelCreds: []bootstrap.ChannelCreds{{Type: "google_default"}},
-       })
-       if err != nil {
-               t.Fatalf("Failed to create LRS server config for testing: %v", err)
-       }
-
-       // Update LRS server to a different name.
-       if err := b.UpdateClientConnState(balancer.ClientConnState{
-               ResolverState: xdsclient.SetClient(resolver.State{Endpoints: endpoints}, xdsC),
-               BalancerConfig: &LBConfig{
-                       Cluster:             testClusterName,
-                       EDSServiceName:      testServiceName,
-                       LoadReportingServer: testLRSServerConfig2,
-                       ChildPolicy: &internalserviceconfig.BalancerConfig{
-                               Name: roundrobin.Name,
-                       },
-               },
-       }); err != nil {
-               t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
-       }
-       if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
-               t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
-       }
-       got2, err2 := xdsC.WaitForReportLoad(ctx)
-       if err2 != nil {
-               t.Fatalf("xdsClient.ReportLoad failed with error: %v", err2)
-       }
-       if got2.Server != testLRSServerConfig2 {
-               t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got2.Server, testLRSServerConfig2)
-       }
-
-       // Update LRS server to nil, to disable LRS.
-       if err := b.UpdateClientConnState(balancer.ClientConnState{
-               ResolverState: xdsclient.SetClient(resolver.State{Endpoints: endpoints}, xdsC),
-               BalancerConfig: &LBConfig{
-                       Cluster:        testClusterName,
-                       EDSServiceName: testServiceName,
-                       ChildPolicy: &internalserviceconfig.BalancerConfig{
-                               Name: roundrobin.Name,
-                       },
-               },
-       }); err != nil {
-               t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
-       }
-       if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
-               t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
-       }
-
-       shortCtx, shortCancel := context.WithTimeout(context.Background(), defaultShortTestTimeout)
-       defer shortCancel()
-       if s, err := xdsC.WaitForReportLoad(shortCtx); err != context.DeadlineExceeded {
-               t.Fatalf("unexpected load report to server: %q", s)
-       }
-}
-
 // Test verifies that child policies was updated on receipt of
 // configuration update.
 func (s) TestChildPolicyUpdatedOnConfigUpdate(t *testing.T) {
index 2aecce0f90ff58958d1ce96a05c4b9b877cf0385..30608dd64a75cfd862015c171a3eb5da3b9fc9b6 100644 (file)
@@ -42,6 +42,7 @@ import (
        "google.golang.org/grpc/internal/testutils/xds/fakeserver"
        "google.golang.org/grpc/peer"
        "google.golang.org/grpc/resolver"
+       "google.golang.org/grpc/resolver/manual"
        "google.golang.org/grpc/status"
        "google.golang.org/protobuf/testing/protocmp"
        "google.golang.org/protobuf/types/known/durationpb"
@@ -907,3 +908,208 @@ func (s) TestLRSLogicalDNS(t *testing.T) {
                t.Fatalf("Server did not receive load due to error: %v", err)
        }
 }
+
+// TestReResolution verifies that when a SubConn turns transient failure,
+// re-resolution is triggered.
+func (s) TestReResolutionAfterTransientFailure(t *testing.T) {
+       // Create an xDS management server.
+       mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
+       defer mgmtServer.Stop()
+
+       // Create bootstrap configuration pointing to the above management server.
+       nodeID := uuid.New().String()
+       bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
+       testutils.CreateBootstrapFileForTesting(t, bc)
+
+       // Create an xDS resolver with the above bootstrap configuration.
+       if internal.NewXDSResolverWithConfigForTesting == nil {
+               t.Fatalf("internal.NewXDSResolverWithConfigForTesting is nil")
+       }
+       resolverBuilder, err := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error))(bc)
+       if err != nil {
+               t.Fatalf("Failed to create xDS resolver for testing: %v", err)
+       }
+
+       // Create a restartable listener to simulate server being down.
+       l, err := testutils.LocalTCPListener()
+       if err != nil {
+               t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
+       }
+       lis := testutils.NewRestartableListener(l)
+       server := stubserver.StartTestService(t, &stubserver.StubServer{
+               Listener:   lis,
+               EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil },
+       })
+       defer server.Stop()
+       host, port := hostAndPortFromAddress(t, server.Address)
+
+       const (
+               listenerName = "test-listener"
+               routeName    = "test-route"
+               clusterName  = "test-aggregate-cluster"
+               dnsCluster   = "logical-dns-cluster"
+       )
+
+       // Configure xDS resources.
+       ldnsCluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
+               Type:        e2e.ClusterTypeLogicalDNS,
+               ClusterName: dnsCluster,
+               DNSHostName: host,
+               DNSPort:     port,
+       })
+       cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
+               ClusterName: clusterName,
+               Type:        e2e.ClusterTypeAggregate,
+               ChildNames:  []string{dnsCluster},
+       })
+       updateOpts := e2e.UpdateOptions{
+               NodeID:    nodeID,
+               Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerName, routeName)},
+               Routes:    []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeName, listenerName, clusterName)},
+               Clusters:  []*v3clusterpb.Cluster{cluster, ldnsCluster},
+               Endpoints: nil,
+       }
+
+       // Replace DNS resolver with a wrapped resolver to capture ResolveNow calls.
+       resolveNowCh := make(chan struct{}, 1)
+       dnsR := manual.NewBuilderWithScheme("dns")
+       dnsResolverBuilder := resolver.Get("dns")
+       resolver.Register(dnsR)
+       defer resolver.Register(dnsResolverBuilder)
+       dnsR.ResolveNowCallback = func(resolver.ResolveNowOptions) {
+               close(resolveNowCh)
+       }
+       dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: fmt.Sprintf("%s:%d", host, port)}}})
+
+       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+       defer cancel()
+       if err := mgmtServer.Update(ctx, updateOpts); err != nil {
+               t.Fatalf("Failed to update xDS resources: %v", err)
+       }
+
+       conn, err := grpc.NewClient(fmt.Sprintf("xds:///%s", listenerName), grpc.WithResolvers(resolverBuilder), grpc.WithTransportCredentials(insecure.NewCredentials()))
+       if err != nil {
+               t.Fatalf("Failed to create client: %v", err)
+       }
+       defer conn.Close()
+
+       client := testgrpc.NewTestServiceClient(conn)
+
+       // Verify initial RPC routes correctly to backend.
+       if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
+               t.Fatalf("client.EmptyCall() failed: %v", err)
+       }
+
+       // Stopping the server listener will close the transport on the client,
+       // which will lead to the channel eventually moving to IDLE.
+       lis.Stop()
+       testutils.AwaitState(ctx, t, conn, connectivity.Idle)
+
+       // An RPC at this point is expected to fail with TRANSIENT_FAILURE.
+       if _, err = client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable {
+               t.Fatalf("EmptyCall RPC succeeded when the channel is in TRANSIENT_FAILURE, got %v want %v", err, codes.Unavailable)
+       }
+
+       // Expect resolver's ResolveNow to be called due to TF state.
+       select {
+       case <-resolveNowCh:
+       case <-ctx.Done():
+               t.Fatalf("Timed out waiting for ResolveNow call after TransientFailure")
+       }
+
+       // Restart the listener and expected to reconnect on its own and come out
+       // of TRANSIENT_FAILURE, even without an RPC attempt.
+       lis.Restart()
+       testutils.AwaitState(ctx, t, conn, connectivity.Ready)
+
+       // An RPC at this point is expected to succeed.
+       if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
+               t.Fatalf("client.EmptyCall() failed: %v", err)
+       }
+}
+
+// TestUpdateLRSServerToNil verifies that updating the cluster's LRS server
+// config from 'Self' to nil correctly closes the LRS stream and ensures no
+// more LRS reports are sent.
+func (s) TestUpdateLRSServerToNil(t *testing.T) {
+       // Create an xDS management server that serves ADS and LRS requests.
+       mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{SupportLoadReportingService: true})
+       defer mgmtServer.Stop()
+
+       // Create bootstrap configuration pointing to the above management server.
+       nodeID := uuid.New().String()
+       bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
+       testutils.CreateBootstrapFileForTesting(t, bc)
+
+       // Create an xDS resolver with the above bootstrap configuration.
+       if internal.NewXDSResolverWithConfigForTesting == nil {
+               t.Fatalf("internal.NewXDSResolverWithConfigForTesting is nil")
+       }
+       resolverBuilder, err := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error))(bc)
+       if err != nil {
+               t.Fatalf("Failed to create xDS resolver for testing: %v", err)
+       }
+
+       // Start a server backend exposing the test service.
+       server := stubserver.StartTestService(t, nil)
+       defer server.Stop()
+
+       // Configure the xDS management server with default resources.
+       const serviceName = "my-test-xds-service"
+       resources := e2e.DefaultClientResources(e2e.ResourceParams{
+               DialTarget: serviceName,
+               NodeID:     nodeID,
+               Host:       "localhost",
+               Port:       testutils.ParsePort(t, server.Address),
+               SecLevel:   e2e.SecurityLevelNone,
+       })
+       resources.Clusters[0].LrsServer = &v3corepb.ConfigSource{
+               ConfigSourceSpecifier: &v3corepb.ConfigSource_Self{
+                       Self: &v3corepb.SelfConfigSource{},
+               },
+       }
+
+       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+       defer cancel()
+       if err := mgmtServer.Update(ctx, resources); err != nil {
+               t.Fatal(err)
+       }
+
+       // Create a ClientConn and make a successful RPC.
+       cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolverBuilder))
+       if err != nil {
+               t.Fatalf("Failed to dial local test server: %v", err)
+       }
+       defer cc.Close()
+
+       client := testgrpc.NewTestServiceClient(cc)
+       if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
+               t.Fatalf("client.EmptyCall() failed: %v", err)
+       }
+
+       // Ensure that an LRS stream is created.
+       if _, err := mgmtServer.LRSServer.LRSStreamOpenChan.Receive(ctx); err != nil {
+               t.Fatalf("Error waiting for initial LRS stream open: %v", err)
+       }
+       if _, err := mgmtServer.LRSServer.LRSRequestChan.Receive(ctx); err != nil {
+               t.Fatalf("Error waiting for initial LRS report: %v", err)
+       }
+
+       // Update LRS Server to nil
+       resources.Clusters[0].LrsServer = nil
+       if err := mgmtServer.Update(ctx, resources); err != nil {
+               t.Fatal(err)
+       }
+
+       // Ensure that the old LRS stream is closed.
+       if _, err := mgmtServer.LRSServer.LRSStreamCloseChan.Receive(ctx); err != nil {
+               t.Fatalf("Error waiting for initial LRS stream close : %v", err)
+       }
+
+       // Also ensure that a new LRS stream is not created.
+       sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+       defer sCancel()
+       if _, err := mgmtServer.LRSServer.LRSRequestChan.Receive(sCtx); err != context.DeadlineExceeded {
+               t.Fatalf("Expected no LRS reports after disable, got %v want %v", err, context.DeadlineExceeded)
+       }
+}