]> git.feebdaed.xyz Git - 0xmirror/grpc.git/commitdiff
[PH2][E2E] Fix a race condition in stream_data_queue
authorAkshit Patel <akshitpatel@google.com>
Wed, 3 Dec 2025 02:54:40 +0000 (18:54 -0800)
committerCopybara-Service <copybara-worker@google.com>
Wed, 3 Dec 2025 02:56:44 +0000 (18:56 -0800)
The `stream_id_` is currently accessed both in Enqueue and Dequeue operations resulting in the race. Technically, in the Enqueue flow `stream_id_` is only used for logs which is redundant and hence being removed.

PiperOrigin-RevId: 839517493

src/core/ext/transport/chttp2/transport/stream_data_queue.h

index 6695c056f48da47df8d3b0ca214e5a70955480d2..0c24d576425be6aeb789b9ea3d6a6c9480549369 100644 (file)
@@ -266,8 +266,8 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
         QueueEntry{InitialMetadataType{std::move(metadata)}}, /*tokens=*/0);
     if (GPR_UNLIKELY(!result.ok())) {
       GRPC_STREAM_DATA_QUEUE_DEBUG
-          << "Immediate enqueueing initial metadata for stream " << stream_id_
-          << " failed with status: " << result.status();
+          << "Immediate enqueueing initial metadata failed with status: "
+          << result.status();
       return result.status();
     }
     return UpdateWritableStateAndPriorityEnqueueLocked(
@@ -286,8 +286,7 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
     GRPC_DCHECK(!is_trailing_metadata_or_half_close_queued_);
 
     if (GPR_UNLIKELY(IsEnqueueClosed())) {
-      GRPC_STREAM_DATA_QUEUE_DEBUG << "Enqueue closed for stream "
-                                   << stream_id_;
+      GRPC_STREAM_DATA_QUEUE_DEBUG << "Enqueue closed.";
       return EnqueueResult{/*became_writable=*/false,
                            WritableStreamPriority::kStreamClosed};
     }
@@ -297,8 +296,8 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
         QueueEntry{TrailingMetadataType{std::move(metadata)}}, /*tokens=*/0);
     if (GPR_UNLIKELY(!result.ok())) {
       GRPC_STREAM_DATA_QUEUE_DEBUG
-          << "Immediate enqueueing trailing metadata for stream " << stream_id_
-          << " failed with status: " << result.status();
+          << "Immediate enqueueing trailing metadata failed with status: "
+          << result.status();
       return result.status();
     }
     return UpdateWritableStateAndPriorityEnqueueLocked(
@@ -325,17 +324,15 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
             tokens]() mutable -> Poll<absl::StatusOr<EnqueueResult>> {
       MutexLock lock(&self->mu_);
       if (GPR_UNLIKELY(self->IsEnqueueClosed())) {
-        GRPC_STREAM_DATA_QUEUE_DEBUG << "Enqueue closed for stream "
-                                     << self->stream_id_;
+        GRPC_STREAM_DATA_QUEUE_DEBUG << "Enqueue closed";
         return EnqueueResult{/*became_writable=*/false,
                              WritableStreamPriority::kStreamClosed};
       }
       Poll<bool> result = self->queue_.Enqueue(entry, tokens);
       if (result.ready()) {
-        GRPC_STREAM_DATA_QUEUE_DEBUG << "Enqueued message for stream "
-                                     << self->stream_id_
-                                     << " with tokens: " << tokens
-                                     << "became_non_empty: " << result.value();
+        GRPC_STREAM_DATA_QUEUE_DEBUG
+            << "Enqueued message with tokens: " << tokens
+            << "became_non_empty: " << result.value();
 
         return self->UpdateWritableStateAndPriorityEnqueueLocked(
             /*became_non_empty=*/result.value(),
@@ -357,8 +354,8 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
     if (GPR_UNLIKELY(IsEnqueueClosed() ||
                      is_trailing_metadata_or_half_close_queued_)) {
       GRPC_STREAM_DATA_QUEUE_DEBUG
-          << "Enqueue closed or trailing metadata/half close queued for stream "
-          << stream_id_ << " is_trailing_metadata_or_half_close_queued_ = "
+          << "Enqueue closed or trailing metadata/half close queued "
+          << " is_trailing_metadata_or_half_close_queued_ = "
           << is_trailing_metadata_or_half_close_queued_;
       return EnqueueResult{/*became_writable=*/false,
                            WritableStreamPriority::kStreamClosed};
@@ -369,8 +366,8 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
         queue_.ImmediateEnqueue(QueueEntry{HalfClosed{}}, /*tokens=*/0);
     if (GPR_UNLIKELY(!result.ok())) {
       GRPC_STREAM_DATA_QUEUE_DEBUG
-          << "Immediate enqueueing half closed for stream " << stream_id_
-          << " failed with status: " << result.status();
+          << "Immediate enqueueing half closed failed with status: "
+          << result.status();
       return result.status();
     }
     return UpdateWritableStateAndPriorityEnqueueLocked(
@@ -388,15 +385,13 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
     // This can happen when the transport tries to close the stream and the
     // stream is cancelled from the call stack.
     if (GPR_UNLIKELY(IsEnqueueClosed())) {
-      GRPC_STREAM_DATA_QUEUE_DEBUG << "Enqueue closed for stream "
-                                   << stream_id_;
+      GRPC_STREAM_DATA_QUEUE_DEBUG << "Enqueue closed";
       return EnqueueResult{/*became_writable=*/false,
                            WritableStreamPriority::kStreamClosed};
     }
 
     GRPC_STREAM_DATA_QUEUE_DEBUG
-        << "Immediate enqueueing reset stream for stream " << stream_id_
-        << " with error code: " << error_code;
+        << "Immediate enqueueing reset stream with error code: " << error_code;
     reset_stream_state_ = RstStreamState::kQueued;
     reset_stream_error_code_ = error_code;
 
@@ -469,8 +464,7 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
                               const bool can_send_reset_stream) {
     MutexLock lock(&mu_);
     GRPC_STREAM_DATA_QUEUE_DEBUG
-        << "Dequeueing frames for stream " << stream_id_
-        << " Max fc tokens: " << max_fc_tokens
+        << "Dequeueing frames. Max fc tokens: " << max_fc_tokens
         << " Max frame length: " << max_frame_length
         << " Message disassembler buffered length: "
         << message_disassembler_.GetBufferedLength()
@@ -525,8 +519,8 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
   bool ReceivedFlowControlWindowUpdate(const uint32_t stream_fc_tokens) {
     MutexLock lock(&mu_);
     GRPC_STREAM_DATA_QUEUE_DEBUG
-        << "Received flow control window update for stream " << stream_id_
-        << " stream_fc_tokens: " << stream_fc_tokens;
+        << "Received flow control window update. stream_fc_tokens: "
+        << stream_fc_tokens;
     return UpdateWritableStateDequeueLocked(stream_fc_tokens);
   }
 
@@ -722,16 +716,14 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
     if (!is_writable_ && became_non_empty) {
       is_writable_ = true;
       GRPC_STREAM_DATA_QUEUE_DEBUG
-          << "UpdateWritableStateLocked for stream id: " << stream_id_
-          << " became writable with priority: "
+          << "UpdateWritableStateLocked became writable with priority: "
           << GetWritableStreamPriorityString(priority_);
       return EnqueueResult{/*became_writable=*/true, priority_};
     }
 
     GRPC_STREAM_DATA_QUEUE_DEBUG
-        << "UpdateWritableStateAndPriorityEnqueueLocked for stream id: "
-        << stream_id_
-        << " with priority: " << GetWritableStreamPriorityString(priority_)
+        << "UpdateWritableStateAndPriorityEnqueueLocked with priority: "
+        << GetWritableStreamPriorityString(priority_)
         << " is_writable: " << is_writable_;
     return EnqueueResult{/*became_writable=*/false, priority_};
   }
@@ -761,10 +753,9 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
       is_writable_ = (available_stream_fc_tokens > 0);
     }
 
-    GRPC_STREAM_DATA_QUEUE_DEBUG
-        << "UpdateWritableStateLocked for stream id: " << stream_id_
-        << " with priority: " << GetWritableStreamPriorityString(priority_)
-        << " is_writable: " << is_writable_;
+    GRPC_STREAM_DATA_QUEUE_DEBUG << "UpdateWritableStateLocked with priority: "
+                                 << GetWritableStreamPriorityString(priority_)
+                                 << " is_writable: " << is_writable_;
     return is_writable_;
   }
 
@@ -781,8 +772,7 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
     switch (reset_stream_state_) {
       case RstStreamState::kDequeued:
         GRPC_STREAM_DATA_QUEUE_DEBUG
-            << "Reset stream is already dequeued for stream " << stream_id_
-            << ". Returning empty frames.";
+            << "Reset stream is already dequeued. Returning empty frames.";
         GRPC_DCHECK(queue_.IsEmpty());
         is_writable_ = false;
         return DequeueResult{
@@ -792,8 +782,7 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
       case RstStreamState::kQueued: {
         GRPC_STREAM_DATA_QUEUE_DEBUG
             << "Reset stream is queued. Skipping all frames (if any) for "
-               "dequeuing "
-            << stream_id_;
+               "dequeuing.";
         is_writable_ = false;
         std::vector<Http2Frame> frames;
         uint8_t flags = 0u;
@@ -828,8 +817,7 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
       case RstStreamState::kDequeued:
         // This can happen when the transport tries to close the stream and the
         // stream is cancelled from the call stack.
-        GRPC_STREAM_DATA_QUEUE_DEBUG
-            << "Reset stream already queued for stream " << stream_id_;
+        GRPC_STREAM_DATA_QUEUE_DEBUG << "Reset stream already queued.";
         return true;
       default:
         GRPC_CHECK(false) << "Invalid reset stream state: "