Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow sending non String payload with execute #665

Merged
merged 2 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions src/body.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use http_body_util::BodyExt;

use bytes::Bytes;
use http_body::Frame;
use snafu::{Backtrace, GenerateImplicitData};
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};

type BoxBody = http_body_util::combinators::BoxBody<Bytes, crate::Error>;
type BoxError = Box<dyn std::error::Error + Send + Sync>;

fn boxed<B>(body: B) -> BoxBody
where
B: http_body::Body<Data = Bytes> + Send + Sync + 'static,
B::Error: Into<BoxError>,
{
try_downcast(body).unwrap_or_else(|body| {
body.map_err(|e| crate::Error::Other {
source: e.into(),
backtrace: Backtrace::generate(),
})
.boxed()
})
}

fn try_downcast<T, K>(k: K) -> Result<T, K>
where
T: 'static,
K: Send + 'static,
{
let mut k = Some(k);
if let Some(k) = <dyn std::any::Any>::downcast_mut::<Option<T>>(&mut k) {
Ok(k.take().unwrap())
} else {
Err(k.unwrap())
}
}

// Define octocrab Body
#[derive(Debug)]
pub struct OctoBody(Arc<RwLock<BoxBody>>);

impl OctoBody {
/// Create a new `Body` that wraps another [`http_body::Body`].
pub fn new<B>(body: B) -> Self
where
B: http_body::Body<Data = Bytes> + Send + Sync + 'static,
B::Error: Into<BoxError>,
{
try_downcast(body).unwrap_or_else(|body| Self(Arc::new(RwLock::new(boxed(body)))))
}
/// Create an empty body.
pub fn empty() -> Self {
Self::new(http_body_util::Empty::new())
}
}

impl Default for OctoBody {
fn default() -> Self {
Self::empty()
}
}

// Implement standard Bodiesque casting
impl From<()> for OctoBody {
fn from(_: ()) -> Self {
Self::empty()
}
}

impl From<String> for OctoBody {
fn from(buf: String) -> Self {
Self::new(http_body_util::Full::from(buf))
}
}

impl From<Vec<u8>> for OctoBody {
fn from(buf: Vec<u8>) -> Self {
Self::new(http_body_util::Full::from(buf))
}
}

impl From<Bytes> for OctoBody {
fn from(buf: Bytes) -> Self {
Self::new(http_body_util::Full::from(buf))
}
}

impl From<&'static str> for OctoBody {
fn from(buf: &'static str) -> Self {
Self::new(http_body_util::Full::from(buf))
}
}

impl http_body::Body for OctoBody {
type Data = Bytes;
type Error = crate::Error;

#[inline]
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let inner = Pin::into_inner(self);
let mut boxed_body = inner.0.write().expect("RwLock write lock failed");
Pin::new(&mut *boxed_body).poll_frame(cx)
}

#[inline]
fn size_hint(&self) -> http_body::SizeHint {
let b = self.0.read().expect("RwLock read lock failed");
b.size_hint()
}

#[inline]
fn is_end_stream(&self) -> bool {
let b = self.0.read().expect("RwLock read lock failed");
b.is_end_stream()
}
}

impl Clone for OctoBody {
fn clone(&self) -> Self {
OctoBody(Arc::clone(&self.0))
}
}
39 changes: 21 additions & 18 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@
#![cfg_attr(test, recursion_limit = "512")]

mod api;
mod body;
mod error;
mod from_response;
mod page;
Expand All @@ -191,6 +192,7 @@ pub mod models;
pub mod params;
pub mod service;

