Skip to content

Commit

Permalink
Update reqwest 0.12 and http 1.0 (#5536)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored Mar 29, 2024
1 parent ff86119 commit cdb7b6f
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 78 deletions.
8 changes: 5 additions & 3 deletions object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ walkdir = "2"

# Cloud storage support
base64 = { version = "0.22", default-features = false, features = ["std"], optional = true }
hyper = { version = "0.14", default-features = false, optional = true }
hyper = { version = "1.2", default-features = false, optional = true }
quick-xml = { version = "0.31.0", features = ["serialize", "overlapped-lists"], optional = true }
serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
serde_json = { version = "1.0", default-features = false, optional = true }
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true }
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls-native-roots"], optional = true }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2"], optional = true }
ring = { version = "0.17", default-features = false, features = ["std"], optional = true }
rustls-pemfile = { version = "2.0", default-features = false, features = ["std"], optional = true }
tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-util"] }
Expand All @@ -69,7 +69,9 @@ tls-webpki-roots = ["reqwest?/rustls-tls-webpki-roots"]

[dev-dependencies] # In alphabetical order
futures-test = "0.3"
hyper = { version = "0.14.24", features = ["server"] }
hyper = { version = "1.2", features = ["server"] }
hyper-util = "0.1"
http-body-util = "0.1"
rand = "0.8"
tempfile = "3.1.0"

