Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(http2): add HTTP2 keep-alive support for client and server #2151

Merged
merged 1 commit into from
Mar 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use http::HeaderMap;
use http_body::{Body as HttpBody, SizeHint};

use crate::common::{task, watch, Future, Never, Pin, Poll};
use crate::proto::h2::bdp;
use crate::proto::h2::ping;
use crate::proto::DecodedLength;
use crate::upgrade::OnUpgrade;

Expand All @@ -38,7 +38,7 @@ enum Kind {
rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
},
H2 {
bdp: bdp::Sampler,
ping: ping::Recorder,
content_length: DecodedLength,
recv: h2::RecvStream,
},
Expand Down Expand Up @@ -180,10 +180,10 @@ impl Body {
pub(crate) fn h2(
recv: h2::RecvStream,
content_length: DecodedLength,
bdp: bdp::Sampler,
ping: ping::Recorder,
) -> Self {
let body = Body::new(Kind::H2 {
bdp,
ping,
content_length,
recv,
});
Expand Down Expand Up @@ -265,14 +265,14 @@ impl Body {
}
}
Kind::H2 {
ref bdp,
ref ping,
recv: ref mut h2,
content_length: ref mut len,
} => match ready!(h2.poll_data(cx)) {
Some(Ok(bytes)) => {
let _ = h2.flow_control().release_capacity(bytes.len());
len.sub_if(bytes.len() as u64);
bdp.sample(bytes.len());
ping.record_data(bytes.len());
Poll::Ready(Some(Ok(bytes)))
}
Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
Expand Down Expand Up @@ -321,9 +321,14 @@ impl HttpBody for Body {
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
match self.kind {
Kind::H2 {
recv: ref mut h2, ..
recv: ref mut h2,
ref ping,
..
} => match ready!(h2.poll_trailers(cx)) {
Ok(t) => Poll::Ready(Ok(t)),
Ok(t) => {
ping.record_non_data();
Poll::Ready(Ok(t))
}
Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))),
},
_ => Poll::Ready(Ok(None)),
Expand Down
56 changes: 56 additions & 0 deletions src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
//!
//! If don't have need to manage connections yourself, consider using the
//! higher-level [Client](super) API.

use std::fmt;
use std::mem;
use std::sync::Arc;
#[cfg(feature = "runtime")]
use std::time::Duration;

use bytes::Bytes;
use futures_util::future::{self, Either, FutureExt as _};
Expand Down Expand Up @@ -517,6 +520,59 @@ impl Builder {
self
}

/// Sets an interval for HTTP2 Ping frames should be sent to keep a
/// connection alive.
///
/// Pass `None` to disable HTTP2 keep-alive.
///
/// Default is currently disabled.
///
/// # Cargo Feature
///
/// Requires the `runtime` cargo feature to be enabled.
#[cfg(feature = "runtime")]
pub fn http2_keep_alive_interval(
&mut self,
interval: impl Into<Option<Duration>>,
) -> &mut Self {
self.h2_builder.keep_alive_interval = interval.into();
self
}

/// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
///
/// If the ping is not acknowledged within the timeout, the connection will
/// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
///
/// Default is 20 seconds.
///
/// # Cargo Feature
///
/// Requires the `runtime` cargo feature to be enabled.
#[cfg(feature = "runtime")]
pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
self.h2_builder.keep_alive_timeout = timeout;
self
}

/// Sets whether HTTP2 keep-alive should apply while the connection is idle.
///
/// If disabled, keep-alive pings are only sent while there are open
/// request/responses streams. If enabled, pings are also sent when no
/// streams are active. Does nothing if `http2_keep_alive_interval` is
/// disabled.
///
/// Default is `false`.
///
/// # Cargo Feature
///
/// Requires the `runtime` cargo feature to be enabled.
#[cfg(feature = "runtime")]
pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
self.h2_builder.keep_alive_while_idle = enabled;
self
}

