From 9a1d8d8bca8c7f88b7b10f51163c7241b087dfe4 Mon Sep 17 00:00:00 2001 From: Arjan Singh Bal <46515553+arjan-bal@users.noreply.github.com> Date: Fri, 3 Oct 2025 09:35:19 +0530 Subject: [PATCH] pickfirstleaf: fix bug in address de-duplication (#8611) Due to a bug in the new pickfirst balancer, it wasn't de-duplicating addresses in the resolver update. The only user visible impact of this seems to be less frequent picker updates after the first pass in happy eyeballs and incorrect interleaving of IPv4/IPv6 addresses during the first happy eyeballs pass. RELEASE NOTES: * balancer/pickfirst: Fix a bug where duplicate addresses were not being ignored as intended. --- .../pickfirst/pickfirstleaf/pickfirstleaf.go | 3 +- .../pickfirstleaf/pickfirstleaf_ext_test.go | 1 + .../pickfirstleaf/pickfirstleaf_test.go | 46 +++++++++++++++++++ 3 files changed, 49 insertions(+), 1 deletion(-) diff --git a/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go b/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go index 9ffdd28a..780cdc4f 100644 --- a/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go +++ b/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go @@ -381,13 +381,14 @@ func (b *pickfirstBalancer) closeSubConnsLocked() { // deDupAddresses ensures that each address appears only once in the slice. func deDupAddresses(addrs []resolver.Address) []resolver.Address { - seenAddrs := resolver.NewAddressMapV2[*scData]() + seenAddrs := resolver.NewAddressMapV2[bool]() retAddrs := []resolver.Address{} for _, addr := range addrs { if _, ok := seenAddrs.Get(addr); ok { continue } + seenAddrs.Set(addr, true) retAddrs = append(retAddrs, addr) } return retAddrs diff --git a/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go b/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go index 6b2fabe2..1c4c96d0 100644 --- a/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go +++ b/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go @@ -1152,6 +1152,7 @@ func (s) TestPickFirstLeaf_InterleavingIPv6Preffered(t *testing.T) { ResolverState: resolver.State{ Endpoints: []resolver.Endpoint{ {Addresses: []resolver.Address{{Addr: "[0001:0001:0001:0001:0001:0001:0001:0001]:8080"}}}, + {Addresses: []resolver.Address{{Addr: "[0001:0001:0001:0001:0001:0001:0001:0001]:8080"}}}, // duplicate, should be ignored. {Addresses: []resolver.Address{{Addr: "1.1.1.1:1111"}}}, {Addresses: []resolver.Address{{Addr: "2.2.2.2:2"}}}, {Addresses: []resolver.Address{{Addr: "3.3.3.3:3"}}}, diff --git a/balancer/pickfirst/pickfirstleaf/pickfirstleaf_test.go b/balancer/pickfirst/pickfirstleaf/pickfirstleaf_test.go index 71984a23..5a73dccd 100644 --- a/balancer/pickfirst/pickfirstleaf/pickfirstleaf_test.go +++ b/balancer/pickfirst/pickfirstleaf/pickfirstleaf_test.go @@ -201,7 +201,9 @@ func (s) TestPickFirstLeaf_TFPickerUpdate(t *testing.T) { ResolverState: resolver.State{ Endpoints: []resolver.Endpoint{ {Addresses: []resolver.Address{{Addr: "1.1.1.1:1"}}}, + {Addresses: []resolver.Address{{Addr: "1.1.1.1:1"}}}, // duplicate, should be ignored. {Addresses: []resolver.Address{{Addr: "2.2.2.2:2"}}}, + {Addresses: []resolver.Address{{Addr: "1.1.1.1:1"}}}, // duplicate, should be ignored. }, }, } @@ -213,14 +215,35 @@ func (s) TestPickFirstLeaf_TFPickerUpdate(t *testing.T) { // once. tfErr := fmt.Errorf("test err: connection refused") sc1 := <-cc.NewSubConnCh + select { + case <-sc1.ConnectCh: + case <-ctx.Done(): + t.Fatal("Context timed out waiting for Connect() to be called on sc1.") + } sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure, ConnectionError: tfErr}) + // Move the subconn back to IDLE, it should not be re-connected until the + // first pass is complete. + shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer shortCancel() + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle}) + select { + case <-sc1.ConnectCh: + t.Fatal("Connect() unexpectedly called on sc1.") + case <-shortCtx.Done(): + } + if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil { t.Fatalf("cc.WaitForPickerWithErr(%v) returned error: %v", balancer.ErrNoSubConnAvailable, err) } sc2 := <-cc.NewSubConnCh + select { + case <-sc2.ConnectCh: + case <-ctx.Done(): + t.Fatal("Context timed out waiting for Connect() to be called on sc2.") + } sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure, ConnectionError: tfErr}) @@ -230,6 +253,29 @@ func (s) TestPickFirstLeaf_TFPickerUpdate(t *testing.T) { // Subsequent TRANSIENT_FAILUREs should be reported only after seeing "# of SubConns" // TRANSIENT_FAILUREs. + + // Both the subconns should be connected in parallel. + select { + case <-sc1.ConnectCh: + case <-ctx.Done(): + t.Fatal("Context timed out waiting for Connect() to be called on sc1.") + } + + shortCtx, shortCancel = context.WithTimeout(ctx, defaultTestShortTimeout) + defer shortCancel() + select { + case <-sc2.ConnectCh: + t.Fatal("Connect() called on sc2 before it completed backing-off.") + case <-shortCtx.Done(): + } + + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle}) + select { + case <-sc2.ConnectCh: + case <-ctx.Done(): + t.Fatal("Context timed out waiting for Connect() to be called on sc2.") + } + newTfErr := fmt.Errorf("test err: unreachable") sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure, ConnectionError: newTfErr}) select { -- 2.43.0