Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement RFC for layering of runtime #845

Merged
merged 7 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/build-events.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ name: Check Lambda Events
on:
push:
paths:
- 'lambda-events/**'
- "lambda-events/**"
pull_request:
paths:
- 'lambda-events/**'
- "lambda-events/**"

jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
toolchain:
- "1.66.0" # Current MSRV
- "1.70.0" # Current MSRV
- stable
env:
RUST_BACKTRACE: 1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build-extension.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
strategy:
matrix:
toolchain:
- "1.66.0" # Current MSRV
- "1.70.0" # Current MSRV
- stable
env:
RUST_BACKTRACE: 1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build-runtime.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
strategy:
matrix:
toolchain:
- "1.66.0" # Current MSRV
- "1.70.0" # Current MSRV
- stable
env:
RUST_BACKTRACE: 1
Expand Down
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ members = [
"lambda-runtime-api-client",
"lambda-runtime",
"lambda-extension",
"lambda-events"
"lambda-events",
]

exclude = ["examples"]
Expand All @@ -26,4 +26,5 @@ hyper = "1.0"
hyper-util = "0.1.1"
pin-project-lite = "0.2"
tower = "0.4"
tower-layer = "0.3"
tower-service = "0.3"
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ This will make your function compile much faster.

## Supported Rust Versions (MSRV)

The AWS Lambda Rust Runtime requires a minimum of Rust 1.66, and is not guaranteed to build on compiler versions earlier than that.
The AWS Lambda Rust Runtime requires a minimum of Rust 1.70, and is not guaranteed to build on compiler versions earlier than that.

## Security

Expand Down
36 changes: 36 additions & 0 deletions examples/opentelemetry-tracing/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[package]
name = "opentelemetry-tracing"
version = "0.1.0"
edition = "2021"

[dependencies]
# Library dependencies
lambda_runtime = { path = "../../lambda-runtime" }
pin-project = "1"
opentelemetry-semantic-conventions = "0.14"
tower = "0.4"
tracing = "0.1"

# Binary dependencies
opentelemetry = { version = "0.22", optional = true }
opentelemetry_sdk = { version = "0.22", features = ["rt-tokio"], optional = true }
opentelemetry-stdout = { version = "0.3", features = ["trace"], optional = true }
serde_json = { version = "1.0", optional = true }
tokio = { version = "1", optional = true }
tracing-opentelemetry = { version = "0.23", optional = true }
tracing-subscriber = { version = "0.3", optional = true }

[features]
build-binary = [
"opentelemetry",
"opentelemetry_sdk",
"opentelemetry-stdout",
"serde_json",
"tokio",
"tracing-opentelemetry",
"tracing-subscriber",
]

[[bin]]
name = "opentelemetry-tracing"
required-features = ["build-binary"]
113 changes: 113 additions & 0 deletions examples/opentelemetry-tracing/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use std::future::Future;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a src/main.rs file with a lambda function that shows how to use this library?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a dummy implementation that logs traces to stdout. Unfortunately, OpenTelemetry setup in Rust isn't very trivial, hence, there are a bunch more dependencies in the Cargo.toml of the example. I attempted to clearly indicate which dependencies are required by the library portion of the example.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!

use std::pin::Pin;
use std::task;

use lambda_runtime::LambdaInvocation;
use opentelemetry_semantic_conventions::trace as traceconv;
use pin_project::pin_project;
use tower::{Layer, Service};
use tracing::instrument::Instrumented;
use tracing::Instrument;

/// Tower layer to add OpenTelemetry tracing to a Lambda function invocation. The layer accepts
/// a function to flush OpenTelemetry after the end of the invocation.
pub struct OpenTelemetryLayer<F> {
flush_fn: F,
}

impl<F> OpenTelemetryLayer<F>
where
F: Fn() + Clone,
{
pub fn new(flush_fn: F) -> Self {
Self { flush_fn }
}
}

impl<S, F> Layer<S> for OpenTelemetryLayer<F>
where
F: Fn() + Clone,
{
type Service = OpenTelemetryService<S, F>;

fn layer(&self, inner: S) -> Self::Service {
OpenTelemetryService {
inner,
flush_fn: self.flush_fn.clone(),
coldstart: true,
}
}
}

/// Tower service created by [OpenTelemetryLayer].
pub struct OpenTelemetryService<S, F> {
inner: S,
flush_fn: F,
coldstart: bool,
}

impl<S, F> Service<LambdaInvocation> for OpenTelemetryService<S, F>
where
S: Service<LambdaInvocation, Response = ()>,
F: Fn() + Clone,
{
type Error = S::Error;
type Response = ();
type Future = OpenTelemetryFuture<Instrumented<S::Future>, F>;

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: LambdaInvocation) -> Self::Future {
let span = tracing::info_span!(
"Lambda function invocation",
"otel.name" = req.context.env_config.function_name,
{ traceconv::FAAS_TRIGGER } = "http",
{ traceconv::FAAS_INVOCATION_ID } = req.context.request_id,
{ traceconv::FAAS_COLDSTART } = self.coldstart
);

// After the first execution, we can set 'coldstart' to false
self.coldstart = false;

let fut = self.inner.call(req).instrument(span);
OpenTelemetryFuture {
future: Some(fut),
flush_fn: self.flush_fn.clone(),
}
}
}

/// Future created by [OpenTelemetryService].
#[pin_project]
pub struct OpenTelemetryFuture<Fut, F> {
#[pin]
future: Option<Fut>,
flush_fn: F,
}

