]> git.feebdaed.xyz Git - 0xmirror/grpc-go.git/commitdiff
stats/otel: Add grpc.lb.backend_service label to wrr metrics (A89) (#8737)
authorMadhav Bissa <48023579+mbissa@users.noreply.github.com>
Mon, 8 Dec 2025 06:49:14 +0000 (12:19 +0530)
committerGitHub <noreply@github.com>
Mon, 8 Dec 2025 06:49:14 +0000 (12:19 +0530)
Address:
https://github.com/grpc/proposal/blob/master/A89-backend-service-metric-label.md

This PR makes available the `backend_service` (cluster name) label which
is decided in clusterimpl (since we are pre-A75). It is added to WRR
per-call metrics.

RELEASE NOTES:
* stats/otel: add backend service label to wrr metrics as part of A89

balancer/weightedroundrobin/balancer.go
balancer/weightedroundrobin/scheduler.go
internal/xds/balancer/clusterimpl/clusterimpl.go
stats/opentelemetry/csm/observability_test.go
stats/opentelemetry/e2e_test.go

index 0de02e5e508827d18345a70fbb7faf06794a9f90..7a89f63554e3418b93c76455c6d6da97553f543c 100644 (file)
@@ -62,7 +62,7 @@ var (
                Description:    "EXPERIMENTAL. Number of scheduler updates in which there were not enough endpoints with valid weight, which caused the WRR policy to fall back to RR behavior.",
                Unit:           "{update}",
                Labels:         []string{"grpc.target"},
-               OptionalLabels: []string{"grpc.lb.locality"},
+               OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
                Default:        false,
        })
 
@@ -71,7 +71,7 @@ var (
                Description:    "EXPERIMENTAL. Number of endpoints from each scheduler update that don't yet have usable weight information (i.e., either the load report has not yet been received, or it is within the blackout period).",
                Unit:           "{endpoint}",
                Labels:         []string{"grpc.target"},
-               OptionalLabels: []string{"grpc.lb.locality"},
+               OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
                Default:        false,
        })
 
@@ -80,7 +80,7 @@ var (
                Description:    "EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is older than the expiration period.",
                Unit:           "{endpoint}",
                Labels:         []string{"grpc.target"},
-               OptionalLabels: []string{"grpc.lb.locality"},
+               OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
                Default:        false,
        })
        endpointWeightsMetric = estats.RegisterFloat64Histo(estats.MetricDescriptor{
@@ -88,7 +88,7 @@ var (
                Description:    "EXPERIMENTAL. Weight of each endpoint, recorded on every scheduler update. Endpoints without usable weights will be recorded as weight 0.",
                Unit:           "{endpoint}",
                Labels:         []string{"grpc.target"},
-               OptionalLabels: []string{"grpc.lb.locality"},
+               OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
                Default:        false,
        })
 )
