From ccd3027baa7f4346b57b19ad00fda19ae5288e32 Mon Sep 17 00:00:00 2001 From: "Daniel P. Purkhus" Date: Tue, 5 Sep 2023 15:38:02 +0000 Subject: [PATCH] Allow running handler with a callback ## What? This is a pretty naive approach that allows the end user to supply a callback that will be invoked once a single event loop has been finished. ## Why? Because currently there is no good way(that I know of) to flush logs/traces/metrics. By having a callback that is called after all event handling is finished allows the end user to do a cleanup at the end of each lambda invocation. --- .../basic-lambda-with-callback/Cargo.toml | 19 ++++++ examples/basic-lambda-with-callback/README.md | 11 ++++ .../basic-lambda-with-callback/src/main.rs | 61 +++++++++++++++++ lambda-runtime/src/lib.rs | 65 +++++++++++++++++-- 4 files changed, 151 insertions(+), 5 deletions(-) create mode 100644 examples/basic-lambda-with-callback/Cargo.toml create mode 100644 examples/basic-lambda-with-callback/README.md create mode 100644 examples/basic-lambda-with-callback/src/main.rs diff --git a/examples/basic-lambda-with-callback/Cargo.toml b/examples/basic-lambda-with-callback/Cargo.toml new file mode 100644 index 00000000..a320da2b --- /dev/null +++ b/examples/basic-lambda-with-callback/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "basic-lambda-with-callback" +version = "0.1.0" +edition = "2021" + + +# Use cargo-edit(https://github.com/killercup/cargo-edit#installation) +# to manage dependencies. +# Running `cargo add DEPENDENCY_NAME` will +# add the latest version of a dependency to the list, +# and it will keep the alphabetic ordering for you. + +[dependencies] +lambda_runtime = { path = "../../lambda-runtime" } +serde = "1.0.136" +tokio = { version = "1", features = ["macros"] } +tracing = { version = "0.1", features = ["log"] } +tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } +tokio-test = "0.4.2" diff --git a/examples/basic-lambda-with-callback/README.md b/examples/basic-lambda-with-callback/README.md new file mode 100644 index 00000000..498f8a50 --- /dev/null +++ b/examples/basic-lambda-with-callback/README.md @@ -0,0 +1,11 @@ +# AWS Lambda Function example + +## Build & Deploy + +1. Install [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation) +2. Build the function with `cargo lambda build --release` +3. Deploy the function to AWS Lambda with `cargo lambda deploy --iam-role YOUR_ROLE` + +## Build for ARM 64 + +Build the function with `cargo lambda build --release --arm64` diff --git a/examples/basic-lambda-with-callback/src/main.rs b/examples/basic-lambda-with-callback/src/main.rs new file mode 100644 index 00000000..c3b505f2 --- /dev/null +++ b/examples/basic-lambda-with-callback/src/main.rs @@ -0,0 +1,61 @@ +// This example requires the following input to succeed: +// { "command": "do something" } +use std::sync::Arc; + +use lambda_runtime::{service_fn, Error, LambdaEvent}; +use serde::{Deserialize, Serialize}; + +/// This is also a made-up example. Requests come into the runtime as unicode +/// strings in json format, which can map to any structure that implements `serde::Deserialize` +/// The runtime pays no attention to the contents of the request payload. +#[derive(Deserialize)] +struct Request { + command: String, +} + +/// This is a made-up example of what a response structure may look like. +/// There is no restriction on what it can be. The runtime requires responses +/// to be serialized into json. The runtime pays no attention +/// to the contents of the response payload. +#[derive(Serialize)] +struct Response { + req_id: String, + msg: String, +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + // required to enable CloudWatch error logging by the runtime + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + // disable printing the name of the module in every log line. + .with_target(false) + // disabling time is handy because CloudWatch will add the ingestion time. + .without_time() + .init(); + + // Since the Lambda is fozen in between invocations you can use this callback + // to e.g. flush traces/logs/metrics + let cleanup = Arc::new(|| { + println!("Callback called!"); + }); + + + let func = service_fn(my_handler); + lambda_runtime::run_with_callback(func, cleanup).await?; + Ok(()) +} + +pub(crate) async fn my_handler(event: LambdaEvent) -> Result { + // extract some useful info from the request + let command = event.payload.command; + + // prepare the response + let resp = Response { + req_id: event.context.request_id, + msg: format!("Command {} executed.", command), + }; + + // return `Response` (it will be serialized to JSON automatically by the runtime) + Ok(resp) +} diff --git a/lambda-runtime/src/lib.rs b/lambda-runtime/src/lib.rs index e3ffd49d..a01d2cb7 100644 --- a/lambda-runtime/src/lib.rs +++ b/lambda-runtime/src/lib.rs @@ -20,7 +20,7 @@ use std::{ env, fmt::{self, Debug, Display}, future::Future, - panic, + panic, sync::Arc, }; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_stream::{Stream, StreamExt}; @@ -101,6 +101,7 @@ where &self, incoming: impl Stream, Error>> + Send, mut handler: F, + callback: Option> ) -> Result<(), Error> where F: Service>, @@ -202,7 +203,13 @@ where } .instrument(request_span) .await?; + + if let Some(callback) = callback.clone() { + callback(); + } } + + Ok(()) } } @@ -258,7 +265,52 @@ where let client = &runtime.client; let incoming = incoming(client); - runtime.run(incoming, handler).await + let callback : Option>= None; + runtime.run(incoming, handler, callback).await +} + +/// Starts the Lambda Rust runtime and begins polling for events on the [Lambda +/// Runtime APIs](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html). +/// +/// The callback function is called at the end of a single invocation of the runtime. +/// +/// # Example +/// ```no_run +/// use lambda_runtime::{Error, service_fn, LambdaEvent}; +/// use serde_json::Value; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Error> { +/// let func = service_fn(func); +/// lambda_runtime::run_with_callback(func, callback_func).await?; +/// Ok(()) +/// } +/// +/// async fn func(event: LambdaEvent) -> Result { +/// Ok(event.payload) +/// } +/// +/// async fn callback_func() -> () { +/// println!("Callback function called!"); +/// () +/// } +/// ``` +pub async fn run_with_callback(handler: F, callback: Arc) -> Result<(), Error> +where + F: Service>, + F::Future: Future>, + F::Error: fmt::Debug + fmt::Display, + A: for<'de> Deserialize<'de>, + B: Serialize, +{ + trace!("Loading config from env"); + let config = Config::from_env()?; + let client = Client::builder().build().expect("Unable to create a runtime client"); + let runtime = Runtime { client, config }; + + let client = &runtime.client; + let incoming = incoming(client); + runtime.run(incoming, handler, Some(callback)).await } fn type_name_of_val(_: T) -> &'static str { @@ -293,7 +345,7 @@ mod endpoint_tests { use lambda_runtime_api_client::Client; use serde_json::json; use simulated::DuplexStreamWrapper; - use std::{convert::TryFrom, env}; + use std::{convert::TryFrom, env, sync::Arc}; use tokio::{ io::{self, AsyncRead, AsyncWrite}, select, @@ -525,7 +577,8 @@ mod endpoint_tests { let runtime = Runtime { client, config }; let client = &runtime.client; let incoming = incoming(client).take(1); - runtime.run(incoming, f).await?; + let callback: Option> = None; + runtime.run(incoming, f, callback).await?; // shutdown server tx.send(()).expect("Receiver has been dropped"); @@ -568,7 +621,9 @@ mod endpoint_tests { let runtime = Runtime { client, config }; let client = &runtime.client; let incoming = incoming(client).take(1); - runtime.run(incoming, f).await?; + let callback: Option> = None; + + runtime.run(incoming, f, callback).await?; match server.await { Ok(_) => Ok(()),