]> git.feebdaed.xyz Git - 0xmirror/grpc.git/commitdiff
[PH2][Trivial] Tidy up client code - Part 3 master
authorTanvi Jagtap <tjagtap@google.com>
Wed, 24 Dec 2025 16:50:56 +0000 (08:50 -0800)
committerCopybara-Service <copybara-worker@google.com>
Wed, 24 Dec 2025 16:53:06 +0000 (08:53 -0800)
Moving code from header to cc file.
Trying to manage the class Http2ClientTransport which has become 500+ lines and hard to work with. Need to reorder as well (in next PR)

PiperOrigin-RevId: 848575512

src/core/ext/transport/chttp2/transport/http2_client_transport.cc
src/core/ext/transport/chttp2/transport/http2_client_transport.h

index f81e0972fed9295e5cdef3848d23ee3c063f834b..c5ff21e2d97eca8c78c30efaf370cd1e17a9c2ff 100644 (file)
@@ -113,8 +113,8 @@ using StreamWritabilityUpdate =
 // and it is functions. The code will be written iteratively.
 // Do not use or edit any of these functions unless you are
 // familiar with the PH2 project (Moving chttp2 to promises.)
-// TODO(tjagtap) : [PH2][P3] : Delete this comment when http2
-// rollout begins
+// TODO(tjagtap) : [PH2][P5] : Update the experimental status of the code when
+// http2 rollout is completed.
 
 template <typename Factory>
 void Http2ClientTransport::SpawnInfallible(RefCountedPtr<Party> party,
@@ -1528,6 +1528,32 @@ void Http2ClientTransport::ReadChannelArgs(const ChannelArgs& channel_args,
   }
 }
 
