]> git.feebdaed.xyz Git - 0xmirror/grpc-go.git/commitdiff
stats/otel: a79 scaffolding to register an async gauge metric and api to record it...
authorMadhav Bissa <48023579+mbissa@users.noreply.github.com>
Thu, 18 Dec 2025 06:38:39 +0000 (12:08 +0530)
committerGitHub <noreply@github.com>
Thu, 18 Dec 2025 06:38:39 +0000 (12:08 +0530)
A79: This change introduces the API surface required to support
asynchronous metrics (e.g., OpenTelemetry Observable Gauges) in gRPC-Go.

This change updates the internal MetricsRecorder interface to support
registering asynchronous metric reporters. This is the second of three
PRs. It establishes the contracts and wiring without adding the
OpenTelemetry implementation logic. This functionality is required to
support OpenTelemetry Observable Gauges, which allow components like RLS
and xDS to report stateful metrics (e.g., current active requests) via
callbacks.

RELEASE NOTES:
* stats/otel: MetricsRecorder interface updated to include a new method
RegisterAsyncReporter that registers a AsyncMetricReporter.
* stats/otel: AsyncMetricReporter is added which is an interface for
types that record metrics asynchronously.

experimental/stats/metricregistry_test.go
experimental/stats/metrics.go
internal/stats/metrics_recorder_list.go
internal/stats/metrics_recorder_list_test.go
internal/testutils/stats/test_metrics_recorder.go
stats/opentelemetry/opentelemetry.go

index 8bf50abbe162587d3a304ca90a5463cdf46ab5fb..d377d9ada306613f32bc5dbf2480205d95a1aeff 100644 (file)
@@ -309,7 +309,15 @@ func (r *fakeMetricsRecorder) RecordInt64UpDownCount(handle *Int64UpDownCountHan
        r.intValues[handle.Descriptor()] += incr
 }
 
-func (r *fakeMetricsRecorder) RecordInt64AsyncGauge(handle *Int64AsyncGaugeHandle, incr int64, labels ...string) {
+func (r *fakeMetricsRecorder) RecordInt64AsyncGauge(handle *Int64AsyncGaugeHandle, val int64, labels ...string) {
        verifyLabels(r.t, handle.Descriptor().Labels, handle.Descriptor().OptionalLabels, labels)
-       r.intValues[handle.Descriptor()] += incr
+       // Async gauges in OTel are "Observer" instruments; they report
+       // the current state of the world every cycle, they do not accumulate deltas.
+       r.intValues[handle.Descriptor()] = val
+}
+
+// RegisterAsyncReporter is noop implementation, this might be changed at a
+// later stage.
+func (r *fakeMetricsRecorder) RegisterAsyncReporter(AsyncMetricReporter, ...AsyncMetric) func() {
+       return func() {}
 }
index d7d404cbe438077921a3c7853d2d51e6b56a2eb3..1d2dc0167a6199d130bb351e0751a1a097e915a0 100644 (file)
@@ -41,6 +41,34 @@ type MetricsRecorder interface {
        // RecordInt64UpDownCounter records the measurement alongside labels on the int
        // count associated with the provided handle.
        RecordInt64UpDownCount(handle *Int64UpDownCountHandle, incr int64, labels ...string)
+       // RegisterAsyncReporter registers a reporter to produce metric values for
+       // only the listed descriptors. The returned function must be called when
+       // the metrics are no longer needed, which will remove the reporter. The
+       // returned method needs to be idempotent and concurrent safe.
+       RegisterAsyncReporter(reporter AsyncMetricReporter, descriptors ...AsyncMetric) func()
+}
+
+// AsyncMetricReporter is an interface for types that record metrics asynchronously
+// for the set of descriptors they are registered with. The AsyncMetricsRecorder
+// parameter is used to record values for these metrics.
+//
+// Implementations must make unique recordings across all registered
+// AsyncMetricReporters. Meaning, they should not report values for a metric with
+// the same attributes as another AsyncMetricReporter will report.
+//
+// Implementations must be concurrent-safe.
+type AsyncMetricReporter interface {
+       // Report records metric values using the provided recorder.
+       Report(AsyncMetricsRecorder) error
+}
+
+// AsyncMetricReporterFunc is an adapter to allow the use of ordinary functions as
+// AsyncMetricReporters.
+type AsyncMetricReporterFunc func(AsyncMetricsRecorder) error
+
+// Report calls f(r).
+func (f AsyncMetricReporterFunc) Report(r AsyncMetricsRecorder) error {
+       return f(r)
 }
 
 // AsyncMetricsRecorder records on asynchronous metrics derived from metric registry.
index d5f7e4d62dd1b1e3528ac53e15f36c55651e6339..4a9fc0127f7533c7068a92c1782ed35a7bc588dc 100644 (file)
@@ -113,3 +113,53 @@ func (l *MetricsRecorderList) RecordInt64Gauge(handle *estats.Int64GaugeHandle,
                metricRecorder.RecordInt64Gauge(handle, incr, labels...)
        }
 }
