}()
// This creates the name resolver, load balancer, etc.
- if err := cc.idlenessMgr.ExitIdleMode(); err != nil {
- return nil, err
+ if err := cc.exitIdleMode(); err != nil {
+ return nil, fmt.Errorf("failed to exit idle mode: %w", err)
}
+ cc.idlenessMgr.UnsafeSetNotIdle()
// Return now for non-blocking dials.
if !cc.dopts.block {
Severity: channelz.CtInfo,
}
}
- channelz.AddTraceEvent(logger, cc.channelz, 0, ted)
+ channelz.AddTraceEvent(logger, cc.channelz, 1, ted)
}
type idler ClientConn
(*ClientConn)(i).enterIdleMode()
}
-func (i *idler) ExitIdleMode() error {
- return (*ClientConn)(i).exitIdleMode()
+func (i *idler) ExitIdleMode() {
+ // Ignore the error returned from this method, because from the perspective
+ // of the caller (idleness manager), the channel would have always moved out
+ // of IDLE by the time this method returns.
+ (*ClientConn)(i).exitIdleMode()
}
// exitIdleMode moves the channel out of idle mode by recreating the name
// resolver and load balancer. This should never be called directly; use
// cc.idlenessMgr.ExitIdleMode instead.
-func (cc *ClientConn) exitIdleMode() (err error) {
+func (cc *ClientConn) exitIdleMode() error {
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
}
cc.mu.Unlock()
+ // Set state to CONNECTING before building the name resolver
+ // so the channel does not remain in IDLE.
+ cc.csMgr.updateState(connectivity.Connecting)
+
// This needs to be called without cc.mu because this builds a new resolver
// which might update state or report error inline, which would then need to
// acquire cc.mu.
if err := cc.resolverWrapper.start(); err != nil {
- return err
+ // If resolver creation fails, treat it like an error reported by the
+ // resolver before any valid udpates. Set channel's state to
+ // TransientFailure, and set an erroring picker with the resolver build
+ // error, which will returned as part of any subsequent RPCs.
+ logger.Warningf("Failed to start resolver: %v", err)
+ cc.csMgr.updateState(connectivity.TransientFailure)
+ cc.mu.Lock()
+ cc.updateResolverStateAndUnlock(resolver.State{}, err)
+ return fmt.Errorf("failed to start resolver: %w", err)
}
cc.addTraceEvent("exiting idle mode")
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
// release.
func (cc *ClientConn) Connect() {
- if err := cc.idlenessMgr.ExitIdleMode(); err != nil {
- cc.addTraceEvent(err.Error())
- return
- }
+ cc.idlenessMgr.ExitIdleMode()
+
// If the ClientConn was not in idle mode, we need to call ExitIdle on the
// LB policy so that connections can be created.
cc.mu.Lock()
internal.EnterIdleModeForTesting = func(cc *ClientConn) {
cc.idlenessMgr.EnterIdleModeForTesting()
}
- internal.ExitIdleModeForTesting = func(cc *ClientConn) error {
- return cc.idlenessMgr.ExitIdleMode()
+ internal.ExitIdleModeForTesting = func(cc *ClientConn) {
+ cc.idlenessMgr.ExitIdleMode()
}
}
}
func (s) TestParsedTarget_Failure_WithoutCustomDialer_WithNewClient(t *testing.T) {
- targets := []string{
- "",
- "unix://a/b/c",
- "unix://authority",
- "unix-abstract://authority/a/b/c",
- "unix-abstract://authority",
+ tests := []struct {
+ target string
+ wantErrSubstr string
+ }{
+
+ {target: "", wantErrSubstr: "invalid target address"},
+ {target: "unix://a/b/c", wantErrSubstr: "invalid (non-empty) authority"},
+ {target: "unix://authority", wantErrSubstr: "invalid (non-empty) authority"},
+ {target: "unix-abstract://authority/a/b/c", wantErrSubstr: "invalid (non-empty) authority"},
+ {target: "unix-abstract://authority", wantErrSubstr: "invalid (non-empty) authority"},
}
- for _, target := range targets {
- t.Run(target, func(t *testing.T) {
+ for _, test := range tests {
+ t.Run(test.target, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- cc, err := NewClient(target, WithTransportCredentials(insecure.NewCredentials()))
+ cc, err := NewClient(test.target, WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
- t.Fatalf("NewClient(%q) failed: %v", target, err)
+ t.Fatalf("NewClient(%q) failed: %v", test, err)
}
defer cc.Close()
- const wantErrSubstr = "failed to exit idle mode"
if _, err := cc.NewStream(ctx, &StreamDesc{}, "/my.service.v1.MyService/UnaryCall"); err == nil {
- t.Fatalf("NewStream() succeeded with target = %q, cc.parsedTarget = %+v, expected to fail", target, cc.parsedTarget)
- } else if !strings.Contains(err.Error(), wantErrSubstr) {
- t.Fatalf("NewStream() with target = %q returned unexpected error: got %v, want substring %q", target, err, wantErrSubstr)
+ t.Fatalf("NewStream() succeeded with target = %q, cc.parsedTarget = %+v, expected to fail", test, cc.parsedTarget)
+ } else if !strings.Contains(err.Error(), test.wantErrSubstr) {
+ t.Fatalf("NewStream() with target = %q returned unexpected error: got %v, want substring %q", test, err, test.wantErrSubstr)
}
})
}
import (
"context"
+ "fmt"
"net"
"strings"
"testing"
type stringerVal struct{ s string }
func (s stringerVal) String() string { return s.s }
+
+const errResolverBuilderScheme = "test-resolver-build-failure"
+
+// errResolverBuilder is a resolver builder that returns an error from its Build
+// method.
+type errResolverBuilder struct {
+ err error
+}
+
+func (b *errResolverBuilder) Build(resolver.Target, resolver.ClientConn, resolver.BuildOptions) (resolver.Resolver, error) {
+ return nil, b.err
+}
+
+func (b *errResolverBuilder) Scheme() string {
+ return errResolverBuilderScheme
+}
+
+// Tests that Dial returns an error if the resolver builder returns an error
+// from its Build method.
+func (s) TestDial_ResolverBuilder_Error(t *testing.T) {
+ resolverErr := fmt.Errorf("resolver builder error")
+ dopts := []DialOption{
+ WithTransportCredentials(insecure.NewCredentials()),
+ WithResolvers(&errResolverBuilder{err: resolverErr}),
+ }
+ _, err := Dial(errResolverBuilderScheme+":///test.server", dopts...)
+ if err == nil {
+ t.Fatalf("Dial() succeeded when it should have failed")
+ }
+ if !strings.Contains(err.Error(), resolverErr.Error()) {
+ t.Fatalf("Dial() failed with error %v, want %v", err, resolverErr)
+ }
+}
package idle
import (
- "fmt"
"math"
"sync"
"sync/atomic"
return time.AfterFunc(d, f)
}
-// Enforcer is the functionality provided by grpc.ClientConn to enter
-// and exit from idle mode.
-type Enforcer interface {
- ExitIdleMode() error
+// ClientConn is the functionality provided by grpc.ClientConn to enter and exit
+// from idle mode.
+type ClientConn interface {
+ ExitIdleMode()
EnterIdleMode()
}
-// Manager implements idleness detection and calls the configured Enforcer to
-// enter/exit idle mode when appropriate. Must be created by NewManager.
+// Manager implements idleness detection and calls the ClientConn to enter/exit
+// idle mode when appropriate. Must be created by NewManager.
type Manager struct {
// State accessed atomically.
lastCallEndTime int64 // Unix timestamp in nanos; time when the most recent RPC completed.
// Can be accessed without atomics or mutex since these are set at creation
// time and read-only after that.
- enforcer Enforcer // Functionality provided by grpc.ClientConn.
- timeout time.Duration
+ cc ClientConn // Functionality provided by grpc.ClientConn.
+ timeout time.Duration
// idleMu is used to guarantee mutual exclusion in two scenarios:
// - Opposing intentions:
// NewManager creates a new idleness manager implementation for the
// given idle timeout. It begins in idle mode.
-func NewManager(enforcer Enforcer, timeout time.Duration) *Manager {
+func NewManager(cc ClientConn, timeout time.Duration) *Manager {
return &Manager{
- enforcer: enforcer,
+ cc: cc,
timeout: timeout,
actuallyIdle: true,
activeCallsCount: -math.MaxInt32,
// Now that we've checked that there has been no activity, attempt to enter
// idle mode, which is very likely to succeed.
- if m.tryEnterIdleMode() {
+ if m.tryEnterIdleMode(true) {
// Successfully entered idle mode. No timer needed until we exit idle.
return
}
// that, it performs a last minute check to ensure that no new RPC has come in,
// making the channel active.
//
+// checkActivity controls if a check for RPC activity, since the last time the
+// idle_timeout fired, is made.
+
// Return value indicates whether or not the channel moved to idle mode.
//
// Holds idleMu which ensures mutual exclusion with exitIdleMode.
-func (m *Manager) tryEnterIdleMode() bool {
+func (m *Manager) tryEnterIdleMode(checkActivity bool) bool {
// Setting the activeCallsCount to -math.MaxInt32 indicates to OnCallBegin()
// that the channel is either in idle mode or is trying to get there.
if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) {
atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
return false
}
- if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {
+ if checkActivity && atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {
// A very short RPC could have come in (and also finished) after we
// checked for calls count and activity in handleIdleTimeout(), but
// before the CAS operation. So, we need to check for activity again.
// No new RPCs have come in since we set the active calls count value to
// -math.MaxInt32. And since we have the lock, it is safe to enter idle mode
// unconditionally now.
- m.enforcer.EnterIdleMode()
+ m.cc.EnterIdleMode()
m.actuallyIdle = true
return true
}
// EnterIdleModeForTesting instructs the channel to enter idle mode.
func (m *Manager) EnterIdleModeForTesting() {
- m.tryEnterIdleMode()
+ m.tryEnterIdleMode(false)
}
// OnCallBegin is invoked at the start of every RPC.
-func (m *Manager) OnCallBegin() error {
+func (m *Manager) OnCallBegin() {
if m.isClosed() {
- return nil
+ return
}
if atomic.AddInt32(&m.activeCallsCount, 1) > 0 {
// Channel is not idle now. Set the activity bit and allow the call.
atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)
- return nil
+ return
}
// Channel is either in idle mode or is in the process of moving to idle
// mode. Attempt to exit idle mode to allow this RPC.
- if err := m.ExitIdleMode(); err != nil {
- // Undo the increment to calls count, and return an error causing the
- // RPC to fail.
- atomic.AddInt32(&m.activeCallsCount, -1)
- return err
- }
-
+ m.ExitIdleMode()
atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)
- return nil
}
-// ExitIdleMode instructs m to call the enforcer's ExitIdleMode and update m's
+// ExitIdleMode instructs m to call the ClientConn's ExitIdleMode and update its
// internal state.
-func (m *Manager) ExitIdleMode() error {
+func (m *Manager) ExitIdleMode() {
// Holds idleMu which ensures mutual exclusion with tryEnterIdleMode.
m.idleMu.Lock()
defer m.idleMu.Unlock()
// m.ExitIdleMode.
//
// In any case, there is nothing to do here.
- return nil
+ return
}
- if err := m.enforcer.ExitIdleMode(); err != nil {
- return fmt.Errorf("failed to exit idle mode: %w", err)
- }
+ m.cc.ExitIdleMode()
// Undo the idle entry process. This also respects any new RPC attempts.
atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
// Start a new timer to fire after the configured idle timeout.
m.resetIdleTimerLocked(m.timeout)
- return nil
+}
+
+// UnsafeSetNotIdle instructs the Manager to update its internal state to
+// reflect the reality that the channel is no longer in IDLE mode.
+//
+// N.B. This method is intended only for internal use by the gRPC client
+// when it exits IDLE mode **manually** from `Dial`. The callsite must ensure:
+// - The channel was **actually in IDLE mode** immediately prior to the call.
+// - There is **no concurrent activity** that could cause the channel to exit
+// IDLE mode *naturally* at the same time.
+func (m *Manager) UnsafeSetNotIdle() {
+ m.idleMu.Lock()
+ defer m.idleMu.Unlock()
+
+ atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
+ m.actuallyIdle = false
+ m.resetIdleTimerLocked(m.timeout)
}
// OnCallEnd is invoked at the end of every RPC.
"fmt"
"io"
"strings"
- "sync"
"testing"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
- "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpctest"
// Verify that the ClientConn moves back to READY.
testutils.AwaitState(ctx, t, cc, connectivity.Ready)
}
-
-// runFunc runs f repeatedly until the context expires.
-func runFunc(ctx context.Context, f func()) {
- for {
- select {
- case <-ctx.Done():
- return
- case <-time.After(10 * time.Millisecond):
- f()
- }
- }
-}
-
-// Tests the scenario where there are concurrent calls to exit and enter idle
-// mode on the ClientConn. Verifies that there is no race under this scenario.
-func (s) TestChannelIdleness_RaceBetweenEnterAndExitIdleMode(t *testing.T) {
- // Start a test backend and set the bootstrap state of the resolver to
- // include this address. This will ensure that when the resolver is
- // restarted when exiting idle, it will push the same address to grpc again.
- r := manual.NewBuilderWithScheme("whatever")
- backend := stubserver.StartTestService(t, nil)
- defer backend.Stop()
- r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
-
- // Create a ClientConn with a long idle_timeout. We will explicitly trigger
- // entering and exiting IDLE mode from the test.
- dopts := []grpc.DialOption{
- grpc.WithTransportCredentials(insecure.NewCredentials()),
- grpc.WithResolvers(r),
- grpc.WithIdleTimeout(30 * time.Minute),
- grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"pick_first":{}}]}`),
- }
- cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
- if err != nil {
- t.Fatalf("grpc.NewClient() failed: %v", err)
- }
- defer cc.Close()
-
- enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn))
- enterIdleFunc := func() { enterIdle(cc) }
- exitIdle := internal.ExitIdleModeForTesting.(func(*grpc.ClientConn) error)
- exitIdleFunc := func() {
- if err := exitIdle(cc); err != nil {
- t.Errorf("Failed to exit idle mode: %v", err)
- }
- }
- // Spawn goroutines that call methods on the ClientConn to enter and exit
- // idle mode concurrently for one second.
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
- defer cancel()
- var wg sync.WaitGroup
- wg.Add(4)
- go func() {
- runFunc(ctx, enterIdleFunc)
- wg.Done()
- }()
- go func() {
- runFunc(ctx, enterIdleFunc)
- wg.Done()
- }()
- go func() {
- runFunc(ctx, exitIdleFunc)
- wg.Done()
- }()
- go func() {
- runFunc(ctx, exitIdleFunc)
- wg.Done()
- }()
- wg.Wait()
-}
enterIdleCh chan struct{}
}
-func (ti *testEnforcer) ExitIdleMode() error {
+func (ti *testEnforcer) ExitIdleMode() {
ti.exitIdleCh <- struct{}{}
- return nil
-
}
func (ti *testEnforcer) EnterIdleMode() {
for i := 0; i < 100; i++ {
// A call to OnCallBegin and OnCallEnd simulates an RPC.
go func() {
- if err := mgr.OnCallBegin(); err != nil {
- t.Errorf("OnCallBegin() failed: %v", err)
- }
+ mgr.OnCallBegin()
mgr.OnCallEnd()
}()
}
// ExitIdleMode sets the internal state to stateExitedIdle. We should only ever
// exit idle when we are currently in idle.
-func (ri *racyEnforcer) ExitIdleMode() error {
+func (ri *racyEnforcer) ExitIdleMode() {
// Set only on the initial ExitIdleMode
if ri.started == false {
if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateInitial), int32(stateInitial)) {
- return fmt.Errorf("idleness enforcer's first ExitIdleMode after EnterIdleMode")
+ ri.t.Errorf("idleness enforcer's first ExitIdleMode after EnterIdleMode")
+ return
}
ri.started = true
- return nil
+ return
}
if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateEnteredIdle), int32(stateExitedIdle)) {
- return fmt.Errorf("idleness enforcer asked to exit idle when it did not enter idle earlier")
+ ri.t.Errorf("idleness enforcer asked to exit idle when it did not enter idle earlier")
+ return
}
- return nil
}
// EnterIdleMode attempts to set the internal state to stateEnteredIdle. We should only ever enter idle before RPCs start.
// Wait for the configured idle timeout and simulate an RPC to
// race with the idle timeout timer callback.
<-time.After(defaultTestIdleTimeout / 50)
- if err := mgr.OnCallBegin(); err != nil {
- t.Errorf("OnCallBegin() failed: %v", err)
- }
+ mgr.OnCallBegin()
atomic.StoreInt32((*int32)(&idlenessState), int32(stateActiveRPCs))
mgr.OnCallEnd()
}()
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/channelz"
+ "google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
+ "google.golang.org/grpc/status"
+
+ testgrpc "google.golang.org/grpc/interop/grpc_testing"
+ testpb "google.golang.org/grpc/interop/grpc_testing"
)
// TestResolverBalancerInteraction tests:
}
}
+// Tests the case where the resolver reports an error to the channel before
+// reporting an update. Verifies that the channel eventually moves to
+// TransientFailure and subsequent RPCs returns the error reported by the
+// resolver to the user.
+func (s) TestResolverReportError(t *testing.T) {
+ const resolverErr = "test resolver error"
+ r := manual.NewBuilderWithScheme("whatever")
+ r.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) {
+ cc.ReportError(errors.New(resolverErr))
+ }
+
+ cc, err := grpc.NewClient(r.Scheme()+":///", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
+ if err != nil {
+ t.Fatalf("Error creating client: %v", err)
+ }
+ defer cc.Close()
+ cc.Connect()
+
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+ testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
+
+ client := testgrpc.NewTestServiceClient(cc)
+ for range 5 {
+ _, err = client.EmptyCall(ctx, &testpb.Empty{})
+ if code := status.Code(err); code != codes.Unavailable {
+ t.Fatalf("EmptyCall() = %v, want %v", err, codes.Unavailable)
+ }
+ if err == nil || !strings.Contains(err.Error(), resolverErr) {
+ t.Fatalf("EmptyCall() = %q, want %q", err, resolverErr)
+ }
+ }
+}
+
// TestEnterIdleDuringResolverUpdateState tests a scenario that used to deadlock
// while calling UpdateState at the same time as the resolver being closed while
// the channel enters idle mode.
errCh := make(chan error)
ccr.serializer.TrySchedule(func(ctx context.Context) {
if ctx.Err() != nil {
+ errCh <- ctx.Err()
return
}
opts := resolver.BuildOptions{
var emptyMethodConfig = serviceconfig.MethodConfig{}
+// endOfClientStream performs cleanup actions required for both successful and
+// failed streams. This includes incrementing channelz stats and invoking all
+// registered OnFinish call options.
+func endOfClientStream(cc *ClientConn, err error, opts ...CallOption) {
+ if channelz.IsOn() {
+ if err != nil {
+ cc.incrCallsFailed()
+ } else {
+ cc.incrCallsSucceeded()
+ }
+ }
+
+ for _, o := range opts {
+ if o, ok := o.(OnFinishCallOption); ok {
+ o.OnFinish(err)
+ }
+ }
+}
+
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
+ if channelz.IsOn() {
+ cc.incrCallsStarted()
+ }
+ defer func() {
+ if err != nil {
+ // Ensure cleanup when stream creation fails.
+ endOfClientStream(cc, err, opts...)
+ }
+ }()
+
// Start tracking the RPC for idleness purposes. This is where a stream is
// created for both streaming and unary RPCs, and hence is a good place to
// track active RPC count.
- if err := cc.idlenessMgr.OnCallBegin(); err != nil {
- return nil, err
- }
+ cc.idlenessMgr.OnCallBegin()
+
// Add a calloption, to decrement the active call count, that gets executed
// when the RPC completes.
opts = append([]CallOption{OnFinish(func(error) { cc.idlenessMgr.OnCallEnd() })}, opts...)
}
}
}
- if channelz.IsOn() {
- cc.incrCallsStarted()
- defer func() {
- if err != nil {
- cc.incrCallsFailed()
- }
- }()
- }
// Provide an opportunity for the first RPC to see the first service config
// provided by the resolver.
nameResolutionDelayed, err := cc.waitForResolvedAddrs(ctx)
return
}
cs.finished = true
- for _, onFinish := range cs.callInfo.onFinish {
- onFinish(err)
- }
cs.commitAttemptLocked()
if cs.attempt != nil {
cs.attempt.finish(err)
if err == nil {
cs.retryThrottler.successfulRPC()
}
- if channelz.IsOn() {
- if err != nil {
- cs.cc.incrCallsFailed()
- } else {
- cs.cc.incrCallsSucceeded()
- }
- }
+ endOfClientStream(cs.cc, err, cs.opts...)
cs.cancel()
}
"fmt"
"io"
"net"
+ "strings"
"sync"
"testing"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
+ "google.golang.org/grpc/status"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
// Test verifies that a channel starts off in IDLE and transitions to CONNECTING
// when Connect() is called, and stays there when there are no resolver updates.
func (s) TestStateTransitions_WithConnect_NoResolverUpdate(t *testing.T) {
- t.Skip("The channel remains in IDLE until the LB policy updates the state to CONNECTING. This is a bug and the channel should transition to CONNECTING as soon as Connect() is called. See issue #7686.")
-
backend := stubserver.StartTestService(t, nil)
defer backend.Stop()
// Test verifies that a channel starts off in IDLE and transitions to CONNECTING
// when Connect() is called, and stays there when there are no resolver updates.
func (s) TestStateTransitions_WithRPC_NoResolverUpdate(t *testing.T) {
- t.Skip("The channel remains in IDLE until the LB policy updates the state to CONNECTING. This is a bug and the channel should transition to CONNECTING as soon as an RPC call is made. See issue #7686.")
-
backend := stubserver.StartTestService(t, nil)
defer backend.Stop()
// Make an RPC call to transition the channel to CONNECTING.
go func() {
- _, err := testgrpc.NewTestServiceClient(cc).EmptyCall(ctx, &testpb.Empty{})
- if err == nil {
+ if _, err := testgrpc.NewTestServiceClient(cc).EmptyCall(ctx, &testpb.Empty{}); err == nil {
t.Errorf("Expected RPC to fail, but it succeeded")
}
}()
defer shortCancel()
testutils.AwaitNoStateChange(shortCtx, t, cc, connectivity.Connecting)
}
+
+const testResolverBuildFailureScheme = "test-resolver-build-failure"
+
+// testResolverBuilder is a resolver builder that fails the first time its
+// Build method is called, and succeeds thereafter.
+type testResolverBuilder struct {
+ logger interface {
+ Logf(format string, args ...any)
+ }
+ buildCalled bool
+ manualR *manual.Resolver
+}
+
+func (b *testResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
+ b.logger.Logf("testResolverBuilder: Build called with target: %v", target)
+ if !b.buildCalled {
+ b.buildCalled = true
+ b.logger.Logf("testResolverBuilder: returning build failure")
+ return nil, fmt.Errorf("simulated resolver build failure")
+ }
+ return b.manualR.Build(target, cc, opts)
+}
+
+func (b *testResolverBuilder) Scheme() string {
+ return testResolverBuildFailureScheme
+}
+
+// Tests for state transitions when the resolver initially fails to build.
+func (s) TestStateTransitions_ResolverBuildFailure(t *testing.T) {
+ tests := []struct {
+ name string
+ exitIdleWithRPC bool
+ }{
+ {
+ name: "exitIdleByConnecting",
+ exitIdleWithRPC: false,
+ },
+ {
+ name: "exitIdleByRPC",
+ exitIdleWithRPC: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ mr := manual.NewBuilderWithScheme("whatever" + tt.name)
+ backend := stubserver.StartTestService(t, nil)
+ defer backend.Stop()
+ mr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
+
+ dopts := []grpc.DialOption{
+ grpc.WithTransportCredentials(insecure.NewCredentials()),
+ grpc.WithResolvers(&testResolverBuilder{logger: t, manualR: mr}),
+ }
+
+ cc, err := grpc.NewClient(testResolverBuildFailureScheme+":///", dopts...)
+ if err != nil {
+ t.Fatalf("Failed to create new client: %v", err)
+ }
+ defer cc.Close()
+
+ // Ensure that the client is in IDLE before connecting.
+ if state := cc.GetState(); state != connectivity.Idle {
+ t.Fatalf("Expected initial state to be IDLE, got %v", state)
+ }
+
+ // Subscribe to state updates.
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+ stateCh := make(chan connectivity.State, 1)
+ s := &funcConnectivityStateSubscriber{
+ onMsg: func(s connectivity.State) {
+ select {
+ case stateCh <- s:
+ case <-ctx.Done():
+ }
+ },
+ }
+ internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, s)
+
+ if tt.exitIdleWithRPC {
+ // The first attempt to kick the channel is expected to return
+ // the resolver build error to the RPC.
+ const wantErr = "simulated resolver build failure"
+ for range 2 {
+ _, err := testgrpc.NewTestServiceClient(cc).EmptyCall(ctx, &testpb.Empty{})
+ if code := status.Code(err); code != codes.Unavailable {
+ t.Fatalf("EmptyCall RPC failed with code %v, want %v", err, codes.Unavailable)
+ }
+ if err == nil || !strings.Contains(err.Error(), wantErr) {
+ t.Fatalf("EmptyCall RPC failed with error: %q, want %q", err, wantErr)
+ }
+ }
+ } else {
+ cc.Connect()
+ }
+
+ wantStates := []connectivity.State{
+ connectivity.Connecting, // When channel exits IDLE for the first time.
+ connectivity.TransientFailure, // Resolver build failure.
+ connectivity.Idle, // After idle timeout.
+ connectivity.Connecting, // When channel exits IDLE again.
+ connectivity.Ready, // Successful resolver build and connection to backend.
+ }
+ for _, wantState := range wantStates {
+ waitForState(ctx, t, stateCh, wantState)
+ switch wantState {
+ case connectivity.TransientFailure:
+ internal.EnterIdleModeForTesting.(func(*grpc.ClientConn))(cc)
+ case connectivity.Idle:
+ if tt.exitIdleWithRPC {
+ if _, err := testgrpc.NewTestServiceClient(cc).EmptyCall(ctx, &testpb.Empty{}); err != nil {
+ t.Fatalf("EmptyCall RPC failed: %v", err)
+ }
+ } else {
+ cc.Connect()
+ }
+ }
+ }
+ })
+ }
+}
+
+// Tests for state transitions when the resolver reports no addresses.
+func (s) TestStateTransitions_WithRPC_ResolverUpdateContainsNoAddresses(t *testing.T) {
+ mr := manual.NewBuilderWithScheme("e2e-test")
+ mr.InitialState(resolver.State{})
+ defer mr.Close()
+
+ cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
+ if err != nil {
+ t.Fatalf("Failed to create new client: %v", err)
+ }
+ defer cc.Close()
+
+ if state := cc.GetState(); state != connectivity.Idle {
+ t.Fatalf("Expected initial state to be IDLE, got %v", state)
+ }
+
+ // Subscribe to state updates.
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+ stateCh := make(chan connectivity.State, 1)
+ s := &funcConnectivityStateSubscriber{
+ onMsg: func(s connectivity.State) {
+ select {
+ case stateCh <- s:
+ case <-ctx.Done():
+ }
+ },
+ }
+ internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, s)
+
+ // Make an RPC call to transition the channel to CONNECTING.
+ const wantErr = "name resolver error: produced zero addresses"
+ for range 2 {
+ _, err := testgrpc.NewTestServiceClient(cc).EmptyCall(ctx, &testpb.Empty{})
+ if code := status.Code(err); code != codes.Unavailable {
+ t.Errorf("EmptyCall RPC failed with code %v, want %v", err, codes.Unavailable)
+ }
+ if err == nil || !strings.Contains(err.Error(), wantErr) {
+ t.Errorf("EmptyCall RPC failed with error: %q, want %q", err, wantErr)
+ }
+ }
+
+ wantStates := []connectivity.State{
+ connectivity.Connecting, // When channel exits IDLE for the first time.
+ connectivity.TransientFailure, // No endpoints from the resolver
+ connectivity.Idle, // After idle timeout.
+ }
+ for _, wantState := range wantStates {
+ waitForState(ctx, t, stateCh, wantState)
+ if wantState == connectivity.TransientFailure {
+ internal.EnterIdleModeForTesting.(func(*grpc.ClientConn))(cc)
+ }
+ }
+}