Expand Down
18 changes: 9 additions & 9 deletions object_store/src/aws/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ struct CreateSessionOutput {
mod tests {
use super::*;
use crate::client::mock_server::MockServer;
use hyper::{Body, Response};
use hyper::Response;
use reqwest::{Client, Method};
use std::env;

Expand Down Expand Up @@ -939,7 +939,7 @@ mod tests {

#[tokio::test]
async fn test_mock() {
let server = MockServer::new();
let server = MockServer::new().await;

const IMDSV2_HEADER: &str = "X-aws-ec2-metadata-token";

Expand All @@ -955,7 +955,7 @@ mod tests {
server.push_fn(|req| {
assert_eq!(req.uri().path(), "/latest/api/token");
assert_eq!(req.method(), &Method::PUT);
Response::new(Body::from("cupcakes"))
Response::new("cupcakes".to_string())
});
server.push_fn(|req| {
assert_eq!(
Expand All @@ -965,14 +965,14 @@ mod tests {
assert_eq!(req.method(), &Method::GET);
let t = req.headers().get(IMDSV2_HEADER).unwrap().to_str().unwrap();
assert_eq!(t, "cupcakes");
Response::new(Body::from("myrole"))
Response::new("myrole".to_string())
});
server.push_fn(|req| {
assert_eq!(req.uri().path(), "/latest/meta-data/iam/security-credentials/myrole");
assert_eq!(req.method(), &Method::GET);
let t = req.headers().get(IMDSV2_HEADER).unwrap().to_str().unwrap();
assert_eq!(t, "cupcakes");
Response::new(Body::from(r#"{"AccessKeyId":"KEYID","Code":"Success","Expiration":"2022-08-30T10:51:04Z","LastUpdated":"2022-08-30T10:21:04Z","SecretAccessKey":"SECRET","Token":"TOKEN","Type":"AWS-HMAC"}"#))
Response::new(r#"{"AccessKeyId":"KEYID","Code":"Success","Expiration":"2022-08-30T10:51:04Z","LastUpdated":"2022-08-30T10:21:04Z","SecretAccessKey":"SECRET","Token":"TOKEN","Type":"AWS-HMAC"}"#.to_string())
});

let creds = instance_creds(&client, &retry_config, endpoint, true)
Expand All @@ -989,7 +989,7 @@ mod tests {
assert_eq!(req.method(), &Method::PUT);
Response::builder()
.status(StatusCode::FORBIDDEN)
.body(Body::empty())
.body(String::new())
.unwrap()
});
server.push_fn(|req| {
Expand All @@ -999,13 +999,13 @@ mod tests {
);
assert_eq!(req.method(), &Method::GET);
assert!(req.headers().get(IMDSV2_HEADER).is_none());
Response::new(Body::from("myrole"))
Response::new("myrole".to_string())
});
server.push_fn(|req| {
assert_eq!(req.uri().path(), "/latest/meta-data/iam/security-credentials/myrole");
assert_eq!(req.method(), &Method::GET);
assert!(req.headers().get(IMDSV2_HEADER).is_none());
Response::new(Body::from(r#"{"AccessKeyId":"KEYID","Code":"Success","Expiration":"2022-08-30T10:51:04Z","LastUpdated":"2022-08-30T10:21:04Z","SecretAccessKey":"SECRET","Token":"TOKEN","Type":"AWS-HMAC"}"#))
Response::new(r#"{"AccessKeyId":"KEYID","Code":"Success","Expiration":"2022-08-30T10:51:04Z","LastUpdated":"2022-08-30T10:21:04Z","SecretAccessKey":"SECRET","Token":"TOKEN","Type":"AWS-HMAC"}"#.to_string())
});

let creds = instance_creds(&client, &retry_config, endpoint, true)
Expand All @@ -1020,7 +1020,7 @@ mod tests {
server.push(
Response::builder()
.status(StatusCode::FORBIDDEN)
.body(Body::empty())
.body(String::new())
.unwrap(),
);

Expand Down
28 changes: 15 additions & 13 deletions object_store/src/azure/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -930,8 +930,8 @@ impl CredentialProvider for AzureCliCredential {
#[cfg(test)]
mod tests {
use futures::executor::block_on;
use hyper::body::to_bytes;
use hyper::{Body, Response, StatusCode};
use http_body_util::BodyExt;
use hyper::{Response, StatusCode};
use reqwest::{Client, Method};
use tempfile::NamedTempFile;

Expand All @@ -942,7 +942,7 @@ mod tests {

#[tokio::test]
async fn test_managed_identity() {
let server = MockServer::new();
let server = MockServer::new().await;

std::env::set_var(MSI_SECRET_ENV_KEY, "env-secret");

Expand All @@ -964,7 +964,7 @@ mod tests {
assert_eq!(t, "env-secret");
let t = req.headers().get("metadata").unwrap().to_str().unwrap();
assert_eq!(t, "true");
Response::new(Body::from(
Response::new(
r#"
{
"access_token": "TOKEN",
Expand All @@ -975,8 +975,9 @@ mod tests {
"resource": "https://management.azure.com/",
"token_type": "Bearer"
}
"#,
))
"#
.to_string(),
)
});

let credential = ImdsManagedIdentityProvider::new(
Expand All @@ -999,7 +1000,7 @@ mod tests {

#[tokio::test]
async fn test_workload_identity() {
let server = MockServer::new();
let server = MockServer::new().await;
let tokenfile = NamedTempFile::new().unwrap();
let tenant = "tenant";
std::fs::write(tokenfile.path(), "federated-token").unwrap();
Expand All @@ -1012,10 +1013,10 @@ mod tests {
server.push_fn(move |req| {
assert_eq!(req.uri().path(), format!("/{tenant}/oauth2/v2.0/token"));
assert_eq!(req.method(), &Method::POST);
let body = block_on(to_bytes(req.into_body())).unwrap();
let body = block_on(async move { req.into_body().collect().await.unwrap().to_bytes() });
let body = String::from_utf8(body.to_vec()).unwrap();
assert!(body.contains("federated-token"));
Response::new(Body::from(
Response::new(
r#"
{
"access_token": "TOKEN",
Expand All @@ -1026,8 +1027,9 @@ mod tests {
"resource": "https://management.azure.com/",
"token_type": "Bearer"
}
"#,
))
"#
.to_string(),
)
});

let credential = WorkloadIdentityOAuthProvider::new(
Expand All @@ -1050,7 +1052,7 @@ mod tests {

#[tokio::test]
async fn test_no_credentials() {
let server = MockServer::new();
let server = MockServer::new().await;

let endpoint = server.url();
let store = MicrosoftAzureBuilder::new()
Expand All @@ -1068,7 +1070,7 @@ mod tests {
assert!(req.headers().get("Authorization").is_none());
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("not found"))
.body("not found".to_string())
.unwrap()
});

Expand Down
82 changes: 48 additions & 34 deletions object_store/src/client/mock_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@

use futures::future::BoxFuture;
use futures::FutureExt;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use hyper::body::Incoming;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::convert::Infallible;
use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::task::{JoinHandle, JoinSet};

pub type ResponseFn = Box<dyn FnOnce(Request<Body>) -> BoxFuture<'static, Response<Body>> + Send>;
pub type ResponseFn =
Box<dyn FnOnce(Request<Incoming>) -> BoxFuture<'static, Response<String>> + Send>;

/// A mock server
pub struct MockServer {
Expand All @@ -39,39 +44,48 @@ pub struct MockServer {
}

impl MockServer {
pub fn new() -> Self {
pub async fn new() -> Self {
let responses: Arc<Mutex<VecDeque<ResponseFn>>> =
Arc::new(Mutex::new(VecDeque::with_capacity(10)));

let r = Arc::clone(&responses);
let make_service = make_service_fn(move |_conn| {
let r = Arc::clone(&r);
async move {
Ok::<_, Infallible>(service_fn(move |req| {
let r = Arc::clone(&r);
let next = r.lock().pop_front();
async move {
Ok::<_, Infallible>(match next {
Some(r) => r(req).await,
None => Response::new(Body::from("Hello World")),
})
}
}))
}
});
let addr = SocketAddr::from(([127, 0, 0, 1], 0));
let listener = TcpListener::bind(addr).await.unwrap();

let (shutdown, rx) = oneshot::channel::<()>();
let server = Server::bind(&SocketAddr::from(([127, 0, 0, 1], 0))).serve(make_service);
let (shutdown, mut rx) = oneshot::channel::<()>();

let url = format!("http://{}", server.local_addr());
let url = format!("http://{}", listener.local_addr().unwrap());

let r = Arc::clone(&responses);
let handle = tokio::spawn(async move {
server
.with_graceful_shutdown(async {
rx.await.ok();
})
.await
.unwrap()
let mut set = JoinSet::new();

loop {
let (stream, _) = tokio::select! {
conn = listener.accept() => conn.unwrap(),
_ = &mut rx => break,
};

let r = Arc::clone(&r);
set.spawn(async move {
let _ = http1::Builder::new()
.serve_connection(
TokioIo::new(stream),
service_fn(move |req| {
let r = Arc::clone(&r);
let next = r.lock().pop_front();
async move {
Ok::<_, Infallible>(match next {
Some(r) => r(req).await,
None => Response::new("Hello World".to_string()),
})
}
}),
)
.await;
});
}

set.abort_all();
});

Self {
Expand All @@ -88,23 +102,23 @@ impl MockServer {
}

/// Add a response
pub fn push(&self, response: Response<Body>) {
pub fn push(&self, response: Response<String>) {
self.push_fn(|_| response)
}

/// Add a response function
pub fn push_fn<F>(&self, f: F)
where
F: FnOnce(Request<Body>) -> Response<Body> + Send + 'static,
F: FnOnce(Request<Incoming>) -> Response<String> + Send + 'static,
{
let f = Box::new(|req| async move { f(req) }.boxed());
self.responses.lock().push_back(f)
}

pub fn push_async_fn<F, Fut>(&self, f: F)
where
F: FnOnce(Request<Body>) -> Fut + Send + 'static,
Fut: Future<Output = Response<Body>> + Send + 'static,
F: FnOnce(Request<Incoming>) -> Fut + Send + 'static,
Fut: Future<Output = Response<String>> + Send + 'static,
{
self.responses.lock().push_back(Box::new(|r| f(r).boxed()))
}
Expand Down
Loading

0 comments on commit cdb7b6f

Please sign in to comment.