This is needed for gRFC A105 (https://github.com/grpc/proposal/pull/516). Specifically, see the "Interaction with xDS Circuit Breaking" section.
It's possible for an LB pick to be happening at the same time as the subchannel sees its underlying connection fail. In this case, the picker can return a subchannel, but when the channel tries to start a call on the subchannel, the call creation fails, because there is no underlying connection. In that case, the channel will queue the pick, on the assumption that the LB policy will soon notice that the subchannel has been disconnected and return a new picker, at which point the queued pick will be re-attempted with that new picker.
When the picker returns a complete pick, it can optionally return a `SubchannelCallTracker` object that allows it to see when the subchannel call starts and ends. In the current API, when the channel successfully creates a call on the subchannel, it will immediately call `Start()`, and then when the subchannel call later ends, it will call `Finish()`. However, when the race condition described above occurs, the `SubchannelCallTracker` object will be destroyed without `Start()` or `Finish()` ever having been called. This API allows us to handle call counter incrementing and decrementing for things like xDS circuit breaking: we check the counter in the picker to see that it's currently below the limit, we increment the counter in `Start()`, and decrement it in `Finish()`. If the subchannel call never starts, then the counter never gets incremented.
With the introduction of connection scaling functionality in the subchannel, this approach will no longer work, because the call may be queued inside of the subchannel rather than being immediately started on a connection, and the channel can't tell if that is going to happen. In other words, there's no longer any benefit to the `Start()` method, because it will no longer actually indicate that the call is actually being started on a connection. As a result, I am removing that method from the API.
For xDS circuit breaking in the xds_cluster_impl LB policy, we are now incrementing the call counter in the picker, and the `SubchannelCallTracker` object will decrement it when either `Finish()` is called or when the object is destroyed, whichever comes first.
For grpclb, the `Start()` method was used in an ugly hack to handle ownership of the client stats object between the grpclb policy and the client load reporting filter. The LB policy passes a pointer to this object down to the filter via client initial metadata, which contains a raw pointer and does not hold a ref. To handle ownership, the LB policy returns a `SubchannelCallTracker` that holds a ref to the client stats object, but when `Start()` is called, it releases that ref, on the assumption that the client load reporting filter will subsequently take ownership. I've replaced this with a slightly cleaner approach whereby the call tracker always holds a ref to the client stats object, thus guaranteeing that the client stats object exists when the client load reporting filter sees it, and the client load reporting filter takes its own ref when it runs. (An even cleaner approach would be to instead pass the client stats object to the filter via a call attribute, similar to how we pass the xDS cluster name from the ConfigSelector to the LB policy tree, but it doesn't seem worth putting that much effort into grpclb at this point.)
Closes #41099
COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/41099 from markdroth:xds_circuit_breaking_counter_change
eaa06bbdf1688c31c0d1e3b3cabe6a7d015fc075
PiperOrigin-RevId:
842261731
}
lb_subchannel_call_tracker_ =
std::move(complete_pick->subchannel_call_tracker);
- if (lb_subchannel_call_tracker_ != nullptr) {
- lb_subchannel_call_tracker_->Start();
- }
// Handle metadata mutations.
MetadataMutationHandler::Apply(complete_pick->metadata_mutations,
send_initial_metadata());
"pick";
return Continue{};
}
- // If the LB policy returned a call tracker, inform it that the
- // call is starting and add it to context, so that we can notify
- // it when the call finishes.
+ // If the LB policy returned a call tracker, add it to context, so
+ // that we can notify it when the call finishes.
if (complete_pick->subchannel_call_tracker != nullptr) {
- complete_pick->subchannel_call_tracker->Start();
SetContext(complete_pick->subchannel_call_tracker.release());
}
// Apply metadata mutations, if any.
ClientMetadata& client_initial_metadata) {
GRPC_LATENT_SEE_SCOPE(
"ClientLoadReportingFilter::Call::OnClientInitialMetadata");
- // Handle client initial metadata.
- // Grab client stats object from metadata.
- auto client_stats_md =
+ // Grab client stats object from metadata. The metadata encodes only
+ // a raw pointer, but the LB policy will have returned a subchannel call
+ // tracker that is holding a ref to it, to ensure that it's alive here
+ // for us to take our own ref.
+ std::optional<GrpcLbClientStats*> client_stats_md =
client_initial_metadata.Take(GrpcLbClientStatsMetadata());
- if (client_stats_md.has_value()) {
- client_stats_.reset(*client_stats_md);
+ if (client_stats_md.has_value() && *client_stats_md != nullptr) {
+ client_stats_ = (*client_stats_md)->Ref();
}
}
PickResult Pick(PickArgs args) override;
private:
- // A subchannel call tracker that unrefs the GrpcLbClientStats object
- // in the case where the subchannel call is never actually started,
- // since the client load reporting filter will not be able to do it
- // in that case.
+ // A subchannel call tracker that holds a ref to the
+ // GrpcLbClientStats object, to ensure that it still exists when the
+ // client load reporting filter sees it and takes its own ref to it.
class SubchannelCallTracker final : public SubchannelCallTrackerInterface {
public:
SubchannelCallTracker(
: client_stats_(std::move(client_stats)),
original_call_tracker_(std::move(original_call_tracker)) {}
- void Start() override {
- if (original_call_tracker_ != nullptr) {
- original_call_tracker_->Start();
- }
- // If we're actually starting the subchannel call, then the
- // client load reporting filter will take ownership of the ref
- // passed down to it via metadata.
- client_stats_.release();
- }
-
void Finish(FinishArgs args) override {
if (original_call_tracker_ != nullptr) {
original_call_tracker_->Finish(args);
public:
virtual ~SubchannelCallTrackerInterface() = default;
- /// Called when a subchannel call is started after an LB pick.
- virtual void Start() = 0;
-
/// Called when a subchannel call is completed.
/// The metadata may be modified by the implementation. However, the
/// implementation does not take ownership, so any data that needs to be
/// used after returning must be copied.
+ ///
+ /// Note that when the picker returns a complete pick, it's possible
+ /// that the returned subchannel has already lost its connection, in
+ /// which case the channel will queue the pick. In that case,
+ /// the SubchannelCallTrackerInterface object will be destroyed
+ /// without ever calling Finish().
struct FinishArgs {
absl::string_view peer_address;
absl::Status status;
endpoint_state_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
}
- void Start() override {
- // This tracker does not care about started calls only finished calls.
- // Delegate if needed.
- if (original_subchannel_call_tracker_ != nullptr) {
- original_subchannel_call_tracker_->Start();
- }
- }
-
void Finish(FinishArgs args) override {
// Delegate if needed.
if (original_subchannel_call_tracker_ != nullptr) {
error_utilization_penalty_(error_utilization_penalty),
child_tracker_(std::move(child_tracker)) {}
- void Start() override;
-
void Finish(FinishArgs args) override;
private:
// WeightedRoundRobin::Picker::SubchannelCallTracker
//
-void WeightedRoundRobin::Picker::SubchannelCallTracker::Start() {
- if (child_tracker_ != nullptr) child_tracker_->Start();
-}
-
void WeightedRoundRobin::Picker::SubchannelCallTracker::Finish(
FinishArgs args) {
if (child_tracker_ != nullptr) child_tracker_->Finish(args);
call_counter_(std::move(call_counter)) {}
~SubchannelCallTracker() override {
- locality_stats_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
- call_counter_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
-#ifndef NDEBUG
- GRPC_DCHECK(!started_);
-#endif
- }
-
- void Start() override {
- // Increment number of calls in flight.
- call_counter_->Increment();
- // Record a call started.
- if (locality_stats_ != nullptr) {
- locality_stats_->AddCallStarted();
- }
- // Delegate if needed.
- if (original_subchannel_call_tracker_ != nullptr) {
- original_subchannel_call_tracker_->Start();
- }
-#ifndef NDEBUG
- started_ = true;
-#endif
+ MaybeFinish(/*succeeded=*/false, /*backend_metrics=*/nullptr);
}
void Finish(FinishArgs args) override {
- // Delegate if needed.
if (original_subchannel_call_tracker_ != nullptr) {
original_subchannel_call_tracker_->Finish(args);
}
+ MaybeFinish(/*succeeded=*/args.status.ok(),
+ args.backend_metric_accessor->GetBackendMetricData());
+ }
+
+ private:
+ void MaybeFinish(bool succeeded, const BackendMetricData* backend_metrics) {
// Record call completion for load reporting.
if (locality_stats_ != nullptr) {
- locality_stats_->AddCallFinished(
- args.backend_metric_accessor->GetBackendMetricData(),
- !args.status.ok());
+ locality_stats_->AddCallFinished(backend_metrics, /*fail=*/!succeeded);
+ locality_stats_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
}
// Decrement number of calls in flight.
- call_counter_->Decrement();
-#ifndef NDEBUG
- started_ = false;
-#endif
+ if (call_counter_ != nullptr) {
+ call_counter_->Decrement();
+ call_counter_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
+ }
}
- private:
std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
original_subchannel_call_tracker_;
RefCountedPtr<LrsClient::ClusterLocalityStats> locality_stats_;
RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
-#ifndef NDEBUG
- bool started_ = false;
-#endif
};
//
kLocality,
subchannel_wrapper->locality());
}
+ // Increment number of calls in flight.
+ call_counter_->Increment();
// Handle load reporting.
RefCountedPtr<LrsClient::ClusterLocalityStats> locality_stats;
if (subchannel_wrapper->locality_stats() != nullptr) {
locality_stats = subchannel_wrapper->locality_stats()->Ref(
DEBUG_LOCATION, "SubchannelCallTracker");
+ locality_stats->AddCallStarted();
}
// Handle authority rewriting if needed.
if (!subchannel_wrapper->hostname().empty()) {
std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
subchannel_call_tracker,
absl::string_view address, absl::Status status = absl::OkStatus()) {
- subchannel_call_tracker->Start();
FakeMetadata metadata({});
FakeBackendMetricAccessor backend_metric_accessor({});
LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
subchannel_call_tracker;
auto address = ExpectPickComplete(picker, {}, {}, &subchannel_call_tracker);
if (address.has_value()) {
- subchannel_call_tracker->Start();
FakeMetadata metadata({});
FakeBackendMetricAccessor backend_metric_accessor({});
LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
const auto& address = picks[i];
auto& subchannel_call_tracker = subchannel_call_trackers[i];
if (subchannel_call_tracker != nullptr) {
- subchannel_call_tracker->Start();
std::optional<BackendMetricData> backend_metric_data;
auto it = backend_metrics.find(address);
if (it != backend_metrics.end()) {
explicit SubchannelCallTracker(InterceptRecvTrailingMetadataCallback cb)
: cb_(std::move(cb)) {}
- void Start() override {}
-
void Finish(FinishArgs args) override {
TrailingMetadataArgsSeen args_seen;
args_seen.status = args.status;
explicit SubchannelCallTracker(LoadReportTracker* load_report_tracker)
: load_report_tracker_(load_report_tracker) {}
- void Start() override {}
-
void Finish(FinishArgs args) override {
load_report_tracker_->RecordPerRpcLoadReport(
args.backend_metric_accessor->GetBackendMetricData());