],
deps = [
"1999",
+ "activity",
"arena",
"call_spine",
"channel_args",
"chttp2_flow_control",
+ "context",
"grpc_check",
"grpc_promise_endpoint",
"header_assembler",
SpawnGuardedTransportParty("SettingsAck", TriggerWriteCycle());
if (GPR_UNLIKELY(!settings_->IsFirstPeerSettingsApplied())) {
// Apply the first settings before we read any other frames.
- should_stall_read_loop_ = true;
+ reader_state_.SetPauseReadLoop();
}
} else {
if (settings_->OnSettingsAckReceived()) {
}));
},
[self = RefAsSubclass<Http2ClientTransport>()]() -> Poll<absl::Status> {
- if (self->should_stall_read_loop_) {
- self->read_loop_waker_ = GetContext<Activity>()->MakeNonOwningWaker();
- return Pending{};
- }
- return absl::OkStatus();
+ return self->reader_state_.MaybePauseReadLoop();
}));
}
// exhaust the flow control window. This prevents us from sending window
// updates to the peer, causing the peer to block unnecessarily while
// waiting for flow control tokens.
- should_stall_read_loop_ = true;
+ reader_state_.SetPauseReadLoop();
SpawnGuardedTransportParty("SendControlFrames", TriggerWriteCycle());
}
}
// Notify Control modules that we have sent the frames.
// All notifications are expected to be synchronous.
GRPC_HTTP2_CLIENT_DLOG << "Http2ClientTransport NotifyControlFramesWriteDone";
- if (should_stall_read_loop_) {
- should_stall_read_loop_ = false;
- read_loop_waker_.Wakeup();
- }
+ reader_state_.ResumeReadLoopIfPaused();
ping_manager_->NotifyPingSent();
goaway_manager_.NotifyGoawaySent();
MaybeSpawnWaitForSettingsTimeout();
"PH2_Client",
channel_args.GetBool(GRPC_ARG_HTTP2_BDP_PROBE).value_or(true),
&memory_owner_),
- ztrace_collector_(std::make_shared<PromiseHttp2ZTraceCollector>()),
- should_stall_read_loop_(false) {
+ ztrace_collector_(std::make_shared<PromiseHttp2ZTraceCollector>()) {
GRPC_HTTP2_CLIENT_DLOG << "Http2ClientTransport Constructor Begin";
// Initialize the general party and write party.
auto general_party_arena = SimpleArenaAllocator(0)->MakeArena();
// Runs on the call party.
std::optional<RefCountedPtr<Stream>> MakeStream(CallHandler call_handler);
- struct CloseStreamArgs {
- bool close_reads;
- bool close_writes;
- };
-
// This function MUST be idempotent.
void CloseStream(RefCountedPtr<Stream> stream, CloseStreamArgs args,
DebugLocation whence = {});
valid_ping_ack_received && ping_manager_->ImportantPingRequested(),
[self = RefAsSubclass<Http2ClientTransport>()] {
return Map(self->TriggerWriteCycle(), [](const absl::Status status) {
- return (status.ok())
- ? Http2Status::Ok()
- : Http2Status::AbslConnectionError(
- status.code(), std::string(status.message()));
+ return ToHttpOkOrConnError(status);
});
},
[] { return Immediate(Http2Status::Ok()); });
// TODO(tjagtap) [PH2][P2][BDP] Remove this when the BDP code is done.
Waker periodic_updates_waker_;
- // TODO(tjagtap) [PH2][P2][Settings] Set this to true when we receive settings
- // that appear "Urgent". Example - initial window size 0 is urgent because it
- // indicates extreme memory pressure on the server.
- bool should_stall_read_loop_;
- Waker read_loop_waker_;
+ Http2ReadContext reader_state_;
Http2Status ParseAndDiscardHeaders(SliceBuffer&& buffer, bool is_end_headers,
RefCountedPtr<Stream> stream,
Http2Status&& original_status,
#define GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HTTP2_TRANSPORT_H
#include <cstdint>
+#include <string>
#include "src/core/channelz/channelz.h"
#include "src/core/ext/transport/chttp2/transport/flow_control.h"
#include "src/core/ext/transport/chttp2/transport/http2_settings_promises.h"
#include "src/core/ext/transport/chttp2/transport/http2_status.h"
#include "src/core/ext/transport/chttp2/transport/stream.h"
+#include "src/core/lib/promise/activity.h"
+#include "src/core/lib/promise/context.h"
+#include "src/core/lib/promise/poll.h"
#include "src/core/util/ref_counted_ptr.h"
#include "absl/log/log.h"
+#include "absl/status/status.h"
namespace grpc_core {
namespace http2 {
constexpr uint32_t kGoawaySendTimeoutSeconds = 5u;
+struct CloseStreamArgs {
+ bool close_reads;
+ bool close_writes;
+};
+
+class Http2ReadContext {
+ public:
+ // Signals that the read loop should pause. If it's already paused, this is a
+ // no-op.
+ void SetPauseReadLoop() {
+ // TODO(tjagtap) [PH2][P2][Settings] Plumb with when we receive urgent
+ // settings. Example - initial window size 0 is urgent because it indicates
+ // extreme memory pressure on the server.
+ should_pause_read_loop_ = true;
+ }
+
+ // If SetPauseReadLoop() was called, this returns Pending and
+ // registers a waker that will be woken by WakeReadLoop().
+ // If SetPauseReadLoop() was not called, this returns OkStatus.
+ // This should be polled by the read loop to yield control when requested.
+ Poll<absl::Status> MaybePauseReadLoop() {
+ if (should_pause_read_loop_) {
+ read_loop_waker_ = GetContext<Activity>()->MakeNonOwningWaker();
+ return Pending{};
+ }
+ return absl::OkStatus();
+ }
+
+ // If SetPauseReadLoop() was called, resumes it by
+ // waking up the ReadLoop. If not paused, this is a no-op.
+ void ResumeReadLoopIfPaused() {
+ if (should_pause_read_loop_) {
+ should_pause_read_loop_ = false;
+ read_loop_waker_.Wakeup();
+ }
+ }
+
+ private:
+ bool should_pause_read_loop_ = false;
+ Waker read_loop_waker_;
+};
+
///////////////////////////////////////////////////////////////////////////////
// Settings helpers
"//:chttp2_frame",
"//:config",
"//:event_engine_base_hdrs",
+ "//:exec_ctx",
"//:gpr",
"//:grpc",
+ "//:grpc_base",
"//:orphanable",
+ "//:promise",
"//:ref_counted_ptr",
+ "//src/core:1999",
"//src/core:call_spine",
"//src/core:channel_args",
"//src/core:chttp2_flow_control",
"//src/core:http2_status",
"//src/core:http2_transport",
"//src/core:internal_channel_arg_names",
+ "//src/core:loop",
"//src/core:message",
"//src/core:metadata",
"//src/core:notification",
"//src/core:ph2_stream",
+ "//src/core:poll",
+ "//src/core:ref_counted",
+ "//src/core:sleep",
"//src/core:slice_buffer",
"//src/core:time",
"//src/core:transport_common",
"//src/core:try_join",
+ "//src/core:try_seq",
"//test/core/promise:poll_matcher",
"//test/core/test_util:grpc_test_util",
"//test/core/test_util:grpc_test_util_base",
#include <cstdint>
#include <memory>
+#include <string>
#include <utility>
#include <vector>
#include "src/core/ext/transport/chttp2/transport/transport_common.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/event_engine/default_event_engine.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/promise/loop.h"
+#include "src/core/lib/promise/party.h"
+#include "src/core/lib/promise/poll.h"
+#include "src/core/lib/promise/promise.h"
+#include "src/core/lib/promise/sleep.h"
#include "src/core/lib/promise/try_join.h"
+#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/util/grpc_check.h"
#include "src/core/util/notification.h"
#include "src/core/util/orphanable.h"
+#include "src/core/util/ref_counted.h"
#include "src/core/util/ref_counted_ptr.h"
#include "src/core/util/time.h"
#include "test/core/promise/poll_matcher.h"
#include "gtest/gtest.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
+#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/span.h"
Http2WindowUpdateFrame frame;
frame.increment = 1000;
- // If stream_id != 0 and stream is not null, stream flow control window should
- // increase.
+ // If stream_id != 0 and stream is not null, stream flow control window
+ // should increase.
frame.stream_id = 1;
ProcessIncomingWindowUpdateFrameFlowControl(frame, transport_flow_control_,
stream);
EXPECT_EQ(stream->flow_control.remote_window_delta(), 1000 + 10000);
}
+///////////////////////////////////////////////////////////////////////////////
+// Http2ReadContext tests
+
+class Http2ReadContextTest : public ::testing::Test {
+ protected:
+ RefCountedPtr<Party> MakeParty() {
+ auto arena = SimpleArenaAllocator()->MakeArena();
+ arena->SetContext<grpc_event_engine::experimental::EventEngine>(
+ event_engine_.get());
+ return Party::Make(std::move(arena));
+ }
+
+ private:
+ std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_ =
+ grpc_event_engine::experimental::GetDefaultEventEngine();
+};
+
+TEST_F(Http2ReadContextTest, WakeWithoutPause) {
+ // Test that calling ResumeReadLoopIfPaused before MaybePauseReadLoop has
+ // no effect and does not crash.
+ Http2ReadContext read_context;
+ read_context.ResumeReadLoopIfPaused();
+ read_context.ResumeReadLoopIfPaused();
+ read_context.ResumeReadLoopIfPaused();
+ read_context.MaybePauseReadLoop();
+ read_context.SetPauseReadLoop();
+}
+
+class SimulatedTransport : public RefCounted<SimulatedTransport> {
+ public:
+ auto SimulatedReadAndProcessOneFrame() {
+ return [self = this->Ref()]() -> Poll<absl::Status> {
+ ++(self->i);
+ if (self->i % 2 == 0) {
+ // Doing this alternate times to make sure that SetPauseReadLoop is
+ // idempotent
+ absl::StrAppend(&self->execution_order, "Pause ");
+ return self->context.MaybePauseReadLoop();
+ }
+ absl::StrAppend(&self->execution_order, ". ");
+ return absl::OkStatus();
+ };
+ }
+ auto SimulatedReadLoop() {
+ return AssertResultType<absl::Status>(Loop([self = this->Ref()]() {
+ return TrySeq(self->SimulatedReadAndProcessOneFrame(),
+ [self]() -> LoopCtl<absl::Status> {
+ if (self->i < 10) {
+ absl::StrAppend(&self->execution_order, "SetPause ");
+ self->context.SetPauseReadLoop();
+ return Continue();
+ }
+ absl::StrAppend(&self->execution_order, "EndRead ");
+ self->did_end_read = true;
+ return absl::OkStatus();
+ });
+ }));
+ }
+ auto SimulatedOneWrite() {
+ return [self = this->Ref()]() -> Poll<absl::Status> {
+ absl::StrAppend(&self->execution_order, "Wake ");
+ self->context.ResumeReadLoopIfPaused();
+ return absl::OkStatus();
+ };
+ }
+
+ auto SimulatedWriteLoop() {
+ return AssertResultType<absl::Status>(Loop([self = this->Ref()]() {
+ return TrySeq(Sleep(Duration::Milliseconds(100)),
+ self->SimulatedOneWrite(),
+ [self]() -> LoopCtl<absl::Status> {
+ if (self->did_end_read) {
+ absl::StrAppend(&self->execution_order, "EndWrite ");
+ return absl::OkStatus();
+ }
+ absl::StrAppend(&self->execution_order, "_ ");
+ return Continue();
+ });
+ }));
+ }
+
+ std::string execution_order;
+
+ private:
+ Http2ReadContext context;
+ int i = 0;
+ bool did_end_read = false;
+};
+
+TEST_F(Http2ReadContextTest, PauseAndWake) {
+ RefCountedPtr<SimulatedTransport> transport =
+ MakeRefCounted<SimulatedTransport>();
+ ExecCtx ctx;
+ RefCountedPtr<Party> party = MakeParty();
+ Notification n1;
+ Notification n2;
+ party->Spawn("Read", transport->SimulatedReadLoop(),
+ [&n1](absl::Status status) { n1.Notify(); });
+ party->Spawn("Write", transport->SimulatedWriteLoop(),
+ [&n2](absl::Status status) { n2.Notify(); });
+ n1.WaitForNotification();
+ n2.WaitForNotification();
+ EXPECT_STREQ(transport->execution_order.c_str(),
+ ". SetPause Pause Wake _ . SetPause Pause Wake _ . "
+ "SetPause Pause Wake _ . SetPause Pause Wake _ . "
+ "SetPause Pause Wake _ . EndRead Wake EndWrite ");
+}
+
} // namespace testing
} // namespace http2