]> git.feebdaed.xyz Git - 0xmirror/grpc-go.git/commitdiff
xdsclient: stop batching writes on the ADS stream (#8627)
authorEaswar Swaminathan <easwars@google.com>
Tue, 11 Nov 2025 17:40:12 +0000 (09:40 -0800)
committerGitHub <noreply@github.com>
Tue, 11 Nov 2025 17:40:12 +0000 (09:40 -0800)
Fixes https://github.com/grpc/grpc-go/issues/8125

#### The original race in the xDS client:
- Resource watch is cancelled by the user of the xdsClient (e.g.
xdsResolver)
- xdsClient removes the resource from its cache and queues an
unsubscribe request to the ADS stream.
- A watch for the same resource is registered immediately, and the
xdsClient instructs the ADS stream to subscribe (as it's not in cache).
- The ADS stream sends a redundant request (same resources, version,
nonce) which the management server ignores.
- The new resource watch sees a "resource-not-found" error once the
watch timer fires.

#### The original fix:
Delay the resource's removal from the cache until the unsubscribe
request was transmitted over the wire, a change implemented in
https://github.com/grpc/grpc-go/pull/8369. However, this solution
introduced new complications:
- The resource's removal from the xdsClient's cache became an
asynchronous operation, occurring while the unsubscribe request was
being sent.
- This asynchronous behavior meant the state maintained within the ADS
stream could still diverge from the cache's state.
- A critical section was absent between the ADS stream's message
transmission logic and the xdsClient's cache access, which is performed
during subscription/unsubscription by its users.

#### The root cause of the previous seen races can be put down two
things:
- Batching of writes for subscribe and unsubscribe calls
- After batching, it may appear that nothing has changed in the list of
subscribed resources, even though a resource was removed and added
again, and therefore the management server would not send any response.
It is important that the management server see the exact sequence of
subscribe and unsubscribe calls.
- State maintained in the ADS stream going out of sync with the state
maintained in the resource cache

#### How does this PR address the above issue?
This PR simplifies the implementation of the ADS stream by removing two
pieces of functionality
- Stop batching of writes on the ADS stream
- If the user registers multiple watches, e.g. resource `A`, `B`, and
`C`, the stream would now send three requests: `[A]`, `[A B]`, `[A B
C]`.
- Queue the exact request to be sent out based on the current state
- As part of handling a subscribe/unsubscribe request, the ADS stream
implementation will queue the exact request to be sent out. When
asynchronously sending the request out, it will not use the current
state, but instead just write the queued request on the wire.
- Don't buffer writes when waiting for flow control
- Flow control is already blocking reads from the stream. Blocking
writes as well during this period might provide some additional flow
control, but not much, and removing this logic simplifies the stream
implementation quite a bit.

RELEASE NOTES:
- xdsclient: fix a race in the xdsClient that could lead to
resource-not-found errors

internal/xds/clients/internal/buffer/unbounded.go
internal/xds/clients/xdsclient/ads_stream.go
internal/xds/clients/xdsclient/channel.go

index 3e6e99d0e8e01df6b6a192e467d3b25b0f421300..52b8dab892bcccfef9a863380ffa7211a9f788d8 100644 (file)
@@ -83,6 +83,7 @@ func (b *Unbounded) Load() {
                default:
                }
        } else if b.closing && !b.closed {
+               b.closed = true
                close(b.c)
        }
 }
