});
}
+template <typename Promise>
+auto Http2ClientTransport::UntilTransportClosed(Promise promise) {
+ return Race(Map(transport_closed_latch_.Wait(),
+ [](Empty) {
+ GRPC_HTTP2_CLIENT_DLOG << "Transport closed";
+ return absl::CancelledError("Transport closed");
+ }),
+ std::move(promise));
+}
+
void Http2ClientTransport::PerformOp(grpc_transport_op* op) {
// Notes : Refer : src/core/ext/transport/chaotic_good/client_transport.cc
// Functions : StartConnectivityWatch, StopConnectivityWatch, PerformOp
});
}
+auto Http2ClientTransport::AckPing(uint64_t opaque_data) {
+ bool valid_ping_ack_received = true;
+
+ if (!ping_manager_->AckPing(opaque_data)) {
+ GRPC_HTTP2_CLIENT_DLOG << "Unknown ping response received for ping id="
+ << opaque_data;
+ valid_ping_ack_received = false;
+ }
+
+ return If(
+ // It is possible that the PingRatePolicy may decide to not send a ping
+ // request (in cases like the number of inflight pings is too high).
+ // When this happens, it becomes important to ensure that if a ping ack
+ // is received and there is an "important" outstanding ping request, we
+ // should retry to send it out now.
+ valid_ping_ack_received && ping_manager_->ImportantPingRequested(),
+ [self = RefAsSubclass<Http2ClientTransport>()] {
+ return Map(self->TriggerWriteCycle(), [](const absl::Status status) {
+ return ToHttpOkOrConnError(status);
+ });
+ },
+ [] { return Immediate(Http2Status::Ok()); });
+}
+
void Http2ClientTransport::Orphan() {
GRPC_HTTP2_CLIENT_DLOG << "Http2ClientTransport Orphan Begin";
// Accessing general_party here is not advisable. It may so happen that
///////////////////////////////////////////////////////////////////////////////
// Read Related Promises and Promise Factories
+
auto Http2ClientTransport::EndpointReadSlice(const size_t num_bytes) {
return Map(
endpoint_.ReadSlice(num_bytes),
}
///////////////////////////////////////////////////////////////////////////////
-// Call Spine related operations
+// Http2ClientTransport - Call Spine related operations
auto Http2ClientTransport::CallOutboundLoop(CallHandler call_handler,
RefCountedPtr<Stream> stream,
GRPC_HTTP2_CLIENT_DLOG << "Http2ClientTransport StartCall End";
}
+///////////////////////////////////////////////////////////////////////////////
+// Class PingSystemInterfaceImpl
+
+std::unique_ptr<PingInterface>
+Http2ClientTransport::PingSystemInterfaceImpl::Make(
+ Http2ClientTransport* transport) {
+ return std::make_unique<PingSystemInterfaceImpl>(
+ PingSystemInterfaceImpl(transport));
+}
+
+Promise<absl::Status>
+Http2ClientTransport::PingSystemInterfaceImpl::TriggerWrite() {
+ return transport_->TriggerWriteCycle();
+}
+
+Promise<absl::Status>
+Http2ClientTransport::PingSystemInterfaceImpl::PingTimeout() {
+ GRPC_HTTP2_CLIENT_DLOG << "Ping timeout at time: " << Timestamp::Now();
+
+ // TODO(akshitpatel) : [PH2][P2] : The error code here has been chosen
+ // based on CHTTP2's usage of GRPC_STATUS_UNAVAILABLE (which corresponds
+ // to kRefusedStream). However looking at RFC9113, definition of
+ // kRefusedStream doesn't seem to fit this case. We should revisit this
+ // and update the error code.
+ return Immediate(transport_->HandleError(
+ std::nullopt,
+ Http2Status::Http2ConnectionError(Http2ErrorCode::kRefusedStream,
+ GRPC_CHTTP2_PING_TIMEOUT_STR)));
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// Class KeepAliveInterfaceImpl
+
+std::unique_ptr<KeepAliveInterface>
+Http2ClientTransport::KeepAliveInterfaceImpl::Make(
+ Http2ClientTransport* transport) {
+ return std::make_unique<KeepAliveInterfaceImpl>(
+ KeepAliveInterfaceImpl(transport));
+}
+
+Promise<absl::Status>
+Http2ClientTransport::KeepAliveInterfaceImpl::SendPingAndWaitForAck() {
+ return TrySeq(transport_->TriggerWriteCycle(), [transport = transport_] {
+ return transport->WaitForPingAck();
+ });
+}
+
+Promise<absl::Status>
+Http2ClientTransport::KeepAliveInterfaceImpl::OnKeepAliveTimeout() {
+ GRPC_HTTP2_CLIENT_DLOG << "Keepalive timeout triggered";
+ // TODO(akshitpatel) : [PH2][P2] : The error code here has been chosen
+ // based on CHTTP2's usage of GRPC_STATUS_UNAVAILABLE (which corresponds
+ // to kRefusedStream). However looking at RFC9113, definition of
+ // kRefusedStream doesn't seem to fit this case. We should revisit this
+ // and update the error code.
+ return Immediate(transport_->HandleError(
+ std::nullopt,
+ Http2Status::Http2ConnectionError(Http2ErrorCode::kRefusedStream,
+ GRPC_CHTTP2_KEEPALIVE_TIMEOUT_STR)));
+}
+
+bool Http2ClientTransport::KeepAliveInterfaceImpl::NeedToSendKeepAlivePing() {
+ bool need_to_send_ping = false;
+ {
+ MutexLock lock(&transport_->transport_mutex_);
+ need_to_send_ping = (transport_->keepalive_permit_without_calls_ ||
+ transport_->GetActiveStreamCountLocked() > 0);
+ }
+ return need_to_send_ping;
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// Class GoawayInterfaceImpl
+
+std::unique_ptr<GoawayInterface>
+Http2ClientTransport::GoawayInterfaceImpl::Make(
+ Http2ClientTransport* transport) {
+ return std::make_unique<GoawayInterfaceImpl>(GoawayInterfaceImpl(transport));
+}
+
+uint32_t Http2ClientTransport::GoawayInterfaceImpl::GetLastAcceptedStreamId() {
+ GRPC_DCHECK(false)
+ << "GetLastAcceptedStreamId is not implemented for client transport.";
+ LOG(ERROR) << "GetLastAcceptedStreamId is not implemented for client "
+ "transport.";
+ return 0;
+}
+
+// TODO(akshitpatel) : [PH2][P2] : Eventually there should be a separate ref
+// counted struct/class passed to all the transport promises/members. This
+// will help removing back references from the transport members to
+// transport and greatly simpilfy the cleanup path. Need to do this for
+// PingSystemInterfaceImpl, KeepAliveInterfaceImpl and GoawayInterfaceImpl.
+
} // namespace http2
} // namespace grpc_core
Latch<void> transport_closed_latch_;
template <typename Promise>
- auto UntilTransportClosed(Promise promise) {
- return Race(Map(transport_closed_latch_.Wait(),
- [](Empty) {
- GRPC_HTTP2_CLIENT_DLOG << "Transport closed";
- return absl::CancelledError("Transport closed");
- }),
- std::move(promise));
- }
+ auto UntilTransportClosed(Promise promise);
// Spawns an infallible promise on the given party.
template <typename Factory>
: Duration::Seconds(1);
}
- auto AckPing(uint64_t opaque_data) {
- bool valid_ping_ack_received = true;
-
- if (!ping_manager_->AckPing(opaque_data)) {
- GRPC_HTTP2_CLIENT_DLOG << "Unknown ping response received for ping id="
- << opaque_data;
- valid_ping_ack_received = false;
- }
-
- return If(
- // It is possible that the PingRatePolicy may decide to not send a ping
- // request (in cases like the number of inflight pings is too high).
- // When this happens, it becomes important to ensure that if a ping ack
- // is received and there is an "important" outstanding ping request, we
- // should retry to send it out now.
- valid_ping_ack_received && ping_manager_->ImportantPingRequested(),
- [self = RefAsSubclass<Http2ClientTransport>()] {
- return Map(self->TriggerWriteCycle(), [](const absl::Status status) {
- return ToHttpOkOrConnError(status);
- });
- },
- [] { return Immediate(Http2Status::Ok()); });
- }
+ auto AckPing(uint64_t opaque_data);
class PingSystemInterfaceImpl : public PingInterface {
public:
- static std::unique_ptr<PingInterface> Make(
- Http2ClientTransport* transport) {
- return std::make_unique<PingSystemInterfaceImpl>(
- PingSystemInterfaceImpl(transport));
- }
-
- Promise<absl::Status> TriggerWrite() override {
- return transport_->TriggerWriteCycle();
- }
-
- Promise<absl::Status> PingTimeout() override {
- GRPC_HTTP2_CLIENT_DLOG << "Ping timeout at time: " << Timestamp::Now();
-
- // TODO(akshitpatel) : [PH2][P2] : The error code here has been chosen
- // based on CHTTP2's usage of GRPC_STATUS_UNAVAILABLE (which corresponds
- // to kRefusedStream). However looking at RFC9113, definition of
- // kRefusedStream doesn't seem to fit this case. We should revisit this
- // and update the error code.
- return Immediate(transport_->HandleError(
- std::nullopt,
- Http2Status::Http2ConnectionError(Http2ErrorCode::kRefusedStream,
- GRPC_CHTTP2_PING_TIMEOUT_STR)));
- }
+ static std::unique_ptr<PingInterface> Make(Http2ClientTransport* transport);
+ Promise<absl::Status> TriggerWrite() override;
+ Promise<absl::Status> PingTimeout() override;
private:
- // TODO(akshitpatel) : [PH2][P2] : Eventually there should be a separate ref
- // counted struct/class passed to all the transport promises/members. This
- // will help removing back references from the transport members to
- // transport and greatly simpilfy the cleanup path.
Http2ClientTransport* transport_;
explicit PingSystemInterfaceImpl(Http2ClientTransport* transport)
: transport_(transport) {}
class KeepAliveInterfaceImpl : public KeepAliveInterface {
public:
static std::unique_ptr<KeepAliveInterface> Make(
- Http2ClientTransport* transport) {
- return std::make_unique<KeepAliveInterfaceImpl>(
- KeepAliveInterfaceImpl(transport));
- }
+ Http2ClientTransport* transport);
private:
explicit KeepAliveInterfaceImpl(Http2ClientTransport* transport)
: transport_(transport) {}
- Promise<absl::Status> SendPingAndWaitForAck() override {
- return TrySeq(transport_->TriggerWriteCycle(), [transport = transport_] {
- return transport->WaitForPingAck();
- });
- }
- Promise<absl::Status> OnKeepAliveTimeout() override {
- GRPC_HTTP2_CLIENT_DLOG << "Keepalive timeout triggered";
-
- // TODO(akshitpatel) : [PH2][P2] : The error code here has been chosen
- // based on CHTTP2's usage of GRPC_STATUS_UNAVAILABLE (which corresponds
- // to kRefusedStream). However looking at RFC9113, definition of
- // kRefusedStream doesn't seem to fit this case. We should revisit this
- // and update the error code.
- return Immediate(transport_->HandleError(
- std::nullopt, Http2Status::Http2ConnectionError(
- Http2ErrorCode::kRefusedStream,
- GRPC_CHTTP2_KEEPALIVE_TIMEOUT_STR)));
- }
-
- bool NeedToSendKeepAlivePing() override {
- bool need_to_send_ping = false;
- {
- MutexLock lock(&transport_->transport_mutex_);
- need_to_send_ping = (transport_->keepalive_permit_without_calls_ ||
- transport_->GetActiveStreamCountLocked() > 0);
- }
- return need_to_send_ping;
- }
+ Promise<absl::Status> SendPingAndWaitForAck() override;
+ Promise<absl::Status> OnKeepAliveTimeout() override;
+ bool NeedToSendKeepAlivePing() override;
- // TODO(akshitpatel) : [PH2][P2] : Eventually there should be a separate ref
- // counted struct/class passed to all the transport promises/members. This
- // will help removing back references from the transport members to
- // transport and greatly simpilfy the cleanup path.
Http2ClientTransport* transport_;
};
class GoawayInterfaceImpl : public GoawayInterface {
public:
static std::unique_ptr<GoawayInterface> Make(
- Http2ClientTransport* transport) {
- return std::make_unique<GoawayInterfaceImpl>(
- GoawayInterfaceImpl(transport));
- }
+ Http2ClientTransport* transport);
Promise<absl::Status> SendPingAndWaitForAck() override {
return transport_->ping_manager_->RequestPing(/*on_initiate=*/[] {},
}
void TriggerWriteCycle() override { transport_->TriggerWriteCycle(); }
-
- uint32_t GetLastAcceptedStreamId() override {
- GRPC_DCHECK(false)
- << "GetLastAcceptedStreamId is not implemented for client transport.";
- LOG(ERROR) << "GetLastAcceptedStreamId is not implemented for client "
- "transport.";
- return 0;
- }
+ uint32_t GetLastAcceptedStreamId() override;
private:
explicit GoawayInterfaceImpl(Http2ClientTransport* transport)