From 6fbf12cbb3167e0f301a52cef643ded5b5c2ce50 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Mon, 5 Oct 2020 13:14:37 +0200 Subject: [PATCH] ref(server): Bring back upstream response transformer --- relay-server/src/actors/upstream.rs | 87 ++++++++++++++++++++++++----- 1 file changed, 72 insertions(+), 15 deletions(-) diff --git a/relay-server/src/actors/upstream.rs b/relay-server/src/actors/upstream.rs index a4723213808..d41a3e5edf0 100644 --- a/relay-server/src/actors/upstream.rs +++ b/relay-server/src/actors/upstream.rs @@ -828,10 +828,42 @@ where } } -pub struct SendRequest { +pub trait ResponseTransformer: 'static { + type Result: 'static; + + fn transform_response(self, _: ClientResponse) -> Self::Result; +} + +impl ResponseTransformer for () { + type Result = ResponseFuture<(), UpstreamRequestError>; + + fn transform_response(self, response: ClientResponse) -> Self::Result { + // consume response bodies to allow connection keep-alive + let future = response + .payload() + .for_each(|_| Ok(())) + .map_err(UpstreamRequestError::PayloadFailed); + + Box::new(future) + } +} + +impl ResponseTransformer for F +where + F: FnOnce(ClientResponse) -> T + 'static, +{ + type Result = T; + + fn transform_response(self, response: ClientResponse) -> Self::Result { + self(response) + } +} + +pub struct SendRequest { method: Method, path: String, builder: B, + transformer: T, retry: bool, } @@ -841,6 +873,7 @@ impl SendRequest { method, path: path.into(), builder: (), + transformer: (), retry: true, } } @@ -850,39 +883,67 @@ impl SendRequest { } } -impl SendRequest { - pub fn build(self, callback: F) -> SendRequest +impl SendRequest { + pub fn build(self, callback: F) -> SendRequest where F: FnMut(&mut ClientRequestBuilder) -> Result + 'static, { SendRequest { method: self.method, path: self.path, - retry: self.retry, builder: callback, + transformer: self.transformer, + retry: self.retry, + } + } + + #[allow(dead_code)] + pub fn transform(self, callback: F) -> SendRequest + where + F: FnOnce(ClientResponse) -> R, + { + SendRequest { + method: self.method, + path: self.path, + builder: self.builder, + transformer: callback, + retry: self.retry, } } } -impl Message for SendRequest { - type Result = Result<(), UpstreamRequestError>; +impl Message for SendRequest +where + R: ResponseTransformer, + R::Result: IntoFuture, +{ + type Result = Result; } +// impl Message for SendRequest { +// type Result = Result<(), UpstreamRequestError>; +// } + /// SendRequest messages represent external messages that need to be sent to the upstream server /// and do not use Relay authentication. /// /// The handler adds the message to one of the message queues. -impl Handler> for UpstreamRelay +impl Handler> for UpstreamRelay where B: RequestBuilder + Send, + R: ResponseTransformer, + R::Result: IntoFuture, + T: Send, + E: From + Send, { - type Result = ResponseFuture<(), UpstreamRequestError>; + type Result = ResponseFuture; - fn handle(&mut self, message: SendRequest, ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, message: SendRequest, ctx: &mut Self::Context) -> Self::Result { let SendRequest { method, path, mut builder, + transformer, retry, } = message; @@ -895,12 +956,8 @@ where move |b| builder.build_request(b), ctx, ) - .and_then(|client_response| { - client_response - .payload() - .for_each(|_| Ok(())) - .map_err(UpstreamRequestError::PayloadFailed) - }); + .from_err() + .and_then(move |r| transformer.transform_response(r)); Box::new(future) }