]> git.feebdaed.xyz Git - 0xmirror/grpc.git/commitdiff
[PH2][FlowControl][Bug] Adding missing flow control plumbing
authorTanvi Jagtap <tjagtap@google.com>
Mon, 15 Dec 2025 06:09:41 +0000 (22:09 -0800)
committerCopybara-Service <copybara-worker@google.com>
Mon, 15 Dec 2025 06:12:15 +0000 (22:12 -0800)
This is a hack. The actual fix needs some work for Call V3 stack which is scheduled for later.

PiperOrigin-RevId: 844589014

14 files changed:
src/core/BUILD
src/core/ext/transport/chttp2/transport/flow_control.cc
src/core/ext/transport/chttp2/transport/flow_control.h
src/core/ext/transport/chttp2/transport/flow_control_manager.h
src/core/ext/transport/chttp2/transport/frame.cc
src/core/ext/transport/chttp2/transport/frame.h
src/core/ext/transport/chttp2/transport/http2_client_transport.cc
src/core/ext/transport/chttp2/transport/http2_transport.cc
src/core/ext/transport/chttp2/transport/http2_transport.h
src/core/ext/transport/chttp2/transport/stream.h
src/core/ext/transport/chttp2/transport/writable_streams.h
test/core/end2end/end2end_ph2_config.cc
test/core/transport/chttp2/flow_control_manager_test.cc
test/core/transport/chttp2/frame_test.cc

index 49a794d71aec8f4a4778b1820d88063d7f2301fc..ee4cd72bd990b9efefb22bda3766c4ccab331b9b 100644 (file)
@@ -8118,6 +8118,7 @@ grpc_cc_library(
     ],
     deps = [
         ":chttp2_flow_control",
+        ":grpc_check",
         ":http2_settings",
         "//:chttp2_frame",
     ],
index b64c4f845b6f375db488f362a14fe3259f042c4f..a15a69482f2882f1ee5df7287f68e5874756f09e 100644 (file)
@@ -404,6 +404,18 @@ FlowControlAction StreamFlowControl::UpdateAction(FlowControlAction action) {
   return action;
 }
 