/// Constructs a connection with the configured options and IO.
pub fn handshake<T, B>(
&self,
Expand Down
54 changes: 54 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,7 @@ impl Builder {
self.pool_config.max_idle_per_host = max_idle;
self
}

// HTTP/1 options

/// Set whether HTTP/1 connections should try to use vectored writes,
Expand Down Expand Up @@ -1036,6 +1037,59 @@ impl Builder {
self
}

/// Sets an interval for HTTP2 Ping frames should be sent to keep a
/// connection alive.
///
/// Pass `None` to disable HTTP2 keep-alive.
///
/// Default is currently disabled.
///
/// # Cargo Feature
///
/// Requires the `runtime` cargo feature to be enabled.
#[cfg(feature = "runtime")]
pub fn http2_keep_alive_interval(
&mut self,
interval: impl Into<Option<Duration>>,
) -> &mut Self {
self.conn_builder.http2_keep_alive_interval(interval);
self
}

/// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
///
/// If the ping is not acknowledged within the timeout, the connection will
/// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
///
/// Default is 20 seconds.
///
/// # Cargo Feature
///
/// Requires the `runtime` cargo feature to be enabled.
#[cfg(feature = "runtime")]
pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
self.conn_builder.http2_keep_alive_timeout(timeout);
self
}

/// Sets whether HTTP2 keep-alive should apply while the connection is idle.
///
/// If disabled, keep-alive pings are only sent while there are open
/// request/responses streams. If enabled, pings are also sent when no
/// streams are active. Does nothing if `http2_keep_alive_interval` is
/// disabled.
///
/// Default is `false`.
///
/// # Cargo Feature
///
/// Requires the `runtime` cargo feature to be enabled.
#[cfg(feature = "runtime")]
pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
self.conn_builder.http2_keep_alive_while_idle(enabled);
self
}

/// Set whether to retry requests that get disrupted before ever starting
/// to write.
///
Expand Down
37 changes: 31 additions & 6 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ pub(crate) enum User {
ManualUpgrade,
}

// Sentinel type to indicate the error was caused by a timeout.
#[derive(Debug)]
pub(crate) struct TimedOut;

impl Error {
/// Returns true if this was an HTTP parse error.
pub fn is_parse(&self) -> bool {
Expand Down Expand Up @@ -133,6 +137,11 @@ impl Error {
self.inner.kind == Kind::BodyWriteAborted
}

/// Returns true if the error was caused by a timeout.
pub fn is_timeout(&self) -> bool {
self.find_source::<TimedOut>().is_some()
}

/// Consumes the error, returning its cause.
pub fn into_cause(self) -> Option<Box<dyn StdError + Send + Sync>> {
self.inner.cause
Expand All @@ -153,19 +162,25 @@ impl Error {
&self.inner.kind
}

pub(crate) fn h2_reason(&self) -> h2::Reason {
// Find an h2::Reason somewhere in the cause stack, if it exists,
// otherwise assume an INTERNAL_ERROR.
fn find_source<E: StdError + 'static>(&self) -> Option<&E> {
let mut cause = self.source();
while let Some(err) = cause {
if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
return h2_err.reason().unwrap_or(h2::Reason::INTERNAL_ERROR);
if let Some(ref typed) = err.downcast_ref() {
return Some(typed);
}
cause = err.source();
}

// else
h2::Reason::INTERNAL_ERROR
None
}

pub(crate) fn h2_reason(&self) -> h2::Reason {
// Find an h2::Reason somewhere in the cause stack, if it exists,
// otherwise assume an INTERNAL_ERROR.
self.find_source::<h2::Error>()
.and_then(|h2_err| h2_err.reason())
.unwrap_or(h2::Reason::INTERNAL_ERROR)
}

pub(crate) fn new_canceled() -> Error {
Expand Down Expand Up @@ -397,6 +412,16 @@ trait AssertSendSync: Send + Sync + 'static {}
#[doc(hidden)]
impl AssertSendSync for Error {}

// ===== impl TimedOut ====

impl fmt::Display for TimedOut {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("operation timed out")
}
}

impl StdError for TimedOut {}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading