]> git.feebdaed.xyz Git - 0xmirror/grpc-go.git/commitdiff
client: Change connectivity state to CONNECTING when creating the name resolver ...
authorEaswar Swaminathan <easwars@google.com>
Tue, 9 Dec 2025 00:02:52 +0000 (16:02 -0800)
committerGitHub <noreply@github.com>
Tue, 9 Dec 2025 00:02:52 +0000 (16:02 -0800)
Fixes https://github.com/grpc/grpc-go/issues/7686

#### Current Behavior
- When client exits IDLE and creates the name resolver, it stays in IDLE
until the connectivity state is set by the LB policy.
- When exiting IDLE mode (because of `Connect` being called or because
of an RPC), if name resolver creation fails, we stay in IDLE.

#### New Behavior
- When the client exits IDLE and creates the name resolver, it moves to
CONNECTING. Moving forward, the connectivity state will be set by the LB
policy.
- When exiting IDLE mode (because of `Connect` being called or because
of an RPC), we have already moved to CONNECTING (because of the previous
bullet point). If name resolver creation fails, we will move to
TRANSIENT_FAILURE and start the idle timer and move back to IDLE when
the timer fires

#### Implementation details:
- The client channel now treats resolver build errors encountered during
exiting IDLE identically to resolver errors received prior to valid
updates.
- `Build` uses a new unsafe API on the idleness manager to mark the
channel as exited IDLE.
- The idleness Manager invokes the channel's `ExitIdleMode` (which now
does not return an error) and updates internal state to reflect that it
is no longer in IDLE.
- `OnFinish` call options are now invoked even if stream creation fails
during an RPC. This fulfills the guarantee for these options and ensures
the idleness Manager’s `activeCallsCount` remains accurate.

RELEASE NOTES:
- client: Change connectivity state to CONNECTING when creating the name
resolver (as part of exiting IDLE).
- client: Change connectivity state to TRANSIENT_FAILURE if name
resolver creation fails (as part of exiting IDLE).
- client: Change connectivity state to IDLE after idle timeout expires
even when current state is TRANSIENT_FAILURE.
- client: Fix a bug that resulted in `OnFinish` call option not being
invoked for RPCs where stream creation failed.

clientconn.go
clientconn_parsed_target_test.go
dial_test.go
internal/idle/idle.go
internal/idle/idle_e2e_test.go
internal/idle/idle_test.go
resolver_balancer_ext_test.go
resolver_wrapper.go
stream.go
test/clientconn_state_transition_test.go

index c0c2c9a76abfae9c42ffce1c724d8395392eb514..09c9f1b4a9339e08b0789945dbfc87edc4765ed1 100644 (file)
@@ -262,9 +262,10 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
        }()
 
        // This creates the name resolver, load balancer, etc.
-       if err := cc.idlenessMgr.ExitIdleMode(); err != nil {
-               return nil, err
+       if err := cc.exitIdleMode(); err != nil {
+               return nil, fmt.Errorf("failed to exit idle mode: %w", err)
        }
+       cc.idlenessMgr.UnsafeSetNotIdle()
 
        // Return now for non-blocking dials.
        if !cc.dopts.block {
@@ -332,7 +333,7 @@ func (cc *ClientConn) addTraceEvent(msg string) {
                        Severity: channelz.CtInfo,
                }
        }
-       channelz.AddTraceEvent(logger, cc.channelz, 0, ted)
+       channelz.AddTraceEvent(logger, cc.channelz, 1, ted)
 }
 
 type idler ClientConn
@@ -341,14 +342,17 @@ func (i *idler) EnterIdleMode() {
        (*ClientConn)(i).enterIdleMode()
 }
 
