]> git.feebdaed.xyz Git - 0xmirror/grpc-go.git/commitdiff
balancer/randomsubsetting: Implementation of the random_subsetting LB policy (#8650)
authorMarek Szews <szews@google.com>
Fri, 12 Dec 2025 20:50:14 +0000 (20:50 +0000)
committerGitHub <noreply@github.com>
Fri, 12 Dec 2025 20:50:14 +0000 (12:50 -0800)
Implements [gRFC
A68](https://github.com/grpc/proposal/blob/master/A68-random-subsetting.md).

Note that this PR only implements the LB policy and does not implement
the xDS integration specified here:
https://github.com/grpc/proposal/blob/master/A68-random-subsetting.md#xds-integration

RELEASE NOTES:
- balancer/randomsubsetting: Implementation of the `random_subsetting`
LB policy

---------

Signed-off-by: marek-szews <szews@google.com>
Co-authored-by: Easwar Swaminathan <easwars@google.com>
balancer/randomsubsetting/randomsubsetting.go [new file with mode: 0644]
balancer/randomsubsetting/randomsubsetting_test.go [new file with mode: 0644]

diff --git a/balancer/randomsubsetting/randomsubsetting.go b/balancer/randomsubsetting/randomsubsetting.go
new file mode 100644 (file)
index 0000000..d8e875b
--- /dev/null
@@ -0,0 +1,192 @@
+/*
+ *
+ * Copyright 2025 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package randomsubsetting implements the random_subsetting LB policy specified
+// here: https://github.com/grpc/proposal/blob/master/A68-random-subsetting.md
+//
+// To install the LB policy, import this package as:
+//
+//     import _ "google.golang.org/grpc/balancer/randomsubsetting"
+//
+// # Experimental
+//
+// Notice: This package is EXPERIMENTAL and may be changed or removed in a
+// later release.
+package randomsubsetting
+
+import (
+       "cmp"
+       "encoding/json"
+       "fmt"
+       "math/rand/v2"
+       "slices"
+
+       xxhash "github.com/cespare/xxhash/v2"
+       "google.golang.org/grpc/balancer"
+       "google.golang.org/grpc/grpclog"
+       "google.golang.org/grpc/internal/balancer/gracefulswitch"
+       internalgrpclog "google.golang.org/grpc/internal/grpclog"
+       iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
+       "google.golang.org/grpc/resolver"
+       "google.golang.org/grpc/serviceconfig"
+)
+
+// Name is the name of the random subsetting load balancer.
+const Name = "random_subsetting_experimental"
+
+var (
+       logger     = grpclog.Component(Name)
+       randUint64 = rand.Uint64
+)
+
+func prefixLogger(p *subsettingBalancer) *internalgrpclog.PrefixLogger {
+       return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[random-subsetting-lb %p] ", p))
+}
+
+func init() {
+       balancer.Register(bb{})
+}
+
+type bb struct{}
+
+func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
+       b := &subsettingBalancer{
+               Balancer:   gracefulswitch.NewBalancer(cc, bOpts),
+               hashSeed:   randUint64(),
+               hashDigest: xxhash.New(),
+       }
+       b.logger = prefixLogger(b)
+       b.logger.Infof("Created")
+       return b
+}
+
+type lbConfig struct {
+       serviceconfig.LoadBalancingConfig `json:"-"`
+
+       SubsetSize  uint32                         `json:"subsetSize,omitempty"`
+       ChildPolicy *iserviceconfig.BalancerConfig `json:"childPolicy,omitempty"`
+}
+
+func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
+       lbCfg := &lbConfig{}
+
+       // Ensure that the specified child policy is registered and validates its
+       // config, if present.
+       if err := json.Unmarshal(s, lbCfg); err != nil {
+               return nil, fmt.Errorf("randomsubsetting: json.Unmarshal failed for configuration: %s with error: %v", string(s), err)
+       }
+       if lbCfg.SubsetSize == 0 {
+               return nil, fmt.Errorf("randomsubsetting: SubsetSize must be greater than 0")
+       }
+       if lbCfg.ChildPolicy == nil {
+               return nil, fmt.Errorf("randomsubsetting: ChildPolicy must be specified")
+       }
+
+       return lbCfg, nil
+}
+
+func (bb) Name() string {
+       return Name
+}
+
+type subsettingBalancer struct {
+       *gracefulswitch.Balancer
+
+       logger     *internalgrpclog.PrefixLogger
+       cfg        *lbConfig
+       hashSeed   uint64
+       hashDigest *xxhash.Digest
+}
+
+func (b *subsettingBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
+       lbCfg, ok := s.BalancerConfig.(*lbConfig)
+       if !ok {
+               b.logger.Warningf("Received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig)
+               return balancer.ErrBadResolverState
+       }
+
+       // Build config for the gracefulswitch balancer. It is safe to ignore
+       // JSON marshaling errors here, since the config was already validated
+       // as part of ParseConfig().
+       cfg := []map[string]any{{lbCfg.ChildPolicy.Name: lbCfg.ChildPolicy.Config}}
+       cfgJSON, _ := json.Marshal(cfg)
+       parsedCfg, err := gracefulswitch.ParseConfig(cfgJSON)
+       if err != nil {
+               return fmt.Errorf("randomsubsetting: error switching to child of type %q: %v", lbCfg.ChildPolicy.Name, err)
+       }
+       b.cfg = lbCfg
+       endpoints := resolver.State{
+               Endpoints:     b.calculateSubset(s.ResolverState.Endpoints),
+               ServiceConfig: s.ResolverState.ServiceConfig,
+               Attributes:    s.ResolverState.Attributes,
+       }
+
+       return b.Balancer.UpdateClientConnState(balancer.ClientConnState{
+               ResolverState:  endpoints,
+               BalancerConfig: parsedCfg,
+       })
+}
+
+// calculateSubset implements the subsetting algorithm, as described in A68:
+// https://github.com/grpc/proposal/blob/master/A68-random-subsetting.md#subsetting-algorithm
+func (b *subsettingBalancer) calculateSubset(endpoints []resolver.Endpoint) []resolver.Endpoint {
+       // A helper struct to hold an endpoint and its hash.
+       type endpointWithHash struct {
+               hash uint64
+               ep   resolver.Endpoint
+       }
+
+       subsetSize := b.cfg.SubsetSize
+       if len(endpoints) <= int(subsetSize) {
+               return endpoints
+       }
+
+       hashedEndpoints := make([]endpointWithHash, len(endpoints))
+       for i, endpoint := range endpoints {
+               // For every endpoint in the list, compute a hash with previously
+               // generated seed - A68.
+               //
+               // The xxhash package's Sum64() function does not allow setting a seed.
+               // This means that we need to reset the digest with the seed for every
+               // endpoint. Without this, an endpoint will not retain the same hash
+               // across resolver updates.
+               //
+               // Note that we only hash the first address of the endpoint, as per A68.
+               b.hashDigest.ResetWithSeed(b.hashSeed)
+               b.hashDigest.WriteString(endpoint.Addresses[0].String())
+               hashedEndpoints[i] = endpointWithHash{
+                       hash: b.hashDigest.Sum64(),
+                       ep:   endpoint,
+               }
+       }
+
+       slices.SortFunc(hashedEndpoints, func(a, b endpointWithHash) int {
+               // Note: This uses the standard library cmp package, not the
+               // github.com/google/go-cmp/cmp package. The latter is intended for
+               // testing purposes only.
+               return cmp.Compare(a.hash, b.hash)
+       })
+
+       // Convert back to resolver.Endpoints
+       endpointSubset := make([]resolver.Endpoint, subsetSize)
+       for i, endpoint := range hashedEndpoints[:subsetSize] {
+               endpointSubset[i] = endpoint.ep
+       }
+
+       return endpointSubset
+}
diff --git a/balancer/randomsubsetting/randomsubsetting_test.go b/balancer/randomsubsetting/randomsubsetting_test.go
new file mode 100644 (file)
index 0000000..a23f1ab
--- /dev/null
@@ -0,0 +1,261 @@
+/*
+ *
+ * Copyright 2025 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package randomsubsetting
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "strings"
+       "testing"
+       "time"
+
+       xxhash "github.com/cespare/xxhash/v2"
+       "github.com/google/go-cmp/cmp"
+       "google.golang.org/grpc/balancer"
+       "google.golang.org/grpc/internal/balancer/stub"
+       "google.golang.org/grpc/internal/grpctest"
+       iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
+       "google.golang.org/grpc/internal/testutils"
+       "google.golang.org/grpc/resolver"
+       "google.golang.org/grpc/serviceconfig"
+
+       _ "google.golang.org/grpc/balancer/roundrobin" // For round_robin LB policy in tests
+)
+
+var defaultTestTimeout = 5 * time.Second
+
+type s struct {
+       grpctest.Tester
+}
+
+func Test(t *testing.T) {
+       grpctest.RunSubTests(t, s{})
+}
+
+func (s) TestParseConfig(t *testing.T) {
+       parser := bb{}
+       tests := []struct {
+               name    string
+               input   string
+               wantCfg serviceconfig.LoadBalancingConfig
+               wantErr string
+       }{
+               {
+                       name:    "invalid_json",
+                       input:   "{{invalidjson{{",
+                       wantErr: "json.Unmarshal failed for configuration",
+               },
+               {
+                       name:    "empty_config",
+                       input:   `{}`,
+                       wantErr: "SubsetSize must be greater than 0",
+               },
+               {
+                       name:    "subset_size_zero",
+                       input:   `{ "subsetSize": 0 }`,
+                       wantErr: "SubsetSize must be greater than 0",
+               },
+               {
+                       name:    "child_policy_missing",
+                       input:   `{ "subsetSize": 1 }`,
+                       wantErr: "ChildPolicy must be specified",
+               },
+               {
+                       name:    "child_policy_not_registered",
+                       input:   `{ "subsetSize": 1 , "childPolicy": [{"unregistered_lb": {}}] }`,
+                       wantErr: "no supported policies found",
+               },
+               {
+                       name:  "success",
+                       input: `{ "subsetSize": 3, "childPolicy": [{"round_robin": {}}]}`,
+                       wantCfg: &lbConfig{
+                               SubsetSize:  3,
+                               ChildPolicy: &iserviceconfig.BalancerConfig{Name: "round_robin"},
+                       },
+               },
+       }
+       for _, test := range tests {
+               t.Run(test.name, func(t *testing.T) {
+                       gotCfg, gotErr := parser.ParseConfig(json.RawMessage(test.input))
+                       // Substring match makes this very tightly coupled to the
+                       // internalserviceconfig.BalancerConfig error strings. However, it
+                       // is important to distinguish the different types of error messages
+                       // possible as the parser has a few defined buckets of ways it can
+                       // error out.
+                       if (gotErr != nil) != (test.wantErr != "") {
+                               t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr)
+                       }
+                       if gotErr != nil && !strings.Contains(gotErr.Error(), test.wantErr) {
+                               t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr)
+                       }
+                       if test.wantErr != "" {
+                               return
+                       }
+                       if diff := cmp.Diff(test.wantCfg, gotCfg); diff != "" {
+                               t.Fatalf("ParseConfig(%s) got unexpected output, diff (-want +got):\n%s", test.input, diff)
+                       }
+               })
+       }
+}
+
+func makeEndpoints(n int) []resolver.Endpoint {
+       endpoints := make([]resolver.Endpoint, n)
+       for i := 0; i < n; i++ {
+               endpoints[i] = resolver.Endpoint{
+                       Addresses: []resolver.Address{{Addr: fmt.Sprintf("endpoint-%d", i)}},
+               }
+       }
+       return endpoints
+}
+
+func (s) TestCalculateSubset_Simple(t *testing.T) {
+       tests := []struct {
+               name       string
+               endpoints  []resolver.Endpoint
+               subsetSize uint32
+               want       []resolver.Endpoint
+       }{
+               {
+                       name:       "NoEndpoints",
+                       endpoints:  []resolver.Endpoint{},
+                       subsetSize: 3,
+                       want:       []resolver.Endpoint{},
+               },
+               {
+                       name:       "SubsetSizeLargerThanNumberOfEndpoints",
+                       endpoints:  makeEndpoints(5),
+                       subsetSize: 10,
+                       want:       makeEndpoints(5),
+               },
+               {
+                       name:       "SubsetSizeEqualToNumberOfEndpoints",
+                       endpoints:  makeEndpoints(5),
+                       subsetSize: 5,
+                       want:       makeEndpoints(5),
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       b := &subsettingBalancer{
+                               cfg:        &lbConfig{SubsetSize: tt.subsetSize},
+                               hashSeed:   0,
+                               hashDigest: xxhash.New(),
+                       }
+                       got := b.calculateSubset(tt.endpoints)
+                       if diff := cmp.Diff(tt.want, got); diff != "" {
+                               t.Errorf("calculateSubset() returned diff (-want +got):\n%s", diff)
+                       }
+               })
+       }
+}
+
+func (s) TestCalculateSubset_EndpointsRetainHashValues(t *testing.T) {
+       endpoints := makeEndpoints(10)
+       const subsetSize = 5
+       // The subset is deterministic based on the hash, so we can hardcode
+       // the expected output.
+       want := []resolver.Endpoint{
+               {Addresses: []resolver.Address{{Addr: "endpoint-6"}}},
+               {Addresses: []resolver.Address{{Addr: "endpoint-0"}}},
+               {Addresses: []resolver.Address{{Addr: "endpoint-1"}}},
+               {Addresses: []resolver.Address{{Addr: "endpoint-7"}}},
+               {Addresses: []resolver.Address{{Addr: "endpoint-3"}}},
+       }
+
+       b := &subsettingBalancer{
+               cfg:        &lbConfig{SubsetSize: subsetSize},
+               hashSeed:   0,
+               hashDigest: xxhash.New(),
+       }
+       for range 10 {
+               got := b.calculateSubset(endpoints)
+               if diff := cmp.Diff(want, got); diff != "" {
+                       t.Fatalf("calculateSubset() returned diff (-want +got):\n%s", diff)
+               }
+       }
+}
+
+func (s) TestSubsettingBalancer_DeterministicSubset(t *testing.T) {
+       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+       defer cancel()
+
+       // Register the stub balancer builder, which will be used as the child
+       // policy in the random_subsetting balancer.
+       updateCh := make(chan balancer.ClientConnState, 1)
+       stub.Register("stub-child-balancer", stub.BalancerFuncs{
+               UpdateClientConnState: func(_ *stub.BalancerData, s balancer.ClientConnState) error {
+                       select {
+                       case <-ctx.Done():
+                       case updateCh <- s:
+                       }
+                       return nil
+               },
+       })
+
+       // Create a random_subsetting balancer.
+       tcc := testutils.NewBalancerClientConn(t)
+       rsb := balancer.Get(Name).Build(tcc, balancer.BuildOptions{})
+       defer rsb.Close()
+
+       // Prepare the configuration and resolver state to be passed to the
+       // random_subsetting balancer.
+       endpoints := makeEndpoints(10)
+       state := balancer.ClientConnState{
+               ResolverState: resolver.State{Endpoints: endpoints},
+               BalancerConfig: &lbConfig{
+                       SubsetSize:  5,
+                       ChildPolicy: &iserviceconfig.BalancerConfig{Name: "stub-child-balancer"},
+               },
+       }
+
+       // Send the resolver state to the random_subsetting balancer and verify that
+       // the child policy receives the expected number of endpoints.
+       if err := rsb.UpdateClientConnState(state); err != nil {
+               t.Fatalf("UpdateClientConnState failed: %v", err)
+       }
+
+       var wantEndpoints []resolver.Endpoint
+       select {
+       case s := <-updateCh:
+               if len(s.ResolverState.Endpoints) != 5 {
+                       t.Fatalf("Child policy received %d endpoints, want 5", len(s.ResolverState.Endpoints))
+               }
+               // Store the subset for the next comparison.
+               wantEndpoints = s.ResolverState.Endpoints
+       case <-ctx.Done():
+               t.Fatal("Timed out waiting for child policy to receive an update")
+       }
+
+       // Call UpdateClientConnState again with the same configuration.
+       if err := rsb.UpdateClientConnState(state); err != nil {
+               t.Fatalf("Second UpdateClientConnState failed: %v", err)
+       }
+
+       // Verify that the child policy receives the same subset of endpoints.
+       select {
+       case s := <-updateCh:
+               if diff := cmp.Diff(wantEndpoints, s.ResolverState.Endpoints); diff != "" {
+                       t.Fatalf("Child policy received a different subset of endpoints on second update, diff (-want +got):\n%s", diff)
+               }
+       case <-ctx.Done():
+               t.Fatal("Timed out waiting for second child policy update")
+       }
+}