+absl::Status Http2ClientTransport::HandleError(
+    const std::optional<uint32_t> stream_id, Http2Status status,
+    DebugLocation whence) {
+  auto error_type = status.GetType();
+  GRPC_DCHECK(error_type != Http2Status::Http2ErrorType::kOk);
+
+  if (error_type == Http2Status::Http2ErrorType::kStreamError) {
+    GRPC_HTTP2_CLIENT_ERROR_DLOG << "Stream Error: " << status.DebugString();
+    GRPC_DCHECK(stream_id.has_value());
+    // Passing a cancelled server metadata handle to propagate the error
+    // to the upper layers.
+    BeginCloseStream(
+        LookupStream(stream_id.value()),
+        Http2ErrorCodeToFrameErrorCode(status.GetStreamErrorCode()),
+        CancelledServerMetadataFromStatus(status.GetAbslStreamError()), whence);
+    return absl::OkStatus();
+  } else if (error_type == Http2Status::Http2ErrorType::kConnectionError) {
+    GRPC_HTTP2_CLIENT_ERROR_DLOG << "Connection Error: "
+                                 << status.DebugString();
+    absl::Status absl_status = status.GetAbslConnectionError();
+    MaybeSpawnCloseTransport(std::move(status), whence);
+    return absl_status;
+  }
+  GPR_UNREACHABLE_CODE(return absl::InternalError("Invalid error type"));
+}
+
 // This function MUST be idempotent. This function MUST be called from the
 // transport party.
 void Http2ClientTransport::CloseStream(RefCountedPtr<Stream> stream,
@@ -1896,6 +1922,63 @@ void Http2ClientTransport::AddData(channelz::DataSink sink) {
 ///////////////////////////////////////////////////////////////////////////////
 // Stream Related Operations
 
+absl::StatusOr<uint32_t> Http2ClientTransport::NextStreamId() {
+  if (next_stream_id_ > GetMaxAllowedStreamId()) {
+    // TODO(tjagtap) : [PH2][P3] : Handle case if transport runs out of stream
+    // ids
+    // RFC9113 : Stream identifiers cannot be reused. Long-lived connections
+    // can result in an endpoint exhausting the available range of stream
+    // identifiers. A client that is unable to establish a new stream
+    // identifier can establish a new connection for new streams. A server
+    // that is unable to establish a new stream identifier can send a GOAWAY
+    // frame so that the client is forced to open a new connection for new
+    // streams.
+    return absl::ResourceExhaustedError("No more stream ids available");
+  }
+  // TODO(akshitpatel) : [PH2][P3] : There is a channel arg to delay
+  // starting new streams instead of failing them. This needs to be
+  // implemented.
+  {
+    MutexLock lock(&transport_mutex_);
+    if (GetActiveStreamCountLocked() >=
+        settings_->peer().max_concurrent_streams()) {
+      return absl::ResourceExhaustedError("Reached max concurrent streams");
+    }
+  }
+
+  // RFC9113 : Streams initiated by a client MUST use odd-numbered stream
+  // identifiers.
+  uint32_t new_stream_id = std::exchange(next_stream_id_, next_stream_id_ + 2);
+  if (GPR_UNLIKELY(next_stream_id_ > GetMaxAllowedStreamId())) {
+    ReportDisconnection(
+        absl::ResourceExhaustedError("Transport Stream IDs exhausted"),
+        {},  // TODO(tjagtap) : [PH2][P2] : Report better disconnect info.
+        "no_more_stream_ids");
+  }
+  return new_stream_id;
+}
+
+absl::Status Http2ClientTransport::MaybeAddStreamToWritableStreamList(
+    const RefCountedPtr<Stream> stream,
+    const StreamDataQueue<ClientMetadataHandle>::StreamWritabilityUpdate
+        result) {
+  if (result.became_writable) {
+    GRPC_HTTP2_CLIENT_DLOG
+        << "Http2ClientTransport MaybeAddStreamToWritableStreamList Stream id: "
+        << stream->GetStreamId() << " became writable";
+    absl::Status status =
+        writable_stream_list_.Enqueue(stream, result.priority);
+    if (!status.ok()) {
+      return HandleError(
+          std::nullopt,
+          Http2Status::Http2ConnectionError(
+              Http2ErrorCode::kRefusedStream,
+              "Failed to enqueue stream to writable stream list"));
+    }
+  }
+  return absl::OkStatus();
+}
+
 RefCountedPtr<Stream> Http2ClientTransport::LookupStream(uint32_t stream_id) {
   MutexLock lock(&transport_mutex_);
   auto it = stream_list_.find(stream_id);
@@ -2107,6 +2190,22 @@ void Http2ClientTransport::StartCall(CallHandler call_handler) {
   GRPC_HTTP2_CLIENT_DLOG << "Http2ClientTransport StartCall End";
 }
 
+///////////////////////////////////////////////////////////////////////////////
+// Http2ClientTransport - Test Only Functions
+
+int64_t Http2ClientTransport::TestOnlyTransportFlowControlWindow() {
+  return flow_control_.remote_window();
+}
+
+int64_t Http2ClientTransport::TestOnlyGetStreamFlowControlWindow(
+    const uint32_t stream_id) {
+  RefCountedPtr<Stream> stream = LookupStream(stream_id);
+  if (stream == nullptr) {
+    return -1;
+  }
+  return stream->flow_control.remote_window_delta();
+}
+
 ///////////////////////////////////////////////////////////////////////////////
 // Class PingSystemInterfaceImpl
 
index a1588fa54ba72761f22ce9f9aa7463c61e68ee38..32db043f80123311a2e31f5f3aae4c5da766d105 100644 (file)
@@ -104,8 +104,8 @@ namespace http2 {
 // Experimental : The code will be written iteratively.
 // Do not use or edit any of these functions unless you are
 // familiar with the PH2 project (Moving chttp2 to promises.)
-// TODO(tjagtap) : [PH2][P3] : Update the experimental status of the code before
-// http2 rollout begins.
+// TODO(tjagtap) : [PH2][P5] : Update the experimental status of the code when
+// http2 rollout is completed.
 class Http2ClientTransport final : public ClientTransport,
                                    public channelz::DataSource {
   // TODO(akshitpatel) [PH2][P1] : Functions that need a mutex to be held should
@@ -128,8 +128,7 @@ class Http2ClientTransport final : public ClientTransport,
   ServerTransport* server_transport() override { return nullptr; }
   absl::string_view GetTransportName() const override { return "http2"; }
 
-  // TODO(tjagtap) : [PH2][EXT] : These can be removed when event engine rollout
-  // is complete.
+  // TODO(tjagtap) : [PH2][EXT] : Removed after event engine rollout
   void SetPollset(grpc_stream*, grpc_pollset*) override {}
   void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {}
 
@@ -175,18 +174,8 @@ class Http2ClientTransport final : public ClientTransport,
   void TestOnlySpawnPromise(absl::string_view name, Factory&& factory) {
     general_party_->Spawn(name, std::forward<Factory>(factory), [](Empty) {});
   }
-
-  int64_t TestOnlyTransportFlowControlWindow() {
-    return flow_control_.remote_window();
-  }
-
-  int64_t TestOnlyGetStreamFlowControlWindow(const uint32_t stream_id) {
-    RefCountedPtr<Stream> stream = LookupStream(stream_id);
-    if (stream == nullptr) {
-      return -1;
-    }
-    return stream->flow_control.remote_window_delta();
-  }
+  int64_t TestOnlyTransportFlowControlWindow();
+  int64_t TestOnlyGetStreamFlowControlWindow(const uint32_t stream_id);
 
   bool AreTransportFlowControlTokensAvailable() {
     return flow_control_.remote_window() > 0;
@@ -299,42 +288,7 @@ class Http2ClientTransport final : public ClientTransport,
 
   // Returns the next stream id. If the next stream id is not available, it
   // returns std::nullopt. MUST be called from the transport party.
-  absl::StatusOr<uint32_t> NextStreamId() {
-    if (next_stream_id_ > GetMaxAllowedStreamId()) {
-      // TODO(tjagtap) : [PH2][P3] : Handle case if transport runs out of stream
-      // ids
-      // RFC9113 : Stream identifiers cannot be reused. Long-lived connections
-      // can result in an endpoint exhausting the available range of stream
-      // identifiers. A client that is unable to establish a new stream
-      // identifier can establish a new connection for new streams. A server
-      // that is unable to establish a new stream identifier can send a GOAWAY
-      // frame so that the client is forced to open a new connection for new
-      // streams.
-      return absl::ResourceExhaustedError("No more stream ids available");
-    }
-    // TODO(akshitpatel) : [PH2][P3] : There is a channel arg to delay
-    // starting new streams instead of failing them. This needs to be
-    // implemented.
-    {
-      MutexLock lock(&transport_mutex_);
-      if (GetActiveStreamCountLocked() >=
-          settings_->peer().max_concurrent_streams()) {
-        return absl::ResourceExhaustedError("Reached max concurrent streams");
-      }
-    }
-
-    // RFC9113 : Streams initiated by a client MUST use odd-numbered stream
-    // identifiers.
-    uint32_t new_stream_id =
-        std::exchange(next_stream_id_, next_stream_id_ + 2);
-    if (GPR_UNLIKELY(next_stream_id_ > GetMaxAllowedStreamId())) {
-      ReportDisconnection(
-          absl::ResourceExhaustedError("Transport Stream IDs exhausted"),
-          {},  // TODO(tjagtap) : [PH2][P2] : Report better disconnect info.
-          "no_more_stream_ids");
-    }
-    return new_stream_id;
-  }
+  absl::StatusOr<uint32_t> NextStreamId();
 
   // Returns the next stream id without incrementing it. MUST be called from the
   // transport party.
@@ -422,30 +376,7 @@ class Http2ClientTransport final : public ClientTransport,
   // If the error is a connection error, it closes the transport and returns the
   // corresponding (failed) absl status.
   absl::Status HandleError(const std::optional<uint32_t> stream_id,
-                           Http2Status status, DebugLocation whence = {}) {
-    auto error_type = status.GetType();
-    GRPC_DCHECK(error_type != Http2Status::Http2ErrorType::kOk);
-
-    if (error_type == Http2Status::Http2ErrorType::kStreamError) {
-      GRPC_HTTP2_CLIENT_ERROR_DLOG << "Stream Error: " << status.DebugString();
-      GRPC_DCHECK(stream_id.has_value());
-      // Passing a cancelled server metadata handle to propagate the error
-      // to the upper layers.
-      BeginCloseStream(
-          LookupStream(stream_id.value()),
-          Http2ErrorCodeToFrameErrorCode(status.GetStreamErrorCode()),
-          CancelledServerMetadataFromStatus(status.GetAbslStreamError()),
-          whence);
-      return absl::OkStatus();
-    } else if (error_type == Http2Status::Http2ErrorType::kConnectionError) {
-      GRPC_HTTP2_CLIENT_ERROR_DLOG << "Connection Error: "
-                                   << status.DebugString();
-      absl::Status absl_status = status.GetAbslConnectionError();
-      MaybeSpawnCloseTransport(std::move(status), whence);
-      return absl_status;
-    }
-    GPR_UNREACHABLE_CODE(return absl::InternalError("Invalid error type"));
-  }
+                           Http2Status status, DebugLocation whence = {});
 
   bool should_reset_ping_clock_;
   bool is_first_write_;
@@ -574,24 +505,8 @@ class Http2ClientTransport final : public ClientTransport,
   absl::Status MaybeAddStreamToWritableStreamList(
       const RefCountedPtr<Stream> stream,
       const StreamDataQueue<ClientMetadataHandle>::StreamWritabilityUpdate
-          result) {
-    if (result.became_writable) {
-      GRPC_HTTP2_CLIENT_DLOG
-          << "Http2ClientTransport MaybeAddStreamToWritableStreamList "
-             " Stream id: "
-          << stream->GetStreamId() << " became writable";
-      absl::Status status =
-          writable_stream_list_.Enqueue(stream, result.priority);
-      if (!status.ok()) {
-        return HandleError(
-            std::nullopt,
-            Http2Status::Http2ConnectionError(
-                Http2ErrorCode::kRefusedStream,
-                "Failed to enqueue stream to writable stream list"));
-      }
-    }
-    return absl::OkStatus();
-  }
+          result);
+
   bool SetOnDone(CallHandler call_handler, RefCountedPtr<Stream> stream);
   absl::StatusOr<std::vector<Http2Frame>> DequeueStreamFrames(
       RefCountedPtr<Stream> stream);