Description: "EXPERIMENTAL. Number of scheduler updates in which there were not enough endpoints with valid weight, which caused the WRR policy to fall back to RR behavior.",
Unit: "{update}",
Labels: []string{"grpc.target"},
- OptionalLabels: []string{"grpc.lb.locality"},
+ OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
Default: false,
})
Description: "EXPERIMENTAL. Number of endpoints from each scheduler update that don't yet have usable weight information (i.e., either the load report has not yet been received, or it is within the blackout period).",
Unit: "{endpoint}",
Labels: []string{"grpc.target"},
- OptionalLabels: []string{"grpc.lb.locality"},
+ OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
Default: false,
})
Description: "EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is older than the expiration period.",
Unit: "{endpoint}",
Labels: []string{"grpc.target"},
- OptionalLabels: []string{"grpc.lb.locality"},
+ OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
Default: false,
})
endpointWeightsMetric = estats.RegisterFloat64Histo(estats.MetricDescriptor{
Description: "EXPERIMENTAL. Weight of each endpoint, recorded on every scheduler update. Endpoints without usable weights will be recorded as weight 0.",
Unit: "{endpoint}",
Labels: []string{"grpc.target"},
- OptionalLabels: []string{"grpc.lb.locality"},
+ OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
Default: false,
})
)
metricsRecorder: b.metricsRecorder,
target: b.target,
locality: b.locality,
+ clusterName: b.clusterName,
}
for _, addr := range endpoint.Addresses {
b.addressWeights.Set(addr, ew)
mu sync.Mutex
cfg *lbConfig // active config
locality string
+ clusterName string
stopPicker *grpcsync.Event
addressWeights *resolver.AddressMapV2[*endpointWeight]
endpointToWeight *resolver.EndpointMap[*endpointWeight]
b.mu.Lock()
b.cfg = cfg
b.locality = weightedtarget.LocalityFromResolverState(ccs.ResolverState)
+ b.clusterName = backendServiceFromState(ccs.ResolverState)
b.updateEndpointsLocked(ccs.ResolverState.Endpoints)
b.mu.Unlock()
metricsRecorder: b.metricsRecorder,
locality: b.locality,
target: b.target,
+ clusterName: b.clusterName,
}
b.stopPicker = grpcsync.NewEvent()
// The following fields are immutable.
target string
locality string
+ clusterName string
metricsRecorder estats.MetricsRecorder
}
target string
metricsRecorder estats.MetricsRecorder
locality string
+ clusterName string
// The following fields are only accessed on calls into the LB policy, and
// do not need a mutex.
if recordMetrics {
defer func() {
- endpointWeightsMetric.Record(w.metricsRecorder, weight, w.target, w.locality)
+ endpointWeightsMetric.Record(w.metricsRecorder, weight, w.target, w.locality, w.clusterName)
}()
}
// The endpoint has not received a load report (i.e. just turned READY with
// no load report).
if w.lastUpdated.Equal(time.Time{}) {
- endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
+ endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality, w.clusterName)
return 0
}
// start getting data again in the future, and return 0.
if now.Sub(w.lastUpdated) >= weightExpirationPeriod {
if recordMetrics {
- endpointWeightStaleMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
+ endpointWeightStaleMetric.Record(w.metricsRecorder, 1, w.target, w.locality, w.clusterName)
}
w.nonEmptySince = time.Time{}
return 0
// If we don't have at least blackoutPeriod worth of data, return 0.
if blackoutPeriod != 0 && (w.nonEmptySince.Equal(time.Time{}) || now.Sub(w.nonEmptySince) < blackoutPeriod) {
if recordMetrics {
- endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
+ endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality, w.clusterName)
}
return 0
}
return w.weightVal
}
+
+type backendServiceKey struct{}
+
+// SetBackendService stores the backendService on the resolver state so
+// that it can be used later as a label in wrr metrics.
+func SetBackendService(state resolver.State, backendService string) resolver.State {
+ state.Attributes = state.Attributes.WithValue(backendServiceKey{}, backendService)
+ return state
+}
+
+// getBackendServiceFromState retrieves the cluster name stored as an attribute
+// in the resolver state.
+func backendServiceFromState(state resolver.State) string {
+ v := state.Attributes.Value(backendServiceKey{})
+ name, _ := v.(string)
+ return name
+}
}
if n == 1 {
if recordMetrics {
- rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
+ rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality, p.clusterName)
}
return &rrScheduler{numSCs: 1, inc: p.inc}
}
if numZero >= n-1 {
if recordMetrics {
- rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
+ rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality, p.clusterName)
}
return &rrScheduler{numSCs: uint32(n), inc: p.inc}
}
"time"
"google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/balancer/weightedroundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
// Addresses and sub-balancer config are sent to sub-balancer.
err = b.child.UpdateClientConnState(balancer.ClientConnState{
- ResolverState: s.ResolverState,
+ ResolverState: weightedroundrobin.SetBackendService(s.ResolverState, b.clusterName),
BalancerConfig: parsedCfg,
})
"csm.service_name": "service_name_val",
"csm.service_namespace_name": "service_namespace_val",
- "grpc.lb.locality": "grpc.lb.locality_val",
+ "grpc.lb.locality": "grpc.lb.locality_val",
+ "grpc.lb.backend_service": "grpc.lb.backend_service_val",
},
})
MetricsOptions: opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics(),
- OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name", "grpc.lb.locality"},
+ OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name", "grpc.lb.locality", "grpc.lb.backend_service"},
},
}, po), grpc.WithUnaryInterceptor(unaryInterceptorAttachXDSLabels)}
if err := ss.Start(nil, dopts...); err != nil {
serviceNameAttr := attribute.String("csm.service_name", "service_name_val")
serviceNamespaceAttr := attribute.String("csm.service_namespace_name", "service_namespace_val")
localityAttr := attribute.String("grpc.lb.locality", "grpc.lb.locality_val")
+ backendServiceAttr := attribute.String("grpc.lb.backend_service", "grpc.lb.backend_service_val")
meshIDAttr := attribute.String("csm.mesh_id", "unknown")
workloadCanonicalServiceAttr := attribute.String("csm.workload_canonical_service", "unknown")
remoteWorkloadTypeAttr := attribute.String("csm.remote_workload_type", "unknown")
serviceNameAttr,
serviceNamespaceAttr,
localityAttr,
+ backendServiceAttr,
meshIDAttr,
workloadCanonicalServiceAttr,
remoteWorkloadTypeAttr,
mo := opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics().Add("grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"),
- OptionalLabels: []string{"grpc.lb.locality"},
+ OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
}
target := fmt.Sprintf("xds:///%s", serviceName)
targetAttr := attribute.String("grpc.target", target)
localityAttr := attribute.String("grpc.lb.locality", `{region="region-1", zone="zone-1", sub_zone="subzone-1"}`)
+ backendServiceAttr := attribute.String("grpc.lb.backend_service", clusterName)
wantMetrics := []metricdata.Metrics{
{
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
- Attributes: attribute.NewSet(targetAttr, localityAttr),
+ Attributes: attribute.NewSet(targetAttr, localityAttr, backendServiceAttr),
Value: 1, // value ignored
},
},
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
- Attributes: attribute.NewSet(targetAttr, localityAttr),
+ Attributes: attribute.NewSet(targetAttr, localityAttr, backendServiceAttr),
Value: 1, // value ignored
},
},
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
- Attributes: attribute.NewSet(targetAttr, localityAttr),
+ Attributes: attribute.NewSet(targetAttr, localityAttr, backendServiceAttr),
},
},
Temporality: metricdata.CumulativeTemporality,
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
- Attributes: attribute.NewSet(targetAttr, localityAttr),
+ Attributes: attribute.NewSet(targetAttr, localityAttr, backendServiceAttr),
Value: 1, // value ignored
},
},