]> git.feebdaed.xyz Git - 0xmirror/grpc.git/commitdiff
[PH2][Refactor]
authorTanvi Jagtap <tjagtap@google.com>
Fri, 5 Dec 2025 18:04:10 +0000 (10:04 -0800)
committerCopybara-Service <copybara-worker@google.com>
Fri, 5 Dec 2025 18:07:09 +0000 (10:07 -0800)
The Pausing and Restarting of the ReadLoop happens in a separate class.
We could generalize and re-use this mechanism elsewhere, but that is a task for later.

PiperOrigin-RevId: 840773537

src/core/BUILD
src/core/ext/transport/chttp2/transport/http2_client_transport.cc
src/core/ext/transport/chttp2/transport/http2_client_transport.h
src/core/ext/transport/chttp2/transport/http2_transport.h
test/core/transport/chttp2/BUILD
test/core/transport/chttp2/http2_transport_test.cc

index c347be1a728222e0c4c9e19fd04faab0609a5595..d3b387aab2ac5e76ce4cb63cd5bc36666cb86440 100644 (file)
@@ -8643,10 +8643,12 @@ grpc_cc_library(
     ],
     deps = [
         "1999",
+        "activity",
         "arena",
         "call_spine",
         "channel_args",
         "chttp2_flow_control",
+        "context",
         "grpc_check",
         "grpc_promise_endpoint",
         "header_assembler",
index 618d49588629fd3ad3d8bf6347d7fa6cbe6332e3..e151f9320e908d048ffe4171b2d49db9d0693524 100644 (file)
@@ -469,7 +469,7 @@ Http2Status Http2ClientTransport::ProcessHttp2SettingsFrame(
     SpawnGuardedTransportParty("SettingsAck", TriggerWriteCycle());
     if (GPR_UNLIKELY(!settings_->IsFirstPeerSettingsApplied())) {
       // Apply the first settings before we read any other frames.
-      should_stall_read_loop_ = true;
+      reader_state_.SetPauseReadLoop();
     }
   } else {
     if (settings_->OnSettingsAckReceived()) {
@@ -844,11 +844,7 @@ auto Http2ClientTransport::ReadAndProcessOneFrame() {
             }));
       },
       [self = RefAsSubclass<Http2ClientTransport>()]() -> Poll<absl::Status> {
-        if (self->should_stall_read_loop_) {
-          self->read_loop_waker_ = GetContext<Activity>()->MakeNonOwningWaker();
-          return Pending{};
-        }
-        return absl::OkStatus();
+        return self->reader_state_.MaybePauseReadLoop();
       }));
 }
 
@@ -932,7 +928,7 @@ void Http2ClientTransport::ActOnFlowControlAction(
     // exhaust the flow control window. This prevents us from sending window
     // updates to the peer, causing the peer to block unnecessarily while
     // waiting for flow control tokens.
-    should_stall_read_loop_ = true;
+    reader_state_.SetPauseReadLoop();
     SpawnGuardedTransportParty("SendControlFrames", TriggerWriteCycle());
   }
 }