-func (i *idler) ExitIdleMode() error {
-       return (*ClientConn)(i).exitIdleMode()
+func (i *idler) ExitIdleMode() {
+       // Ignore the error returned from this method, because from the perspective
+       // of the caller (idleness manager), the channel would have always moved out
+       // of IDLE by the time this method returns.
+       (*ClientConn)(i).exitIdleMode()
 }
 
 // exitIdleMode moves the channel out of idle mode by recreating the name
 // resolver and load balancer.  This should never be called directly; use
 // cc.idlenessMgr.ExitIdleMode instead.
-func (cc *ClientConn) exitIdleMode() (err error) {
+func (cc *ClientConn) exitIdleMode() error {
        cc.mu.Lock()
        if cc.conns == nil {
                cc.mu.Unlock()
@@ -356,11 +360,23 @@ func (cc *ClientConn) exitIdleMode() (err error) {
        }
        cc.mu.Unlock()
 
+       // Set state to CONNECTING before building the name resolver
+       // so the channel does not remain in IDLE.
+       cc.csMgr.updateState(connectivity.Connecting)
+
        // This needs to be called without cc.mu because this builds a new resolver
        // which might update state or report error inline, which would then need to
        // acquire cc.mu.
        if err := cc.resolverWrapper.start(); err != nil {
-               return err
+               // If resolver creation fails, treat it like an error reported by the
+               // resolver before any valid udpates. Set channel's state to
+               // TransientFailure, and set an erroring picker with the resolver build
+               // error, which will returned as part of any subsequent RPCs.
+               logger.Warningf("Failed to start resolver: %v", err)
+               cc.csMgr.updateState(connectivity.TransientFailure)
+               cc.mu.Lock()
+               cc.updateResolverStateAndUnlock(resolver.State{}, err)
+               return fmt.Errorf("failed to start resolver: %w", err)
        }
 
        cc.addTraceEvent("exiting idle mode")
@@ -681,10 +697,8 @@ func (cc *ClientConn) GetState() connectivity.State {
 // Notice: This API is EXPERIMENTAL and may be changed or removed in a later
 // release.
 func (cc *ClientConn) Connect() {
-       if err := cc.idlenessMgr.ExitIdleMode(); err != nil {
-               cc.addTraceEvent(err.Error())
-               return
-       }
+       cc.idlenessMgr.ExitIdleMode()
+
        // If the ClientConn was not in idle mode, we need to call ExitIdle on the
        // LB policy so that connections can be created.
        cc.mu.Lock()
@@ -735,8 +749,8 @@ func init() {
        internal.EnterIdleModeForTesting = func(cc *ClientConn) {
                cc.idlenessMgr.EnterIdleModeForTesting()
        }
-       internal.ExitIdleModeForTesting = func(cc *ClientConn) error {
-               return cc.idlenessMgr.ExitIdleMode()
+       internal.ExitIdleModeForTesting = func(cc *ClientConn) {
+               cc.idlenessMgr.ExitIdleMode()
        }
 }
 
index 00e8e283f63ff017096b63a4cdfb03b8c7951b40..57ae3ad7020f87d2c7456163f676407f73480c76 100644 (file)
@@ -205,28 +205,31 @@ func (s) TestParsedTarget_Failure_WithoutCustomDialer(t *testing.T) {
 }
 
 func (s) TestParsedTarget_Failure_WithoutCustomDialer_WithNewClient(t *testing.T) {
-       targets := []string{
-               "",
-               "unix://a/b/c",
-               "unix://authority",
-               "unix-abstract://authority/a/b/c",
-               "unix-abstract://authority",
+       tests := []struct {
+               target        string
+               wantErrSubstr string
+       }{
+
+               {target: "", wantErrSubstr: "invalid target address"},
+               {target: "unix://a/b/c", wantErrSubstr: "invalid (non-empty) authority"},
+               {target: "unix://authority", wantErrSubstr: "invalid (non-empty) authority"},
+               {target: "unix-abstract://authority/a/b/c", wantErrSubstr: "invalid (non-empty) authority"},
+               {target: "unix-abstract://authority", wantErrSubstr: "invalid (non-empty) authority"},
        }
 
-       for _, target := range targets {
-               t.Run(target, func(t *testing.T) {
+       for _, test := range tests {
+               t.Run(test.target, func(t *testing.T) {
                        ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
                        defer cancel()
-                       cc, err := NewClient(target, WithTransportCredentials(insecure.NewCredentials()))
+                       cc, err := NewClient(test.target, WithTransportCredentials(insecure.NewCredentials()))
                        if err != nil {
-                               t.Fatalf("NewClient(%q) failed: %v", target, err)
+                               t.Fatalf("NewClient(%q) failed: %v", test, err)
                        }
                        defer cc.Close()
-                       const wantErrSubstr = "failed to exit idle mode"
                        if _, err := cc.NewStream(ctx, &StreamDesc{}, "/my.service.v1.MyService/UnaryCall"); err == nil {
-                               t.Fatalf("NewStream() succeeded with target = %q, cc.parsedTarget = %+v, expected to fail", target, cc.parsedTarget)
-                       } else if !strings.Contains(err.Error(), wantErrSubstr) {
-                               t.Fatalf("NewStream() with target = %q returned unexpected error: got %v, want substring %q", target, err, wantErrSubstr)
+                               t.Fatalf("NewStream() succeeded with target = %q, cc.parsedTarget = %+v, expected to fail", test, cc.parsedTarget)
+                       } else if !strings.Contains(err.Error(), test.wantErrSubstr) {
+                               t.Fatalf("NewStream() with target = %q returned unexpected error: got %v, want substring %q", test, err, test.wantErrSubstr)
                        }
                })
        }
index cb86f330d5cf42b42b9370ae38b285478cb61ad0..7b74aac79fa664e58b64247913f977578916ba67 100644 (file)
@@ -20,6 +20,7 @@ package grpc
 
 import (
        "context"
+       "fmt"
        "net"
        "strings"
        "testing"
@@ -312,3 +313,36 @@ func (s) TestResolverAddressesWithTypedNilAttribute(t *testing.T) {
 type stringerVal struct{ s string }
 
 func (s stringerVal) String() string { return s.s }
+
+const errResolverBuilderScheme = "test-resolver-build-failure"
+
+// errResolverBuilder is a resolver builder that returns an error from its Build
+// method.
+type errResolverBuilder struct {
+       err error
+}
+
+func (b *errResolverBuilder) Build(resolver.Target, resolver.ClientConn, resolver.BuildOptions) (resolver.Resolver, error) {
+       return nil, b.err
+}
+
+func (b *errResolverBuilder) Scheme() string {
+       return errResolverBuilderScheme
+}
+
+// Tests that Dial returns an error if the resolver builder returns an error
+// from its Build method.
+func (s) TestDial_ResolverBuilder_Error(t *testing.T) {
+       resolverErr := fmt.Errorf("resolver builder error")
+       dopts := []DialOption{
+               WithTransportCredentials(insecure.NewCredentials()),
+               WithResolvers(&errResolverBuilder{err: resolverErr}),
+       }
+       _, err := Dial(errResolverBuilderScheme+":///test.server", dopts...)
+       if err == nil {
+               t.Fatalf("Dial() succeeded when it should have failed")
+       }
+       if !strings.Contains(err.Error(), resolverErr.Error()) {
+               t.Fatalf("Dial() failed with error %v, want %v", err, resolverErr)
+       }
+}
index 2c13ee9dac753c996bd28d3f6ba7ff8313b450df..d3cd24f80bd75a573b0c6d641d3c157a243ecaaa 100644 (file)
@@ -21,7 +21,6 @@
 package idle
 
 import (
-       "fmt"
        "math"
        "sync"
        "sync/atomic"
@@ -33,15 +32,15 @@ var timeAfterFunc = func(d time.Duration, f func()) *time.Timer {
        return time.AfterFunc(d, f)
 }
 
-// Enforcer is the functionality provided by grpc.ClientConn to enter
-// and exit from idle mode.
-type Enforcer interface {
-       ExitIdleMode() error
+// ClientConn is the functionality provided by grpc.ClientConn to enter and exit
+// from idle mode.
+type ClientConn interface {
+       ExitIdleMode()
        EnterIdleMode()
 }
 
-// Manager implements idleness detection and calls the configured Enforcer to
-// enter/exit idle mode when appropriate.  Must be created by NewManager.
+// Manager implements idleness detection and calls the ClientConn to enter/exit
+// idle mode when appropriate. Must be created by NewManager.
 type Manager struct {
        // State accessed atomically.
        lastCallEndTime           int64 // Unix timestamp in nanos; time when the most recent RPC completed.
@@ -51,8 +50,8 @@ type Manager struct {
 
        // Can be accessed without atomics or mutex since these are set at creation
        // time and read-only after that.
-       enforcer Enforcer // Functionality provided by grpc.ClientConn.
-       timeout  time.Duration
+       cc      ClientConn // Functionality provided by grpc.ClientConn.
+       timeout time.Duration
 
        // idleMu is used to guarantee mutual exclusion in two scenarios:
        // - Opposing intentions:
@@ -72,9 +71,9 @@ type Manager struct {
 
 // NewManager creates a new idleness manager implementation for the
 // given idle timeout.  It begins in idle mode.
-func NewManager(enforcer Enforcer, timeout time.Duration) *Manager {
+func NewManager(cc ClientConn, timeout time.Duration) *Manager {
        return &Manager{
-               enforcer:         enforcer,
+               cc:               cc,
                timeout:          timeout,
                actuallyIdle:     true,
                activeCallsCount: -math.MaxInt32,
@@ -127,7 +126,7 @@ func (m *Manager) handleIdleTimeout() {
 
        // Now that we've checked that there has been no activity, attempt to enter
        // idle mode, which is very likely to succeed.
-       if m.tryEnterIdleMode() {
+       if m.tryEnterIdleMode(true) {
                // Successfully entered idle mode. No timer needed until we exit idle.
                return
        }
@@ -142,10 +141,13 @@ func (m *Manager) handleIdleTimeout() {
 // that, it performs a last minute check to ensure that no new RPC has come in,
 // making the channel active.
 //
+// checkActivity controls if a check for RPC activity, since the last time the
+// idle_timeout fired, is made.
+
 // Return value indicates whether or not the channel moved to idle mode.
 //
 // Holds idleMu which ensures mutual exclusion with exitIdleMode.
-func (m *Manager) tryEnterIdleMode() bool {
+func (m *Manager) tryEnterIdleMode(checkActivity bool) bool {
        // Setting the activeCallsCount to -math.MaxInt32 indicates to OnCallBegin()
        // that the channel is either in idle mode or is trying to get there.
        if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) {
@@ -166,7 +168,7 @@ func (m *Manager) tryEnterIdleMode() bool {
                atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
                return false
        }
-       if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {
+       if checkActivity && atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {
                // A very short RPC could have come in (and also finished) after we
                // checked for calls count and activity in handleIdleTimeout(), but
                // before the CAS operation. So, we need to check for activity again.
@@ -177,44 +179,37 @@ func (m *Manager) tryEnterIdleMode() bool {
        // No new RPCs have come in since we set the active calls count value to
        // -math.MaxInt32. And since we have the lock, it is safe to enter idle mode
        // unconditionally now.
-       m.enforcer.EnterIdleMode()
+       m.cc.EnterIdleMode()
        m.actuallyIdle = true
        return true
 }
 
 // EnterIdleModeForTesting instructs the channel to enter idle mode.
 func (m *Manager) EnterIdleModeForTesting() {
-       m.tryEnterIdleMode()
+       m.tryEnterIdleMode(false)
 }
 
 // OnCallBegin is invoked at the start of every RPC.
-func (m *Manager) OnCallBegin() error {
+func (m *Manager) OnCallBegin() {
        if m.isClosed() {
-               return nil
+               return
        }
 
        if atomic.AddInt32(&m.activeCallsCount, 1) > 0 {
                // Channel is not idle now. Set the activity bit and allow the call.
                atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)
-               return nil
+               return
        }
 
        // Channel is either in idle mode or is in the process of moving to idle
        // mode. Attempt to exit idle mode to allow this RPC.
-       if err := m.ExitIdleMode(); err != nil {
-               // Undo the increment to calls count, and return an error causing the
-               // RPC to fail.
-               atomic.AddInt32(&m.activeCallsCount, -1)
-               return err
-       }
-
+       m.ExitIdleMode()
        atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)
-       return nil
 }
 
-// ExitIdleMode instructs m to call the enforcer's ExitIdleMode and update m's
+// ExitIdleMode instructs m to call the ClientConn's ExitIdleMode and update its
 // internal state.
-func (m *Manager) ExitIdleMode() error {
+func (m *Manager) ExitIdleMode() {
        // Holds idleMu which ensures mutual exclusion with tryEnterIdleMode.
        m.idleMu.Lock()
        defer m.idleMu.Unlock()
@@ -231,12 +226,10 @@ func (m *Manager) ExitIdleMode() error {
                //   m.ExitIdleMode.
                //
                // In any case, there is nothing to do here.
-               return nil
+               return
        }
 
-       if err := m.enforcer.ExitIdleMode(); err != nil {
-               return fmt.Errorf("failed to exit idle mode: %w", err)
-       }
+       m.cc.ExitIdleMode()
 
        // Undo the idle entry process. This also respects any new RPC attempts.
        atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
@@ -244,7 +237,23 @@ func (m *Manager) ExitIdleMode() error {
 
        // Start a new timer to fire after the configured idle timeout.
        m.resetIdleTimerLocked(m.timeout)
-       return nil
+}
+
+// UnsafeSetNotIdle instructs the Manager to update its internal state to
+// reflect the reality that the channel is no longer in IDLE mode.
+//
+// N.B. This method is intended only for internal use by the gRPC client
+// when it exits IDLE mode **manually** from `Dial`. The callsite must ensure:
+//   - The channel was **actually in IDLE mode** immediately prior to the call.
+//   - There is **no concurrent activity** that could cause the channel to exit
+//     IDLE mode *naturally* at the same time.
+func (m *Manager) UnsafeSetNotIdle() {
+       m.idleMu.Lock()
+       defer m.idleMu.Unlock()
+
+       atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
+       m.actuallyIdle = false
+       m.resetIdleTimerLocked(m.timeout)
 }
 
 // OnCallEnd is invoked at the end of every RPC.
index 3b606a9dbedaa7b3b746342cecd4cc5bae3a8a62..27486c3987427c80e791521b373f78d2f74cd971 100644 (file)
@@ -23,7 +23,6 @@ import (
        "fmt"
        "io"
        "strings"
-       "sync"
        "testing"
        "time"
 
@@ -33,7 +32,6 @@ import (
        "google.golang.org/grpc/codes"
        "google.golang.org/grpc/connectivity"
        "google.golang.org/grpc/credentials/insecure"
-       "google.golang.org/grpc/internal"
        "google.golang.org/grpc/internal/balancer/stub"
        "google.golang.org/grpc/internal/channelz"
        "google.golang.org/grpc/internal/grpctest"
@@ -549,73 +547,3 @@ func (s) TestChannelIdleness_Connect(t *testing.T) {
        // Verify that the ClientConn moves back to READY.
        testutils.AwaitState(ctx, t, cc, connectivity.Ready)
 }
-
-// runFunc runs f repeatedly until the context expires.
-func runFunc(ctx context.Context, f func()) {
-       for {
-               select {
-               case <-ctx.Done():
-                       return
-               case <-time.After(10 * time.Millisecond):
-                       f()
-               }
-       }
-}
-
-// Tests the scenario where there are concurrent calls to exit and enter idle
-// mode on the ClientConn. Verifies that there is no race under this scenario.
-func (s) TestChannelIdleness_RaceBetweenEnterAndExitIdleMode(t *testing.T) {
-       // Start a test backend and set the bootstrap state of the resolver to
-       // include this address. This will ensure that when the resolver is
-       // restarted when exiting idle, it will push the same address to grpc again.
-       r := manual.NewBuilderWithScheme("whatever")
-       backend := stubserver.StartTestService(t, nil)
-       defer backend.Stop()
-       r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
-
-       // Create a ClientConn with a long idle_timeout. We will explicitly trigger
-       // entering and exiting IDLE mode from the test.
-       dopts := []grpc.DialOption{
-               grpc.WithTransportCredentials(insecure.NewCredentials()),
-               grpc.WithResolvers(r),
-               grpc.WithIdleTimeout(30 * time.Minute),
-               grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"pick_first":{}}]}`),
-       }
-       cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
-       if err != nil {
-               t.Fatalf("grpc.NewClient() failed: %v", err)
-       }
-       defer cc.Close()
-
-       enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn))
-       enterIdleFunc := func() { enterIdle(cc) }
-       exitIdle := internal.ExitIdleModeForTesting.(func(*grpc.ClientConn) error)
-       exitIdleFunc := func() {
-               if err := exitIdle(cc); err != nil {
-                       t.Errorf("Failed to exit idle mode: %v", err)
-               }
-       }
-       // Spawn goroutines that call methods on the ClientConn to enter and exit
-       // idle mode concurrently for one second.
-       ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
-       defer cancel()
-       var wg sync.WaitGroup
-       wg.Add(4)
-       go func() {
-               runFunc(ctx, enterIdleFunc)
-               wg.Done()
-       }()
-       go func() {
-               runFunc(ctx, enterIdleFunc)
-               wg.Done()
-       }()
-       go func() {
-               runFunc(ctx, exitIdleFunc)
-               wg.Done()
-       }()
-       go func() {
-               runFunc(ctx, exitIdleFunc)
-               wg.Done()
-       }()
-       wg.Wait()
-}
index c2645bb95c051c3d8b9141c50d8224f0e0774c78..acc95ba6a0bc8a5465ea480184b30c8a1cbe93b1 100644 (file)
@@ -48,10 +48,8 @@ type testEnforcer struct {
        enterIdleCh chan struct{}
 }
 
-func (ti *testEnforcer) ExitIdleMode() error {
+func (ti *testEnforcer) ExitIdleMode() {
        ti.exitIdleCh <- struct{}{}
-       return nil
-
 }
 
 func (ti *testEnforcer) EnterIdleMode() {
@@ -273,9 +271,7 @@ func (s) TestManager_Enabled_ExitIdleOnRPC(t *testing.T) {
        for i := 0; i < 100; i++ {
                // A call to OnCallBegin and OnCallEnd simulates an RPC.
                go func() {
-                       if err := mgr.OnCallBegin(); err != nil {
-                               t.Errorf("OnCallBegin() failed: %v", err)
-                       }
+                       mgr.OnCallBegin()
                        mgr.OnCallEnd()
                }()
        }
@@ -316,19 +312,20 @@ type racyEnforcer struct {
 
 // ExitIdleMode sets the internal state to stateExitedIdle. We should only ever
 // exit idle when we are currently in idle.
-func (ri *racyEnforcer) ExitIdleMode() error {
+func (ri *racyEnforcer) ExitIdleMode() {
        // Set only on the initial ExitIdleMode
        if ri.started == false {
                if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateInitial), int32(stateInitial)) {
-                       return fmt.Errorf("idleness enforcer's first ExitIdleMode after EnterIdleMode")
+                       ri.t.Errorf("idleness enforcer's first ExitIdleMode after EnterIdleMode")
+                       return
                }
                ri.started = true
-               return nil
+               return
        }
        if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateEnteredIdle), int32(stateExitedIdle)) {
-               return fmt.Errorf("idleness enforcer asked to exit idle when it did not enter idle earlier")
+               ri.t.Errorf("idleness enforcer asked to exit idle when it did not enter idle earlier")
+               return
        }
-       return nil
 }
 
 // EnterIdleMode attempts to set the internal state to stateEnteredIdle. We should only ever enter idle before RPCs start.
@@ -370,9 +367,7 @@ func (s) TestManager_IdleTimeoutRacesWithOnCallBegin(t *testing.T) {
                                        // Wait for the configured idle timeout and simulate an RPC to
                                        // race with the idle timeout timer callback.
                                        <-time.After(defaultTestIdleTimeout / 50)
-                                       if err := mgr.OnCallBegin(); err != nil {
-                                               t.Errorf("OnCallBegin() failed: %v", err)
-                                       }
+                                       mgr.OnCallBegin()
                                        atomic.StoreInt32((*int32)(&idlenessState), int32(stateActiveRPCs))
                                        mgr.OnCallEnd()
                                }()
index 98fdbc6e1044edbc3e136f18fd790bf056b85ed8..7b6cf0f91fc8a62d6c2814622a9b12baa9147c68 100644 (file)
@@ -29,13 +29,19 @@ import (
 
        "google.golang.org/grpc"
        "google.golang.org/grpc/balancer"
+       "google.golang.org/grpc/codes"
        "google.golang.org/grpc/connectivity"
        "google.golang.org/grpc/credentials/insecure"
        "google.golang.org/grpc/internal"
        "google.golang.org/grpc/internal/balancer/stub"
        "google.golang.org/grpc/internal/channelz"
+       "google.golang.org/grpc/internal/testutils"
        "google.golang.org/grpc/resolver"
        "google.golang.org/grpc/resolver/manual"
+       "google.golang.org/grpc/status"
+
+       testgrpc "google.golang.org/grpc/interop/grpc_testing"
+       testpb "google.golang.org/grpc/interop/grpc_testing"
 )
 
 // TestResolverBalancerInteraction tests:
@@ -127,6 +133,40 @@ func (s) TestResolverBuildFailure(t *testing.T) {
        }
 }
 
+// Tests the case where the resolver reports an error to the channel before
+// reporting an update. Verifies that the channel eventually moves to
+// TransientFailure and subsequent RPCs returns the error reported by the
+// resolver to the user.
+func (s) TestResolverReportError(t *testing.T) {
+       const resolverErr = "test resolver error"
+       r := manual.NewBuilderWithScheme("whatever")
+       r.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) {
+               cc.ReportError(errors.New(resolverErr))
+       }
+
+       cc, err := grpc.NewClient(r.Scheme()+":///", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
+       if err != nil {
+               t.Fatalf("Error creating client: %v", err)
+       }
+       defer cc.Close()
+       cc.Connect()
+
+       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+       defer cancel()
+       testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
+
+       client := testgrpc.NewTestServiceClient(cc)
+       for range 5 {
+               _, err = client.EmptyCall(ctx, &testpb.Empty{})
+               if code := status.Code(err); code != codes.Unavailable {
+                       t.Fatalf("EmptyCall() = %v, want %v", err, codes.Unavailable)
+               }
+               if err == nil || !strings.Contains(err.Error(), resolverErr) {
+                       t.Fatalf("EmptyCall() = %q, want %q", err, resolverErr)
+               }
+       }
+}
+
 // TestEnterIdleDuringResolverUpdateState tests a scenario that used to deadlock
 // while calling UpdateState at the same time as the resolver being closed while
 // the channel enters idle mode.
index 80e16a327cd3ba91640dff1c0e8e771e4a512649..6e613764372b1e6f3ef6b80ec13aaffa9953cf01 100644 (file)
@@ -69,6 +69,7 @@ func (ccr *ccResolverWrapper) start() error {
        errCh := make(chan error)
        ccr.serializer.TrySchedule(func(ctx context.Context) {
                if ctx.Err() != nil {
+                       errCh <- ctx.Err()
                        return
                }
                opts := resolver.BuildOptions{
index ca87ff9776ef68314337327627ac13ee74567055..995c2821bc0c95d7cdf63fcbf68cb3964d87ac6f 100644 (file)
--- a/stream.go
+++ b/stream.go
@@ -179,13 +179,41 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
 
 var emptyMethodConfig = serviceconfig.MethodConfig{}
 
+// endOfClientStream performs cleanup actions required for both successful and
+// failed streams. This includes incrementing channelz stats and invoking all
+// registered OnFinish call options.
+func endOfClientStream(cc *ClientConn, err error, opts ...CallOption) {
+       if channelz.IsOn() {
+               if err != nil {
+                       cc.incrCallsFailed()
+               } else {
+                       cc.incrCallsSucceeded()
+               }
+       }
+
+       for _, o := range opts {
+               if o, ok := o.(OnFinishCallOption); ok {
+                       o.OnFinish(err)
+               }
+       }
+}
+
 func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
+       if channelz.IsOn() {
+               cc.incrCallsStarted()
+       }
+       defer func() {
+               if err != nil {
+                       // Ensure cleanup when stream creation fails.
+                       endOfClientStream(cc, err, opts...)
+               }
+       }()
+
        // Start tracking the RPC for idleness purposes. This is where a stream is
        // created for both streaming and unary RPCs, and hence is a good place to
        // track active RPC count.
-       if err := cc.idlenessMgr.OnCallBegin(); err != nil {
-               return nil, err
-       }
+       cc.idlenessMgr.OnCallBegin()
+
        // Add a calloption, to decrement the active call count, that gets executed
        // when the RPC completes.
        opts = append([]CallOption{OnFinish(func(error) { cc.idlenessMgr.OnCallEnd() })}, opts...)
@@ -204,14 +232,6 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
                        }
                }
        }
-       if channelz.IsOn() {
-               cc.incrCallsStarted()
-               defer func() {
-                       if err != nil {
-                               cc.incrCallsFailed()
-                       }
-               }()
-       }
        // Provide an opportunity for the first RPC to see the first service config
        // provided by the resolver.
        nameResolutionDelayed, err := cc.waitForResolvedAddrs(ctx)
@@ -1042,9 +1062,6 @@ func (cs *clientStream) finish(err error) {
                return
        }
        cs.finished = true
-       for _, onFinish := range cs.callInfo.onFinish {
-               onFinish(err)
-       }
        cs.commitAttemptLocked()
        if cs.attempt != nil {
                cs.attempt.finish(err)
@@ -1084,13 +1101,7 @@ func (cs *clientStream) finish(err error) {
        if err == nil {
                cs.retryThrottler.successfulRPC()
        }
-       if channelz.IsOn() {
-               if err != nil {
-                       cs.cc.incrCallsFailed()
-               } else {
-                       cs.cc.incrCallsSucceeded()
-               }
-       }
+       endOfClientStream(cs.cc, err, cs.opts...)
        cs.cancel()
 }
 
index de4e5b3d6894ce68e4cfd3dde9a49a566de2a4c4..3d365bd8b37861c9150711d70a37550fe481c83b 100644 (file)
@@ -23,6 +23,7 @@ import (
        "fmt"
        "io"
        "net"
+       "strings"
        "sync"
        "testing"
        "time"
@@ -31,6 +32,7 @@ import (
        "google.golang.org/grpc"
        "google.golang.org/grpc/backoff"
        "google.golang.org/grpc/balancer"
+       "google.golang.org/grpc/codes"
        "google.golang.org/grpc/connectivity"
        "google.golang.org/grpc/credentials/insecure"
        "google.golang.org/grpc/internal"
@@ -40,6 +42,7 @@ import (
        "google.golang.org/grpc/internal/testutils"
        "google.golang.org/grpc/resolver"
        "google.golang.org/grpc/resolver/manual"
+       "google.golang.org/grpc/status"
 
        testgrpc "google.golang.org/grpc/interop/grpc_testing"
        testpb "google.golang.org/grpc/interop/grpc_testing"
@@ -583,8 +586,6 @@ func (s) TestConnectivityStateSubscriber(t *testing.T) {
 // Test verifies that a channel starts off in IDLE and transitions to CONNECTING
 // when Connect() is called, and stays there when there are no resolver updates.
 func (s) TestStateTransitions_WithConnect_NoResolverUpdate(t *testing.T) {
-       t.Skip("The channel remains in IDLE until the LB policy updates the state to CONNECTING. This is a bug and the channel should transition to CONNECTING as soon as Connect() is called. See issue #7686.")
-
        backend := stubserver.StartTestService(t, nil)
        defer backend.Stop()
 
@@ -618,8 +619,6 @@ func (s) TestStateTransitions_WithConnect_NoResolverUpdate(t *testing.T) {
 // Test verifies that a channel starts off in IDLE and transitions to CONNECTING
 // when Connect() is called, and stays there when there are no resolver updates.
 func (s) TestStateTransitions_WithRPC_NoResolverUpdate(t *testing.T) {
-       t.Skip("The channel remains in IDLE until the LB policy updates the state to CONNECTING. This is a bug and the channel should transition to CONNECTING as soon as an RPC call is made. See issue #7686.")
-
        backend := stubserver.StartTestService(t, nil)
        defer backend.Stop()
 
@@ -641,8 +640,7 @@ func (s) TestStateTransitions_WithRPC_NoResolverUpdate(t *testing.T) {
 
        // Make an RPC call to transition the channel to CONNECTING.
        go func() {
-               _, err := testgrpc.NewTestServiceClient(cc).EmptyCall(ctx, &testpb.Empty{})
-               if err == nil {
+               if _, err := testgrpc.NewTestServiceClient(cc).EmptyCall(ctx, &testpb.Empty{}); err == nil {
                        t.Errorf("Expected RPC to fail, but it succeeded")
                }
        }()
@@ -656,3 +654,179 @@ func (s) TestStateTransitions_WithRPC_NoResolverUpdate(t *testing.T) {
        defer shortCancel()
        testutils.AwaitNoStateChange(shortCtx, t, cc, connectivity.Connecting)
 }
+
+const testResolverBuildFailureScheme = "test-resolver-build-failure"
+
+// testResolverBuilder is a resolver builder that fails the first time its
+// Build method is called, and succeeds thereafter.
+type testResolverBuilder struct {
+       logger interface {
+               Logf(format string, args ...any)
+       }
+       buildCalled bool
+       manualR     *manual.Resolver
+}
+
+func (b *testResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
+       b.logger.Logf("testResolverBuilder: Build called with target: %v", target)
+       if !b.buildCalled {
+               b.buildCalled = true
+               b.logger.Logf("testResolverBuilder: returning build failure")
+               return nil, fmt.Errorf("simulated resolver build failure")
+       }
+       return b.manualR.Build(target, cc, opts)
+}
+
+func (b *testResolverBuilder) Scheme() string {
+       return testResolverBuildFailureScheme
+}
+
+// Tests for state transitions when the resolver initially fails to build.
+func (s) TestStateTransitions_ResolverBuildFailure(t *testing.T) {
+       tests := []struct {
+               name            string
+               exitIdleWithRPC bool
+       }{
+               {
+                       name:            "exitIdleByConnecting",
+                       exitIdleWithRPC: false,
+               },
+               {
+                       name:            "exitIdleByRPC",
+                       exitIdleWithRPC: true,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       mr := manual.NewBuilderWithScheme("whatever" + tt.name)
+                       backend := stubserver.StartTestService(t, nil)
+                       defer backend.Stop()
+                       mr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
+
+                       dopts := []grpc.DialOption{
+                               grpc.WithTransportCredentials(insecure.NewCredentials()),
+                               grpc.WithResolvers(&testResolverBuilder{logger: t, manualR: mr}),
+                       }
+
+                       cc, err := grpc.NewClient(testResolverBuildFailureScheme+":///", dopts...)
+                       if err != nil {
+                               t.Fatalf("Failed to create new client: %v", err)
+                       }
+                       defer cc.Close()
+
+                       // Ensure that the client is in IDLE before connecting.
+                       if state := cc.GetState(); state != connectivity.Idle {
+                               t.Fatalf("Expected initial state to be IDLE, got %v", state)
+                       }
+
+                       // Subscribe to state updates.
+                       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+                       defer cancel()
+                       stateCh := make(chan connectivity.State, 1)
+                       s := &funcConnectivityStateSubscriber{
+                               onMsg: func(s connectivity.State) {
+                                       select {
+                                       case stateCh <- s:
+                                       case <-ctx.Done():
+                                       }
+                               },
+                       }
+                       internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, s)
+
+                       if tt.exitIdleWithRPC {
+                               // The first attempt to kick the channel is expected to return
+                               // the resolver build error to the RPC.
+                               const wantErr = "simulated resolver build failure"
+                               for range 2 {
+                                       _, err := testgrpc.NewTestServiceClient(cc).EmptyCall(ctx, &testpb.Empty{})
+                                       if code := status.Code(err); code != codes.Unavailable {
+                                               t.Fatalf("EmptyCall RPC failed with code %v, want %v", err, codes.Unavailable)
+                                       }
+                                       if err == nil || !strings.Contains(err.Error(), wantErr) {
+                                               t.Fatalf("EmptyCall RPC failed with error: %q, want %q", err, wantErr)
+                                       }
+                               }
+                       } else {
+                               cc.Connect()
+                       }
+
+                       wantStates := []connectivity.State{
+                               connectivity.Connecting,       // When channel exits IDLE for the first time.
+                               connectivity.TransientFailure, // Resolver build failure.
+                               connectivity.Idle,             // After idle timeout.
+                               connectivity.Connecting,       // When channel exits IDLE again.
+                               connectivity.Ready,            // Successful resolver build and connection to backend.
+                       }
+                       for _, wantState := range wantStates {
+                               waitForState(ctx, t, stateCh, wantState)
+                               switch wantState {
+                               case connectivity.TransientFailure:
+                                       internal.EnterIdleModeForTesting.(func(*grpc.ClientConn))(cc)
+                               case connectivity.Idle:
+                                       if tt.exitIdleWithRPC {
+                                               if _, err := testgrpc.NewTestServiceClient(cc).EmptyCall(ctx, &testpb.Empty{}); err != nil {
+                                                       t.Fatalf("EmptyCall RPC failed: %v", err)
+                                               }
+                                       } else {
+                                               cc.Connect()
+                                       }
+                               }
+                       }
+               })
+       }
+}
+
+// Tests for state transitions when the resolver reports no addresses.
+func (s) TestStateTransitions_WithRPC_ResolverUpdateContainsNoAddresses(t *testing.T) {
+       mr := manual.NewBuilderWithScheme("e2e-test")
+       mr.InitialState(resolver.State{})
+       defer mr.Close()
+
+       cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
+       if err != nil {
+               t.Fatalf("Failed to create new client: %v", err)
+       }
+       defer cc.Close()
+
+       if state := cc.GetState(); state != connectivity.Idle {
+               t.Fatalf("Expected initial state to be IDLE, got %v", state)
+       }
+
+       // Subscribe to state updates.
+       ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+       defer cancel()
+       stateCh := make(chan connectivity.State, 1)
+       s := &funcConnectivityStateSubscriber{
+               onMsg: func(s connectivity.State) {
+                       select {
+                       case stateCh <- s:
+                       case <-ctx.Done():
+                       }
+               },
+       }
+       internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, s)
+
+       // Make an RPC call to transition the channel to CONNECTING.
+       const wantErr = "name resolver error: produced zero addresses"
+       for range 2 {
+               _, err := testgrpc.NewTestServiceClient(cc).EmptyCall(ctx, &testpb.Empty{})
+               if code := status.Code(err); code != codes.Unavailable {
+                       t.Errorf("EmptyCall RPC failed with code %v, want %v", err, codes.Unavailable)
+               }
+               if err == nil || !strings.Contains(err.Error(), wantErr) {
+                       t.Errorf("EmptyCall RPC failed with error: %q, want %q", err, wantErr)
+               }
+       }
+
+       wantStates := []connectivity.State{
+               connectivity.Connecting,       // When channel exits IDLE for the first time.
+               connectivity.TransientFailure, // No endpoints from the resolver
+               connectivity.Idle,             // After idle timeout.
+       }
+       for _, wantState := range wantStates {
+               waitForState(ctx, t, stateCh, wantState)
+               if wantState == connectivity.TransientFailure {
+                       internal.EnterIdleModeForTesting.(func(*grpc.ClientConn))(cc)
+               }
+       }
+}