import (
"context"
+ "encoding/json"
"errors"
"fmt"
"strings"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
+ "google.golang.org/grpc/balancer"
+ pfbalancer "google.golang.org/grpc/balancer/pickfirst"
pfinternal "google.golang.org/grpc/balancer/pickfirst/internal"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
+ "google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
}
}
+// Tests the PF LB policy with shuffling enabled. It explicitly unsets the
+// Endpoints field in the resolver update to test the shuffling of the
+// Addresses.
+func (s) TestPickFirst_ShuffleAddressListNoEndpoints(t *testing.T) {
+ // Install a shuffler that always reverses two entries.
+ origShuf := pfinternal.RandShuffle
+ defer func() { pfinternal.RandShuffle = origShuf }()
+ pfinternal.RandShuffle = func(n int, f func(int, int)) {
+ if n != 2 {
+ t.Errorf("Shuffle called with n=%v; want 2", n)
+ return
+ }
+ f(0, 1) // reverse the two addresses
+ }
+
+ pfBuilder := balancer.Get(pfbalancer.Name)
+ shuffleConfig, err := pfBuilder.(balancer.ConfigParser).ParseConfig(json.RawMessage(`{ "shuffleAddressList": true }`))
+ if err != nil {
+ t.Fatal(err)
+ }
+ noShuffleConfig, err := pfBuilder.(balancer.ConfigParser).ParseConfig(json.RawMessage(`{ "shuffleAddressList": false }`))
+ if err != nil {
+ t.Fatal(err)
+ }
+ var activeCfg serviceconfig.LoadBalancingConfig
+
+ bf := stub.BalancerFuncs{
+ Init: func(bd *stub.BalancerData) {
+ bd.ChildBalancer = pfBuilder.Build(bd.ClientConn, bd.BuildOptions)
+ },
+ Close: func(bd *stub.BalancerData) {
+ bd.ChildBalancer.Close()
+ },
+ UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
+ ccs.BalancerConfig = activeCfg
+ ccs.ResolverState.Endpoints = nil
+ return bd.ChildBalancer.UpdateClientConnState(ccs)
+ },
+ }
+
+ stub.Register(t.Name(), bf)
+ svcCfg := fmt.Sprintf(`{ "loadBalancingConfig": [{%q: {}}] }`, t.Name())
+ // Set up our backends.
+ cc, r, backends := setupPickFirst(t, 2, grpc.WithDefaultServiceConfig(svcCfg))
+ addrs := stubBackendsToResolverAddrs(backends)
+
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+
+ // Push an update with both addresses and shuffling disabled. We should
+ // connect to backend 0.
+ activeCfg = noShuffleConfig
+ resolverState := resolver.State{Addresses: addrs}
+ r.UpdateState(resolverState)
+ if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
+ t.Fatal(err)
+ }
+
+ // Send a config with shuffling enabled. This will reverse the addresses,
+ // but the channel should still be connected to backend 0.
+ activeCfg = shuffleConfig
+ r.UpdateState(resolverState)
+ if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
+ t.Fatal(err)
+ }
+
+ // Send a resolver update with no addresses. This should push the channel
+ // into TransientFailure.
+ r.UpdateState(resolver.State{})
+ testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
+
+ // Send the same config as last time with shuffling enabled. Since we are
+ // not connected to backend 0, we should connect to backend 1.
+ r.UpdateState(resolverState)
+ if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
+ t.Fatal(err)
+ }
+}
+
// Test config parsing with the env var turned on and off for various scenarios.
func (s) TestPickFirst_ParseConfig_Success(t *testing.T) {
// Install a shuffler that always reverses two entries.