+
+// RegisterAsyncReporter forwards the registration to all underlying metrics
+// recorders.
+//
+// It returns a cleanup function that, when called, invokes the cleanup function
+// returned by each underlying recorder, ensuring the reporter is unregistered
+// from all of them.
+func (l *MetricsRecorderList) RegisterAsyncReporter(reporter estats.AsyncMetricReporter, metrics ...estats.AsyncMetric) func() {
+       descriptorsMap := make(map[*estats.MetricDescriptor]bool, len(metrics))
+       for _, m := range metrics {
+               descriptorsMap[m.Descriptor()] = true
+       }
+       unregisterFns := make([]func(), 0, len(l.metricsRecorders))
+       for _, mr := range l.metricsRecorders {
+               // Wrap the AsyncMetricsRecorder to intercept calls to RecordInt64Gauge
+               // and validate the labels.
+               wrappedCallback := func(recorder estats.AsyncMetricsRecorder) error {
+                       wrappedRecorder := &asyncRecorderWrapper{
+                               delegate:    recorder,
+                               descriptors: descriptorsMap,
+                       }
+                       return reporter.Report(wrappedRecorder)
+               }
+               unregisterFns = append(unregisterFns, mr.RegisterAsyncReporter(estats.AsyncMetricReporterFunc(wrappedCallback), metrics...))
+       }
+       return func() {
+               for _, unregister := range unregisterFns {
+                       unregister()
+               }
+       }
+}
+
+type asyncRecorderWrapper struct {
+       delegate    estats.AsyncMetricsRecorder
+       descriptors map[*estats.MetricDescriptor]bool
+}
+
+// RecordIntAsync64Gauge records the measurement alongside labels on the int
+// gauge associated with the provided handle.
+func (w *asyncRecorderWrapper) RecordInt64AsyncGauge(handle *estats.Int64AsyncGaugeHandle, value int64, labels ...string) {
+       // Ensure only metrics for descriptors passed during callback registeration
+       // are emitted.
+       d := handle.Descriptor()
+       if _, ok := w.descriptors[d]; !ok {
+               return
+       }
+       // Validate labels and delegate.
+       verifyLabels(d, labels...)
+       w.delegate.RecordInt64AsyncGauge(handle, value, labels...)
+}
index 38f9472f926a8a097b7092b9f1d37bf819ee2c8f..f94f85713377f9918af15be23a92c2f787b1bb81 100644 (file)
@@ -41,6 +41,7 @@ import (
        "google.golang.org/grpc/resolver"
        "google.golang.org/grpc/resolver/manual"
        "google.golang.org/grpc/serviceconfig"
+       gstats "google.golang.org/grpc/stats"
 )
 
 var defaultTestTimeout = 5 * time.Second
