igrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/internal/xds/clients/internal/backoff"
- "google.golang.org/grpc/internal/xds/clients/internal/buffer"
"google.golang.org/grpc/internal/xds/clients/internal/pretty"
"google.golang.org/grpc/internal/xds/clients/xdsclient/internal/xdsresource"
perRPCVerbosityLevel = 9
)
+// request represents a queued request message to be sent on the ADS stream. It
+// contains the type of the resource and the list of resource names to be sent.
+type request struct {
+ typ ResourceType
+ resourceNames []string
+}
+
// response represents a response received on the ADS stream. It contains the
// type URL, version, and resources for the response.
type response struct {
type resourceTypeState struct {
version string // Last acked version. Should not be reset when the stream breaks.
nonce string // Last received nonce. Should be reset when the stream breaks.
- bufferedRequests chan struct{} // Channel to buffer requests when writing is blocked.
subscribedResources map[string]*xdsresource.ResourceWatchState // Map of subscribed resource names to their state.
- pendingWrite bool // True if there is a pending write for this resource type.
}
// adsStreamImpl provides the functionality associated with an ADS (Aggregated
// The following fields are initialized in the constructor and are not
// written to afterwards, and hence can be accessed without a mutex.
streamCh chan clients.Stream // New ADS streams are pushed here.
- requestCh *buffer.Unbounded // Subscriptions and unsubscriptions are pushed here.
runnerDoneCh chan struct{} // Notify completion of runner goroutine.
cancel context.CancelFunc // To cancel the context passed to the runner goroutine.
fc *adsFlowControl // Flow control for ADS stream.
+ notifySender chan struct{} // To notify the sending goroutine of a pending request.
// Guards access to the below fields (and to the contents of the map).
mu sync.Mutex
resourceTypeState map[ResourceType]*resourceTypeState // Map of resource types to their state.
firstRequest bool // False after the first request is sent out.
+ pendingRequests []request // Subscriptions and unsubscriptions are pushed here.
}
// adsStreamOpts contains the options for creating a new ADS Stream.
watchExpiryTimeout: opts.watchExpiryTimeout,
streamCh: make(chan clients.Stream, 1),
- requestCh: buffer.NewUnbounded(),
runnerDoneCh: make(chan struct{}),
fc: newADSFlowControl(),
+ notifySender: make(chan struct{}, 1),
resourceTypeState: make(map[ResourceType]*resourceTypeState),
}
func (s *adsStreamImpl) Stop() {
s.cancel()
s.fc.stop()
- s.requestCh.Close()
<-s.runnerDoneCh
s.logger.Infof("Shutdown ADS stream")
}
// subscribe subscribes to the given resource. It is assumed that multiple
// subscriptions for the same resource is deduped at the caller. A discovery
-// request is sent out on the underlying stream for the resource type when there
-// is sufficient flow control quota.
+// request is sent out on the underlying stream, for the resource type with the
+// newly subscribed resource.
func (s *adsStreamImpl) subscribe(typ ResourceType, name string) {
if s.logger.V(2) {
s.logger.Infof("Subscribing to resource %q of type %q", name, typ.TypeName)
}
s.mu.Lock()
- defer s.mu.Unlock()
-
state, ok := s.resourceTypeState[typ]
if !ok {
// An entry in the type state map is created as part of the first
// subscription request for this type.
- state = &resourceTypeState{
- subscribedResources: make(map[string]*xdsresource.ResourceWatchState),
- bufferedRequests: make(chan struct{}, 1),
- }
+ state = &resourceTypeState{subscribedResources: make(map[string]*xdsresource.ResourceWatchState)}
s.resourceTypeState[typ] = state
}
// Create state for the newly subscribed resource. The watch timer will
// be started when a request for this resource is actually sent out.
state.subscribedResources[name] = &xdsresource.ResourceWatchState{State: xdsresource.ResourceWatchStateStarted}
- state.pendingWrite = true
// Send a request for the resource type with updated subscriptions.
- s.requestCh.Put(typ)
+ s.pendingRequests = append(s.pendingRequests, request{typ: typ, resourceNames: resourceNames(state.subscribedResources)})
+ s.mu.Unlock()
+
+ select {
+ case s.notifySender <- struct{}{}:
+ default:
+ }
}
-// Unsubscribe cancels the subscription to the given resource. It is a no-op if
+// unsubscribe cancels the subscription to the given resource. It is a no-op if
// the given resource does not exist. The watch expiry timer associated with the
// resource is stopped if one is active. A discovery request is sent out on the
-// stream for the resource type when there is sufficient flow control quota.
-func (s *adsStreamImpl) Unsubscribe(typ ResourceType, name string) {
+// stream for the resource type with the updated set of resource names.
+func (s *adsStreamImpl) unsubscribe(typ ResourceType, name string) {
if s.logger.V(2) {
s.logger.Infof("Unsubscribing to resource %q of type %q", name, typ.TypeName)
}
s.mu.Lock()
- defer s.mu.Unlock()
-
state, ok := s.resourceTypeState[typ]
if !ok {
+ s.mu.Unlock()
return
}
-
rs, ok := state.subscribedResources[name]
if !ok {
+ s.mu.Unlock()
return
}
if rs.ExpiryTimer != nil {
rs.ExpiryTimer.Stop()
}
delete(state.subscribedResources, name)
- state.pendingWrite = true
// Send a request for the resource type with updated subscriptions.
- s.requestCh.Put(typ)
+ s.pendingRequests = append(s.pendingRequests, request{typ: typ, resourceNames: resourceNames(state.subscribedResources)})
+ s.mu.Unlock()
+
+ select {
+ case s.notifySender <- struct{}{}:
+ default:
+ }
}
// runner is a long-running goroutine that handles the lifecycle of the ADS
-// stream. It spwans another goroutine to handle writes of discovery request
+// stream. It spawns another goroutine to handle writes of discovery request
// messages on the stream. Whenever an existing stream fails, it performs
// exponential backoff (if no messages were received on that stream) before
// creating a new stream.
stream = nil
continue
}
- case req, ok := <-s.requestCh.Get():
- if !ok {
- return
+ case <-s.notifySender:
+ // If there's no stream yet, skip the request. This request will be resent
+ // when a new stream is created. If no stream is created, the watcher will
+ // timeout (same as server not sending response back).
+ if stream == nil {
+ continue
}
- s.requestCh.Load()
- typ := req.(ResourceType)
- if err := s.sendNew(stream, typ); err != nil {
+ // Resetting the pendingRequests slice to nil works for both cases:
+ // - When we successfully sends the requests out on the wire.
+ // - When sending fails. This can happen only when the stream fails,
+ // and in this case, we rely on the `sendExisting` to send out
+ // requests for all subscriptions when the stream is recreated.
+ s.mu.Lock()
+ if err := s.sendNewLocked(stream, s.pendingRequests); err != nil {
stream = nil
- continue
}
+ s.pendingRequests = nil
+ s.mu.Unlock()
}
}
}
-// sendNew attempts to send a discovery request based on a new subscription or
-// unsubscription. If there is no flow control quota, the request is buffered
-// and will be sent later. This method also starts the watch expiry timer for
-// resources that were sent in the request for the first time, i.e. their watch
-// state is `watchStateStarted`.
-func (s *adsStreamImpl) sendNew(stream clients.Stream, typ ResourceType) error {
- s.mu.Lock()
- defer s.mu.Unlock()
-
- // If there's no stream yet, skip the request. This request will be resent
- // when a new stream is created. If no stream is created, the watcher will
- // timeout (same as server not sending response back).
- if stream == nil {
- return nil
- }
-
- // If local processing of the most recently received response is not yet
- // complete, i.e. fc.pending == true, queue this write and return early.
- // This allows us to batch writes for requests which are generated as part
- // of local processing of a received response.
- state := s.resourceTypeState[typ]
- bufferRequest := func() {
- select {
- case state.bufferedRequests <- struct{}{}:
- default:
+// sendNewLocked attempts to send a discovery request based on a new subscription or
+// unsubscription. This method also starts the watch expiry timer for resources
+// that were sent in the request for the first time, i.e. their watch state is
+// `watchStateStarted`.
+//
+// Caller needs to hold c.mu.
+func (s *adsStreamImpl) sendNewLocked(stream clients.Stream, requests []request) error {
+ for _, req := range requests {
+ state := s.resourceTypeState[req.typ]
+ if err := s.sendMessageLocked(stream, req.resourceNames, req.typ.TypeURL, state.version, state.nonce, nil); err != nil {
+ return err
}
+ s.startWatchTimersLocked(req.typ, req.resourceNames)
}
- if s.fc.runIfPending(bufferRequest) {
- return nil
- }
-
- return s.sendMessageIfWritePendingLocked(stream, typ, state)
+ return nil
}
// sendExisting sends out discovery requests for existing resources when
s.mu.Lock()
defer s.mu.Unlock()
+ // Clear any queued requests. Previously subscribed to resources will be
+ // resent below.
+ s.pendingRequests = nil
+
for typ, state := range s.resourceTypeState {
// Reset only the nonces map when the stream restarts.
//
continue
}
- state.pendingWrite = true
- if err := s.sendMessageIfWritePendingLocked(stream, typ, state); err != nil {
+ names := resourceNames(state.subscribedResources)
+ if err := s.sendMessageLocked(stream, names, typ.TypeURL, state.version, state.nonce, nil); err != nil {
return err
}
+ s.startWatchTimersLocked(typ, names)
}
return nil
}
-// sendBuffered sends out discovery requests for resources that were buffered
-// when they were subscribed to, because local processing of the previously
-// received response was not yet complete.
-//
-// The stream argument is guaranteed to be non-nil.
-func (s *adsStreamImpl) sendBuffered(stream clients.Stream) error {
- s.mu.Lock()
- defer s.mu.Unlock()
-
- for typ, state := range s.resourceTypeState {
- select {
- case <-state.bufferedRequests:
- if err := s.sendMessageIfWritePendingLocked(stream, typ, state); err != nil {
- return err
- }
- default:
- // No buffered request.
- continue
- }
- }
- return nil
-}
-
-// sendMessageIfWritePendingLocked attempts to sends a discovery request to the
-// server, if there is a pending write for the given resource type.
-//
-// If the request is successfully sent, the pending write field is cleared and
-// watch timers are started for the resources in the request.
-//
-// Caller needs to hold c.mu.
-func (s *adsStreamImpl) sendMessageIfWritePendingLocked(stream clients.Stream, typ ResourceType, state *resourceTypeState) error {
- if !state.pendingWrite {
- if s.logger.V(2) {
- s.logger.Infof("Skipping sending request for type %q, because all subscribed resources were already sent", typ.TypeURL)
- }
- return nil
- }
-
- names := resourceNames(state.subscribedResources)
- if err := s.sendMessageLocked(stream, names, typ.TypeURL, state.version, state.nonce, nil); err != nil {
- return err
- }
- state.pendingWrite = false
-
- // Drain the buffered requests channel because we just sent a request for this
- // resource type.
- select {
- case <-state.bufferedRequests:
- default:
- }
-
- s.startWatchTimersLocked(typ, names)
- return nil
-}
-
// sendMessageLocked sends a discovery request to the server, populating the
// different fields of the message with the given parameters. Returns a non-nil
// error if the request could not be sent.
// recv is responsible for receiving messages from the ADS stream.
//
// It performs the following actions:
-// - Waits for local flow control to be available before sending buffered
-// requests, if any.
-// - Receives a message from the ADS stream. If an error is encountered here,
-// it is handled by the onError method which propagates the error to all
-// watchers.
+// - Waits for local flow control to be available before it receives a message
+// from the ADS stream. If an error is encountered here, it is handled by
+// the onError method which propagates the error to all watchers.
// - Invokes the event handler's OnADSResponse method to process the message.
// - Sends an ACK or NACK to the server based on the response.
//
return msgReceived
}
- // Send out a request if anything was buffered while we were waiting for
- // local processing of the previous response to complete.
- s.sendBuffered(stream)
-
resources, url, version, nonce, err := s.recvMessage(stream)
if err != nil {
s.onError(err, msgReceived)
}
}
-func (fc *adsFlowControl) runIfPending(f func()) bool {
- fc.mu.Lock()
- defer fc.mu.Unlock()
-
- if fc.stopped {
- return false
- }
-
- // If there's a pending update, run the function while still holding the
- // lock. This ensures that the pending state does not change between the
- // check and the function call.
- if fc.pending {
- f()
- }
- return fc.pending
-}
-
// wait blocks until all the watchers have consumed the most recent update.
// Returns true if the flow control was stopped while waiting, false otherwise.
func (fc *adsFlowControl) wait() bool {