"multiping": "multiping",
"otel_export_telemetry_domains": "otel_export_telemetry_domains",
"pick_first_ignore_empty_updates": "pick_first_ignore_empty_updates",
+ "pick_first_ready_to_connecting": "pick_first_ready_to_connecting",
"pipelined_read_secure_endpoint": "event_engine_client,event_engine_listener,event_engine_secure_endpoint,pipelined_read_secure_endpoint",
"pollset_alternative": "event_engine_client,event_engine_listener,pollset_alternative",
"prioritize_finished_requests": "prioritize_finished_requests",
"subchannel_wrapper_cleanup_on_orphan",
],
"cpp_lb_end2end_test": [
+ "pick_first_ready_to_connecting",
"rr_wrr_connect_from_random_index",
"transport_state_watcher",
],
"tcp_rcv_lowat",
],
"lb_unit_test": [
+ "pick_first_ready_to_connecting",
"rr_wrr_connect_from_random_index",
],
"minimal_stack_test": [
"subchannel_wrapper_cleanup_on_orphan",
],
"cpp_lb_end2end_test": [
+ "pick_first_ready_to_connecting",
"rr_wrr_connect_from_random_index",
"transport_state_watcher",
],
"tcp_rcv_lowat",
],
"lb_unit_test": [
+ "pick_first_ready_to_connecting",
"rr_wrr_connect_from_random_index",
],
"minimal_stack_test": [
"subchannel_wrapper_cleanup_on_orphan",
],
"cpp_lb_end2end_test": [
+ "pick_first_ready_to_connecting",
"rr_wrr_connect_from_random_index",
"transport_state_watcher",
],
"tcp_rcv_lowat",
],
"lb_unit_test": [
+ "pick_first_ready_to_connecting",
"rr_wrr_connect_from_random_index",
],
"minimal_stack_test": [
const char* const description_pick_first_ignore_empty_updates =
"Ignore empty resolutions in pick_first";
const char* const additional_constraints_pick_first_ignore_empty_updates = "{}";
+const char* const description_pick_first_ready_to_connecting =
+ "When the subchannel goes from READY to CONNECTING or TRANSIENT_FAILURE, "
+ "pick_first goes to CONNECTING and starts a new Happy Eyeballs pass.";
+const char* const additional_constraints_pick_first_ready_to_connecting = "{}";
const char* const description_pipelined_read_secure_endpoint =
"Enable pipelined reads for EventEngine secure endpoints";
const char* const additional_constraints_pipelined_read_secure_endpoint = "{}";
description_pick_first_ignore_empty_updates,
additional_constraints_pick_first_ignore_empty_updates, nullptr, 0, false,
true},
+ {"pick_first_ready_to_connecting",
+ description_pick_first_ready_to_connecting,
+ additional_constraints_pick_first_ready_to_connecting, nullptr, 0, false,
+ true},
{"pipelined_read_secure_endpoint",
description_pipelined_read_secure_endpoint,
additional_constraints_pipelined_read_secure_endpoint,
const char* const description_pick_first_ignore_empty_updates =
"Ignore empty resolutions in pick_first";
const char* const additional_constraints_pick_first_ignore_empty_updates = "{}";
+const char* const description_pick_first_ready_to_connecting =
+ "When the subchannel goes from READY to CONNECTING or TRANSIENT_FAILURE, "
+ "pick_first goes to CONNECTING and starts a new Happy Eyeballs pass.";
+const char* const additional_constraints_pick_first_ready_to_connecting = "{}";
const char* const description_pipelined_read_secure_endpoint =
"Enable pipelined reads for EventEngine secure endpoints";
const char* const additional_constraints_pipelined_read_secure_endpoint = "{}";
description_pick_first_ignore_empty_updates,
additional_constraints_pick_first_ignore_empty_updates, nullptr, 0, false,
true},
+ {"pick_first_ready_to_connecting",
+ description_pick_first_ready_to_connecting,
+ additional_constraints_pick_first_ready_to_connecting, nullptr, 0, false,
+ true},
{"pipelined_read_secure_endpoint",
description_pipelined_read_secure_endpoint,
additional_constraints_pipelined_read_secure_endpoint,
const char* const description_pick_first_ignore_empty_updates =
"Ignore empty resolutions in pick_first";
const char* const additional_constraints_pick_first_ignore_empty_updates = "{}";
+const char* const description_pick_first_ready_to_connecting =
+ "When the subchannel goes from READY to CONNECTING or TRANSIENT_FAILURE, "
+ "pick_first goes to CONNECTING and starts a new Happy Eyeballs pass.";
+const char* const additional_constraints_pick_first_ready_to_connecting = "{}";
const char* const description_pipelined_read_secure_endpoint =
"Enable pipelined reads for EventEngine secure endpoints";
const char* const additional_constraints_pipelined_read_secure_endpoint = "{}";
description_pick_first_ignore_empty_updates,
additional_constraints_pick_first_ignore_empty_updates, nullptr, 0, false,
true},
+ {"pick_first_ready_to_connecting",
+ description_pick_first_ready_to_connecting,
+ additional_constraints_pick_first_ready_to_connecting, nullptr, 0, false,
+ true},
{"pipelined_read_secure_endpoint",
description_pipelined_read_secure_endpoint,
additional_constraints_pipelined_read_secure_endpoint,
inline bool IsMultipingEnabled() { return false; }
inline bool IsOtelExportTelemetryDomainsEnabled() { return false; }
inline bool IsPickFirstIgnoreEmptyUpdatesEnabled() { return false; }
+inline bool IsPickFirstReadyToConnectingEnabled() { return false; }
inline bool IsPipelinedReadSecureEndpointEnabled() { return false; }
inline bool IsPollsetAlternativeEnabled() { return false; }
inline bool IsPrioritizeFinishedRequestsEnabled() { return false; }
inline bool IsMultipingEnabled() { return false; }
inline bool IsOtelExportTelemetryDomainsEnabled() { return false; }
inline bool IsPickFirstIgnoreEmptyUpdatesEnabled() { return false; }
+inline bool IsPickFirstReadyToConnectingEnabled() { return false; }
inline bool IsPipelinedReadSecureEndpointEnabled() { return false; }
inline bool IsPollsetAlternativeEnabled() { return false; }
inline bool IsPrioritizeFinishedRequestsEnabled() { return false; }
inline bool IsMultipingEnabled() { return false; }
inline bool IsOtelExportTelemetryDomainsEnabled() { return false; }
inline bool IsPickFirstIgnoreEmptyUpdatesEnabled() { return false; }
+inline bool IsPickFirstReadyToConnectingEnabled() { return false; }
inline bool IsPipelinedReadSecureEndpointEnabled() { return false; }
inline bool IsPollsetAlternativeEnabled() { return false; }
inline bool IsPrioritizeFinishedRequestsEnabled() { return false; }
kExperimentIdMultiping,
kExperimentIdOtelExportTelemetryDomains,
kExperimentIdPickFirstIgnoreEmptyUpdates,
+ kExperimentIdPickFirstReadyToConnecting,
kExperimentIdPipelinedReadSecureEndpoint,
kExperimentIdPollsetAlternative,
kExperimentIdPrioritizeFinishedRequests,
inline bool IsPickFirstIgnoreEmptyUpdatesEnabled() {
return IsExperimentEnabled<kExperimentIdPickFirstIgnoreEmptyUpdates>();
}
+#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_READY_TO_CONNECTING
+inline bool IsPickFirstReadyToConnectingEnabled() {
+ return IsExperimentEnabled<kExperimentIdPickFirstReadyToConnecting>();
+}
#define GRPC_EXPERIMENT_IS_INCLUDED_PIPELINED_READ_SECURE_ENDPOINT
inline bool IsPipelinedReadSecureEndpointEnabled() {
return IsExperimentEnabled<kExperimentIdPipelinedReadSecureEndpoint>();
description: Ignore empty resolutions in pick_first
expiry: 2026/02/02
owner: ctiller@google.com
+- name: pick_first_ready_to_connecting
+ description:
+ When the subchannel goes from READY to CONNECTING or TRANSIENT_FAILURE,
+ pick_first goes to CONNECTING and starts a new Happy Eyeballs pass.
+ expiry: 2026/02/01
+ owner: roth@google.com
+ test_tags: ["lb_unit_test", "cpp_lb_end2end_test"]
- name: pipelined_read_secure_endpoint
description: Enable pipelined reads for EventEngine secure endpoints
expiry: 2026/03/15
// Lateset update args.
UpdateArgs latest_update_args_;
// The list of subchannels that we're currently trying to connect to.
- // Will generally be null when selected_ is set, except when we get a
- // resolver update and need to check initial connectivity states for
- // the new list to decide whether we keep using the existing
- // connection or go IDLE.
+ // Will generally be null when selected_ is set, except for two cases:
+ // - When we get a resolver update and need to check initial connectivity
+ // states for the new list to decide whether we keep using the existing
+ // connection or go IDLE.
+ // - When the selected subchannel transitions from READY to CONNECTING
+ // or TRANSIENT_FAILURE (instead of IDLE), in which case we create a
+ // new subchannel list and start connecting with a Happy Eyeballs pass.
OrphanablePtr<SubchannelList> subchannel_list_;
// Selected subchannel. Will generally be null when subchannel_list_
- // is non-null, with the exception mentioned above.
+ // is non-null, with the exceptions mentioned above.
OrphanablePtr<SubchannelList::SubchannelData::SubchannelState> selected_;
// Health watcher for the selected subchannel.
SubchannelInterface::ConnectivityStateWatcherInterface* health_watcher_ =
UnsetSelectedSubchannel();
// Drop the current subchannel list, if any.
subchannel_list_.reset();
- // Request a re-resolution.
- // TODO(qianchengz): We may want to request re-resolution in
- // ExitIdleLocked() instead.
- channel_control_helper()->RequestReresolution();
+ if (!IsPickFirstReadyToConnectingEnabled()) {
+ // Request a re-resolution.
+ // TODO(roth): We may want to request re-resolution in
+ // ExitIdleLocked() instead.
+ channel_control_helper()->RequestReresolution();
+ }
// Enter idle.
UpdateState(GRPC_CHANNEL_IDLE, absl::OkStatus(),
MakeRefCounted<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
stats_plugins.AddCounter(kMetricDisconnections, 1,
{pick_first_->channel_control_helper()->GetTarget()},
{});
- // Report IDLE.
- pick_first_->GoIdle();
+ if (IsPickFirstReadyToConnectingEnabled()) {
+ // TODO(roth): We may want to request re-resolution in
+ // ExitIdleLocked() instead, at least if we go IDLE below.
+ pick_first_->channel_control_helper()->RequestReresolution();
+ }
+ // If the subchannel went to CONNECTING or TRANSIENT_FAILURE, we go
+ // back to CONNECTING and start a new Happy Eyeballs pass.
+ // Otherwise, go IDLE.
+ if (IsPickFirstReadyToConnectingEnabled() &&
+ (new_state == GRPC_CHANNEL_CONNECTING ||
+ new_state == GRPC_CHANNEL_TRANSIENT_FAILURE)) {
+ pick_first_->UpdateState(GRPC_CHANNEL_CONNECTING, absl::OkStatus(),
+ MakeRefCounted<QueuePicker>(nullptr));
+ pick_first_->AttemptToConnectUsingLatestUpdateArgsLocked();
+ // Unset the selected subchannel, so that when we see the initial
+ // connectivity state notifications for the subchannels in the new
+ // subchannel list, we don't think it was caused by a resolver
+ // update and go IDLE if none of the subchannels report READY.
+ //
+ // Note that we do this *after* creating the new subchannel list,
+ // which will have taken a new ref to the originally selected
+ // subchannel. This ensures that we don't destroy and recreate the
+ // subchannel, thus preserving the backoff state inside the subchannel.
+ pick_first_->UnsetSelectedSubchannel();
+ } else {
+ pick_first_->GoIdle();
+ }
}
//
class WatcherWrapper : public AsyncConnectivityStateWatcherInterface {
public:
WatcherWrapper(
- std::shared_ptr<WorkSerializer> work_serializer,
+ SubchannelState* state,
std::unique_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>
watcher)
- : AsyncConnectivityStateWatcherInterface(
- std::move(work_serializer)),
+ : AsyncConnectivityStateWatcherInterface(state->work_serializer()),
+ state_(state),
watcher_(std::move(watcher)) {}
WatcherWrapper(
- std::shared_ptr<WorkSerializer> work_serializer,
+ SubchannelState* state,
std::shared_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>
watcher)
- : AsyncConnectivityStateWatcherInterface(
- std::move(work_serializer)),
+ : AsyncConnectivityStateWatcherInterface(state->work_serializer()),
+ state_(state),
watcher_(std::move(watcher)) {}
void OnConnectivityStateChange(grpc_connectivity_state new_state,
const absl::Status& status) override {
- LOG(INFO) << "notifying watcher: state="
- << ConnectivityStateName(new_state) << " status=" << status;
+ LOG(INFO) << "notifying watcher for " << state_->address_
+ << ": state=" << ConnectivityStateName(new_state)
+ << " status=" << status;
watcher_->OnConnectivityStateChange(new_state, status);
}
private:
+ SubchannelState* state_;
std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
watcher_;
};
SubchannelInterface::ConnectivityStateWatcherInterface>
watcher) override {
auto* watcher_ptr = watcher.get();
- auto watcher_wrapper = MakeOrphanable<WatcherWrapper>(
- state_->work_serializer(), std::move(watcher));
+ auto watcher_wrapper =
+ MakeOrphanable<WatcherWrapper>(state_, std::move(watcher));
watcher_map_[watcher_ptr] = watcher_wrapper.get();
state_->state_tracker_.AddWatcher(GRPC_CHANNEL_SHUTDOWN,
std::move(watcher_wrapper));
auto connectivity_watcher = health_watcher_->TakeWatcher();
auto* connectivity_watcher_ptr = connectivity_watcher.get();
auto watcher_wrapper = MakeOrphanable<WatcherWrapper>(
- state_->work_serializer(), std::move(connectivity_watcher));
+ state_, std::move(connectivity_watcher));
health_watcher_wrapper_ = watcher_wrapper.get();
state_->state_tracker_.AddWatcher(GRPC_CHANNEL_SHUTDOWN,
std::move(watcher_wrapper));
<< location.file() << ":" << location.line();
break;
case GRPC_CHANNEL_READY:
- ASSERT_EQ(to_state, GRPC_CHANNEL_IDLE)
+ ASSERT_THAT(to_state, ::testing::AnyOf(
+ GRPC_CHANNEL_IDLE, GRPC_CHANNEL_CONNECTING,
+ GRPC_CHANNEL_TRANSIENT_FAILURE))
<< ConnectivityStateName(from_state) << "=>"
<< ConnectivityStateName(to_state) << "\n"
<< location.file() << ":" << location.line();
}
}
+TEST_F(PickFirstTest, GoesConnectingWhenSelectedSubchannelGoesConnecting) {
+ if (!IsPickFirstReadyToConnectingEnabled()) {
+ GTEST_SKIP() << "requires pick_first_ready_to_connecting experiment";
+ }
+ // Send an update containing two addresses.
+ constexpr std::array<absl::string_view, 2> kAddresses = {
+ "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
+ absl::Status status = ApplyUpdate(
+ BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
+ EXPECT_TRUE(status.ok()) << status;
+ // LB policy should have created a subchannel for both addresses.
+ auto* subchannel = FindSubchannel(kAddresses[0]);
+ ASSERT_NE(subchannel, nullptr);
+ auto* subchannel2 = FindSubchannel(kAddresses[1]);
+ ASSERT_NE(subchannel2, nullptr);
+ // When the LB policy receives the first subchannel's initial connectivity
+ // state notification (IDLE), it will request a connection.
+ EXPECT_TRUE(subchannel->ConnectionRequested());
+ // This causes the subchannel to start to connect, so it reports CONNECTING.
+ subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
+ // LB policy should have reported CONNECTING state.
+ ExpectConnectingUpdate();
+ // The second subchannel should not be connecting.
+ EXPECT_FALSE(subchannel2->ConnectionRequested());
+ // Subchannel fails to connect.
+ subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
+ absl::UnavailableError("failed"));
+ // LB policy asks the second subchannel to connect.
+ EXPECT_TRUE(subchannel2->ConnectionRequested());
+ // Second subchannel reports CONNECTING.
+ subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
+ // When the subchannel becomes connected, it reports READY.
+ subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
+ // The LB policy will report CONNECTING some number of times (doesn't
+ // matter how many) and then report READY.
+ auto picker = WaitForConnected();
+ ASSERT_NE(picker, nullptr);
+ // Picker should return the same subchannel repeatedly.
+ for (size_t i = 0; i < 3; ++i) {
+ EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[1]);
+ }
+ // First subchannel finishes backoff.
+ subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
+ // The selected subchannel goes from READY to CONNECTING.
+ subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
+ // We should see a re-resolution request.
+ ExpectReresolutionRequest();
+ // LB policy reports CONNECTNG with a queueing picker.
+ ExpectConnectingUpdate();
+ // LB policy asks the first subchannel to connect.
+ EXPECT_TRUE(subchannel->ConnectionRequested());
+ // First subchannel reports CONNECTING.
+ subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
+ // Second subchannel gets connected while the first is still trying.
+ subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
+ // Subchannel succeeds in connecting.
+ subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
+ // LB policy reports READY.
+ picker = WaitForConnected();
+ ASSERT_NE(picker, nullptr);
+ // Picker should return the same subchannel repeatedly.
+ for (size_t i = 0; i < 3; ++i) {
+ EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[1]);
+ }
+}
+
+TEST_F(PickFirstTest,
+ GoesConnectingWhenSelectedSubchannelGoesTransientFailure) {
+ if (!IsPickFirstReadyToConnectingEnabled()) {
+ GTEST_SKIP() << "requires pick_first_ready_to_connecting experiment";
+ }
+ // Send an update containing two addresses.
+ constexpr std::array<absl::string_view, 2> kAddresses = {
+ "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
+ absl::Status status = ApplyUpdate(
+ BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
+ EXPECT_TRUE(status.ok()) << status;
+ // LB policy should have created a subchannel for both addresses.
+ auto* subchannel = FindSubchannel(kAddresses[0]);
+ ASSERT_NE(subchannel, nullptr);
+ auto* subchannel2 = FindSubchannel(kAddresses[1]);
+ ASSERT_NE(subchannel2, nullptr);
+ // When the LB policy receives the first subchannel's initial connectivity
+ // state notification (IDLE), it will request a connection.
+ EXPECT_TRUE(subchannel->ConnectionRequested());
+ // This causes the subchannel to start to connect, so it reports CONNECTING.
+ subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
+ // LB policy should have reported CONNECTING state.
+ ExpectConnectingUpdate();
+ // The second subchannel should not be connecting.
+ EXPECT_FALSE(subchannel2->ConnectionRequested());
+ // Subchannel fails to connect.
+ subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
+ absl::UnavailableError("failed"));
+ // LB policy asks the second subchannel to connect.
+ EXPECT_TRUE(subchannel2->ConnectionRequested());
+ // Second subchannel reports CONNECTING.
+ subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
+ // When the subchannel becomes connected, it reports READY.
+ subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
+ // The LB policy will report CONNECTING some number of times (doesn't
+ // matter how many) and then report READY.
+ auto picker = WaitForConnected();
+ ASSERT_NE(picker, nullptr);
+ // Picker should return the same subchannel repeatedly.
+ for (size_t i = 0; i < 3; ++i) {
+ EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[1]);
+ }
+ // First subchannel finishes backoff.
+ subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
+ // The selected subchannel goes from READY to TRANSIENT_FAILURE.
+ subchannel2->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
+ absl::UnavailableError("failed"));
+ // We should see a re-resolution request.
+ ExpectReresolutionRequest();
+ // LB policy reports CONNECTNG with a queueing picker.
+ ExpectConnectingUpdate();
+ // LB policy asks the first subchannel to connect.
+ EXPECT_TRUE(subchannel->ConnectionRequested());
+ // First subchannel reports CONNECTING.
+ subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
+ // Second subchannel finishes backoff while the first is still trying.
+ subchannel2->SetConnectivityState(GRPC_CHANNEL_IDLE);
+ // First subchannel fails to connect.
+ subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
+ absl::UnavailableError("failed"));
+ // LB policy asks the second subchannel to connect.
+ EXPECT_TRUE(subchannel2->ConnectionRequested());
+ // Second subchannel reports CONNECTING.
+ subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
+ // Subchannel succeeds in connecting.
+ subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
+ // LB policy reports READY.
+ picker = WaitForConnected();
+ ASSERT_NE(picker, nullptr);
+ // Picker should return the same subchannel repeatedly.
+ for (size_t i = 0; i < 3; ++i) {
+ EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[1]);
+ }
+}
+
TEST_F(PickFirstTest, AddressUpdateRemovedSelectedAddress) {
// Send an update containing two addresses.
constexpr std::array<absl::string_view, 2> kAddresses = {
lb_policy());
EXPECT_TRUE(status.ok()) << status;
// We should see a re-resolution request.
- ExpectReresolutionRequest();
+ if (!IsPickFirstReadyToConnectingEnabled()) ExpectReresolutionRequest();
// LB policy reports IDLE with a queueing picker.
ExpectStateAndQueuingPicker(GRPC_CHANNEL_IDLE);
// By checking the picker, we told the LB policy to trigger a new