+void StreamFlowControl::IncomingUpdateContext::HackIncrementPendingSize(
+    int64_t pending_size) {
+  GRPC_CHECK_GE(pending_size, 0);
+  if (sfc_->pending_size_.has_value()) {
+    int64_t final_size = Clamp(sfc_->pending_size_.value() + pending_size,
+                               int64_t{0}, kMaxWindowUpdateSize);
+    *sfc_->pending_size_ = final_size;
+  } else {
+    sfc_->pending_size_ = pending_size;
+  }
+}
+
 void StreamFlowControl::IncomingUpdateContext::SetPendingSize(
     int64_t pending_size) {
   GRPC_CHECK_GE(pending_size, 0);
index e6f79e51bd418218ff04bca5e94b6c88aed8d335..385b3c731d80c0917b35669f306383c969042e5c 100644 (file)
@@ -513,9 +513,17 @@ class StreamFlowControl final {
     // for application to read. Call this when a complete message is assembled
     // but not yet pulled by the application. This helps flow control decide
     // whether to send a WINDOW_UPDATE to the peer.
-    // TODO(tjagtap) [PH2][P2] Plumb with PH2 flow control.
+    // TODO(tjagtap) [PH2][P1] Plumb with PH2 flow control.
     void SetPendingSize(int64_t pending_size);
 
+    // This is a hack in place till SetPendingSize is fully plumbed. This hack
+    // function just pretends that the application needs more bytes. Since we
+    // dont actually know how many bytes the application needs, we just want to
+    // refill the used up tokens. The only way to refill used up tokens is to
+    // call this function for each DATA frame.
+    // TODO(tjagtap) [PH2][P1] Remove hack after SetPendingSize is plumbed.
+    void HackIncrementPendingSize(int64_t pending_size);
+
    private:
     TransportFlowControl::IncomingUpdateContext tfc_upd_;
     StreamFlowControl* const sfc_;
index c34901e4b91831dfe785de40836c1ad9d6bc479e..d484e25f29d5e8c8dc157830d39a661e246d81c8 100644 (file)
@@ -26,6 +26,7 @@
 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
 #include "src/core/ext/transport/chttp2/transport/frame.h"
 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
+#include "src/core/util/grpc_check.h"
 #include "absl/container/flat_hash_map.h"
 
 namespace grpc_core {
@@ -35,6 +36,7 @@ constexpr chttp2::FlowControlAction::Urgency kNoActionNeeded =
     chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED;
 constexpr chttp2::FlowControlAction::Urgency kUpdateImmediately =
     chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
+constexpr int64_t kZero = 0;
 
 #define GRPC_HTTP2_FLOW_CONTROL_HELPERS \
   DLOG_IF(INFO, GRPC_TRACE_FLAG_ENABLED(http2_ph2_transport))
@@ -58,14 +60,29 @@ inline void ActOnFlowControlActionSettings(
   }
 }
 
+// RFC9113 : A sender MUST NOT allow a flow-control window to exceed ((2^31)-1)
+// octets. If a sender receives a WINDOW_UPDATE that causes a flow-control
+// window to exceed this maximum, it MUST terminate either the stream or the
+// connection, as appropriate.
+// While the return value is int64_t for compatibility with CHTTP2 flow control,
+// we dont expect this value to exceed ((2^31)-1) i.e kMaxSize31Bit
+inline int64_t GetStreamFlowControlTokens(
+    chttp2::StreamFlowControl& stream_flow_control,
+    const Http2Settings& peer_settings) {
+  GRPC_DCHECK(stream_flow_control.remote_window_delta() +
+                  peer_settings.initial_window_size() <=
+              RFC9113::kMaxSize31Bit);
+  return std::max(kZero, stream_flow_control.remote_window_delta() +
+                             peer_settings.initial_window_size());
+}
+
 inline uint32_t GetMaxPermittedDequeue(
     chttp2::TransportFlowControl& transport_flow_control,
     chttp2::StreamFlowControl& stream_flow_control, const size_t upper_limit,
     const Http2Settings& peer_settings) {
   const int64_t flow_control_tokens =
       std::min(transport_flow_control.remote_window(),
-               stream_flow_control.remote_window_delta() +
-                   peer_settings.initial_window_size());
+               GetStreamFlowControlTokens(stream_flow_control, peer_settings));
   uint32_t max_dequeue = 0;
   if (flow_control_tokens > 0) {
     max_dequeue = static_cast<uint32_t>(
@@ -78,6 +95,10 @@ inline uint32_t GetMaxPermittedDequeue(
   return max_dequeue;
 }
 
+// TODO(tjagtap) [PH2][P4] : Ensure that the total transport flow control window
+// or stream flow control window does not exceed the max permitted limit of
+// 2^31-1
+
 }  // namespace http2
 }  // namespace grpc_core
 
index 4d22665e96bb8c2eceea6b86bcb6e5c2fb6336d4..8e578b0eae73151f01cf42c0cbd6a08389a53985 100644 (file)
@@ -224,7 +224,7 @@ class SerializeHeaderAndPayload {
         << "SerializeHeaderAndPayload Http2DataFrame Type:0 { stream_id:"
         << frame.stream_id << ", end_stream:" << frame.end_stream
         << ", payload_length:" << frame.payload.Length()
-        << ", payload:" << frame.payload.JoinIntoString() << "}";
+        << ", payload:" << MaybeTruncatePayload(frame.payload) << "}";
     auto hdr = extra_bytes_.TakeFirst(kFrameHeaderSize);
     Http2FrameHeader{static_cast<uint32_t>(frame.payload.Length()),
                      static_cast<uint8_t>(FrameType::kData),
@@ -242,7 +242,7 @@ class SerializeHeaderAndPayload {
         << frame.stream_id << ", end_headers:" << frame.end_headers
         << ", end_stream:" << frame.end_stream
         << ", payload_length:" << frame.payload.Length()
-        << ", payload:" << frame.payload.JoinIntoString() << "}";
+        << ", payload:" << MaybeTruncatePayload(frame.payload) << "}";
     auto hdr = extra_bytes_.TakeFirst(kFrameHeaderSize);
     Http2FrameHeader{
         static_cast<uint32_t>(frame.payload.Length()),
@@ -262,7 +262,7 @@ class SerializeHeaderAndPayload {
                           << frame.stream_id
                           << ", end_headers:" << frame.end_headers
                           << ", payload_length:" << frame.payload.Length()
-                          << ", payload:" << frame.payload.JoinIntoString()
+                          << ", payload:" << MaybeTruncatePayload(frame.payload)
                           << "}";
     auto hdr = extra_bytes_.TakeFirst(kFrameHeaderSize);
     Http2FrameHeader{
@@ -881,4 +881,13 @@ Http2Status ValidateFrameHeader(const uint32_t max_frame_size_setting,
   return Http2Status::Ok();
 }
 
+std::string MaybeTruncatePayload(SliceBuffer& payload, const uint32_t length) {
+  if (payload.Length() <= length) {
+    return payload.JoinIntoString();
+  }
+  std::string result(length, '\0');
+  payload.CopyFirstNBytesIntoBuffer(length, result.data());
+  return absl::StrCat(result, "<clipped>");
+}
+
 }  // namespace grpc_core
index 19cae7c8b96499d75b77920b644d1e22b8c70ecc..335109e05340883d3b97e5fe6d8cc0f6c8847046 100644 (file)
 
 namespace grpc_core {
 
+// Prints the first `length` bytes of the payload. If the payload is longer than
+// `length`, it appends "<clipped>" to the output.
+std::string MaybeTruncatePayload(SliceBuffer& payload, uint32_t length = 15);
+
 ///////////////////////////////////////////////////////////////////////////////
 // Frame types
 //
index 03189fb359a123c8a9effd5627d3bdce26fbcd3e..bc6786d565219fd702d24074ce1c3156394c50c1 100644 (file)
@@ -246,7 +246,7 @@ Http2Status Http2ClientTransport::ProcessHttp2DataFrame(Http2DataFrame frame) {
   GRPC_HTTP2_CLIENT_DLOG
       << "Http2ClientTransport ProcessHttp2DataFrame { stream_id="
       << frame.stream_id << ", end_stream=" << frame.end_stream
-      << ", payload=" << frame.payload.JoinIntoString()
+      << ", payload=" << MaybeTruncatePayload(frame.payload)
       << ", payload length=" << frame.payload.Length() << "}";
 
   // TODO(akshitpatel) : [PH2][P3] : Investigate if we should do this even if
@@ -340,7 +340,7 @@ Http2Status Http2ClientTransport::ProcessHttp2HeaderFrame(
       << "Http2ClientTransport ProcessHttp2HeaderFrame Promise { stream_id="
       << frame.stream_id << ", end_headers=" << frame.end_headers
       << ", end_stream=" << frame.end_stream
-      << ", payload=" << frame.payload.JoinIntoString() << " }";
+      << ", payload=" << MaybeTruncatePayload(frame.payload) << " }";
   // State update MUST happen before processing the frame.
   incoming_headers_.OnHeaderReceived(frame);
 
@@ -621,6 +621,18 @@ Http2Status Http2ClientTransport::ProcessHttp2WindowUpdateFrame(
   if (frame.stream_id != 0) {
     stream = LookupStream(frame.stream_id);
   }
+  if (stream != nullptr) {
+    StreamWritabilityUpdate update =
+        stream->ReceivedFlowControlWindowUpdate(frame.increment);
+    if (update.became_writable) {
+      absl::Status status = writable_stream_list_.EnqueueWrapper(
+          stream, update.priority, AreTransportFlowControlTokensAvailable());
+      if (!status.ok()) {
+        return ToHttpOkOrConnError(status);
+      }
+    }
+  }
+
   bool should_trigger_write =
       ProcessIncomingWindowUpdateFrameFlowControl(frame, flow_control_, stream);
   if (should_trigger_write) {
@@ -636,7 +648,7 @@ Http2Status Http2ClientTransport::ProcessHttp2ContinuationFrame(
       << "Http2ClientTransport ProcessHttp2ContinuationFrame Promise { "
          "stream_id="
       << frame.stream_id << ", end_headers=" << frame.end_headers
-      << ", payload=" << frame.payload.JoinIntoString() << " }";
+      << ", payload=" << MaybeTruncatePayload(frame.payload) << " }";
 
   // State update MUST happen before processing the frame.
   incoming_headers_.OnContinuationReceived(frame);
@@ -828,7 +840,7 @@ auto Http2ClientTransport::ReadAndProcessOneFrame() {
           SliceBuffer payload) -> absl::StatusOr<Http2Frame> {
         GRPC_HTTP2_CLIENT_DLOG
             << "Http2ClientTransport ReadAndProcessOneFrame ParseFramePayload "
-            << payload.JoinIntoString();
+            << MaybeTruncatePayload(payload);
         ValueOrHttp2Status<Http2Frame> frame =
             ParseFramePayload(self->current_frame_header_, std::move(payload));
         if (!frame.IsOk()) {
@@ -910,16 +922,21 @@ auto Http2ClientTransport::FlowControlPeriodicUpdateLoop() {
 // Equivalent to grpc_chttp2_act_on_flowctl_action in chttp2_transport.cc
 void Http2ClientTransport::ActOnFlowControlAction(
     const chttp2::FlowControlAction& action, RefCountedPtr<Stream> stream) {
-  GRPC_HTTP2_CLIENT_DLOG << "Http2ClientTransport::ActOnFlowControlAction";
+  GRPC_HTTP2_CLIENT_DLOG << "Http2ClientTransport::ActOnFlowControlAction"
+                         << action.DebugString();
   if (action.send_stream_update() != kNoActionNeeded) {
     if (GPR_LIKELY(stream != nullptr)) {
       GRPC_DCHECK_GT(stream->GetStreamId(), 0u);
       if (stream->CanSendWindowUpdateFrames()) {
         window_update_list_.insert(stream->GetStreamId());
+        GRPC_HTTP2_CLIENT_DLOG
+            << "Http2ClientTransport::ActOnFlowControlAction "
+               "added stream "
+            << stream->GetStreamId() << " to window_update_list_";
       }
     } else {
       GRPC_HTTP2_CLIENT_DLOG
-          << "Http2ClientTransport ActOnFlowControlAction stream is null";
+          << "Http2ClientTransport::ActOnFlowControlAction stream is null";
     }
   }
 
@@ -1037,25 +1054,23 @@ Http2ClientTransport::DequeueStreamFrames(RefCountedPtr<Stream> stream) {
   // data frames when write_bytes_remaining_ is very low. As the
   // available transport tokens can only range from 0 to 2^31 - 1,
   // we are clamping the write_bytes_remaining_ to that range.
-  const uint32_t max_dequeue_size =
+  const uint32_t tokens =
       GetMaxPermittedDequeue(flow_control_, stream->flow_control,
                              write_bytes_remaining_, settings_->peer());
+  const uint32_t stream_flow_control_tokens = static_cast<uint32_t>(
+      GetStreamFlowControlTokens(stream->flow_control, settings_->peer()));
   stream->flow_control.ReportIfStalled(
       /*is_client=*/true, stream->GetStreamId(), settings_->peer());
   StreamDataQueue<ClientMetadataHandle>::DequeueResult result =
-      stream->DequeueFrames(max_dequeue_size,
+      stream->DequeueFrames(tokens, stream_flow_control_tokens,
                             settings_->peer().max_frame_size(), encoder_);
   ProcessOutgoingDataFrameFlowControl(stream->flow_control,
                                       result.flow_control_tokens_consumed);
   if (result.is_writable) {
     // Stream is still writable. Enqueue it back to the writable
     // stream list.
-    absl::Status status;
-    if (AreTransportFlowControlTokensAvailable()) {
-      status = writable_stream_list_.Enqueue(stream, result.priority);
-    } else {
-      status = writable_stream_list_.BlockedOnTransportFlowControl(stream);
-    }
+    absl::Status status = writable_stream_list_.EnqueueWrapper(
+        stream, result.priority, AreTransportFlowControlTokensAvailable());
 
     if (GPR_UNLIKELY(!status.ok())) {
       GRPC_HTTP2_CLIENT_DLOG
@@ -1325,16 +1340,7 @@ void Http2ClientTransport::MaybeGetWindowUpdateFrames(SliceBuffer& output_buf) {
   }
   for (const uint32_t stream_id : window_update_list_) {
     RefCountedPtr<Stream> stream = LookupStream(stream_id);
-    if (stream != nullptr && stream->CanSendWindowUpdateFrames()) {
-      const uint32_t increment = stream->flow_control.MaybeSendUpdate();
-      if (increment > 0) {
-        GRPC_HTTP2_CLIENT_DLOG
-            << "Http2ClientTransport::MaybeGetWindowUpdateFrames Stream Window "
-               "Update { "
-            << stream_id << ", " << window_size << " }";
-        frames.emplace_back(Http2WindowUpdateFrame{stream_id, increment});
-      }
-    }
+    MaybeAddStreamWindowUpdateFrame(stream, frames);
   }
   window_update_list_.clear();
   if (!frames.empty()) {
@@ -1425,8 +1431,6 @@ void Http2ClientTransport::ReadChannelArgs(const ChannelArgs& channel_args,
                          flow_control_,
                          /*is_client=*/true);
 
-  // TODO(akshitpatel) : [PH2][P3] : Add a persistent struct for channel args
-  // to avoid copying these channel args to member variables.
   // Assign the channel args to the member variables.
   keepalive_time_ = args.keepalive_time;
   incoming_headers_.set_soft_limit(args.max_header_list_size_soft_limit);
index 48f34212260e2c55d259b0444401613b70d095ee..6e53864390abc7c0c03d413146b11ba188d92e46 100644 (file)
@@ -266,6 +266,9 @@ ProcessIncomingDataFrameFlowControl(Http2FrameHeader& frame_header,
           &flow_control);
       absl::Status fc_status = transport_fc.RecvData(frame_header.length);
       chttp2::FlowControlAction action = transport_fc.MakeAction();
+      GRPC_HTTP2_COMMON_DLOG
+          << "ProcessIncomingDataFrameFlowControl Transport RecvData status: "
+          << fc_status << " action: " << action.DebugString();
       if (!fc_status.ok()) {
         LOG(ERROR) << "Flow control error: " << fc_status.message();
         // RFC9113 : A receiver MAY respond with a stream error or connection
@@ -280,6 +283,9 @@ ProcessIncomingDataFrameFlowControl(Http2FrameHeader& frame_header,
           &stream->flow_control);
       absl::Status fc_status = stream_fc.RecvData(frame_header.length);
       chttp2::FlowControlAction action = stream_fc.MakeAction();
+      GRPC_HTTP2_COMMON_DLOG
+          << "ProcessIncomingDataFrameFlowControl Stream RecvData status: "
+          << fc_status << " action: " << action.DebugString();
       if (!fc_status.ok()) {
         LOG(ERROR) << "Flow control error: " << fc_status.message();
         // RFC9113 : A receiver MAY respond with a stream error or connection
@@ -288,6 +294,8 @@ ProcessIncomingDataFrameFlowControl(Http2FrameHeader& frame_header,
             Http2ErrorCode::kFlowControlError,
             std::string(fc_status.message()));
       }
+      // TODO(tjagtap) [PH2][P1][FlowControl] This is a HACK. Fix this.
+      stream_fc.HackIncrementPendingSize(frame_header.length);
       return action;
     }
   }
@@ -299,14 +307,23 @@ bool ProcessIncomingWindowUpdateFrameFlowControl(
     chttp2::TransportFlowControl& flow_control, RefCountedPtr<Stream> stream) {
   if (frame.stream_id != 0) {
     if (stream != nullptr) {
+      GRPC_HTTP2_COMMON_DLOG
+          << "ProcessIncomingWindowUpdateFrameFlowControl stream "
+          << frame.stream_id << " increment " << frame.increment;
       chttp2::StreamFlowControl::OutgoingUpdateContext fc_update(
           &stream->flow_control);
       fc_update.RecvUpdate(frame.increment);
     } else {
       // If stream id is non zero, and stream is nullptr, maybe the stream was
       // closed. Ignore this WINDOW_UPDATE frame.
+      GRPC_HTTP2_COMMON_DLOG
+          << "ProcessIncomingWindowUpdateFrameFlowControl stream "
+          << frame.stream_id << " not found. Ignoring.";
     }
   } else {
+    GRPC_HTTP2_COMMON_DLOG
+        << "ProcessIncomingWindowUpdateFrameFlowControl transport increment "
+        << frame.increment;
     chttp2::TransportFlowControl::OutgoingUpdateContext fc_update(
         &flow_control);
     fc_update.RecvUpdate(frame.increment);
@@ -316,12 +333,37 @@ bool ProcessIncomingWindowUpdateFrameFlowControl(
       // write cycle and attempt to send data from these streams.
       // Although it's possible no streams were blocked, triggering an
       // unnecessary write cycle in that super-rare case is acceptable.
+      GRPC_HTTP2_COMMON_DLOG << "ProcessIncomingWindowUpdateFrameFlowControl "
+                                "Transport Unstalled";
       return true;
     }
   }
   return false;
 }
 
+void MaybeAddStreamWindowUpdateFrame(RefCountedPtr<Stream> stream,
+                                     std::vector<Http2Frame>& frames) {
+  GRPC_HTTP2_COMMON_DLOG << "MaybeAddStreamWindowUpdateFrame stream="
+                         << ((stream == nullptr)
+                                 ? "null"
+                                 : absl::StrCat(
+                                       stream->GetStreamId(),
+                                       " CanSendWindowUpdateFrames=",
+                                       stream->CanSendWindowUpdateFrames()));
+  if (stream != nullptr && stream->CanSendWindowUpdateFrames()) {
+    const uint32_t increment = stream->flow_control.MaybeSendUpdate();
+    GRPC_HTTP2_COMMON_DLOG
+        << "MaybeAddStreamWindowUpdateFrame MaybeSendUpdate { "
+        << stream->GetStreamId() << ", " << increment << " }"
+        << (increment == 0 ? ". The frame will NOT be sent for increment 0"
+                           : "");
+    if (increment > 0) {
+      frames.emplace_back(
+          Http2WindowUpdateFrame{stream->GetStreamId(), increment});
+    }
+  }
+}
+
 // /////////////////////////////////////////////////////////////////////////////
 // Header and Continuation frame processing helpers
 Http2Status ParseAndDiscardHeaders(HPackParser& parser, SliceBuffer&& buffer,
index 8f331007091a4b6bd33666f4f1a6176bc70f69bd..1d12097edbddaed575fae15deb14195d46ba5c25 100644 (file)
@@ -172,6 +172,9 @@ bool ProcessIncomingWindowUpdateFrameFlowControl(
     const Http2WindowUpdateFrame& frame,
     chttp2::TransportFlowControl& flow_control, RefCountedPtr<Stream> stream);
 
+void MaybeAddStreamWindowUpdateFrame(RefCountedPtr<Stream> stream,
+                                     std::vector<Http2Frame>& frames);
+
 ///////////////////////////////////////////////////////////////////////////////
 // Header and Continuation frame processing helpers
 
index 8d4fa334882e13c497386d484a094d6a3580e7b4..c642e083802368cebfc4bfe6c65f730b756a11fe 100644 (file)
@@ -99,19 +99,17 @@ struct Stream : public RefCounted<Stream> {
   // All enqueue methods are called from the call party.
 
   auto EnqueueInitialMetadata(ClientMetadataHandle&& metadata) {
-    GRPC_HTTP2_STREAM_LOG << "Stream::EnqueueInitialMetadata stream_id="
-                          << stream_id;
+    GRPC_HTTP2_STREAM_LOG << "Stream::EnqueueInitialMetadata";
     return data_queue->EnqueueInitialMetadata(std::move(metadata));
   }
 
   auto EnqueueTrailingMetadata(ClientMetadataHandle&& metadata) {
-    GRPC_HTTP2_STREAM_LOG << "Stream::EnqueueTrailingMetadata stream_id="
-                          << stream_id;
+    GRPC_HTTP2_STREAM_LOG << "Stream::EnqueueTrailingMetadata";
     return data_queue->EnqueueTrailingMetadata(std::move(metadata));
   }
 
   auto EnqueueMessage(MessageHandle&& message) {
-    GRPC_HTTP2_STREAM_LOG << "Stream::EnqueueMessage stream_id=" << stream_id
+    GRPC_HTTP2_STREAM_LOG << "Stream::EnqueueMessage"
                           << " with payload size = "
                           << message->payload()->Length()
                           << " and flags = " << message->flags();
@@ -119,31 +117,28 @@ struct Stream : public RefCounted<Stream> {
   }
 
   auto EnqueueHalfClosed() {
-    GRPC_HTTP2_STREAM_LOG << "Stream::EnqueueHalfClosed stream_id="
-                          << stream_id;
+    GRPC_HTTP2_STREAM_LOG << "Stream::EnqueueHalfClosed";
     return data_queue->EnqueueHalfClosed();
   }
 
   auto EnqueueResetStream(const uint32_t error_code) {
-    GRPC_HTTP2_STREAM_LOG << "Stream::EnqueueResetStream stream_id="
-                          << stream_id << " with error_code = " << error_code;
+    GRPC_HTTP2_STREAM_LOG << "Stream::EnqueueResetStream"
+                          << " with error_code = " << error_code;
     return data_queue->EnqueueResetStream(error_code);
   }
 
   // Called from the transport party
-  auto DequeueFrames(const uint32_t transport_tokens,
+  auto DequeueFrames(const uint32_t tokens,
+                     const uint32_t stream_flow_control_tokens,
                      const uint32_t max_frame_length,
                      HPackCompressor& encoder) {
     HttpStreamState state = stream_state;
     // Reset stream MUST not be sent if the stream is idle or closed.
-    // TODO(tjagtap) : [PH2][P1][FlowControl] : Populate the correct stream flow
-    // control tokens.
-    return data_queue->DequeueFrames(
-        transport_tokens, max_frame_length,
-        /*stream_fc_tokens=*/std::numeric_limits<uint32_t>::max(), encoder,
-        /*can_send_reset_stream=*/
-        !(state == HttpStreamState::kIdle ||
-          state == HttpStreamState::kClosed));
+    return data_queue->DequeueFrames(tokens, max_frame_length,
+                                     stream_flow_control_tokens, encoder,
+                                     /*can_send_reset_stream=*/
+                                     !(state == HttpStreamState::kIdle ||
+                                       state == HttpStreamState::kClosed));
   }
 
   auto ReceivedFlowControlWindowUpdate(const uint32_t stream_fc_tokens) {
index b336f158be67a332cf8689ce26c0d1102f6f9479..9cdc25a2a2b0d569477e6d7ad444268136502a7e 100644 (file)
@@ -74,6 +74,16 @@ class WritableStreams {
   WritableStreams(WritableStreams&&) = delete;
   WritableStreams& operator=(WritableStreams&&) = delete;
 
+  absl::Status EnqueueWrapper(const StreamPtr stream,
+                              const WritableStreamPriority priority,
+                              bool transport_tokens_available) {
+    if (transport_tokens_available) {
+      return Enqueue(stream, priority);
+    } else {
+      return BlockedOnTransportFlowControl(stream);
+    }
+  }
+
   // Enqueues a stream id with the given priority.
   // If this returns error, transport MUST be closed.
   absl::Status Enqueue(const StreamPtr stream,
index 7c055ef0f588d32b8c076f54295189e9665c8d31..72c4dcbb5d0c9b1b57fd116652258d71f233e2ad 100644 (file)
@@ -56,9 +56,8 @@ class Ph2InsecureFixture : public InsecureFixture {
 };
 
 // This macro defines a set of cancellation and deadline tests that are
-// frequently broken and have been temporarily disabled. Grouping them here
-// allows them to be added to the GRPC_HTTP2_PROMISE_CLIENT_TRANSPORT_AVOID_LIST
-// list easily.
+// frequently broken. Grouping them here allows them to be added to the
+// GRPC_HTTP2_PROMISE_CLIENT_TRANSPORT_AVOID_LIST list easily.
 #define CANCEL_SUITE                        \
   "|CoreEnd2endTests.CancelAfterAccept"     \
   "|CoreEnd2endTests.CancelAfterClientDone" \
@@ -99,7 +98,6 @@ class Ph2InsecureFixture : public InsecureFixture {
 
 #define GRPC_HTTP2_PROMISE_CLIENT_TRANSPORT_AVOID_LIST       \
   LARGE_METADATA_SUITE                                       \
-  "|Http2SingleHopTests.InvokeLargeRequest"                  \
   "|Http2SingleHopTests.MaxConcurrentStreams"                \
   "|Http2SingleHopTests.MaxConcurrentStreamsTimeoutOnFirst"  \
   "|Http2SingleHopTests.MaxConcurrentStreamsTimeoutOnSecond" \
index 14a9a4274868ecc8cae2af1c487501bb3db10f48..86ef7fe3bf3c255bc5abd9f70b1602c6deb6ac25 100644 (file)
@@ -121,6 +121,49 @@ TEST(FlowControlManagerTest, ActOnFlowControlActionSettingsNoAction) {
             preferred_receive_crypto_message_size);
 }
 
+TEST(FlowControlManagerTest, GetStreamFlowControlTokens) {
+  chttp2::TransportFlowControl transport_flow_control(
+      /*name=*/"TestFlowControl", /*enable_bdp_probe=*/false,
+      /*memory_owner=*/nullptr);
+  chttp2::StreamFlowControl stream_flow_control(&transport_flow_control);
+  Http2Settings peer_settings;
+
+  // Initial state: stream delta 0, initial window 65535.
+  EXPECT_EQ(chttp2::kDefaultWindow,
+            GetStreamFlowControlTokens(stream_flow_control, peer_settings));
+
+  // Send 1000 bytes, stream delta becomes -1000.
+  // 65535 - 1000 = 64535
+  {
+    chttp2::StreamFlowControl::OutgoingUpdateContext sfc_upd(
+        &stream_flow_control);
+    sfc_upd.SentData(1000);
+  }
+  EXPECT_EQ(chttp2::kDefaultWindow - 1000,
+            GetStreamFlowControlTokens(stream_flow_control, peer_settings));
+
+  // Receive stream window update of 500, stream delta becomes -500.
+  // 65535 - 1000 + 500 = 65035
+  {
+    chttp2::StreamFlowControl::OutgoingUpdateContext sfc_upd(
+        &stream_flow_control);
+    sfc_upd.RecvUpdate(500);
+  }
+  EXPECT_EQ(chttp2::kDefaultWindow - 500,
+            GetStreamFlowControlTokens(stream_flow_control, peer_settings));
+
+  // If peer settings initial window size changes.
+  // 1000 - 500 = 500
+  peer_settings.SetInitialWindowSize(1000);
+  EXPECT_EQ(500,
+            GetStreamFlowControlTokens(stream_flow_control, peer_settings));
+
+  // If stream flow control tokens becomes negative, it's clamped to 0.
+  // 100 - 500 = -400
+  peer_settings.SetInitialWindowSize(100);
+  EXPECT_EQ(0, GetStreamFlowControlTokens(stream_flow_control, peer_settings));
+}
+
 TEST(FlowControlManagerTest, GetMaxPermittedDequeue) {
   chttp2::TransportFlowControl transport_flow_control(
       /*name=*/"TestFlowControl", /*enable_bdp_probe=*/false,
index efa28c7b91c2b1fda95155232a59116b0fdc2d0b..5b549dd5c58416d35d4a6c49220226a54e981638 100644 (file)
@@ -1354,6 +1354,28 @@ TEST(FrameSize, Http2FrameSizeTest) {
   EXPECT_EQ(GetFrameMemoryUsage(Http2EmptyFrame{}), sizeof(Http2EmptyFrame));
 }
 
+TEST(MaybeTruncatePayloadTest, Truncation) {
+  // Test with an empty buffer.
+  SliceBuffer sb_empty;
+  EXPECT_EQ(MaybeTruncatePayload(sb_empty, 10), "");
+
+  // Test with a non-empty buffer.
+  SliceBuffer sb;
+  sb.Append(Slice::FromCopiedString("hello world"));
+  // Case: Length > payload length.
+  EXPECT_EQ(MaybeTruncatePayload(sb, 20), "hello world");
+  // Case: Length == payload length + 1.
+  EXPECT_EQ(MaybeTruncatePayload(sb, 12), "hello world");
+  // Case: Length == payload length.
+  EXPECT_EQ(MaybeTruncatePayload(sb, 11), "hello world");
+  // Case: Length == payload length - 1.
+  EXPECT_EQ(MaybeTruncatePayload(sb, 10), "hello worl<clipped>");
+  // Case: Length < payload length.
+  EXPECT_EQ(MaybeTruncatePayload(sb, 5), "hello<clipped>");
+  // Case: Length == 0.
+  EXPECT_EQ(MaybeTruncatePayload(sb, 0), "<clipped>");
+}
+
 }  // namespace
 }  // namespace grpc_core