use body::OctoBody;
use chrono::{DateTime, Utc};
use http::{HeaderMap, HeaderValue, Method, Uri};
use http_body_util::combinators::BoxBody;
Expand Down Expand Up @@ -422,7 +424,7 @@ impl<Config, Auth> OctocrabBuilder<NoSvc, Config, Auth, NotLayerReady> {

impl<Svc, Config, Auth, B> OctocrabBuilder<Svc, Config, Auth, LayerReady>
where
Svc: Service<Request<String>, Response = Response<B>> + Send + 'static,
Svc: Service<Request<OctoBody>, Response = Response<B>> + Send + 'static,
Svc::Future: Send + 'static,
Svc::Error: Into<BoxError>,
B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
Expand Down Expand Up @@ -467,7 +469,7 @@ impl<Svc, Auth, LayerState> OctocrabBuilder<Svc, NoConfig, Auth, LayerState> {

impl<Svc, B, LayerState> OctocrabBuilder<Svc, NoConfig, AuthState, LayerState>
where
Svc: Service<Request<String>, Response = Response<B>> + Send + 'static,
Svc: Service<Request<OctoBody>, Response = Response<B>> + Send + 'static,
Svc::Future: Send + 'static,
Svc::Error: Into<BoxError>,
B: http_body::Body<Data = bytes::Bytes> + Send + Sync + 'static,
Expand Down Expand Up @@ -584,8 +586,8 @@ impl OctocrabBuilder<NoSvc, DefaultOctocrabBuilderConfig, NoAuth, NotLayerReady>
#[cfg(feature = "retry")]
pub fn set_connector_retry_service<S>(
&self,
connector: hyper_util::client::legacy::Client<S, String>,
) -> Retry<RetryConfig, hyper_util::client::legacy::Client<S, String>> {
connector: hyper_util::client::legacy::Client<S, OctoBody>,
) -> Retry<RetryConfig, hyper_util::client::legacy::Client<S, OctoBody>> {
let retry_layer = RetryLayer::new(self.config.retry_config.clone());

retry_layer.layer(connector)
Expand All @@ -610,7 +612,7 @@ impl OctocrabBuilder<NoSvc, DefaultOctocrabBuilderConfig, NoAuth, NotLayerReady>
/// Build a [`Client`] instance with the current [`Service`] stack.
#[cfg(feature = "default-client")]
pub fn build(self) -> Result<Octocrab> {
let client: hyper_util::client::legacy::Client<_, String> = {
let client: hyper_util::client::legacy::Client<_, OctoBody> = {
#[cfg(all(not(feature = "opentls"), not(feature = "rustls")))]
let mut connector = hyper::client::conn::http1::HttpConnector::new();

Expand Down Expand Up @@ -646,7 +648,7 @@ impl OctocrabBuilder<NoSvc, DefaultOctocrabBuilderConfig, NoAuth, NotLayerReady>

#[cfg(feature = "tracing")]
let client = TraceLayer::new_for_http()
.make_span_with(|req: &Request<String>| {
.make_span_with(|req: &Request<OctoBody>| {
tracing::debug_span!(
"HTTP",
http.method = %req.method(),
Expand All @@ -657,7 +659,7 @@ impl OctocrabBuilder<NoSvc, DefaultOctocrabBuilderConfig, NoAuth, NotLayerReady>
otel.status_code = tracing::field::Empty,
)
})
.on_request(|_req: &Request<String>, _span: &Span| {
.on_request(|_req: &Request<OctoBody>, _span: &Span| {
tracing::debug!("requesting");
})
.on_response(
Expand Down Expand Up @@ -913,8 +915,8 @@ pub enum AuthState {
}

pub type OctocrabService = Buffer<
BoxService<http::Request<String>, http::Response<BoxBody<Bytes, Error>>, BoxError>,
http::Request<String>,
BoxService<http::Request<OctoBody>, http::Response<BoxBody<Bytes, Error>>, BoxError>,
http::Request<OctoBody>,
>;

/// The GitHub API client.
Expand Down Expand Up @@ -954,7 +956,7 @@ impl Octocrab {
/// Creates a new `Octocrab`.
fn new<S>(service: S, auth_state: AuthState) -> Self
where
S: Service<Request<String>, Response = Response<BoxBody<Bytes, crate::Error>>>
S: Service<Request<OctoBody>, Response = Response<BoxBody<Bytes, crate::Error>>>
+ Send
+ 'static,
S::Future: Send + 'static,
Expand Down Expand Up @@ -1363,7 +1365,7 @@ impl Octocrab {
&self,
mut builder: Builder,
body: Option<&B>,
) -> Result<http::Request<String>> {
) -> Result<http::Request<OctoBody>> {
// Since Octocrab doesn't require streamable bodies(aka, file upload) because it is serde::Serialize),
// we can just use String body, since it is both http_body::Body(required by Hyper::Client), and Clone(required by BoxService).

Expand All @@ -1372,14 +1374,14 @@ impl Octocrab {

if let Some(body) = body {
builder = builder.header(http::header::CONTENT_TYPE, "application/json");
let request = builder
.body(serde_json::to_string(body).context(SerdeSnafu)?)
.context(HttpSnafu)?;
let serialized = serde_json::to_string(body).context(SerdeSnafu)?;
let body: OctoBody = serialized.into();
let request = builder.body(body).context(HttpSnafu)?;
Ok(request)
} else {
Ok(builder
.header(http::header::CONTENT_LENGTH, "0")
.body(String::new())
.body(OctoBody::empty())
.context(HttpSnafu)?)
}
}
Expand Down Expand Up @@ -1442,7 +1444,7 @@ impl Octocrab {
.method(http::Method::POST)
.uri(uri);
let response = self
.send(request.body("{}".to_string()).context(HttpSnafu)?)
.send(request.body("{}".into()).context(HttpSnafu)?)
.await?;
let _status = response.status();

Expand Down Expand Up @@ -1470,7 +1472,7 @@ impl Octocrab {
/// Send the given request to the underlying service
pub async fn send(
&self,
request: Request<String>,
request: Request<OctoBody>,
) -> Result<http::Response<BoxBody<Bytes, crate::Error>>> {
let mut svc = self.client.clone();
let response: Response<BoxBody<Bytes, crate::Error>> = svc
Expand All @@ -1496,9 +1498,10 @@ impl Octocrab {
/// Execute the given `request` using octocrab's Client.
pub async fn execute(
&self,
request: http::Request<String>,
request: http::Request<impl Into<OctoBody>>,
) -> Result<http::Response<BoxBody<Bytes, crate::Error>>> {
let (mut parts, body) = request.into_parts();
let body: OctoBody = body.into();
// Saved request that we can retry later if necessary
let auth_header: Option<HeaderValue> = match self.auth_state {
AuthState::None => None,
Expand Down
9 changes: 5 additions & 4 deletions src/service/middleware/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ use http::{Request, Response};
use hyper_util::client::legacy::Error;
use tower::retry::Policy;

use crate::body::OctoBody;

#[derive(Clone)]
pub enum RetryConfig {
None,
Simple(usize),
}

impl<B> Policy<Request<String>, Response<B>, Error> for RetryConfig {
impl<B> Policy<Request<OctoBody>, Response<B>, Error> for RetryConfig {
type Future = futures_util::future::Ready<Self>;

fn retry(
&self,
_req: &Request<String>,
_req: &Request<OctoBody>,
result: Result<&Response<B>, &Error>,
) -> Option<Self::Future> {
match self {
Expand Down Expand Up @@ -42,7 +44,7 @@ impl<B> Policy<Request<String>, Response<B>, Error> for RetryConfig {
}
}

fn clone_request(&self, req: &Request<String>) -> Option<Request<String>> {
fn clone_request(&self, req: &Request<OctoBody>) -> Option<Request<OctoBody>> {
match self {
RetryConfig::None => None,
_ => {
Expand All @@ -59,7 +61,6 @@ impl<B> Policy<Request<String>, Response<B>, Error> for RetryConfig {
let new_req = new_req.body(body).expect(
"This should never panic, as we are cloning a components from existing request",
);

Some(new_req)
}
}
Expand Down
Loading