@@ -173,6 +173,7 @@ func (b *wrrBalancer) updateEndpointsLocked(endpoints []resolver.Endpoint) {
                                metricsRecorder: b.metricsRecorder,
                                target:          b.target,
                                locality:        b.locality,
+                               clusterName:     b.clusterName,
                        }
                        for _, addr := range endpoint.Addresses {
                                b.addressWeights.Set(addr, ew)
@@ -211,6 +212,7 @@ type wrrBalancer struct {
        mu               sync.Mutex
        cfg              *lbConfig // active config
        locality         string
+       clusterName      string
        stopPicker       *grpcsync.Event
        addressWeights   *resolver.AddressMapV2[*endpointWeight]
        endpointToWeight *resolver.EndpointMap[*endpointWeight]
@@ -231,6 +233,7 @@ func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
        b.mu.Lock()
        b.cfg = cfg
        b.locality = weightedtarget.LocalityFromResolverState(ccs.ResolverState)
+       b.clusterName = backendServiceFromState(ccs.ResolverState)
        b.updateEndpointsLocked(ccs.ResolverState.Endpoints)
        b.mu.Unlock()
 
@@ -288,6 +291,7 @@ func (b *wrrBalancer) UpdateState(state balancer.State) {
                metricsRecorder: b.metricsRecorder,
                locality:        b.locality,
                target:          b.target,
+               clusterName:     b.clusterName,
        }
 
        b.stopPicker = grpcsync.NewEvent()
@@ -420,6 +424,7 @@ type picker struct {
        // The following fields are immutable.
        target          string
        locality        string
+       clusterName     string
        metricsRecorder estats.MetricsRecorder
 }
 
@@ -499,6 +504,7 @@ type endpointWeight struct {
        target          string
        metricsRecorder estats.MetricsRecorder
        locality        string
+       clusterName     string
 
        // The following fields are only accessed on calls into the LB policy, and
        // do not need a mutex.
@@ -602,14 +608,14 @@ func (w *endpointWeight) weight(now time.Time, weightExpirationPeriod, blackoutP
 
        if recordMetrics {
                defer func() {
-                       endpointWeightsMetric.Record(w.metricsRecorder, weight, w.target, w.locality)
+                       endpointWeightsMetric.Record(w.metricsRecorder, weight, w.target, w.locality, w.clusterName)
                }()
        }
 
        // The endpoint has not received a load report (i.e. just turned READY with
        // no load report).
        if w.lastUpdated.Equal(time.Time{}) {
-               endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
+               endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality, w.clusterName)
                return 0
        }
 
@@ -618,7 +624,7 @@ func (w *endpointWeight) weight(now time.Time, weightExpirationPeriod, blackoutP
        // start getting data again in the future, and return 0.
        if now.Sub(w.lastUpdated) >= weightExpirationPeriod {
                if recordMetrics {
-                       endpointWeightStaleMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
+                       endpointWeightStaleMetric.Record(w.metricsRecorder, 1, w.target, w.locality, w.clusterName)
                }
                w.nonEmptySince = time.Time{}
                return 0
@@ -627,10 +633,27 @@ func (w *endpointWeight) weight(now time.Time, weightExpirationPeriod, blackoutP
        // If we don't have at least blackoutPeriod worth of data, return 0.
        if blackoutPeriod != 0 && (w.nonEmptySince.Equal(time.Time{}) || now.Sub(w.nonEmptySince) < blackoutPeriod) {
                if recordMetrics {
-                       endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
+                       endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality, w.clusterName)
                }
                return 0
        }
 
        return w.weightVal
 }
+
+type backendServiceKey struct{}
+
+// SetBackendService stores the backendService on the resolver state so
+// that it can be used later as a label in wrr metrics.
+func SetBackendService(state resolver.State, backendService string) resolver.State {
+       state.Attributes = state.Attributes.WithValue(backendServiceKey{}, backendService)
+       return state
+}
+
+// getBackendServiceFromState retrieves the cluster name stored as an attribute
+// in the resolver state.
+func backendServiceFromState(state resolver.State) string {
+       v := state.Attributes.Value(backendServiceKey{})
+       name, _ := v.(string)
+       return name
+}
index 7d3d6815eb7ad35af661ad1aa6d00d8255f3e24f..8ed8d92a781c6ebf5ca7db464c011ab54e331374 100644 (file)
@@ -39,7 +39,7 @@ func (p *picker) newScheduler(recordMetrics bool) scheduler {
        }
        if n == 1 {
                if recordMetrics {
-                       rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
+                       rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality, p.clusterName)
                }
                return &rrScheduler{numSCs: 1, inc: p.inc}
        }
@@ -58,7 +58,7 @@ func (p *picker) newScheduler(recordMetrics bool) scheduler {
 
        if numZero >= n-1 {
                if recordMetrics {
-                       rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
+                       rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality, p.clusterName)
                }
                return &rrScheduler{numSCs: uint32(n), inc: p.inc}
        }
index cdc7bf2eb57909670b66c7a443506c8227ea9775..b5dc77371548f45d77d02b2796b09996b0f1c932 100644 (file)
@@ -33,6 +33,7 @@ import (
        "time"
 
        "google.golang.org/grpc/balancer"
+       "google.golang.org/grpc/balancer/weightedroundrobin"
        "google.golang.org/grpc/connectivity"
        "google.golang.org/grpc/internal"
        "google.golang.org/grpc/internal/balancer/gracefulswitch"
@@ -297,7 +298,7 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState)
 
        // Addresses and sub-balancer config are sent to sub-balancer.
        err = b.child.UpdateClientConnState(balancer.ClientConnState{
-               ResolverState:  s.ResolverState,
+               ResolverState:  weightedroundrobin.SetBackendService(s.ResolverState, b.clusterName),
                BalancerConfig: parsedCfg,
        })
 
index 4057a1fd85a5bfcbc38d46a44cd60e1b148e4de3..09d450cbc15509afbf56349d2a720fff87514c26 100644 (file)
@@ -434,7 +434,8 @@ func unaryInterceptorAttachXDSLabels(ctx context.Context, method string, req, re
                        "csm.service_name":           "service_name_val",
                        "csm.service_namespace_name": "service_namespace_val",
 
-                       "grpc.lb.locality": "grpc.lb.locality_val",
+                       "grpc.lb.locality":        "grpc.lb.locality_val",
+                       "grpc.lb.backend_service": "grpc.lb.backend_service_val",
                },
        })
 
@@ -469,7 +470,7 @@ func (s) TestXDSLabels(t *testing.T) {
                MetricsOptions: opentelemetry.MetricsOptions{
                        MeterProvider:  provider,
                        Metrics:        opentelemetry.DefaultMetrics(),
-                       OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name", "grpc.lb.locality"},
+                       OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name", "grpc.lb.locality", "grpc.lb.backend_service"},
                },
        }, po), grpc.WithUnaryInterceptor(unaryInterceptorAttachXDSLabels)}
        if err := ss.Start(nil, dopts...); err != nil {
@@ -498,6 +499,7 @@ func (s) TestXDSLabels(t *testing.T) {
        serviceNameAttr := attribute.String("csm.service_name", "service_name_val")
        serviceNamespaceAttr := attribute.String("csm.service_namespace_name", "service_namespace_val")
        localityAttr := attribute.String("grpc.lb.locality", "grpc.lb.locality_val")
+       backendServiceAttr := attribute.String("grpc.lb.backend_service", "grpc.lb.backend_service_val")
        meshIDAttr := attribute.String("csm.mesh_id", "unknown")
        workloadCanonicalServiceAttr := attribute.String("csm.workload_canonical_service", "unknown")
        remoteWorkloadTypeAttr := attribute.String("csm.remote_workload_type", "unknown")
@@ -510,6 +512,7 @@ func (s) TestXDSLabels(t *testing.T) {
                serviceNameAttr,
                serviceNamespaceAttr,
                localityAttr,
+               backendServiceAttr,
                meshIDAttr,
                workloadCanonicalServiceAttr,
                remoteWorkloadTypeAttr,
index 2d575bfe06c53de81752c5e3976468f48ba28842..d9c30b7aa1042a2abbb20a8c7c27925b3c0b966b 100644 (file)
@@ -673,7 +673,7 @@ func (s) TestWRRMetrics(t *testing.T) {
        mo := opentelemetry.MetricsOptions{
                MeterProvider:  provider,
                Metrics:        opentelemetry.DefaultMetrics().Add("grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"),
-               OptionalLabels: []string{"grpc.lb.locality"},
+               OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
        }
 
        target := fmt.Sprintf("xds:///%s", serviceName)
@@ -699,6 +699,7 @@ func (s) TestWRRMetrics(t *testing.T) {
 
        targetAttr := attribute.String("grpc.target", target)
        localityAttr := attribute.String("grpc.lb.locality", `{region="region-1", zone="zone-1", sub_zone="subzone-1"}`)
+       backendServiceAttr := attribute.String("grpc.lb.backend_service", clusterName)
 
        wantMetrics := []metricdata.Metrics{
                {
@@ -708,7 +709,7 @@ func (s) TestWRRMetrics(t *testing.T) {
                        Data: metricdata.Sum[int64]{
                                DataPoints: []metricdata.DataPoint[int64]{
                                        {
-                                               Attributes: attribute.NewSet(targetAttr, localityAttr),
+                                               Attributes: attribute.NewSet(targetAttr, localityAttr, backendServiceAttr),
                                                Value:      1, // value ignored
                                        },
                                },
@@ -724,7 +725,7 @@ func (s) TestWRRMetrics(t *testing.T) {
                        Data: metricdata.Sum[int64]{
                                DataPoints: []metricdata.DataPoint[int64]{
                                        {
-                                               Attributes: attribute.NewSet(targetAttr, localityAttr),
+                                               Attributes: attribute.NewSet(targetAttr, localityAttr, backendServiceAttr),
                                                Value:      1, // value ignored
                                        },
                                },
@@ -739,7 +740,7 @@ func (s) TestWRRMetrics(t *testing.T) {
                        Data: metricdata.Histogram[float64]{
                                DataPoints: []metricdata.HistogramDataPoint[float64]{
                                        {
-                                               Attributes: attribute.NewSet(targetAttr, localityAttr),
+                                               Attributes: attribute.NewSet(targetAttr, localityAttr, backendServiceAttr),
                                        },
                                },
                                Temporality: metricdata.CumulativeTemporality,
@@ -761,7 +762,7 @@ func (s) TestWRRMetrics(t *testing.T) {
                Data: metricdata.Sum[int64]{
                        DataPoints: []metricdata.DataPoint[int64]{
                                {
-                                       Attributes: attribute.NewSet(targetAttr, localityAttr),
+                                       Attributes: attribute.NewSet(targetAttr, localityAttr, backendServiceAttr),
                                        Value:      1, // value ignored
                                },
                        },