impl<Fut, F> Future for OpenTelemetryFuture<Fut, F>
where
Fut: Future,
F: Fn(),
{
type Output = Fut::Output;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
// First, try to get the ready value of the future
let ready = task::ready!(self
.as_mut()
.project()
.future
.as_pin_mut()
.expect("future polled after completion")
.poll(cx));

// If we got the ready value, we first drop the future: this ensures that the
// OpenTelemetry span attached to it is closed and included in the subsequent flush.
Pin::set(&mut self.as_mut().project().future, None);
(self.project().flush_fn)();
task::Poll::Ready(ready)
}
}
34 changes: 34 additions & 0 deletions examples/opentelemetry-tracing/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use lambda_runtime::{LambdaEvent, Runtime};
use opentelemetry::trace::TracerProvider;
use opentelemetry_sdk::{runtime, trace};
use opentelemetry_tracing::OpenTelemetryLayer;
use tower::{service_fn, BoxError};
use tracing_subscriber::prelude::*;

async fn echo(event: LambdaEvent<serde_json::Value>) -> Result<serde_json::Value, &'static str> {
Ok(event.payload)
}

#[tokio::main]
async fn main() -> Result<(), BoxError> {
// Set up OpenTelemetry tracer provider that writes spans to stdout for debugging purposes
let exporter = opentelemetry_stdout::SpanExporter::default();
let tracer_provider = trace::TracerProvider::builder()
.with_batch_exporter(exporter, runtime::Tokio)
.build();

// Set up link between OpenTelemetry and tracing crate
tracing_subscriber::registry()
.with(tracing_opentelemetry::OpenTelemetryLayer::new(
tracer_provider.tracer("my-app"),
))
.init();

// Initialize the Lambda runtime and add OpenTelemetry tracing
let runtime = Runtime::new(service_fn(echo)).layer(OpenTelemetryLayer::new(|| {
// Make sure that the trace is exported before the Lambda runtime is frozen
tracer_provider.force_flush();
}));
runtime.run().await?;
Ok(())
}
8 changes: 4 additions & 4 deletions lambda-events/src/custom_serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,18 @@ mod test {

let test = r#"{"v": null}"#;
let decoded: Test = serde_json::from_str(test).unwrap();
assert_eq!(false, decoded.v);
assert!(!decoded.v);

let test = r#"{}"#;
let decoded: Test = serde_json::from_str(test).unwrap();
assert_eq!(false, decoded.v);
assert!(!decoded.v);

let test = r#"{"v": true}"#;
let decoded: Test = serde_json::from_str(test).unwrap();
assert_eq!(true, decoded.v);
assert!(decoded.v);

let test = r#"{"v": false}"#;
let decoded: Test = serde_json::from_str(test).unwrap();
assert_eq!(false, decoded.v);
assert!(!decoded.v);
}
}
2 changes: 1 addition & 1 deletion lambda-events/src/event/dynamodb/attributes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ mod test {

let attr: AttributeValue = serde_json::from_value(value.clone()).unwrap();
match attr {
AttributeValue::Bool(b) => assert_eq!(true, b),
AttributeValue::Bool(b) => assert!(b),
other => panic!("unexpected value {:?}", other),
}

Expand Down
15 changes: 11 additions & 4 deletions lambda-runtime-api-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@

//! This crate includes a base HTTP client to interact with
//! the AWS Lambda Runtime API.
use futures_util::{future::BoxFuture, FutureExt, TryFutureExt};
use http::{uri::PathAndQuery, uri::Scheme, Request, Response, Uri};
use hyper::body::Incoming;
use hyper_util::client::legacy::connect::HttpConnector;
use std::{convert::TryInto, fmt::Debug};
use std::{convert::TryInto, fmt::Debug, future};

const USER_AGENT_HEADER: &str = "User-Agent";
const DEFAULT_USER_AGENT: &str = concat!("aws-lambda-rust/", env!("CARGO_PKG_VERSION"));
Expand Down Expand Up @@ -42,9 +43,15 @@ impl Client {
impl Client {
/// Send a given request to the Runtime API.
/// Use the client's base URI to ensure the API endpoint is correct.
pub async fn call(&self, req: Request<body::Body>) -> Result<Response<Incoming>, BoxError> {
let req = self.set_origin(req)?;
self.client.request(req).await.map_err(Into::into)
pub fn call(&self, req: Request<body::Body>) -> BoxFuture<'static, Result<Response<Incoming>, BoxError>> {
// NOTE: This method returns a boxed future such that the future has a static lifetime.
// Due to limitations around the Rust async implementation as of Mar 2024, this is
// required to minimize constraints on the handler passed to [lambda_runtime::run].
let req = match self.set_origin(req) {
Ok(req) => req,
Err(err) => return future::ready(Err(err)).boxed(),
};
self.client.request(req).map_err(Into::into).boxed()
}

/// Create a new client with a given base URI and HTTP connector.
Expand Down
2 changes: 2 additions & 0 deletions lambda-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ hyper-util = { workspace = true, features = [
"tokio",
] }
lambda_runtime_api_client = { version = "0.10", path = "../lambda-runtime-api-client" }
pin-project = "1"
serde = { version = "1", features = ["derive", "rc"] }
serde_json = "^1"
serde_path_to_error = "0.1.11"
Expand All @@ -48,6 +49,7 @@ tokio = { version = "1.0", features = [
] }
tokio-stream = "0.1.2"
tower = { workspace = true, features = ["util"] }
tower-layer = { workspace = true }
tracing = { version = "0.1", features = ["log"] }

[dev-dependencies]
Expand Down
Loading
Loading