Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow running handler with a callback #694

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions examples/basic-lambda-with-callback/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "basic-lambda-with-callback"
version = "0.1.0"
edition = "2021"


# Use cargo-edit(https://github.com/killercup/cargo-edit#installation)
# to manage dependencies.
# Running `cargo add DEPENDENCY_NAME` will
# add the latest version of a dependency to the list,
# and it will keep the alphabetic ordering for you.

[dependencies]
lambda_runtime = { path = "../../lambda-runtime" }
serde = "1.0.136"
tokio = { version = "1", features = ["macros"] }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }
tokio-test = "0.4.2"
11 changes: 11 additions & 0 deletions examples/basic-lambda-with-callback/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# AWS Lambda Function example

## Build & Deploy

1. Install [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation)
2. Build the function with `cargo lambda build --release`
3. Deploy the function to AWS Lambda with `cargo lambda deploy --iam-role YOUR_ROLE`

## Build for ARM 64

Build the function with `cargo lambda build --release --arm64`
61 changes: 61 additions & 0 deletions examples/basic-lambda-with-callback/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// This example requires the following input to succeed:
// { "command": "do something" }
use std::sync::Arc;

use lambda_runtime::{service_fn, Error, LambdaEvent};
use serde::{Deserialize, Serialize};

/// This is also a made-up example. Requests come into the runtime as unicode
/// strings in json format, which can map to any structure that implements `serde::Deserialize`
/// The runtime pays no attention to the contents of the request payload.
#[derive(Deserialize)]
struct Request {
command: String,
}

/// This is a made-up example of what a response structure may look like.
/// There is no restriction on what it can be. The runtime requires responses
/// to be serialized into json. The runtime pays no attention
/// to the contents of the response payload.
#[derive(Serialize)]
struct Response {
req_id: String,
msg: String,
}

#[tokio::main]
async fn main() -> Result<(), Error> {
// required to enable CloudWatch error logging by the runtime
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();

// Since the Lambda is fozen in between invocations you can use this callback
// to e.g. flush traces/logs/metrics
let cleanup = Arc::new(|| {
println!("Callback called!");
});


let func = service_fn(my_handler);
lambda_runtime::run_with_callback(func, cleanup).await?;
Ok(())
}

pub(crate) async fn my_handler(event: LambdaEvent<Request>) -> Result<Response, Error> {
// extract some useful info from the request
let command = event.payload.command;

// prepare the response
let resp = Response {
req_id: event.context.request_id,
msg: format!("Command {} executed.", command),
};

// return `Response` (it will be serialized to JSON automatically by the runtime)
Ok(resp)
}
65 changes: 60 additions & 5 deletions lambda-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{
env,
fmt::{self, Debug, Display},
future::Future,
panic,
panic, sync::Arc,
};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_stream::{Stream, StreamExt};
Expand Down Expand Up @@ -101,6 +101,7 @@ where
&self,
incoming: impl Stream<Item = Result<http::Response<hyper::Body>, Error>> + Send,
mut handler: F,
callback: Option<Arc<impl Fn()>>
) -> Result<(), Error>
where
F: Service<LambdaEvent<A>>,
Expand Down Expand Up @@ -202,7 +203,13 @@ where
}
.instrument(request_span)
.await?;

if let Some(callback) = callback.clone() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you do the following to avoid the clone?

if let Some(ref callback) = callback {
    ...
}

callback();
}
}


Ok(())
}
}
Expand Down Expand Up @@ -258,7 +265,52 @@ where

let client = &runtime.client;
let incoming = incoming(client);
runtime.run(incoming, handler).await
let callback : Option<Arc<fn()>>= None;
runtime.run(incoming, handler, callback).await
}

/// Starts the Lambda Rust runtime and begins polling for events on the [Lambda
/// Runtime APIs](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html).
///
/// The callback function is called at the end of a single invocation of the runtime.
///
/// # Example
/// ```no_run
/// use lambda_runtime::{Error, service_fn, LambdaEvent};
/// use serde_json::Value;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Error> {
/// let func = service_fn(func);
/// lambda_runtime::run_with_callback(func, callback_func).await?;
/// Ok(())
/// }
///
/// async fn func(event: LambdaEvent<Value>) -> Result<Value, Error> {
/// Ok(event.payload)
/// }
///
/// async fn callback_func() -> () {
/// println!("Callback function called!");
/// ()
/// }
/// ```
pub async fn run_with_callback<A, B, F>(handler: F, callback: Arc<impl Fn()>) -> Result<(), Error>
where
F: Service<LambdaEvent<A>>,
F::Future: Future<Output = Result<B, F::Error>>,
F::Error: fmt::Debug + fmt::Display,
A: for<'de> Deserialize<'de>,
B: Serialize,
{
trace!("Loading config from env");
let config = Config::from_env()?;
let client = Client::builder().build().expect("Unable to create a runtime client");
let runtime = Runtime { client, config };

let client = &runtime.client;
let incoming = incoming(client);
runtime.run(incoming, handler, Some(callback)).await
}

fn type_name_of_val<T>(_: T) -> &'static str {
Expand Down Expand Up @@ -293,7 +345,7 @@ mod endpoint_tests {
use lambda_runtime_api_client::Client;
use serde_json::json;
use simulated::DuplexStreamWrapper;
use std::{convert::TryFrom, env};
use std::{convert::TryFrom, env, sync::Arc};
use tokio::{
io::{self, AsyncRead, AsyncWrite},
select,
Expand Down Expand Up @@ -525,7 +577,8 @@ mod endpoint_tests {
let runtime = Runtime { client, config };
let client = &runtime.client;
let incoming = incoming(client).take(1);
runtime.run(incoming, f).await?;
let callback: Option<Arc<fn()>> = None;
runtime.run(incoming, f, callback).await?;

// shutdown server
tx.send(()).expect("Receiver has been dropped");
Expand Down Expand Up @@ -568,7 +621,9 @@ mod endpoint_tests {
let runtime = Runtime { client, config };
let client = &runtime.client;
let incoming = incoming(client).take(1);
runtime.run(incoming, f).await?;
let callback: Option<Arc<fn()>> = None;

runtime.run(incoming, f, callback).await?;

match server.await {
Ok(_) => Ok(()),
Expand Down