@@ -1000,10 +996,7 @@ void Http2ClientTransport::NotifyControlFramesWriteDone() {
   // Notify Control modules that we have sent the frames.
   // All notifications are expected to be synchronous.
   GRPC_HTTP2_CLIENT_DLOG << "Http2ClientTransport NotifyControlFramesWriteDone";
-  if (should_stall_read_loop_) {
-    should_stall_read_loop_ = false;
-    read_loop_waker_.Wakeup();
-  }
+  reader_state_.ResumeReadLoopIfPaused();
   ping_manager_->NotifyPingSent();
   goaway_manager_.NotifyGoawaySent();
   MaybeSpawnWaitForSettingsTimeout();
@@ -1366,8 +1359,7 @@ Http2ClientTransport::Http2ClientTransport(
           "PH2_Client",
           channel_args.GetBool(GRPC_ARG_HTTP2_BDP_PROBE).value_or(true),
           &memory_owner_),
-      ztrace_collector_(std::make_shared<PromiseHttp2ZTraceCollector>()),
-      should_stall_read_loop_(false) {
+      ztrace_collector_(std::make_shared<PromiseHttp2ZTraceCollector>()) {
   GRPC_HTTP2_CLIENT_DLOG << "Http2ClientTransport Constructor Begin";
   // Initialize the general party and write party.
   auto general_party_arena = SimpleArenaAllocator(0)->MakeArena();
index b110f9432bdb11aae138ba38d7272c6682213eac..7d9f61e648745468bf39db5ba348f534c8ccca8e 100644 (file)
@@ -406,11 +406,6 @@ class Http2ClientTransport final : public ClientTransport,
   // Runs on the call party.
   std::optional<RefCountedPtr<Stream>> MakeStream(CallHandler call_handler);
 
-  struct CloseStreamArgs {
-    bool close_reads;
-    bool close_writes;
-  };
-
   // This function MUST be idempotent.
   void CloseStream(RefCountedPtr<Stream> stream, CloseStreamArgs args,
                    DebugLocation whence = {});
@@ -583,10 +578,7 @@ class Http2ClientTransport final : public ClientTransport,
         valid_ping_ack_received && ping_manager_->ImportantPingRequested(),
         [self = RefAsSubclass<Http2ClientTransport>()] {
           return Map(self->TriggerWriteCycle(), [](const absl::Status status) {
-            return (status.ok())
-                       ? Http2Status::Ok()
-                       : Http2Status::AbslConnectionError(
-                             status.code(), std::string(status.message()));
+            return ToHttpOkOrConnError(status);
           });
         },
         [] { return Immediate(Http2Status::Ok()); });
@@ -745,11 +737,7 @@ class Http2ClientTransport final : public ClientTransport,
   // TODO(tjagtap) [PH2][P2][BDP] Remove this when the BDP code is done.
   Waker periodic_updates_waker_;
 
-  // TODO(tjagtap) [PH2][P2][Settings] Set this to true when we receive settings
-  // that appear "Urgent". Example - initial window size 0 is urgent because it
-  // indicates extreme memory pressure on the server.
-  bool should_stall_read_loop_;
-  Waker read_loop_waker_;
+  Http2ReadContext reader_state_;
   Http2Status ParseAndDiscardHeaders(SliceBuffer&& buffer, bool is_end_headers,
                                      RefCountedPtr<Stream> stream,
                                      Http2Status&& original_status,
index 0e06fb2f1d06786faf36b871b59d2633ac441c08..8f331007091a4b6bd33666f4f1a6176bc70f69bd 100644 (file)
@@ -20,6 +20,7 @@
 #define GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HTTP2_TRANSPORT_H
 
 #include <cstdint>
+#include <string>
 
 #include "src/core/channelz/channelz.h"
 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
 #include "src/core/ext/transport/chttp2/transport/http2_settings_promises.h"
 #include "src/core/ext/transport/chttp2/transport/http2_status.h"
 #include "src/core/ext/transport/chttp2/transport/stream.h"
+#include "src/core/lib/promise/activity.h"
+#include "src/core/lib/promise/context.h"
+#include "src/core/lib/promise/poll.h"
 #include "src/core/util/ref_counted_ptr.h"
 #include "absl/log/log.h"
+#include "absl/status/status.h"
 
 namespace grpc_core {
 namespace http2 {
@@ -56,6 +61,48 @@ constexpr uint32_t kMaxWriteSize = /*10 MB*/ 10u * 1024u * 1024u;
 
 constexpr uint32_t kGoawaySendTimeoutSeconds = 5u;
 
+struct CloseStreamArgs {
+  bool close_reads;
+  bool close_writes;
+};
+
+class Http2ReadContext {
+ public:
+  // Signals that the read loop should pause. If it's already paused, this is a
+  // no-op.
+  void SetPauseReadLoop() {
+    // TODO(tjagtap) [PH2][P2][Settings] Plumb with when we receive urgent
+    // settings. Example - initial window size 0 is urgent because it indicates
+    // extreme memory pressure on the server.
+    should_pause_read_loop_ = true;
+  }
+
+  // If SetPauseReadLoop() was called, this returns Pending and
+  // registers a waker that will be woken by WakeReadLoop().
+  // If SetPauseReadLoop() was not called, this returns OkStatus.
+  // This should be polled by the read loop to yield control when requested.
+  Poll<absl::Status> MaybePauseReadLoop() {
+    if (should_pause_read_loop_) {
+      read_loop_waker_ = GetContext<Activity>()->MakeNonOwningWaker();
+      return Pending{};
+    }
+    return absl::OkStatus();
+  }
+
+  // If SetPauseReadLoop() was called, resumes it by
+  // waking up the ReadLoop. If not paused, this is a no-op.
+  void ResumeReadLoopIfPaused() {
+    if (should_pause_read_loop_) {
+      should_pause_read_loop_ = false;
+      read_loop_waker_.Wakeup();
+    }
+  }
+
+ private:
+  bool should_pause_read_loop_ = false;
+  Waker read_loop_waker_;
+};
+
 ///////////////////////////////////////////////////////////////////////////////
 // Settings helpers
 
index 809c0c30403491e3e0565c374b9610fc14f266e5..cb9f5bb1d6841265ca29bc1cb55d86e7f6a5801b 100644 (file)
@@ -1003,10 +1003,14 @@ grpc_cc_test(
         "//:chttp2_frame",
         "//:config",
         "//:event_engine_base_hdrs",
+        "//:exec_ctx",
         "//:gpr",
         "//:grpc",
+        "//:grpc_base",
         "//:orphanable",
+        "//:promise",
         "//:ref_counted_ptr",
+        "//src/core:1999",
         "//src/core:call_spine",
         "//src/core:channel_args",
         "//src/core:chttp2_flow_control",
@@ -1019,14 +1023,19 @@ grpc_cc_test(
         "//src/core:http2_status",
         "//src/core:http2_transport",
         "//src/core:internal_channel_arg_names",
+        "//src/core:loop",
         "//src/core:message",
         "//src/core:metadata",
         "//src/core:notification",
         "//src/core:ph2_stream",
+        "//src/core:poll",
+        "//src/core:ref_counted",
+        "//src/core:sleep",
         "//src/core:slice_buffer",
         "//src/core:time",
         "//src/core:transport_common",
         "//src/core:try_join",
+        "//src/core:try_seq",
         "//test/core/promise:poll_matcher",
         "//test/core/test_util:grpc_test_util",
         "//test/core/test_util:grpc_test_util_base",
index 1523d65ac721dd8d00aef3c39549d6d659c713c7..155f899e087af31b575bb6a352bfef397d980d1f 100644 (file)
@@ -24,6 +24,7 @@
 
 #include <cstdint>
 #include <memory>
+#include <string>
 #include <utility>
 #include <vector>
 
 #include "src/core/ext/transport/chttp2/transport/transport_common.h"
 #include "src/core/lib/channel/channel_args.h"
 #include "src/core/lib/event_engine/default_event_engine.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/promise/loop.h"
+#include "src/core/lib/promise/party.h"
+#include "src/core/lib/promise/poll.h"
+#include "src/core/lib/promise/promise.h"
+#include "src/core/lib/promise/sleep.h"
 #include "src/core/lib/promise/try_join.h"
+#include "src/core/lib/promise/try_seq.h"
 #include "src/core/lib/slice/slice_buffer.h"
 #include "src/core/util/grpc_check.h"
 #include "src/core/util/notification.h"
 #include "src/core/util/orphanable.h"
+#include "src/core/util/ref_counted.h"
 #include "src/core/util/ref_counted_ptr.h"
 #include "src/core/util/time.h"
 #include "test/core/promise/poll_matcher.h"
@@ -56,6 +65,7 @@
 #include "gtest/gtest.h"
 #include "absl/log/log.h"
 #include "absl/status/status.h"
+#include "absl/strings/str_cat.h"
 #include "absl/strings/string_view.h"
 #include "absl/types/span.h"
 
@@ -590,8 +600,8 @@ TEST_F(TestsNeedingStreamObjects,
   Http2WindowUpdateFrame frame;
   frame.increment = 1000;
 
-  // If stream_id != 0 and stream is not null, stream flow control window should
-  // increase.
+  // If stream_id != 0 and stream is not null, stream flow control window
+  // should increase.
   frame.stream_id = 1;
   ProcessIncomingWindowUpdateFrameFlowControl(frame, transport_flow_control_,
                                               stream);
@@ -633,6 +643,114 @@ TEST_F(TestsNeedingStreamObjects,
   EXPECT_EQ(stream->flow_control.remote_window_delta(), 1000 + 10000);
 }
 
+///////////////////////////////////////////////////////////////////////////////
+// Http2ReadContext tests
+
+class Http2ReadContextTest : public ::testing::Test {
+ protected:
+  RefCountedPtr<Party> MakeParty() {
+    auto arena = SimpleArenaAllocator()->MakeArena();
+    arena->SetContext<grpc_event_engine::experimental::EventEngine>(
+        event_engine_.get());
+    return Party::Make(std::move(arena));
+  }
+
+ private:
+  std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_ =
+      grpc_event_engine::experimental::GetDefaultEventEngine();
+};
+
+TEST_F(Http2ReadContextTest, WakeWithoutPause) {
+  // Test that calling ResumeReadLoopIfPaused before MaybePauseReadLoop has
+  // no effect and does not crash.
+  Http2ReadContext read_context;
+  read_context.ResumeReadLoopIfPaused();
+  read_context.ResumeReadLoopIfPaused();
+  read_context.ResumeReadLoopIfPaused();
+  read_context.MaybePauseReadLoop();
+  read_context.SetPauseReadLoop();
+}
+
+class SimulatedTransport : public RefCounted<SimulatedTransport> {
+ public:
+  auto SimulatedReadAndProcessOneFrame() {
+    return [self = this->Ref()]() -> Poll<absl::Status> {
+      ++(self->i);
+      if (self->i % 2 == 0) {
+        // Doing this alternate times to make sure that SetPauseReadLoop is
+        // idempotent
+        absl::StrAppend(&self->execution_order, "Pause ");
+        return self->context.MaybePauseReadLoop();
+      }
+      absl::StrAppend(&self->execution_order, ". ");
+      return absl::OkStatus();
+    };
+  }
+  auto SimulatedReadLoop() {
+    return AssertResultType<absl::Status>(Loop([self = this->Ref()]() {
+      return TrySeq(self->SimulatedReadAndProcessOneFrame(),
+                    [self]() -> LoopCtl<absl::Status> {
+                      if (self->i < 10) {
+                        absl::StrAppend(&self->execution_order, "SetPause ");
+                        self->context.SetPauseReadLoop();
+                        return Continue();
+                      }
+                      absl::StrAppend(&self->execution_order, "EndRead ");
+                      self->did_end_read = true;
+                      return absl::OkStatus();
+                    });
+    }));
+  }
+  auto SimulatedOneWrite() {
+    return [self = this->Ref()]() -> Poll<absl::Status> {
+      absl::StrAppend(&self->execution_order, "Wake ");
+      self->context.ResumeReadLoopIfPaused();
+      return absl::OkStatus();
+    };
+  }
+
+  auto SimulatedWriteLoop() {
+    return AssertResultType<absl::Status>(Loop([self = this->Ref()]() {
+      return TrySeq(Sleep(Duration::Milliseconds(100)),
+                    self->SimulatedOneWrite(),
+                    [self]() -> LoopCtl<absl::Status> {
+                      if (self->did_end_read) {
+                        absl::StrAppend(&self->execution_order, "EndWrite ");
+                        return absl::OkStatus();
+                      }
+                      absl::StrAppend(&self->execution_order, "_ ");
+                      return Continue();
+                    });
+    }));
+  }
+
+  std::string execution_order;
+
+ private:
+  Http2ReadContext context;
+  int i = 0;
+  bool did_end_read = false;
+};
+
+TEST_F(Http2ReadContextTest, PauseAndWake) {
+  RefCountedPtr<SimulatedTransport> transport =
+      MakeRefCounted<SimulatedTransport>();
+  ExecCtx ctx;
+  RefCountedPtr<Party> party = MakeParty();
+  Notification n1;
+  Notification n2;
+  party->Spawn("Read", transport->SimulatedReadLoop(),
+               [&n1](absl::Status status) { n1.Notify(); });
+  party->Spawn("Write", transport->SimulatedWriteLoop(),
+               [&n2](absl::Status status) { n2.Notify(); });
+  n1.WaitForNotification();
+  n2.WaitForNotification();
+  EXPECT_STREQ(transport->execution_order.c_str(),
+               ". SetPause Pause Wake _ . SetPause Pause Wake _ . "
+               "SetPause Pause Wake _ . SetPause Pause Wake _ . "
+               "SetPause Pause Wake _ . EndRead Wake EndWrite ");
+}
+
 }  // namespace testing
 
 }  // namespace http2