From 32140d89670ef2a092c74dff4fb139c83339dab8 Mon Sep 17 00:00:00 2001 From: Stanislas Polu Date: Sat, 1 Jun 2024 12:31:11 +0200 Subject: [PATCH 1/9] feat: return Response on UnexpectedResponse so that the body can be parsed for errors --- eventsource-client/src/client.rs | 2 +- eventsource-client/src/error.rs | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/eventsource-client/src/client.rs b/eventsource-client/src/client.rs index f8220b2..0ca6d02 100644 --- a/eventsource-client/src/client.rs +++ b/eventsource-client/src/client.rs @@ -467,7 +467,7 @@ where self.as_mut().reset_redirects(); self.as_mut().project().state.set(State::New); - return Poll::Ready(Some(Err(Error::UnexpectedResponse(resp.status())))); + return Poll::Ready(Some(Err(Error::UnexpectedResponse(resp)))); } Err(e) => { // This seems basically impossible. AFAIK we can only get this way if we diff --git a/eventsource-client/src/error.rs b/eventsource-client/src/error.rs index 80e4ccb..24cfdf3 100644 --- a/eventsource-client/src/error.rs +++ b/eventsource-client/src/error.rs @@ -1,4 +1,4 @@ -use hyper::StatusCode; +use hyper::{Body, Response}; /// Error type returned from this library's functions. #[derive(Debug)] @@ -8,7 +8,7 @@ pub enum Error { /// An invalid request parameter InvalidParameter(Box), /// The HTTP response could not be handled. - UnexpectedResponse(StatusCode), + UnexpectedResponse(Response), /// An error reading from the HTTP response body. HttpStream(Box), /// The HTTP response stream ended @@ -32,7 +32,10 @@ impl std::fmt::Display for Error { TimedOut => write!(f, "timed out"), StreamClosed => write!(f, "stream closed"), InvalidParameter(err) => write!(f, "invalid parameter: {err}"), - UnexpectedResponse(status_code) => write!(f, "unexpected response: {status_code}"), + UnexpectedResponse(resp) => { + let status = resp.status(); + write!(f, "unexpected response: {status}") + } HttpStream(err) => write!(f, "http error: {err}"), Eof => write!(f, "eof"), UnexpectedEof => write!(f, "unexpected eof"), From b1bc711c008f873ecb54f174a122edd478b9a0a3 Mon Sep 17 00:00:00 2001 From: Stanislas Polu Date: Sun, 2 Jun 2024 15:45:40 +0200 Subject: [PATCH 2/9] wrapper to simplify hyper stuff --- eventsource-client/src/client.rs | 7 ++++-- eventsource-client/src/error.rs | 40 ++++++++++++++++++++++++++++---- 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/eventsource-client/src/client.rs b/eventsource-client/src/client.rs index 0ca6d02..8ee7077 100644 --- a/eventsource-client/src/client.rs +++ b/eventsource-client/src/client.rs @@ -27,8 +27,8 @@ use tokio::{ time::Sleep, }; -use crate::config::ReconnectOptions; use crate::error::{Error, Result}; +use crate::{config::ReconnectOptions, ResponseWrapper}; use hyper::client::HttpConnector; use hyper_timeout::TimeoutConnector; @@ -467,7 +467,10 @@ where self.as_mut().reset_redirects(); self.as_mut().project().state.set(State::New); - return Poll::Ready(Some(Err(Error::UnexpectedResponse(resp)))); + + return Poll::Ready(Some(Err(Error::UnexpectedResponse( + ResponseWrapper::new(resp), + )))); } Err(e) => { // This seems basically impossible. AFAIK we can only get this way if we diff --git a/eventsource-client/src/error.rs b/eventsource-client/src/error.rs index 24cfdf3..053c28d 100644 --- a/eventsource-client/src/error.rs +++ b/eventsource-client/src/error.rs @@ -1,4 +1,36 @@ -use hyper::{Body, Response}; +use hyper::{body::Buf, Body, Response}; + +pub struct ResponseWrapper { + response: Response, +} + +impl ResponseWrapper { + pub fn new(response: Response) -> Self { + Self { response } + } + pub fn status(&self) -> hyper::http::StatusCode { + self.response.status() + } + + pub async fn body_bytes(self) -> Result> { + let body = self.response.into_body(); + + let buf = match hyper::body::aggregate(body).await { + Ok(buf) => buf, + Err(err) => return Err(Error::HttpStream(Box::new(err))), + }; + + Ok(buf.chunk().to_vec()) + } +} + +impl std::fmt::Debug for ResponseWrapper { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ResponseWrapper") + .field("status", &self.status()) + .finish() + } +} /// Error type returned from this library's functions. #[derive(Debug)] @@ -8,7 +40,7 @@ pub enum Error { /// An invalid request parameter InvalidParameter(Box), /// The HTTP response could not be handled. - UnexpectedResponse(Response), + UnexpectedResponse(ResponseWrapper), /// An error reading from the HTTP response body. HttpStream(Box), /// The HTTP response stream ended @@ -32,8 +64,8 @@ impl std::fmt::Display for Error { TimedOut => write!(f, "timed out"), StreamClosed => write!(f, "stream closed"), InvalidParameter(err) => write!(f, "invalid parameter: {err}"), - UnexpectedResponse(resp) => { - let status = resp.status(); + UnexpectedResponse(r) => { + let status = r.status(); write!(f, "unexpected response: {status}") } HttpStream(err) => write!(f, "http error: {err}"), From dbd95fa1a05864f748ed90e9faa9aa3fd721f2e0 Mon Sep 17 00:00:00 2001 From: Stanislas Polu Date: Sun, 2 Jun 2024 16:04:25 +0200 Subject: [PATCH 3/9] Status as u16 --- eventsource-client/src/error.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eventsource-client/src/error.rs b/eventsource-client/src/error.rs index 053c28d..4f36e7d 100644 --- a/eventsource-client/src/error.rs +++ b/eventsource-client/src/error.rs @@ -8,8 +8,8 @@ impl ResponseWrapper { pub fn new(response: Response) -> Self { Self { response } } - pub fn status(&self) -> hyper::http::StatusCode { - self.response.status() + pub fn status(&self) -> u16 { + self.response.status().as_u16() } pub async fn body_bytes(self) -> Result> { From c8771d3b16fb65cab2f04f367e561eb9cde8e94a Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Mon, 3 Jun 2024 10:17:56 +0200 Subject: [PATCH 4/9] enh: expose headers in ResponseWrapper --- eventsource-client/src/error.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/eventsource-client/src/error.rs b/eventsource-client/src/error.rs index 4f36e7d..2585e47 100644 --- a/eventsource-client/src/error.rs +++ b/eventsource-client/src/error.rs @@ -11,6 +11,9 @@ impl ResponseWrapper { pub fn status(&self) -> u16 { self.response.status().as_u16() } + pub fn headers(&self) -> &hyper::header::HeaderMap { + self.response.headers() + } pub async fn body_bytes(self) -> Result> { let body = self.response.into_body(); From 470c8fd8589097e0a4e25a3f5bc1d4fcaf4c93c1 Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Mon, 3 Jun 2024 10:38:51 +0200 Subject: [PATCH 5/9] fix: don't expose hyper HeaderMap --- eventsource-client/src/error.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/eventsource-client/src/error.rs b/eventsource-client/src/error.rs index 2585e47..1b6f101 100644 --- a/eventsource-client/src/error.rs +++ b/eventsource-client/src/error.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use hyper::{body::Buf, Body, Response}; pub struct ResponseWrapper { @@ -11,8 +13,18 @@ impl ResponseWrapper { pub fn status(&self) -> u16 { self.response.status().as_u16() } - pub fn headers(&self) -> &hyper::header::HeaderMap { - self.response.headers() + pub fn headers(&self) -> Result> { + let headers = self.response.headers(); + let mut map = HashMap::new(); + for (key, value) in headers.iter() { + let key = key.as_str(); + let value = match value.to_str() { + Ok(value) => value, + Err(err) => return Err(Error::InvalidParameter(Box::new(err))), + }; + map.insert(key, value); + } + Ok(map) } pub async fn body_bytes(self) -> Result> { From b0a09838ebd8b43218800d70dd89eab98ab84f3f Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Mon, 3 Jun 2024 10:53:49 +0200 Subject: [PATCH 6/9] add error type --- eventsource-client/src/error.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/eventsource-client/src/error.rs b/eventsource-client/src/error.rs index 1b6f101..ede60b6 100644 --- a/eventsource-client/src/error.rs +++ b/eventsource-client/src/error.rs @@ -20,7 +20,7 @@ impl ResponseWrapper { let key = key.as_str(); let value = match value.to_str() { Ok(value) => value, - Err(err) => return Err(Error::InvalidParameter(Box::new(err))), + Err(err) => return Err(Error::InvalidResponseHeader(Box::new(err))), }; map.insert(key, value); } @@ -70,6 +70,8 @@ pub enum Error { MalformedLocationHeader(Box), /// Reached maximum redirect limit after encountering Location headers. MaxRedirectLimitReached(u32), + // Invalid response header. + InvalidResponseHeader(Box), } impl std::fmt::Display for Error { @@ -90,6 +92,7 @@ impl std::fmt::Display for Error { InvalidEvent => write!(f, "invalid event"), MalformedLocationHeader(err) => write!(f, "malformed header: {err}"), MaxRedirectLimitReached(limit) => write!(f, "maximum redirect limit reached: {limit}"), + InvalidResponseHeader(err) => write!(f, "invalid response header: {err}"), } } } From 8d0cf7fa0aec2a859be7fe31d4c2da7eb39db666 Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Mon, 3 Jun 2024 11:05:39 +0200 Subject: [PATCH 7/9] fix: move header error out of enum --- eventsource-client/src/error.rs | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/eventsource-client/src/error.rs b/eventsource-client/src/error.rs index ede60b6..e5fac41 100644 --- a/eventsource-client/src/error.rs +++ b/eventsource-client/src/error.rs @@ -13,14 +13,14 @@ impl ResponseWrapper { pub fn status(&self) -> u16 { self.response.status().as_u16() } - pub fn headers(&self) -> Result> { + pub fn headers(&self) -> std::result::Result, HeaderError> { let headers = self.response.headers(); let mut map = HashMap::new(); for (key, value) in headers.iter() { let key = key.as_str(); let value = match value.to_str() { Ok(value) => value, - Err(err) => return Err(Error::InvalidResponseHeader(Box::new(err))), + Err(err) => return Err(HeaderError::new(Box::new(err))), }; map.insert(key, value); } @@ -70,8 +70,6 @@ pub enum Error { MalformedLocationHeader(Box), /// Reached maximum redirect limit after encountering Location headers. MaxRedirectLimitReached(u32), - // Invalid response header. - InvalidResponseHeader(Box), } impl std::fmt::Display for Error { @@ -92,7 +90,6 @@ impl std::fmt::Display for Error { InvalidEvent => write!(f, "invalid event"), MalformedLocationHeader(err) => write!(f, "malformed header: {err}"), MaxRedirectLimitReached(limit) => write!(f, "maximum redirect limit reached: {limit}"), - InvalidResponseHeader(err) => write!(f, "invalid response header: {err}"), } } } @@ -128,3 +125,29 @@ impl Error { } pub type Result = std::result::Result; + +/// Error type for invalid response headers encountered in ResponseWrapper. +#[derive(Debug)] +pub struct HeaderError { + /// Wrapped inner error providing details about the header issue. + inner_error: Box, +} + +impl HeaderError { + /// Constructs a new `HeaderError` wrapping an existing error. + pub fn new(err: Box) -> Self { + HeaderError { inner_error: err } + } +} + +impl std::fmt::Display for HeaderError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Invalid response header: {}", self.inner_error) + } +} + +impl std::error::Error for HeaderError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + Some(self.inner_error.as_ref()) + } +} From 54af18f82c99783bc978be436577c5ccb37845fb Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Mon, 3 Jun 2024 12:22:05 +0200 Subject: [PATCH 8/9] move new error struct close to ResponseWrapper --- eventsource-client/src/error.rs | 52 ++++++++++++++++----------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/eventsource-client/src/error.rs b/eventsource-client/src/error.rs index e5fac41..00bb912 100644 --- a/eventsource-client/src/error.rs +++ b/eventsource-client/src/error.rs @@ -47,6 +47,32 @@ impl std::fmt::Debug for ResponseWrapper { } } +/// Error type for invalid response headers encountered in ResponseWrapper. +#[derive(Debug)] +pub struct HeaderError { + /// Wrapped inner error providing details about the header issue. + inner_error: Box, +} + +impl HeaderError { + /// Constructs a new `HeaderError` wrapping an existing error. + pub fn new(err: Box) -> Self { + HeaderError { inner_error: err } + } +} + +impl std::fmt::Display for HeaderError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Invalid response header: {}", self.inner_error) + } +} + +impl std::error::Error for HeaderError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + Some(self.inner_error.as_ref()) + } +} + /// Error type returned from this library's functions. #[derive(Debug)] pub enum Error { @@ -125,29 +151,3 @@ impl Error { } pub type Result = std::result::Result; - -/// Error type for invalid response headers encountered in ResponseWrapper. -#[derive(Debug)] -pub struct HeaderError { - /// Wrapped inner error providing details about the header issue. - inner_error: Box, -} - -impl HeaderError { - /// Constructs a new `HeaderError` wrapping an existing error. - pub fn new(err: Box) -> Self { - HeaderError { inner_error: err } - } -} - -impl std::fmt::Display for HeaderError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Invalid response header: {}", self.inner_error) - } -} - -impl std::error::Error for HeaderError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - Some(self.inner_error.as_ref()) - } -} From a04669b61f3c332a4176f9fa4ddca75c875497ad Mon Sep 17 00:00:00 2001 From: Stanislas Polu Date: Mon, 10 Jun 2024 14:06:09 +0200 Subject: [PATCH 9/9] SSE::Connected event to return status an headers --- contract-tests/src/bin/sse-test-api/main.rs | 17 ++++++++++++++--- eventsource-client/examples/tail.rs | 3 +++ eventsource-client/src/client.rs | 18 +++++++++++++++++- eventsource-client/src/event_parser.rs | 7 ++++++- eventsource-client/src/lib.rs | 3 ++- 5 files changed, 42 insertions(+), 6 deletions(-) diff --git a/contract-tests/src/bin/sse-test-api/main.rs b/contract-tests/src/bin/sse-test-api/main.rs index 0cea4bd..22f4e78 100644 --- a/contract-tests/src/bin/sse-test-api/main.rs +++ b/contract-tests/src/bin/sse-test-api/main.rs @@ -50,14 +50,25 @@ struct Config { #[derive(Serialize, Debug)] #[serde(tag = "kind", rename_all = "camelCase")] enum EventType { - Event { event: Event }, - Comment { comment: String }, - Error { error: String }, + Connected { + status: u16, + headers: HashMap, + }, + Event { + event: Event, + }, + Comment { + comment: String, + }, + Error { + error: String, + }, } impl From for EventType { fn from(event: es::SSE) -> Self { match event { + es::SSE::Connected((status, headers)) => Self::Connected { status, headers }, es::SSE::Event(evt) => Self::Event { event: Event { event_type: evt.event_type, diff --git a/eventsource-client/examples/tail.rs b/eventsource-client/examples/tail.rs index 5df399c..77c2c05 100644 --- a/eventsource-client/examples/tail.rs +++ b/eventsource-client/examples/tail.rs @@ -40,6 +40,9 @@ fn tail_events(client: impl es::Client) -> impl Stream> { client .stream() .map_ok(|event| match event { + es::SSE::Connected((status, _)) => { + println!("got connected: \nstatus={}", status) + } es::SSE::Event(ev) => { println!("got an event: {}\n{}", ev.event_type, ev.data) } diff --git a/eventsource-client/src/client.rs b/eventsource-client/src/client.rs index 8ee7077..79ffc1e 100644 --- a/eventsource-client/src/client.rs +++ b/eventsource-client/src/client.rs @@ -13,6 +13,7 @@ use log::{debug, info, trace, warn}; use pin_project::pin_project; use std::{ boxed, + collections::HashMap, fmt::{self, Debug, Display, Formatter}, future::Future, io::ErrorKind, @@ -393,6 +394,7 @@ where let this = self.as_mut().project(); if let Some(event) = this.event_parser.get_event() { return match event { + SSE::Connected(_) => Poll::Ready(Some(Ok(event))), SSE::Event(ref evt) => { *this.last_event_id = evt.id.clone(); @@ -438,11 +440,25 @@ where if resp.status().is_success() { self.as_mut().project().retry_strategy.reset(Instant::now()); self.as_mut().reset_redirects(); + + let headers = resp.headers(); + let mut map = HashMap::new(); + for (key, value) in headers.iter() { + let key = key.to_string(); + let value = match value.to_str() { + Ok(value) => value.to_string(), + Err(_) => String::from(""), + }; + map.insert(key, value); + } + let status = resp.status().as_u16(); + self.as_mut() .project() .state .set(State::Connected(resp.into_body())); - continue; + + return Poll::Ready(Some(Ok(SSE::Connected((status, map))))); } if resp.status() == 301 || resp.status() == 307 { diff --git a/eventsource-client/src/event_parser.rs b/eventsource-client/src/event_parser.rs index 4e66891..0920be3 100644 --- a/eventsource-client/src/event_parser.rs +++ b/eventsource-client/src/event_parser.rs @@ -1,4 +1,8 @@ -use std::{collections::VecDeque, convert::TryFrom, str::from_utf8}; +use std::{ + collections::{HashMap, VecDeque}, + convert::TryFrom, + str::from_utf8, +}; use hyper::body::Bytes; use log::{debug, log_enabled, trace}; @@ -32,6 +36,7 @@ impl EventData { #[derive(Debug, Eq, PartialEq)] pub enum SSE { + Connected((u16, HashMap)), Event(Event), Comment(String), } diff --git a/eventsource-client/src/lib.rs b/eventsource-client/src/lib.rs index 9865f39..7677f4f 100644 --- a/eventsource-client/src/lib.rs +++ b/eventsource-client/src/lib.rs @@ -14,7 +14,8 @@ //! let mut stream = Box::pin(client.stream()) //! .map_ok(|event| match event { //! SSE::Comment(comment) => println!("got a comment event: {:?}", comment), -//! SSE::Event(evt) => println!("got an event: {}", evt.event_type) +//! SSE::Event(evt) => println!("got an event: {}", evt.event_type), +//! SSE::Connected(_) => println!("got connected") //! }) //! .map_err(|e| println!("error streaming events: {:?}", e)); //! # while let Ok(Some(_)) = stream.try_next().await {}