index 3ad62ac16db87d5bab7252d3f91852b42ee5c83d..24e66b8347168d9fd17ba86699dfb4331b8de681 100644 (file)
@@ -28,7 +28,6 @@ import (
        igrpclog "google.golang.org/grpc/internal/grpclog"
        "google.golang.org/grpc/internal/xds/clients"
        "google.golang.org/grpc/internal/xds/clients/internal/backoff"
-       "google.golang.org/grpc/internal/xds/clients/internal/buffer"
        "google.golang.org/grpc/internal/xds/clients/internal/pretty"
        "google.golang.org/grpc/internal/xds/clients/xdsclient/internal/xdsresource"
 
@@ -48,6 +47,13 @@ const (
        perRPCVerbosityLevel = 9
 )
 
+// request represents a queued request message to be sent on the ADS stream. It
+// contains the type of the resource and the list of resource names to be sent.
+type request struct {
+       typ           ResourceType
+       resourceNames []string
+}
+
 // response represents a response received on the ADS stream. It contains the
 // type URL, version, and resources for the response.
 type response struct {
@@ -76,9 +82,7 @@ type adsStreamEventHandler interface {
 type resourceTypeState struct {
        version             string                                     // Last acked version. Should not be reset when the stream breaks.
        nonce               string                                     // Last received nonce. Should be reset when the stream breaks.
-       bufferedRequests    chan struct{}                              // Channel to buffer requests when writing is blocked.
        subscribedResources map[string]*xdsresource.ResourceWatchState // Map of subscribed resource names to their state.
-       pendingWrite        bool                                       // True if there is a pending write for this resource type.
 }
 
 // adsStreamImpl provides the functionality associated with an ADS (Aggregated
@@ -99,15 +103,16 @@ type adsStreamImpl struct {
        // The following fields are initialized in the constructor and are not
        // written to afterwards, and hence can be accessed without a mutex.
        streamCh     chan clients.Stream // New ADS streams are pushed here.
-       requestCh    *buffer.Unbounded   // Subscriptions and unsubscriptions are pushed here.
        runnerDoneCh chan struct{}       // Notify completion of runner goroutine.
        cancel       context.CancelFunc  // To cancel the context passed to the runner goroutine.
        fc           *adsFlowControl     // Flow control for ADS stream.
+       notifySender chan struct{}       // To notify the sending goroutine of a pending request.
 
        // Guards access to the below fields (and to the contents of the map).
        mu                sync.Mutex
        resourceTypeState map[ResourceType]*resourceTypeState // Map of resource types to their state.
        firstRequest      bool                                // False after the first request is sent out.
+       pendingRequests   []request                           // Subscriptions and unsubscriptions are pushed here.
 }
 
 // adsStreamOpts contains the options for creating a new ADS Stream.
@@ -132,9 +137,9 @@ func newADSStreamImpl(opts adsStreamOpts) *adsStreamImpl {
                watchExpiryTimeout: opts.watchExpiryTimeout,
 
                streamCh:          make(chan clients.Stream, 1),
-               requestCh:         buffer.NewUnbounded(),
                runnerDoneCh:      make(chan struct{}),
                fc:                newADSFlowControl(),
+               notifySender:      make(chan struct{}, 1),
                resourceTypeState: make(map[ResourceType]*resourceTypeState),
        }
 
@@ -151,76 +156,79 @@ func newADSStreamImpl(opts adsStreamOpts) *adsStreamImpl {
 func (s *adsStreamImpl) Stop() {
        s.cancel()
        s.fc.stop()
-       s.requestCh.Close()
        <-s.runnerDoneCh
        s.logger.Infof("Shutdown ADS stream")
 }
 
 // subscribe subscribes to the given resource. It is assumed that multiple
 // subscriptions for the same resource is deduped at the caller. A discovery
-// request is sent out on the underlying stream for the resource type when there
-// is sufficient flow control quota.
+// request is sent out on the underlying stream, for the resource type with the
+// newly subscribed resource.
 func (s *adsStreamImpl) subscribe(typ ResourceType, name string) {
        if s.logger.V(2) {
                s.logger.Infof("Subscribing to resource %q of type %q", name, typ.TypeName)
        }
 
        s.mu.Lock()
-       defer s.mu.Unlock()
-
        state, ok := s.resourceTypeState[typ]
        if !ok {
                // An entry in the type state map is created as part of the first
                // subscription request for this type.
-               state = &resourceTypeState{
-                       subscribedResources: make(map[string]*xdsresource.ResourceWatchState),
-                       bufferedRequests:    make(chan struct{}, 1),
-               }
+               state = &resourceTypeState{subscribedResources: make(map[string]*xdsresource.ResourceWatchState)}
                s.resourceTypeState[typ] = state
        }
 
        // Create state for the newly subscribed resource. The watch timer will
        // be started when a request for this resource is actually sent out.
        state.subscribedResources[name] = &xdsresource.ResourceWatchState{State: xdsresource.ResourceWatchStateStarted}
-       state.pendingWrite = true
 
        // Send a request for the resource type with updated subscriptions.
-       s.requestCh.Put(typ)
+       s.pendingRequests = append(s.pendingRequests, request{typ: typ, resourceNames: resourceNames(state.subscribedResources)})
+       s.mu.Unlock()
+
+       select {
+       case s.notifySender <- struct{}{}:
+       default:
+       }
 }
 
-// Unsubscribe cancels the subscription to the given resource. It is a no-op if
+// unsubscribe cancels the subscription to the given resource. It is a no-op if
 // the given resource does not exist. The watch expiry timer associated with the
 // resource is stopped if one is active. A discovery request is sent out on the
-// stream for the resource type when there is sufficient flow control quota.
-func (s *adsStreamImpl) Unsubscribe(typ ResourceType, name string) {
+// stream for the resource type with the updated set of resource names.
+func (s *adsStreamImpl) unsubscribe(typ ResourceType, name string) {
        if s.logger.V(2) {
                s.logger.Infof("Unsubscribing to resource %q of type %q", name, typ.TypeName)
        }
 
        s.mu.Lock()
-       defer s.mu.Unlock()
-
        state, ok := s.resourceTypeState[typ]
        if !ok {
+               s.mu.Unlock()
                return
        }
-
        rs, ok := state.subscribedResources[name]
        if !ok {
+               s.mu.Unlock()
                return
        }
        if rs.ExpiryTimer != nil {
                rs.ExpiryTimer.Stop()
        }
        delete(state.subscribedResources, name)
-       state.pendingWrite = true
 
        // Send a request for the resource type with updated subscriptions.
-       s.requestCh.Put(typ)
+       s.pendingRequests = append(s.pendingRequests, request{typ: typ, resourceNames: resourceNames(state.subscribedResources)})
+       s.mu.Unlock()
+
+       select {
+       case s.notifySender <- struct{}{}:
+       default:
+       }
 }
 
 // runner is a long-running goroutine that handles the lifecycle of the ADS
-// stream. It spwans another goroutine to handle writes of discovery request
+// stream. It spawns another goroutine to handle writes of discovery request
 // messages on the stream. Whenever an existing stream fails, it performs
 // exponential backoff (if no messages were received on that stream) before
 // creating a new stream.
@@ -280,53 +288,44 @@ func (s *adsStreamImpl) send(ctx context.Context) {
                                stream = nil
                                continue
                        }
-               case req, ok := <-s.requestCh.Get():
-                       if !ok {
-                               return
+               case <-s.notifySender:
+                       // If there's no stream yet, skip the request. This request will be resent
+                       // when a new stream is created. If no stream is created, the watcher will
+                       // timeout (same as server not sending response back).
+                       if stream == nil {
+                               continue
                        }
-                       s.requestCh.Load()
 
-                       typ := req.(ResourceType)
-                       if err := s.sendNew(stream, typ); err != nil {
+                       // Resetting the pendingRequests slice to nil works for both cases:
+                       // - When we successfully sends the requests out on the wire.
+                       // - When sending fails. This can happen only when the stream fails,
+                       //   and in this case, we rely on the `sendExisting` to send out
+                       //   requests for all subscriptions when the stream is recreated.
+                       s.mu.Lock()
+                       if err := s.sendNewLocked(stream, s.pendingRequests); err != nil {
                                stream = nil
-                               continue
                        }
+                       s.pendingRequests = nil
+                       s.mu.Unlock()
                }
        }
 }
 
-// sendNew attempts to send a discovery request based on a new subscription or
-// unsubscription. If there is no flow control quota, the request is buffered
-// and will be sent later. This method also starts the watch expiry timer for
-// resources that were sent in the request for the first time, i.e. their watch
-// state is `watchStateStarted`.
-func (s *adsStreamImpl) sendNew(stream clients.Stream, typ ResourceType) error {
-       s.mu.Lock()
-       defer s.mu.Unlock()
-
-       // If there's no stream yet, skip the request. This request will be resent
-       // when a new stream is created. If no stream is created, the watcher will
-       // timeout (same as server not sending response back).
-       if stream == nil {
-               return nil
-       }
-
-       // If local processing of the most recently received response is not yet
-       // complete, i.e. fc.pending == true, queue this write and return early.
-       // This allows us to batch writes for requests which are generated as part
-       // of local processing of a received response.
-       state := s.resourceTypeState[typ]
-       bufferRequest := func() {
-               select {
-               case state.bufferedRequests <- struct{}{}:
-               default:
+// sendNewLocked attempts to send a discovery request based on a new subscription or
+// unsubscription. This method also starts the watch expiry timer for resources
+// that were sent in the request for the first time, i.e. their watch state is
+// `watchStateStarted`.
+//
+// Caller needs to hold c.mu.
+func (s *adsStreamImpl) sendNewLocked(stream clients.Stream, requests []request) error {
+       for _, req := range requests {
+               state := s.resourceTypeState[req.typ]
+               if err := s.sendMessageLocked(stream, req.resourceNames, req.typ.TypeURL, state.version, state.nonce, nil); err != nil {
+                       return err
                }
+               s.startWatchTimersLocked(req.typ, req.resourceNames)
        }
-       if s.fc.runIfPending(bufferRequest) {
-               return nil
-       }
-
-       return s.sendMessageIfWritePendingLocked(stream, typ, state)
+       return nil
 }
 
 // sendExisting sends out discovery requests for existing resources when
@@ -337,6 +336,10 @@ func (s *adsStreamImpl) sendExisting(stream clients.Stream) error {
        s.mu.Lock()
        defer s.mu.Unlock()
 
+       // Clear any queued requests. Previously subscribed to resources will be
+       // resent below.
+       s.pendingRequests = nil
+
        for typ, state := range s.resourceTypeState {
                // Reset only the nonces map when the stream restarts.
                //
@@ -355,69 +358,15 @@ func (s *adsStreamImpl) sendExisting(stream clients.Stream) error {
                        continue
                }
 
-               state.pendingWrite = true
-               if err := s.sendMessageIfWritePendingLocked(stream, typ, state); err != nil {
+               names := resourceNames(state.subscribedResources)
+               if err := s.sendMessageLocked(stream, names, typ.TypeURL, state.version, state.nonce, nil); err != nil {
                        return err
                }
+               s.startWatchTimersLocked(typ, names)
        }
        return nil
 }
 
-// sendBuffered sends out discovery requests for resources that were buffered
-// when they were subscribed to, because local processing of the previously
-// received response was not yet complete.
-//
-// The stream argument is guaranteed to be non-nil.
-func (s *adsStreamImpl) sendBuffered(stream clients.Stream) error {
-       s.mu.Lock()
-       defer s.mu.Unlock()
-
-       for typ, state := range s.resourceTypeState {
-               select {
-               case <-state.bufferedRequests:
-                       if err := s.sendMessageIfWritePendingLocked(stream, typ, state); err != nil {
-                               return err
-                       }
-               default:
-                       // No buffered request.
-                       continue
-               }
-       }
-       return nil
-}
-
-// sendMessageIfWritePendingLocked attempts to sends a discovery request to the
-// server, if there is a pending write for the given resource type.
-//
-// If the request is successfully sent, the pending write field is cleared and
-// watch timers are started for the resources in the request.
-//
-// Caller needs to hold c.mu.
-func (s *adsStreamImpl) sendMessageIfWritePendingLocked(stream clients.Stream, typ ResourceType, state *resourceTypeState) error {
-       if !state.pendingWrite {
-               if s.logger.V(2) {
-                       s.logger.Infof("Skipping sending request for type %q, because all subscribed resources were already sent", typ.TypeURL)
-               }
-               return nil
-       }
-
-       names := resourceNames(state.subscribedResources)
-       if err := s.sendMessageLocked(stream, names, typ.TypeURL, state.version, state.nonce, nil); err != nil {
-               return err
-       }
-       state.pendingWrite = false
-
-       // Drain the buffered requests channel because we just sent a request for this
-       // resource type.
-       select {
-       case <-state.bufferedRequests:
-       default:
-       }
-
-       s.startWatchTimersLocked(typ, names)
-       return nil
-}
-
 // sendMessageLocked sends a discovery request to the server, populating the
 // different fields of the message with the given parameters. Returns a non-nil
 // error if the request could not be sent.
@@ -467,11 +416,9 @@ func (s *adsStreamImpl) sendMessageLocked(stream clients.Stream, names []string,
 // recv is responsible for receiving messages from the ADS stream.
 //
 // It performs the following actions:
-//   - Waits for local flow control to be available before sending buffered
-//     requests, if any.
-//   - Receives a message from the ADS stream. If an error is encountered here,
-//     it is handled by the onError method which propagates the error to all
-//     watchers.
+//   - Waits for local flow control to be available before it receives a message
+//     from the ADS stream. If an error is encountered here, it is handled by
+//     the onError method which propagates the error to all watchers.
 //   - Invokes the event handler's OnADSResponse method to process the message.
 //   - Sends an ACK or NACK to the server based on the response.
 //
@@ -488,10 +435,6 @@ func (s *adsStreamImpl) recv(stream clients.Stream) bool {
                        return msgReceived
                }
 
-               // Send out a request if anything was buffered while we were waiting for
-               // local processing of the previous response to complete.
-               s.sendBuffered(stream)
-
                resources, url, version, nonce, err := s.recvMessage(stream)
                if err != nil {
                        s.onError(err, msgReceived)
@@ -760,23 +703,6 @@ func (fc *adsFlowControl) setPending(pending bool) {
        }
 }
 
-func (fc *adsFlowControl) runIfPending(f func()) bool {
-       fc.mu.Lock()
-       defer fc.mu.Unlock()
-
-       if fc.stopped {
-               return false
-       }
-
-       // If there's a pending update, run the function while still holding the
-       // lock. This ensures that the pending state does not change between the
-       // check and the function call.
-       if fc.pending {
-               f()
-       }
-       return fc.pending
-}
-
 // wait blocks until all the watchers have consumed the most recent update.
 // Returns true if the flow control was stopped while waiting, false otherwise.
 func (fc *adsFlowControl) wait() bool {
index 7c40f1dab8f84138d80009d2c05387d4c95139af..9da5eb361865112daa44d28da8d4aaaa5aa5aee9 100644 (file)
@@ -163,7 +163,7 @@ func (xc *xdsChannel) unsubscribe(typ ResourceType, name string) {
                }
                return
        }
-       xc.ads.Unsubscribe(typ, name)
+       xc.ads.unsubscribe(typ, name)
 }
 
 // The following onADSXxx() methods implement the StreamEventHandler interface