From: Tanvi Jagtap Date: Wed, 24 Dec 2025 06:36:51 +0000 (-0800) Subject: [PH2][Trivial] Tidy up client code - Part 2 X-Git-Url: https://git.feebdaed.xyz/?a=commitdiff_plain;h=a2baa27f2df4ceeafe918537b80331d2de313fa1;p=0xmirror%2Fgrpc.git [PH2][Trivial] Tidy up client code - Part 2 Moving code from header to cc file. Trying to manage the class Http2ClientTransport which has become 500+ lines and hard to work with. PiperOrigin-RevId: 848425647 --- diff --git a/src/core/ext/transport/chttp2/transport/http2_client_transport.cc b/src/core/ext/transport/chttp2/transport/http2_client_transport.cc index e3bf2a5531..f81e0972fe 100644 --- a/src/core/ext/transport/chttp2/transport/http2_client_transport.cc +++ b/src/core/ext/transport/chttp2/transport/http2_client_transport.cc @@ -142,6 +142,16 @@ void Http2ClientTransport::SpawnGuardedTransportParty(absl::string_view name, }); } +template +auto Http2ClientTransport::UntilTransportClosed(Promise promise) { + return Race(Map(transport_closed_latch_.Wait(), + [](Empty) { + GRPC_HTTP2_CLIENT_DLOG << "Transport closed"; + return absl::CancelledError("Transport closed"); + }), + std::move(promise)); +} + void Http2ClientTransport::PerformOp(grpc_transport_op* op) { // Notes : Refer : src/core/ext/transport/chaotic_good/client_transport.cc // Functions : StartConnectivityWatch, StopConnectivityWatch, PerformOp @@ -230,6 +240,30 @@ void Http2ClientTransport::NotifyStateWatcherOnDisconnectLocked( }); } +auto Http2ClientTransport::AckPing(uint64_t opaque_data) { + bool valid_ping_ack_received = true; + + if (!ping_manager_->AckPing(opaque_data)) { + GRPC_HTTP2_CLIENT_DLOG << "Unknown ping response received for ping id=" + << opaque_data; + valid_ping_ack_received = false; + } + + return If( + // It is possible that the PingRatePolicy may decide to not send a ping + // request (in cases like the number of inflight pings is too high). + // When this happens, it becomes important to ensure that if a ping ack + // is received and there is an "important" outstanding ping request, we + // should retry to send it out now. + valid_ping_ack_received && ping_manager_->ImportantPingRequested(), + [self = RefAsSubclass()] { + return Map(self->TriggerWriteCycle(), [](const absl::Status status) { + return ToHttpOkOrConnError(status); + }); + }, + [] { return Immediate(Http2Status::Ok()); }); +} + void Http2ClientTransport::Orphan() { GRPC_HTTP2_CLIENT_DLOG << "Http2ClientTransport Orphan Begin"; // Accessing general_party here is not advisable. It may so happen that @@ -802,6 +836,7 @@ Http2Status Http2ClientTransport::ParseAndDiscardHeaders( /////////////////////////////////////////////////////////////////////////////// // Read Related Promises and Promise Factories + auto Http2ClientTransport::EndpointReadSlice(const size_t num_bytes) { return Map( endpoint_.ReadSlice(num_bytes), @@ -1962,7 +1997,7 @@ void Http2ClientTransport::SetMaxAllowedStreamId( } /////////////////////////////////////////////////////////////////////////////// -// Call Spine related operations +// Http2ClientTransport - Call Spine related operations auto Http2ClientTransport::CallOutboundLoop(CallHandler call_handler, RefCountedPtr stream, @@ -2072,5 +2107,99 @@ void Http2ClientTransport::StartCall(CallHandler call_handler) { GRPC_HTTP2_CLIENT_DLOG << "Http2ClientTransport StartCall End"; } +/////////////////////////////////////////////////////////////////////////////// +// Class PingSystemInterfaceImpl + +std::unique_ptr +Http2ClientTransport::PingSystemInterfaceImpl::Make( + Http2ClientTransport* transport) { + return std::make_unique( + PingSystemInterfaceImpl(transport)); +} + +Promise +Http2ClientTransport::PingSystemInterfaceImpl::TriggerWrite() { + return transport_->TriggerWriteCycle(); +} + +Promise +Http2ClientTransport::PingSystemInterfaceImpl::PingTimeout() { + GRPC_HTTP2_CLIENT_DLOG << "Ping timeout at time: " << Timestamp::Now(); + + // TODO(akshitpatel) : [PH2][P2] : The error code here has been chosen + // based on CHTTP2's usage of GRPC_STATUS_UNAVAILABLE (which corresponds + // to kRefusedStream). However looking at RFC9113, definition of + // kRefusedStream doesn't seem to fit this case. We should revisit this + // and update the error code. + return Immediate(transport_->HandleError( + std::nullopt, + Http2Status::Http2ConnectionError(Http2ErrorCode::kRefusedStream, + GRPC_CHTTP2_PING_TIMEOUT_STR))); +} + +/////////////////////////////////////////////////////////////////////////////// +// Class KeepAliveInterfaceImpl + +std::unique_ptr +Http2ClientTransport::KeepAliveInterfaceImpl::Make( + Http2ClientTransport* transport) { + return std::make_unique( + KeepAliveInterfaceImpl(transport)); +} + +Promise +Http2ClientTransport::KeepAliveInterfaceImpl::SendPingAndWaitForAck() { + return TrySeq(transport_->TriggerWriteCycle(), [transport = transport_] { + return transport->WaitForPingAck(); + }); +} + +Promise +Http2ClientTransport::KeepAliveInterfaceImpl::OnKeepAliveTimeout() { + GRPC_HTTP2_CLIENT_DLOG << "Keepalive timeout triggered"; + // TODO(akshitpatel) : [PH2][P2] : The error code here has been chosen + // based on CHTTP2's usage of GRPC_STATUS_UNAVAILABLE (which corresponds + // to kRefusedStream). However looking at RFC9113, definition of + // kRefusedStream doesn't seem to fit this case. We should revisit this + // and update the error code. + return Immediate(transport_->HandleError( + std::nullopt, + Http2Status::Http2ConnectionError(Http2ErrorCode::kRefusedStream, + GRPC_CHTTP2_KEEPALIVE_TIMEOUT_STR))); +} + +bool Http2ClientTransport::KeepAliveInterfaceImpl::NeedToSendKeepAlivePing() { + bool need_to_send_ping = false; + { + MutexLock lock(&transport_->transport_mutex_); + need_to_send_ping = (transport_->keepalive_permit_without_calls_ || + transport_->GetActiveStreamCountLocked() > 0); + } + return need_to_send_ping; +} + +/////////////////////////////////////////////////////////////////////////////// +// Class GoawayInterfaceImpl + +std::unique_ptr +Http2ClientTransport::GoawayInterfaceImpl::Make( + Http2ClientTransport* transport) { + return std::make_unique(GoawayInterfaceImpl(transport)); +} + +uint32_t Http2ClientTransport::GoawayInterfaceImpl::GetLastAcceptedStreamId() { + GRPC_DCHECK(false) + << "GetLastAcceptedStreamId is not implemented for client transport."; + LOG(ERROR) << "GetLastAcceptedStreamId is not implemented for client " + "transport."; + return 0; +} + +// TODO(akshitpatel) : [PH2][P2] : Eventually there should be a separate ref +// counted struct/class passed to all the transport promises/members. This +// will help removing back references from the transport members to +// transport and greatly simpilfy the cleanup path. Need to do this for +// PingSystemInterfaceImpl, KeepAliveInterfaceImpl and GoawayInterfaceImpl. + } // namespace http2 } // namespace grpc_core diff --git a/src/core/ext/transport/chttp2/transport/http2_client_transport.h b/src/core/ext/transport/chttp2/transport/http2_client_transport.h index b65ec1eec8..a1588fa54b 100644 --- a/src/core/ext/transport/chttp2/transport/http2_client_transport.h +++ b/src/core/ext/transport/chttp2/transport/http2_client_transport.h @@ -363,14 +363,7 @@ class Http2ClientTransport final : public ClientTransport, Latch transport_closed_latch_; template - auto UntilTransportClosed(Promise promise) { - return Race(Map(transport_closed_latch_.Wait(), - [](Empty) { - GRPC_HTTP2_CLIENT_DLOG << "Transport closed"; - return absl::CancelledError("Transport closed"); - }), - std::move(promise)); - } + auto UntilTransportClosed(Promise promise); // Spawns an infallible promise on the given party. template @@ -525,61 +518,15 @@ class Http2ClientTransport final : public ClientTransport, : Duration::Seconds(1); } - auto AckPing(uint64_t opaque_data) { - bool valid_ping_ack_received = true; - - if (!ping_manager_->AckPing(opaque_data)) { - GRPC_HTTP2_CLIENT_DLOG << "Unknown ping response received for ping id=" - << opaque_data; - valid_ping_ack_received = false; - } - - return If( - // It is possible that the PingRatePolicy may decide to not send a ping - // request (in cases like the number of inflight pings is too high). - // When this happens, it becomes important to ensure that if a ping ack - // is received and there is an "important" outstanding ping request, we - // should retry to send it out now. - valid_ping_ack_received && ping_manager_->ImportantPingRequested(), - [self = RefAsSubclass()] { - return Map(self->TriggerWriteCycle(), [](const absl::Status status) { - return ToHttpOkOrConnError(status); - }); - }, - [] { return Immediate(Http2Status::Ok()); }); - } + auto AckPing(uint64_t opaque_data); class PingSystemInterfaceImpl : public PingInterface { public: - static std::unique_ptr Make( - Http2ClientTransport* transport) { - return std::make_unique( - PingSystemInterfaceImpl(transport)); - } - - Promise TriggerWrite() override { - return transport_->TriggerWriteCycle(); - } - - Promise PingTimeout() override { - GRPC_HTTP2_CLIENT_DLOG << "Ping timeout at time: " << Timestamp::Now(); - - // TODO(akshitpatel) : [PH2][P2] : The error code here has been chosen - // based on CHTTP2's usage of GRPC_STATUS_UNAVAILABLE (which corresponds - // to kRefusedStream). However looking at RFC9113, definition of - // kRefusedStream doesn't seem to fit this case. We should revisit this - // and update the error code. - return Immediate(transport_->HandleError( - std::nullopt, - Http2Status::Http2ConnectionError(Http2ErrorCode::kRefusedStream, - GRPC_CHTTP2_PING_TIMEOUT_STR))); - } + static std::unique_ptr Make(Http2ClientTransport* transport); + Promise TriggerWrite() override; + Promise PingTimeout() override; private: - // TODO(akshitpatel) : [PH2][P2] : Eventually there should be a separate ref - // counted struct/class passed to all the transport promises/members. This - // will help removing back references from the transport members to - // transport and greatly simpilfy the cleanup path. Http2ClientTransport* transport_; explicit PingSystemInterfaceImpl(Http2ClientTransport* transport) : transport_(transport) {} @@ -588,57 +535,22 @@ class Http2ClientTransport final : public ClientTransport, class KeepAliveInterfaceImpl : public KeepAliveInterface { public: static std::unique_ptr Make( - Http2ClientTransport* transport) { - return std::make_unique( - KeepAliveInterfaceImpl(transport)); - } + Http2ClientTransport* transport); private: explicit KeepAliveInterfaceImpl(Http2ClientTransport* transport) : transport_(transport) {} - Promise SendPingAndWaitForAck() override { - return TrySeq(transport_->TriggerWriteCycle(), [transport = transport_] { - return transport->WaitForPingAck(); - }); - } - Promise OnKeepAliveTimeout() override { - GRPC_HTTP2_CLIENT_DLOG << "Keepalive timeout triggered"; - - // TODO(akshitpatel) : [PH2][P2] : The error code here has been chosen - // based on CHTTP2's usage of GRPC_STATUS_UNAVAILABLE (which corresponds - // to kRefusedStream). However looking at RFC9113, definition of - // kRefusedStream doesn't seem to fit this case. We should revisit this - // and update the error code. - return Immediate(transport_->HandleError( - std::nullopt, Http2Status::Http2ConnectionError( - Http2ErrorCode::kRefusedStream, - GRPC_CHTTP2_KEEPALIVE_TIMEOUT_STR))); - } - - bool NeedToSendKeepAlivePing() override { - bool need_to_send_ping = false; - { - MutexLock lock(&transport_->transport_mutex_); - need_to_send_ping = (transport_->keepalive_permit_without_calls_ || - transport_->GetActiveStreamCountLocked() > 0); - } - return need_to_send_ping; - } + Promise SendPingAndWaitForAck() override; + Promise OnKeepAliveTimeout() override; + bool NeedToSendKeepAlivePing() override; - // TODO(akshitpatel) : [PH2][P2] : Eventually there should be a separate ref - // counted struct/class passed to all the transport promises/members. This - // will help removing back references from the transport members to - // transport and greatly simpilfy the cleanup path. Http2ClientTransport* transport_; }; class GoawayInterfaceImpl : public GoawayInterface { public: static std::unique_ptr Make( - Http2ClientTransport* transport) { - return std::make_unique( - GoawayInterfaceImpl(transport)); - } + Http2ClientTransport* transport); Promise SendPingAndWaitForAck() override { return transport_->ping_manager_->RequestPing(/*on_initiate=*/[] {}, @@ -646,14 +558,7 @@ class Http2ClientTransport final : public ClientTransport, } void TriggerWriteCycle() override { transport_->TriggerWriteCycle(); } - - uint32_t GetLastAcceptedStreamId() override { - GRPC_DCHECK(false) - << "GetLastAcceptedStreamId is not implemented for client transport."; - LOG(ERROR) << "GetLastAcceptedStreamId is not implemented for client " - "transport."; - return 0; - } + uint32_t GetLastAcceptedStreamId() override; private: explicit GoawayInterfaceImpl(Http2ClientTransport* transport)