Skip to content

Commit

Permalink
Introduce a module to group all streaming functionality.
Browse files Browse the repository at this point in the history
This makes the streaming functionallity more concise.
It aliases other functionality to keep backwards compatibility.

Signed-off-by: David Calavera <[email protected]>
  • Loading branch information
calavera committed Dec 13, 2023
1 parent 6f1a457 commit 0b803db
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 10 deletions.
15 changes: 8 additions & 7 deletions examples/basic-streaming-response/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use lambda_runtime::{body::Body, service_fn, Error, LambdaEvent, StreamResponse};
use lambda_runtime::{
service_fn,
streaming::{channel, Body, Response},
Error, LambdaEvent,
};
use serde_json::Value;
use std::{thread, time::Duration};

async fn func(_event: LambdaEvent<Value>) -> Result<StreamResponse<Body>, Error> {
async fn func(_event: LambdaEvent<Value>) -> Result<Response<Body>, Error> {
let messages = vec!["Hello", "world", "from", "Lambda!"];

let (mut tx, rx) = Body::channel();
let (mut tx, rx) = channel();

tokio::spawn(async move {
for message in messages.iter() {
Expand All @@ -14,10 +18,7 @@ async fn func(_event: LambdaEvent<Value>) -> Result<StreamResponse<Body>, Error>
}
});

Ok(StreamResponse {
metadata_prelude: Default::default(),
stream: rx,
})
Ok(Response::from(rx))
}

#[tokio::main]
Expand Down
2 changes: 0 additions & 2 deletions lambda-events/src/event/bedrock_agent_runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ pub struct Agent {

#[cfg(test)]
mod tests {
use serde_json;

#[test]
#[cfg(feature = "bedrock-agent-runtime")]
fn example_bedrock_agent__runtime_event() {
Expand Down
2 changes: 1 addition & 1 deletion lambda-runtime-api-client/src/body/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ macro_rules! ready {
}

mod channel;
mod sender;
pub mod sender;
mod watch;

type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, Error>;
Expand Down
2 changes: 2 additions & 0 deletions lambda-runtime-api-client/src/body/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,13 @@ impl Sender {
.map_err(|err| err.into_inner().expect("just sent Ok"))
}

/// Send a `SenderError::BodyWriteAborted` error and terminate the stream.
#[allow(unused)]
pub fn abort(mut self) {
self.send_error(Error::new(SenderError::BodyWriteAborted));
}

/// Terminate the stream with an error.
pub fn send_error(&mut self, err: Error) {
let _ = self
.data_tx
Expand Down
2 changes: 2 additions & 0 deletions lambda-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ mod deserializer;
mod requests;
#[cfg(test)]
mod simulated;
/// Utilities for Lambda Streaming functions.
pub mod streaming;
/// Types available to a Lambda function.
mod types;

Expand Down
35 changes: 35 additions & 0 deletions lambda-runtime/src/streaming.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
pub use lambda_runtime_api_client::body::{sender::Sender, Body};

pub use crate::types::StreamResponse as Response;

/// Create a new `Body` stream with associated Sender half.
///
/// Examples
///
/// ```
/// use lambda_runtime::{
/// streaming::{channel, Body, Response},
/// Error, LambdaEvent,
/// };
/// use std::{thread, time::Duration};
///
/// async fn func(_event: LambdaEvent<serde_json::Value>) -> Result<Response<Body>, Error> {
/// let messages = vec!["Hello", "world", "from", "Lambda!"];
///
/// let (mut tx, rx) = channel();
///
/// tokio::spawn(async move {
/// for message in messages.iter() {
/// tx.send_data((message.to_string() + "\n").into()).await.unwrap();
/// thread::sleep(Duration::from_millis(500));
/// }
/// });
///
/// Ok(Response::from(rx))
/// }
/// ```
#[allow(unused)]
#[inline]
pub fn channel() -> (Sender, Body) {
Body::channel()
}
14 changes: 14 additions & 0 deletions lambda-runtime/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,20 @@ where
}
}

impl<S, D, E> From<S> for StreamResponse<S>
where
S: Stream<Item = Result<D, E>> + Unpin + Send + 'static,
D: Into<Bytes> + Send,
E: Into<Error> + Send + Debug,
{
fn from(value: S) -> Self {
StreamResponse {
metadata_prelude: Default::default(),
stream: value,
}
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down

0 comments on commit 0b803db

Please sign in to comment.