From f350014bc322388f09426147b37a8bfa0d9f51a1 Mon Sep 17 00:00:00 2001 From: Toru Ogawa Date: Thu, 8 Feb 2024 01:07:18 +0900 Subject: [PATCH] Upgrade http/hyper 1.0 (#564) --- Cargo.toml | 18 +++-- examples/custom_client.rs | 4 +- src/api/actions.rs | 10 ++- src/api/repos.rs | 6 +- src/api/repos/releases.rs | 8 +- src/etag.rs | 2 +- src/from_response.rs | 16 ++-- src/lib.rs | 109 ++++++++++++++------------- src/models/repos.rs | 21 +++--- src/page.rs | 20 ++--- src/service/body.rs | 43 ----------- src/service/middleware/retry.rs | 6 +- src/service/mod.rs | 1 - tests/actions_self_hosted_runners.rs | 6 +- 14 files changed, 122 insertions(+), 148 deletions(-) delete mode 100644 src/service/body.rs diff --git a/Cargo.toml b/Cargo.toml index 3da46617..2a236426 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,12 +31,14 @@ futures = { version = "0.3.15" } futures-core = { version = "0.3.15", optional = true } futures-util = { version = "0.3.15", optional = true } jsonwebtoken = "9" -http = "0.2.5" -http-body = "0.4.5" -hyper = { version = "0.14.13", features = ["client", "http1", "stream", "tcp"] } -hyper-rustls = { version = "0.24.0", optional = true } -hyper-timeout = { version = "0.4.1", optional = true } -hyper-tls = { version = "0.5.0", optional = true } +http = "1.0.0" +http-body = "1.0.0" +http-body-util = "0.1.0" +hyper = "1.1.0" +hyper-rustls = { version = "0.26.0", optional = true } +hyper-timeout = { version = "0.5.1", optional = true } +hyper-tls = { version = "0.6.0", optional = true } +hyper-util = { version = "0.1.3", features = ["client-legacy", "http1"] } once_cell = "1.7.2" percent-encoding = "2.2.0" pin-project = "1.0.12" @@ -48,7 +50,7 @@ serde_urlencoded = "0.7.1" snafu = { version = "0.7", features = ["backtraces"] } tokio = { version = "1.17.0", default-features = false, optional = true } tower = { version = "0.4.13", default-features = false, features = ["util", "buffer"] } -tower-http = { version = "0.4.0", features = ["map-response-body", "trace"] } +tower-http = { version = "0.5.1", features = ["map-response-body", "trace"] } tracing = { version = "0.1.37", features = ["log"], optional = true } url = { version = "2.2.2", features = ["serde"] } @@ -73,5 +75,5 @@ retry = ["tower/retry", "futures-util"] rustls = ["hyper-rustls"] rustls-webpki-tokio = ["hyper-rustls/webpki-tokio"] opentls = ["hyper-tls"] -stream = ["futures-core", "futures-util", "hyper/stream"] +stream = ["futures-core", "futures-util"] timeout = ["hyper-timeout", "tokio", "tower/timeout"] diff --git a/examples/custom_client.rs b/examples/custom_client.rs index 8559ce91..12184bf7 100644 --- a/examples/custom_client.rs +++ b/examples/custom_client.rs @@ -11,11 +11,13 @@ use std::sync::Arc; async fn main() -> octocrab::Result<()> { let connector = HttpsConnectorBuilder::new() .with_native_roots() // enabled the `rustls-native-certs` feature in hyper-rustls + .unwrap() .https_only() .enable_http1() .build(); - let client = hyper::Client::builder().build(connector); + let client = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new()) + .build(connector); let octocrab = OctocrabBuilder::new_empty() .with_service(client) .with_layer(&BaseUriLayer::new(Uri::from_static( diff --git a/src/api/actions.rs b/src/api/actions.rs index 96704194..e0c2e3f7 100644 --- a/src/api/actions.rs +++ b/src/api/actions.rs @@ -1,10 +1,13 @@ //! GitHub Actions +use bytes::Bytes; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyExt, Collected}; use snafu::ResultExt; pub mod self_hosted_runners; use self::self_hosted_runners::{CreateJitRunnerConfigBuilder, ListSelfHostedRunnersBuilder}; -use crate::error::{HttpSnafu, HyperSnafu}; +use crate::error::HttpSnafu; use crate::etag::{EntityTag, Etagged}; use crate::models::{ workflows::WorkflowDispatch, workflows::WorkflowListArtifact, ArtifactId, RepositoryId, RunId, @@ -13,7 +16,6 @@ use crate::models::{RunnerGroupId, RunnerId}; use crate::{params, FromResponse, Octocrab, Page}; use http::request::Builder; use http::{header::HeaderMap, Method, StatusCode, Uri}; -use hyper::body; pub struct ListWorkflowRunArtifacts<'octo> { crab: &'octo Octocrab, @@ -268,13 +270,13 @@ impl<'octo> ActionsHandler<'octo> { async fn follow_location_to_data( &self, - response: http::Response, + response: http::Response>, ) -> crate::Result { let data_response = self.crab.follow_location_to_data(response).await?; let body = data_response.into_body(); - body::to_bytes(body).await.context(HyperSnafu) + body.collect().await.map(Collected::to_bytes) } /// Downloads and returns the raw data representing a zip of the logs from diff --git a/src/api/repos.rs b/src/api/repos.rs index dd8d53bc..065ed0ec 100644 --- a/src/api/repos.rs +++ b/src/api/repos.rs @@ -1,8 +1,10 @@ //! The repositories API. +use bytes::Bytes; use http::header::ACCEPT; use http::request::Builder; use http::Uri; +use http_body_util::combinators::BoxBody; use snafu::ResultExt; mod branches; @@ -561,7 +563,7 @@ impl<'octo> RepoHandler<'octo> { self, reference: impl Into, path: impl AsRef, - ) -> Result> { + ) -> Result>> { let route = format!( "/repos/{owner}/{repo}/contents/{path}", owner = self.owner, @@ -606,7 +608,7 @@ impl<'octo> RepoHandler<'octo> { pub async fn download_tarball( &self, reference: impl Into, - ) -> Result> { + ) -> Result>> { let route = format!( "/repos/{owner}/{repo}/tarball/{reference}", owner = self.owner, diff --git a/src/api/repos/releases.rs b/src/api/repos/releases.rs index ec9bfe70..9c84d99f 100644 --- a/src/api/repos/releases.rs +++ b/src/api/repos/releases.rs @@ -194,12 +194,8 @@ impl<'octo, 'r> ReleasesHandler<'octo, 'r> { let request = self.parent.crab.build_request(builder, None::<&()>)?; let response = self.parent.crab.execute(request).await?; let response = self.parent.crab.follow_location_to_data(response).await?; - Ok(response - .into_body() - .map_err(|source| crate::error::Error::Hyper { - source, - backtrace: snafu::Backtrace::generate(), - })) + Ok(http_body_util::BodyStream::new(response.into_body()) + .try_filter_map(|frame| futures_util::future::ok(frame.into_data().ok()))) } } diff --git a/src/etag.rs b/src/etag.rs index b5faf0d6..533811b1 100644 --- a/src/etag.rs +++ b/src/etag.rs @@ -64,7 +64,7 @@ pub struct EntityTag { } impl EntityTag { - pub fn extract_from_response(response: &http::Response) -> Option { + pub fn extract_from_response(response: &http::Response) -> Option { response .headers() .get("ETag") diff --git a/src/from_response.rs b/src/from_response.rs index f6d12797..db85738a 100644 --- a/src/from_response.rs +++ b/src/from_response.rs @@ -1,18 +1,24 @@ +use bytes::Bytes; +use http_body::Body; +use http_body_util::BodyExt; use snafu::ResultExt; /// A trait for mapping from a `http::Response` to an another type. #[async_trait::async_trait] pub trait FromResponse: Sized { - async fn from_response(response: http::Response) -> crate::Result; + async fn from_response(response: http::Response) -> crate::Result + where + B: Body + Send; } #[async_trait::async_trait] impl FromResponse for T { - async fn from_response(response: http::Response) -> crate::Result { + async fn from_response(response: http::Response) -> crate::Result + where + B: Body + Send, + { let (_, body) = response.into_parts(); - let body = hyper::body::to_bytes(body) - .await - .context(crate::error::HyperSnafu)?; + let body = body.collect().await?.to_bytes(); let de = &mut serde_json::Deserializer::from_slice(&body); return serde_path_to_error::deserialize(de).context(crate::error::JsonSnafu); } diff --git a/src/lib.rs b/src/lib.rs index 74bc5244..7b614d06 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -190,10 +190,11 @@ pub mod etag; pub mod models; pub mod params; pub mod service; -use crate::service::body::BodyStreamExt; use chrono::{DateTime, Utc}; use http::{HeaderMap, HeaderValue, Method, Uri}; +use http_body_util::combinators::BoxBody; +use http_body_util::BodyExt; use service::middleware::auth_header::AuthHeaderLayer; use std::convert::{Infallible, TryInto}; use std::fmt; @@ -206,7 +207,7 @@ use std::time::Duration; use http::{header::HeaderName, StatusCode}; #[cfg(all(not(feature = "opentls"), not(feature = "rustls")))] use hyper::client::HttpConnector; -use hyper::{body, Body, Request, Response}; +use hyper::{Request, Response}; use once_cell::sync::Lazy; use secrecy::{ExposeSecret, SecretString}; @@ -227,10 +228,7 @@ use hyper_rustls::HttpsConnectorBuilder; use tower::retry::{Retry, RetryLayer}; #[cfg(feature = "timeout")] -use { - hyper_timeout::TimeoutConnector, - tokio::io::{AsyncRead, AsyncWrite}, -}; +use hyper_timeout::TimeoutConnector; use tower_http::{classify::ServerErrorsFailureClass, map_response_body::MapResponseBodyLayer}; @@ -299,18 +297,14 @@ pub fn format_media_type(media_type: impl AsRef) -> String { /// Maps a GitHub error response into and `Err()` variant if the status is /// not a success. pub async fn map_github_error( - response: http::Response, -) -> Result> { + response: http::Response>, +) -> Result>> { if response.status().is_success() { Ok(response) } else { - let b: error::GitHubError = serde_json::from_slice( - body::to_bytes(response.into_body()) - .await - .context(error::HyperSnafu)? - .as_ref(), - ) - .context(error::SerdeSnafu)?; + let b: error::GitHubError = + serde_json::from_slice(response.into_body().collect().await?.to_bytes().as_ref()) + .context(error::SerdeSnafu)?; Err(error::Error::GitHub { source: b, @@ -459,12 +453,19 @@ where Svc: Service, Response = Response> + Send + 'static, Svc::Future: Send + 'static, Svc::Error: Into, - B: http_body::Body + Send + 'static, + B: http_body::Body + Send + Sync + 'static, B::Error: Into, { /// Build a [`Client`] instance with the current [`Service`] stack. pub fn build(self) -> Result { - Ok(Octocrab::new(self.service, self.auth)) + // Transform response body to `BoxBody` and use type erased error to avoid type parameters. + let service = MapResponseBodyLayer::new(|b: B| { + b.map_err(|e| ServiceSnafu.into_error(e.into())).boxed() + }) + .layer(self.service) + .map_err(|e| e.into()); + + Ok(Octocrab::new(service, self.auth)) } } @@ -565,8 +566,8 @@ impl OctocrabBuilder #[cfg(feature = "retry")] pub fn set_connector_retry_service( &self, - connector: hyper::Client, - ) -> Retry> { + connector: hyper_util::client::legacy::Client, + ) -> Retry> { let retry_layer = RetryLayer::new(self.config.retry_config.clone()); retry_layer.layer(connector) @@ -576,7 +577,7 @@ impl OctocrabBuilder pub fn set_connect_timeout_service(&self, connector: T) -> TimeoutConnector where T: Service + Send, - T::Response: AsyncRead + AsyncWrite + Send + Unpin, + T::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin, T::Future: Send + 'static, T::Error: Into, { @@ -590,7 +591,7 @@ impl OctocrabBuilder /// Build a [`Client`] instance with the current [`Service`] stack. pub fn build(self) -> Result { - let client: hyper::Client<_, String> = { + let client: hyper_util::client::legacy::Client<_, String> = { #[cfg(all(not(feature = "opentls"), not(feature = "rustls")))] let mut connector = HttpConnector::new(); @@ -600,7 +601,10 @@ impl OctocrabBuilder #[cfg(feature = "rustls-webpki-tokio")] let builder = builder.with_webpki_roots(); #[cfg(not(feature = "rustls-webpki-tokio"))] - let builder = builder.with_native_roots(); // enabled the `rustls-native-certs` feature in hyper-rustls + let builder = builder + .with_native_roots() + .map_err(Into::into) + .context(error::OtherSnafu)?; // enabled the `rustls-native-certs` feature in hyper-rustls builder .https_or_http() // Disable .https_only() during tests until: https://github.com/LukeMathWalker/wiremock-rs/issues/58 is resolved. Alternatively we can use conditional compilation to only enable this feature in tests, but it becomes rather ugly with integration tests. @@ -614,7 +618,8 @@ impl OctocrabBuilder #[cfg(feature = "timeout")] let connector = self.set_connect_timeout_service(connector); - hyper::Client::builder().build(connector) + hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new()) + .build(connector) }; #[cfg(feature = "retry")] @@ -637,7 +642,7 @@ impl OctocrabBuilder tracing::debug!("requesting"); }) .on_response( - |res: &Response, _latency: Duration, span: &Span| { + |res: &Response, _latency: Duration, span: &Span| { let status = res.status(); span.record("http.status_code", status.as_u16()); if status.is_client_error() || status.is_server_error() { @@ -726,7 +731,7 @@ impl OctocrabBuilder let client = ExtraHeadersLayer::new(Arc::new(hmap)).layer(client); let client = MapResponseBodyLayer::new(|body| { - Box::new(http_body::Body::map_err(body, BoxError::from)) as Box + BodyExt::map_err(body, |e| HyperSnafu.into_error(e)).boxed() }) .layer(client); @@ -784,8 +789,6 @@ impl DefaultOctocrabBuilderConfig { } } -pub type DynBody = dyn http_body::Body + Send + Unpin; - #[derive(Debug, Clone)] struct CachedTokenInner { expiration: Option>, @@ -892,7 +895,7 @@ pub enum AuthState { } pub type OctocrabService = Buffer< - BoxService, http::Response, BoxError>, + BoxService, http::Response>, BoxError>, http::Request, >; @@ -930,20 +933,15 @@ impl Octocrab { } /// Creates a new `Octocrab`. - fn new(service: S, auth_state: AuthState) -> Self + fn new(service: S, auth_state: AuthState) -> Self where - S: Service, Response = Response> + Send + 'static, + S: Service, Response = Response>> + + Send + + 'static, S::Future: Send + 'static, S::Error: Into, - B: http_body::Body + Send + 'static, - B::Error: Into, { - // Transform response body to `hyper::Body` and use type erased error to avoid type parameters. - let service = MapResponseBodyLayer::new(|b: B| Body::wrap_stream(b.into_stream())) - .layer(service) - .map_err(|e| e.into()); - - let service = Buffer::new(BoxService::new(service), 1024); + let service = Buffer::new(BoxService::new(service.map_err(Into::into)), 1024); Self { client: service, @@ -1182,7 +1180,7 @@ impl Octocrab { &self, uri: impl TryInto, body: Option<&P>, - ) -> Result> { + ) -> Result>> { let uri = uri .try_into() .map_err(|_| UriParseError {}) @@ -1204,7 +1202,10 @@ impl Octocrab { } /// Send a `GET` request with no additional post-processing. - pub async fn _get(&self, uri: impl TryInto) -> Result> { + pub async fn _get( + &self, + uri: impl TryInto, + ) -> Result>> { self._get_with_headers(uri, None).await } @@ -1234,8 +1235,11 @@ impl Octocrab { uri } - pub async fn body_to_string(&self, res: http::Response) -> Result { - let body_bytes = body::to_bytes(res.into_body()).await.context(HyperSnafu)?; + pub async fn body_to_string( + &self, + res: http::Response>, + ) -> Result { + let body_bytes = res.into_body().collect().await?.to_bytes(); String::from_utf8(body_bytes.to_vec()).context(InvalidUtf8Snafu) } @@ -1263,7 +1267,7 @@ impl Octocrab { &self, uri: impl TryInto, headers: Option, - ) -> Result> { + ) -> Result>> { let uri = uri .try_into() .map_err(|_| UriParseError {}) @@ -1297,7 +1301,7 @@ impl Octocrab { &self, uri: impl TryInto, body: Option<&B>, - ) -> Result> { + ) -> Result>> { let uri = uri .try_into() .map_err(|_| UriParseError {}) @@ -1326,7 +1330,7 @@ impl Octocrab { &self, uri: impl TryInto, body: Option<&B>, - ) -> Result> { + ) -> Result>> { let uri = uri .try_into() .map_err(|_| UriParseError {}) @@ -1380,7 +1384,7 @@ impl Octocrab { &self, uri: impl TryInto, body: Option<&B>, - ) -> Result> { + ) -> Result>> { let uri = uri .try_into() .map_err(|_| UriParseError {}) @@ -1445,9 +1449,12 @@ impl Octocrab { } /// Send the given request to the underlying service - pub async fn send(&self, request: Request) -> Result> { + pub async fn send( + &self, + request: Request, + ) -> Result>> { let mut svc = self.client.clone(); - let response: Response = svc + let response: Response> = svc .ready() .await .context(ServiceSnafu)? @@ -1471,7 +1478,7 @@ impl Octocrab { pub async fn execute( &self, request: http::Request, - ) -> Result> { + ) -> Result>> { let (mut parts, body) = request.into_parts(); // Saved request that we can retry later if necessary let auth_header: Option = match self.auth_state { @@ -1540,8 +1547,8 @@ impl Octocrab { pub async fn follow_location_to_data( &self, - response: http::Response, - ) -> crate::Result> { + response: http::Response>, + ) -> crate::Result>> { if let Some(redirect) = response.headers().get(http::header::LOCATION) { let location = redirect.to_str().expect("Location URL not valid str"); diff --git a/src/models/repos.rs b/src/models/repos.rs index 26f874df..dbe6fa2d 100644 --- a/src/models/repos.rs +++ b/src/models/repos.rs @@ -1,7 +1,9 @@ use super::*; -use crate::error; use crate::error::SerdeSnafu; -use hyper::{body, Response}; +use bytes::Bytes; +use http_body::Body; +use http_body_util::BodyExt; +use hyper::Response; use snafu::ResultExt; use url::Url; @@ -229,14 +231,13 @@ impl Content { #[async_trait::async_trait] impl crate::FromResponse for ContentItems { - async fn from_response(response: Response) -> crate::Result { - let json: serde_json::Value = serde_json::from_slice( - body::to_bytes(response.into_body()) - .await - .context(error::HyperSnafu)? - .as_ref(), - ) - .context(SerdeSnafu)?; + async fn from_response(response: Response) -> crate::Result + where + B: Body + Send, + { + let json: serde_json::Value = + serde_json::from_slice(response.into_body().collect().await?.to_bytes().as_ref()) + .context(SerdeSnafu)?; if json.is_array() { Ok(ContentItems { diff --git a/src/page.rs b/src/page.rs index 1eb0cec0..68d0e1ca 100644 --- a/src/page.rs +++ b/src/page.rs @@ -1,9 +1,10 @@ +use bytes::Bytes; use http::Uri; -use hyper::body; +use http_body::Body; +use http_body_util::BodyExt; use std::slice::Iter; use std::str::FromStr; -use crate::error; use crate::error::{SerdeSnafu, UriSnafu}; use snafu::{GenerateImplicitData, ResultExt}; use url::form_urlencoded; @@ -166,7 +167,10 @@ impl<'iter, T> IntoIterator for &'iter Page { #[async_trait::async_trait] impl crate::FromResponse for Page { - async fn from_response(response: http::Response) -> crate::Result { + async fn from_response(response: http::Response) -> crate::Result + where + B: Body + Send, + { let HeaderLinks { first, prev, @@ -174,13 +178,9 @@ impl crate::FromResponse for Page { last, } = get_links(response.headers())?; - let json: serde_json::Value = serde_json::from_slice( - body::to_bytes(response.into_body()) - .await - .context(error::HyperSnafu)? - .as_ref(), - ) - .context(SerdeSnafu)?; + let json: serde_json::Value = + serde_json::from_slice(response.into_body().collect().await?.to_bytes().as_ref()) + .context(SerdeSnafu)?; if json.is_array() { Ok(Self { diff --git a/src/service/body.rs b/src/service/body.rs deleted file mode 100644 index aec7116f..00000000 --- a/src/service/body.rs +++ /dev/null @@ -1,43 +0,0 @@ -use std::{ - pin::Pin, - task::{Context, Poll}, -}; - -use futures::stream::Stream; -use http_body::Body; -use pin_project::pin_project; - -// Wrap `http_body::Body` to implement `Stream`. -#[pin_project] -pub struct IntoStream { - #[pin] - body: B, -} - -impl IntoStream { - pub(crate) fn new(body: B) -> Self { - Self { body } - } -} - -impl Stream for IntoStream -where - B: Body, -{ - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().body.poll_data(cx) - } -} - -pub trait BodyStreamExt: Body { - fn into_stream(self) -> IntoStream - where - Self: Sized, - { - IntoStream::new(self) - } -} - -impl BodyStreamExt for T where T: Body {} diff --git a/src/service/middleware/retry.rs b/src/service/middleware/retry.rs index 77113fc4..1dfce9f6 100644 --- a/src/service/middleware/retry.rs +++ b/src/service/middleware/retry.rs @@ -1,6 +1,6 @@ use futures_util::future; use http::{Request, Response}; -use hyper::{Body, Error}; +use hyper_util::client::legacy::Error; use tower::retry::Policy; #[derive(Clone)] @@ -9,13 +9,13 @@ pub enum RetryConfig { Simple(usize), } -impl Policy, Response, hyper::Error> for RetryConfig { +impl Policy, Response, Error> for RetryConfig { type Future = futures_util::future::Ready; fn retry( &self, _req: &Request, - result: Result<&Response, &Error>, + result: Result<&Response, &Error>, ) -> Option { match self { RetryConfig::None => None, diff --git a/src/service/mod.rs b/src/service/mod.rs index 7c44d053..8a2cf7a5 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -1,2 +1 @@ -pub mod body; pub mod middleware; diff --git a/tests/actions_self_hosted_runners.rs b/tests/actions_self_hosted_runners.rs index e829eaf1..457702e0 100644 --- a/tests/actions_self_hosted_runners.rs +++ b/tests/actions_self_hosted_runners.rs @@ -103,8 +103,8 @@ async fn test_context( resp_body: Option, ) -> TestContext { let template = match resp_body { - Some(resp_body) => ResponseTemplate::new(response_code).set_body_json(resp_body), - None => ResponseTemplate::new(response_code), + Some(resp_body) => ResponseTemplate::new(response_code.as_u16()).set_body_json(resp_body), + None => ResponseTemplate::new(response_code.as_u16()), }; let server = setup_api(scope, method_name, actions_uri, None::<()>, template).await; let client = Octocrab::builder() @@ -127,7 +127,7 @@ async fn test_context_with_request_body( request_body: impl Serialize, resp_body: Option, ) -> TestContext { - let template = ResponseTemplate::new(response_code).set_body_json(resp_body); + let template = ResponseTemplate::new(response_code.as_u16()).set_body_json(resp_body); let server = setup_api( scope, method_name,