@@ -114,7 +115,6 @@ func (recordingLoadBalancerBuilder) Build(cc balancer.ClientConn, bOpts balancer
        intHistoHandle.Record(cc.MetricsRecorder(), 3, "int histo label val", "int histo optional label val")
        floatHistoHandle.Record(cc.MetricsRecorder(), 4, "float histo label val", "float histo optional label val")
        intGaugeHandle.Record(cc.MetricsRecorder(), 5, "int gauge label val", "int gauge optional label val")
-
        return &recordingLoadBalancer{
                Balancer: balancer.Get(pickfirst.Name).Build(cc, bOpts),
        }
@@ -255,3 +255,82 @@ func (s) TestMetricRecorderListPanic(t *testing.T) {
 
        intCountHandle.Record(mrl, 1, "only one label")
 }
+
+// TestMetricsRecorderList_RegisterAsyncReporter verifies that the list implementation
+// correctly fans out registration calls to all underlying recorders and
+// aggregates the cleanup calls.
+func TestMetricsRecorderList_RegisterAsyncReporter(t *testing.T) {
+       spy1 := &spyMetricsRecorder{name: "spy1"}
+       spy2 := &spyMetricsRecorder{name: "spy2"}
+       spy3 := &spyMetricsRecorder{name: "spy3"}
+
+       list := istats.NewMetricsRecorderList([]gstats.Handler{spy1, spy2, spy3})
+
+       desc := &estats.MetricDescriptor{Name: "test_metric", Description: "test"}
+       mockMetric := &mockAsyncMetric{d: desc}
+
+       dummyReporter := estats.AsyncMetricReporterFunc(func(estats.AsyncMetricsRecorder) error {
+               return nil
+       })
+       cleanup := list.RegisterAsyncReporter(dummyReporter, mockMetric)
+
+       // Check that RegisterAsyncReporter was called exactly once on ALL spies
+       if spy1.registerCalledCount != 1 {
+               t.Errorf("spy1 register called %d times, want 1", spy1.registerCalledCount)
+       }
+       if spy2.registerCalledCount != 1 {
+               t.Errorf("spy2 register called %d times, want 1", spy2.registerCalledCount)
+       }
+       if spy3.registerCalledCount != 1 {
+               t.Errorf("spy3 register called %d times, want 1", spy3.registerCalledCount)
+       }
+
+       // Verify that cleanup has NOT been called yet
+       if spy1.cleanupCalledCount != 0 {
+               t.Error("spy1 cleanup called prematurely")
+       }
+
+       cleanup()
+
+       // Check that the cleanup function returned by the list actually triggers
+       // the cleanup logic on ALL underlying spies.
+       if spy1.cleanupCalledCount != 1 {
+               t.Errorf("spy1 cleanup called %d times, want 1", spy1.cleanupCalledCount)
+       }
+       if spy2.cleanupCalledCount != 1 {
+               t.Errorf("spy2 cleanup called %d times, want 1", spy2.cleanupCalledCount)
+       }
+       if spy3.cleanupCalledCount != 1 {
+               t.Errorf("spy3 cleanup called %d times, want 1", spy3.cleanupCalledCount)
+       }
+}
+
+// --- Helpers & Spies ---
+
+// mockAsyncMetric implements estats.AsyncMetric
+type mockAsyncMetric struct {
+       estats.AsyncMetric
+       d *estats.MetricDescriptor
+}
+
+func (m *mockAsyncMetric) Descriptor() *estats.MetricDescriptor {
+       return m.d
+}
+
+// spyMetricsRecorder implements estats.MetricsRecorder
+type spyMetricsRecorder struct {
+       stats.TestMetricsRecorder
+       name                string
+       registerCalledCount int
+       cleanupCalledCount  int
+}
+
+// RegisterAsyncReporter implements the interface and tracks calls.
+func (s *spyMetricsRecorder) RegisterAsyncReporter(estats.AsyncMetricReporter, ...estats.AsyncMetric) func() {
+       s.registerCalledCount++
+
+       // Return a cleanup function that tracks if it was called
+       return func() {
+               s.cleanupCalledCount++
+       }
+}
index be1a06117a2f77153679806b76e2de59b63b163b..40fc9b7b274a3aaa26275186331b4e5ed491b012 100644 (file)
@@ -276,6 +276,12 @@ func (r *TestMetricsRecorder) RecordInt64Gauge(handle *estats.Int64GaugeHandle,
        r.data[handle.Name] = float64(incr)
 }
 
+// RegisterAsyncReporter is noop implementation, async gauge test recorders should
+// provide their own implementation
+func (r *TestMetricsRecorder) RegisterAsyncReporter(estats.AsyncMetricReporter, ...estats.AsyncMetric) func() {
+       return func() {}
+}
+
 // To implement a stats.Handler, which allows it to be set as a dial option:
 
 // TagRPC is TestMetricsRecorder's implementation of TagRPC.
@@ -316,3 +322,8 @@ func (r *NoopMetricsRecorder) RecordInt64Gauge(*estats.Int64GaugeHandle, int64,
 // RecordInt64UpDownCount is a noop implementation of RecordInt64UpDownCount.
 func (r *NoopMetricsRecorder) RecordInt64UpDownCount(*estats.Int64UpDownCountHandle, int64, ...string) {
 }
+
+// RegisterAsyncReporter is a noop implementation of RegisterAsyncReporter.
+func (r *NoopMetricsRecorder) RegisterAsyncReporter(estats.AsyncMetricReporter, ...estats.AsyncMetric) func() {
+       return func() {}
+}
index 2a9cb5e57d7734985246aaf9e6b7bebdfabfd095..db7207527f52a2004ad4036d2a6c57e349702d4a 100644 (file)
@@ -453,6 +453,19 @@ func (rm *registryMetrics) RecordInt64Gauge(handle *estats.Int64GaugeHandle, inc
        }
 }
 
+// RegisterAsyncReporter will register a callback with the underlying OpenTelemetry
+// Meter for the provided descriptors.
+//
+// It will map the provided descriptors to their corresponding OTel Observable
+// instruments. If no instruments match the descriptors, registration is
+// skipped.
+//
+// The returned cleanup function unregisters the callback from the Meter.
+func (rm *registryMetrics) RegisterAsyncReporter(_ estats.AsyncMetricReporter, _ ...estats.AsyncMetric) func() {
+       // TODO(@mbissa) - add implementation
+       return func() {}
+}
+
 // Users of this component should use these bucket boundaries as part of their
 // SDK MeterProvider passed in. This component sends this as "advice" to the
 // API, which works, however this stability is not guaranteed, so for safety the