// and it is functions. The code will be written iteratively.
// Do not use or edit any of these functions unless you are
// familiar with the PH2 project (Moving chttp2 to promises.)
-// TODO(tjagtap) : [PH2][P3] : Delete this comment when http2
-// rollout begins
+// TODO(tjagtap) : [PH2][P5] : Update the experimental status of the code when
+// http2 rollout is completed.
template <typename Factory>
void Http2ClientTransport::SpawnInfallible(RefCountedPtr<Party> party,
}
}
+absl::Status Http2ClientTransport::HandleError(
+ const std::optional<uint32_t> stream_id, Http2Status status,
+ DebugLocation whence) {
+ auto error_type = status.GetType();
+ GRPC_DCHECK(error_type != Http2Status::Http2ErrorType::kOk);
+
+ if (error_type == Http2Status::Http2ErrorType::kStreamError) {
+ GRPC_HTTP2_CLIENT_ERROR_DLOG << "Stream Error: " << status.DebugString();
+ GRPC_DCHECK(stream_id.has_value());
+ // Passing a cancelled server metadata handle to propagate the error
+ // to the upper layers.
+ BeginCloseStream(
+ LookupStream(stream_id.value()),
+ Http2ErrorCodeToFrameErrorCode(status.GetStreamErrorCode()),
+ CancelledServerMetadataFromStatus(status.GetAbslStreamError()), whence);
+ return absl::OkStatus();
+ } else if (error_type == Http2Status::Http2ErrorType::kConnectionError) {
+ GRPC_HTTP2_CLIENT_ERROR_DLOG << "Connection Error: "
+ << status.DebugString();
+ absl::Status absl_status = status.GetAbslConnectionError();
+ MaybeSpawnCloseTransport(std::move(status), whence);
+ return absl_status;
+ }
+ GPR_UNREACHABLE_CODE(return absl::InternalError("Invalid error type"));
+}
+
// This function MUST be idempotent. This function MUST be called from the
// transport party.
void Http2ClientTransport::CloseStream(RefCountedPtr<Stream> stream,
///////////////////////////////////////////////////////////////////////////////
// Stream Related Operations
+absl::StatusOr<uint32_t> Http2ClientTransport::NextStreamId() {
+ if (next_stream_id_ > GetMaxAllowedStreamId()) {
+ // TODO(tjagtap) : [PH2][P3] : Handle case if transport runs out of stream
+ // ids
+ // RFC9113 : Stream identifiers cannot be reused. Long-lived connections
+ // can result in an endpoint exhausting the available range of stream
+ // identifiers. A client that is unable to establish a new stream
+ // identifier can establish a new connection for new streams. A server
+ // that is unable to establish a new stream identifier can send a GOAWAY
+ // frame so that the client is forced to open a new connection for new
+ // streams.
+ return absl::ResourceExhaustedError("No more stream ids available");
+ }
+ // TODO(akshitpatel) : [PH2][P3] : There is a channel arg to delay
+ // starting new streams instead of failing them. This needs to be
+ // implemented.
+ {
+ MutexLock lock(&transport_mutex_);
+ if (GetActiveStreamCountLocked() >=
+ settings_->peer().max_concurrent_streams()) {
+ return absl::ResourceExhaustedError("Reached max concurrent streams");
+ }
+ }
+
+ // RFC9113 : Streams initiated by a client MUST use odd-numbered stream
+ // identifiers.
+ uint32_t new_stream_id = std::exchange(next_stream_id_, next_stream_id_ + 2);
+ if (GPR_UNLIKELY(next_stream_id_ > GetMaxAllowedStreamId())) {
+ ReportDisconnection(
+ absl::ResourceExhaustedError("Transport Stream IDs exhausted"),
+ {}, // TODO(tjagtap) : [PH2][P2] : Report better disconnect info.
+ "no_more_stream_ids");
+ }
+ return new_stream_id;
+}
+
+absl::Status Http2ClientTransport::MaybeAddStreamToWritableStreamList(
+ const RefCountedPtr<Stream> stream,
+ const StreamDataQueue<ClientMetadataHandle>::StreamWritabilityUpdate
+ result) {
+ if (result.became_writable) {
+ GRPC_HTTP2_CLIENT_DLOG
+ << "Http2ClientTransport MaybeAddStreamToWritableStreamList Stream id: "
+ << stream->GetStreamId() << " became writable";
+ absl::Status status =
+ writable_stream_list_.Enqueue(stream, result.priority);
+ if (!status.ok()) {
+ return HandleError(
+ std::nullopt,
+ Http2Status::Http2ConnectionError(
+ Http2ErrorCode::kRefusedStream,
+ "Failed to enqueue stream to writable stream list"));
+ }
+ }
+ return absl::OkStatus();
+}
+
RefCountedPtr<Stream> Http2ClientTransport::LookupStream(uint32_t stream_id) {
MutexLock lock(&transport_mutex_);
auto it = stream_list_.find(stream_id);
GRPC_HTTP2_CLIENT_DLOG << "Http2ClientTransport StartCall End";
}
+///////////////////////////////////////////////////////////////////////////////
+// Http2ClientTransport - Test Only Functions
+
+int64_t Http2ClientTransport::TestOnlyTransportFlowControlWindow() {
+ return flow_control_.remote_window();
+}
+
+int64_t Http2ClientTransport::TestOnlyGetStreamFlowControlWindow(
+ const uint32_t stream_id) {
+ RefCountedPtr<Stream> stream = LookupStream(stream_id);
+ if (stream == nullptr) {
+ return -1;
+ }
+ return stream->flow_control.remote_window_delta();
+}
+
///////////////////////////////////////////////////////////////////////////////
// Class PingSystemInterfaceImpl
// Experimental : The code will be written iteratively.
// Do not use or edit any of these functions unless you are
// familiar with the PH2 project (Moving chttp2 to promises.)
-// TODO(tjagtap) : [PH2][P3] : Update the experimental status of the code before
-// http2 rollout begins.
+// TODO(tjagtap) : [PH2][P5] : Update the experimental status of the code when
+// http2 rollout is completed.
class Http2ClientTransport final : public ClientTransport,
public channelz::DataSource {
// TODO(akshitpatel) [PH2][P1] : Functions that need a mutex to be held should
ServerTransport* server_transport() override { return nullptr; }
absl::string_view GetTransportName() const override { return "http2"; }
- // TODO(tjagtap) : [PH2][EXT] : These can be removed when event engine rollout
- // is complete.
+ // TODO(tjagtap) : [PH2][EXT] : Removed after event engine rollout
void SetPollset(grpc_stream*, grpc_pollset*) override {}
void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {}
void TestOnlySpawnPromise(absl::string_view name, Factory&& factory) {
general_party_->Spawn(name, std::forward<Factory>(factory), [](Empty) {});
}
-
- int64_t TestOnlyTransportFlowControlWindow() {
- return flow_control_.remote_window();
- }
-
- int64_t TestOnlyGetStreamFlowControlWindow(const uint32_t stream_id) {
- RefCountedPtr<Stream> stream = LookupStream(stream_id);
- if (stream == nullptr) {
- return -1;
- }
- return stream->flow_control.remote_window_delta();
- }
+ int64_t TestOnlyTransportFlowControlWindow();
+ int64_t TestOnlyGetStreamFlowControlWindow(const uint32_t stream_id);
bool AreTransportFlowControlTokensAvailable() {
return flow_control_.remote_window() > 0;
// Returns the next stream id. If the next stream id is not available, it
// returns std::nullopt. MUST be called from the transport party.
- absl::StatusOr<uint32_t> NextStreamId() {
- if (next_stream_id_ > GetMaxAllowedStreamId()) {
- // TODO(tjagtap) : [PH2][P3] : Handle case if transport runs out of stream
- // ids
- // RFC9113 : Stream identifiers cannot be reused. Long-lived connections
- // can result in an endpoint exhausting the available range of stream
- // identifiers. A client that is unable to establish a new stream
- // identifier can establish a new connection for new streams. A server
- // that is unable to establish a new stream identifier can send a GOAWAY
- // frame so that the client is forced to open a new connection for new
- // streams.
- return absl::ResourceExhaustedError("No more stream ids available");
- }
- // TODO(akshitpatel) : [PH2][P3] : There is a channel arg to delay
- // starting new streams instead of failing them. This needs to be
- // implemented.
- {
- MutexLock lock(&transport_mutex_);
- if (GetActiveStreamCountLocked() >=
- settings_->peer().max_concurrent_streams()) {
- return absl::ResourceExhaustedError("Reached max concurrent streams");
- }
- }
-
- // RFC9113 : Streams initiated by a client MUST use odd-numbered stream
- // identifiers.
- uint32_t new_stream_id =
- std::exchange(next_stream_id_, next_stream_id_ + 2);
- if (GPR_UNLIKELY(next_stream_id_ > GetMaxAllowedStreamId())) {
- ReportDisconnection(
- absl::ResourceExhaustedError("Transport Stream IDs exhausted"),
- {}, // TODO(tjagtap) : [PH2][P2] : Report better disconnect info.
- "no_more_stream_ids");
- }
- return new_stream_id;
- }
+ absl::StatusOr<uint32_t> NextStreamId();
// Returns the next stream id without incrementing it. MUST be called from the
// transport party.
// If the error is a connection error, it closes the transport and returns the
// corresponding (failed) absl status.
absl::Status HandleError(const std::optional<uint32_t> stream_id,
- Http2Status status, DebugLocation whence = {}) {
- auto error_type = status.GetType();
- GRPC_DCHECK(error_type != Http2Status::Http2ErrorType::kOk);
-
- if (error_type == Http2Status::Http2ErrorType::kStreamError) {
- GRPC_HTTP2_CLIENT_ERROR_DLOG << "Stream Error: " << status.DebugString();
- GRPC_DCHECK(stream_id.has_value());
- // Passing a cancelled server metadata handle to propagate the error
- // to the upper layers.
- BeginCloseStream(
- LookupStream(stream_id.value()),
- Http2ErrorCodeToFrameErrorCode(status.GetStreamErrorCode()),
- CancelledServerMetadataFromStatus(status.GetAbslStreamError()),
- whence);
- return absl::OkStatus();
- } else if (error_type == Http2Status::Http2ErrorType::kConnectionError) {
- GRPC_HTTP2_CLIENT_ERROR_DLOG << "Connection Error: "
- << status.DebugString();
- absl::Status absl_status = status.GetAbslConnectionError();
- MaybeSpawnCloseTransport(std::move(status), whence);
- return absl_status;
- }
- GPR_UNREACHABLE_CODE(return absl::InternalError("Invalid error type"));
- }
+ Http2Status status, DebugLocation whence = {});
bool should_reset_ping_clock_;
bool is_first_write_;
absl::Status MaybeAddStreamToWritableStreamList(
const RefCountedPtr<Stream> stream,
const StreamDataQueue<ClientMetadataHandle>::StreamWritabilityUpdate
- result) {
- if (result.became_writable) {
- GRPC_HTTP2_CLIENT_DLOG
- << "Http2ClientTransport MaybeAddStreamToWritableStreamList "
- " Stream id: "
- << stream->GetStreamId() << " became writable";
- absl::Status status =
- writable_stream_list_.Enqueue(stream, result.priority);
- if (!status.ok()) {
- return HandleError(
- std::nullopt,
- Http2Status::Http2ConnectionError(
- Http2ErrorCode::kRefusedStream,
- "Failed to enqueue stream to writable stream list"));
- }
- }
- return absl::OkStatus();
- }
+ result);
+
bool SetOnDone(CallHandler call_handler, RefCountedPtr<Stream> stream);
absl::StatusOr<std::vector<Http2Frame>> DequeueStreamFrames(
RefCountedPtr<Stream> stream);