diff --git a/examples/basic-streaming-response/src/main.rs b/examples/basic-streaming-response/src/main.rs index 25b7ddd6..c8932554 100644 --- a/examples/basic-streaming-response/src/main.rs +++ b/examples/basic-streaming-response/src/main.rs @@ -1,11 +1,15 @@ -use lambda_runtime::{body::Body, service_fn, Error, LambdaEvent, StreamResponse}; +use lambda_runtime::{ + service_fn, + streaming::{channel, Body, Response}, + Error, LambdaEvent, +}; use serde_json::Value; use std::{thread, time::Duration}; -async fn func(_event: LambdaEvent) -> Result, Error> { +async fn func(_event: LambdaEvent) -> Result, Error> { let messages = vec!["Hello", "world", "from", "Lambda!"]; - let (mut tx, rx) = Body::channel(); + let (mut tx, rx) = channel(); tokio::spawn(async move { for message in messages.iter() { @@ -14,10 +18,7 @@ async fn func(_event: LambdaEvent) -> Result, Error> } }); - Ok(StreamResponse { - metadata_prelude: Default::default(), - stream: rx, - }) + Ok(Response::from(rx)) } #[tokio::main] diff --git a/lambda-events/src/event/bedrock_agent_runtime/mod.rs b/lambda-events/src/event/bedrock_agent_runtime/mod.rs index 836d2f95..cf84d4d3 100644 --- a/lambda-events/src/event/bedrock_agent_runtime/mod.rs +++ b/lambda-events/src/event/bedrock_agent_runtime/mod.rs @@ -82,8 +82,6 @@ pub struct Agent { #[cfg(test)] mod tests { - use serde_json; - #[test] #[cfg(feature = "bedrock-agent-runtime")] fn example_bedrock_agent__runtime_event() { diff --git a/lambda-runtime-api-client/src/body/mod.rs b/lambda-runtime-api-client/src/body/mod.rs index e56f7727..9faf9998 100644 --- a/lambda-runtime-api-client/src/body/mod.rs +++ b/lambda-runtime-api-client/src/body/mod.rs @@ -21,7 +21,7 @@ macro_rules! ready { } mod channel; -mod sender; +pub mod sender; mod watch; type BoxBody = http_body_util::combinators::UnsyncBoxBody; diff --git a/lambda-runtime-api-client/src/body/sender.rs b/lambda-runtime-api-client/src/body/sender.rs index 3fb71b02..0e008454 100644 --- a/lambda-runtime-api-client/src/body/sender.rs +++ b/lambda-runtime-api-client/src/body/sender.rs @@ -96,11 +96,13 @@ impl Sender { .map_err(|err| err.into_inner().expect("just sent Ok")) } + /// Send a `SenderError::BodyWriteAborted` error and terminate the stream. #[allow(unused)] pub fn abort(mut self) { self.send_error(Error::new(SenderError::BodyWriteAborted)); } + /// Terminate the stream with an error. pub fn send_error(&mut self, err: Error) { let _ = self .data_tx diff --git a/lambda-runtime/src/lib.rs b/lambda-runtime/src/lib.rs index 938bd318..e048bbd2 100644 --- a/lambda-runtime/src/lib.rs +++ b/lambda-runtime/src/lib.rs @@ -30,6 +30,8 @@ mod deserializer; mod requests; #[cfg(test)] mod simulated; +/// Utilities for Lambda Streaming functions. +pub mod streaming; /// Types available to a Lambda function. mod types; diff --git a/lambda-runtime/src/streaming.rs b/lambda-runtime/src/streaming.rs new file mode 100644 index 00000000..4f0c8083 --- /dev/null +++ b/lambda-runtime/src/streaming.rs @@ -0,0 +1,35 @@ +pub use lambda_runtime_api_client::body::{sender::Sender, Body}; + +pub use crate::types::StreamResponse as Response; + +/// Create a new `Body` stream with associated Sender half. +/// +/// Examples +/// +/// ``` +/// use lambda_runtime::{ +/// streaming::{channel, Body, Response}, +/// Error, LambdaEvent, +/// }; +/// use std::{thread, time::Duration}; +/// +/// async fn func(_event: LambdaEvent) -> Result, Error> { +/// let messages = vec!["Hello", "world", "from", "Lambda!"]; +/// +/// let (mut tx, rx) = channel(); +/// +/// tokio::spawn(async move { +/// for message in messages.iter() { +/// tx.send_data((message.to_string() + "\n").into()).await.unwrap(); +/// thread::sleep(Duration::from_millis(500)); +/// } +/// }); +/// +/// Ok(Response::from(rx)) +/// } +/// ``` +#[allow(unused)] +#[inline] +pub fn channel() -> (Sender, Body) { + Body::channel() +} diff --git a/lambda-runtime/src/types.rs b/lambda-runtime/src/types.rs index 98faa49a..f2a36073 100644 --- a/lambda-runtime/src/types.rs +++ b/lambda-runtime/src/types.rs @@ -272,6 +272,20 @@ where } } +impl From for StreamResponse +where + S: Stream> + Unpin + Send + 'static, + D: Into + Send, + E: Into + Send + Debug, +{ + fn from(value: S) -> Self { + StreamResponse { + metadata_prelude: Default::default(), + stream: value, + } + } +} + #[cfg(test)] mod test { use super::*;