Skip to content

Commit

Permalink
Implement Serialize for lambda telemetry (#759)
Browse files Browse the repository at this point in the history
* Add serialize testing mod and macro

* Use and derive serialize

* Skip serialize if None on Options

* Add unit tests for serialization
  • Loading branch information
nismotie authored Dec 20, 2023
1 parent 953b2d2 commit ead7f37
Showing 1 changed file with 216 additions and 15 deletions.
231 changes: 216 additions & 15 deletions lambda-extension/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use http::{Request, Response};
use http_body_util::BodyExt;
use hyper::body::Incoming;
use lambda_runtime_api_client::body::Body;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use std::{boxed::Box, fmt, sync::Arc};
use tokio::sync::Mutex;
use tower::Service;
use tracing::{error, trace};

/// Payload received from the Telemetry API
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub struct LambdaTelemetry {
/// Time when the telemetry was generated
pub time: DateTime<Utc>,
Expand All @@ -20,7 +20,7 @@ pub struct LambdaTelemetry {
}

/// Record in a LambdaTelemetry entry
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[serde(tag = "type", content = "record", rename_all = "lowercase")]
pub enum LambdaTelemetryRecord {
/// Function log records
Expand All @@ -37,8 +37,10 @@ pub enum LambdaTelemetryRecord {
/// Phase of initialisation
phase: InitPhase,
/// Lambda runtime version
#[serde(skip_serializing_if = "Option::is_none")]
runtime_version: Option<String>,
/// Lambda runtime version ARN
#[serde(skip_serializing_if = "Option::is_none")]
runtime_version_arn: Option<String>,
},
/// Platform init runtime done record
Expand All @@ -47,10 +49,12 @@ pub enum LambdaTelemetryRecord {
/// Type of initialization
initialization_type: InitType,
/// Phase of initialisation
#[serde(skip_serializing_if = "Option::is_none")]
phase: Option<InitPhase>,
/// Status of initalization
status: Status,
/// When the status = failure, the error_type describes what kind of error occurred
#[serde(skip_serializing_if = "Option::is_none")]
error_type: Option<String>,
/// Spans
#[serde(default)]
Expand All @@ -75,8 +79,10 @@ pub enum LambdaTelemetryRecord {
/// Request identifier
request_id: String,
/// Version of the Lambda function
#[serde(skip_serializing_if = "Option::is_none")]
version: Option<String>,
/// Trace Context
#[serde(skip_serializing_if = "Option::is_none")]
tracing: Option<TraceContext>,
},
/// Record marking the completion of an invocation
Expand All @@ -87,13 +93,16 @@ pub enum LambdaTelemetryRecord {
/// Status of the invocation
status: Status,
/// When unsuccessful, the error_type describes what kind of error occurred
#[serde(skip_serializing_if = "Option::is_none")]
error_type: Option<String>,
/// Metrics corresponding to the runtime
#[serde(skip_serializing_if = "Option::is_none")]
metrics: Option<RuntimeDoneMetrics>,
/// Spans
#[serde(default)]
spans: Vec<Span>,
/// Trace Context
#[serde(skip_serializing_if = "Option::is_none")]
tracing: Option<TraceContext>,
},
/// Platfor report record
Expand All @@ -104,13 +113,15 @@ pub enum LambdaTelemetryRecord {
/// Status of the invocation
status: Status,
/// When unsuccessful, the error_type describes what kind of error occurred
#[serde(skip_serializing_if = "Option::is_none")]
error_type: Option<String>,
/// Metrics
metrics: ReportMetrics,
/// Spans
#[serde(default)]
spans: Vec<Span>,
/// Trace Context
#[serde(skip_serializing_if = "Option::is_none")]
tracing: Option<TraceContext>,
},

Expand Down Expand Up @@ -147,7 +158,7 @@ pub enum LambdaTelemetryRecord {
}

/// Type of Initialization
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(rename_all = "kebab-case")]
pub enum InitType {
/// Initialised on demand
Expand All @@ -159,7 +170,7 @@ pub enum InitType {
}

/// Phase in which initialization occurs
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(rename_all = "kebab-case")]
pub enum InitPhase {
/// Initialization phase
Expand All @@ -169,7 +180,7 @@ pub enum InitPhase {
}

/// Status of invocation/initialization
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(rename_all = "kebab-case")]
pub enum Status {
/// Success
Expand All @@ -183,7 +194,7 @@ pub enum Status {
}

/// Span
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct Span {
/// Duration of the span
Expand All @@ -195,7 +206,7 @@ pub struct Span {
}

/// Tracing Context
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct TraceContext {
/// Span ID
Expand All @@ -207,23 +218,23 @@ pub struct TraceContext {
}

/// Type of tracing
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
pub enum TracingType {
/// Amazon trace type
#[serde(rename = "X-Amzn-Trace-Id")]
AmznTraceId,
}

///Init report metrics
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct InitReportMetrics {
/// Duration of initialization
pub duration_ms: f64,
}

/// Report metrics
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct ReportMetrics {
/// Duration in milliseconds
Expand All @@ -237,15 +248,15 @@ pub struct ReportMetrics {
#[serde(rename = "maxMemoryUsedMB")]
pub max_memory_used_mb: u64,
/// Init duration in case of a cold start
#[serde(default = "Option::default")]
#[serde(default = "Option::default", skip_serializing_if = "Option::is_none")]
pub init_duration_ms: Option<f64>,
/// Restore duration in milliseconds
#[serde(default = "Option::default")]
#[serde(default = "Option::default", skip_serializing_if = "Option::is_none")]
pub restore_duration_ms: Option<f64>,
}

/// Runtime done metrics
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct RuntimeDoneMetrics {
/// Duration in milliseconds
Expand Down Expand Up @@ -303,7 +314,7 @@ where
}

#[cfg(test)]
mod tests {
mod deserialization_tests {
use super::*;
use chrono::{Duration, TimeZone};

Expand Down Expand Up @@ -459,3 +470,193 @@ mod tests {
),
}
}

#[cfg(test)]
mod serialization_tests {
use chrono::{Duration, TimeZone};

use super::*;
macro_rules! serialize_tests {
($($name:ident: $value:expr,)*) => {
$(
#[test]
fn $name() {
let (input, expected) = $value;
let actual = serde_json::to_string(&input).expect("unable to serialize");
println!("Input: {:?}\n", input);
println!("Expected:\n {:?}\n", expected);
println!("Actual:\n {:?}\n", actual);

assert!(actual == expected);
}
)*
}
}

serialize_tests! {
// function
function: (
LambdaTelemetry {
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::Function("hello world".to_string()),
},
r#"{"time":"2023-11-28T12:00:09Z","type":"function","record":"hello world"}"#,
),
// extension
extension: (
LambdaTelemetry {
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::Extension("hello world".to_string()),
},
r#"{"time":"2023-11-28T12:00:09Z","type":"extension","record":"hello world"}"#,
),
//platform.Start
platform_start: (
LambdaTelemetry{
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformStart {
request_id: "459921b5-681c-4a96-beb0-81e0aa586026".to_string(),
version: Some("$LATEST".to_string()),
tracing: Some(TraceContext{
span_id: Some("24cd7d670fa455f0".to_string()),
r#type: TracingType::AmznTraceId,
value: "Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1".to_string(),
}),
}
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.start","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","version":"$LATEST","tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}"#,
),
// platform.initStart
platform_init_start: (
LambdaTelemetry{
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformInitStart {
initialization_type: InitType::OnDemand,
phase: InitPhase::Init,
runtime_version: None,
runtime_version_arn: None,
},
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.initStart","record":{"initializationType":"on-demand","phase":"init"}}"#,
),
// platform.runtimeDone
platform_runtime_done: (
LambdaTelemetry{
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformRuntimeDone {
request_id: "459921b5-681c-4a96-beb0-81e0aa586026".to_string(),
status: Status::Success,
error_type: None,
metrics: Some(RuntimeDoneMetrics {
duration_ms: 2599.0,
produced_bytes: Some(8),
}),
spans: vec!(
Span {
name:"responseLatency".to_string(),
start: Utc
.with_ymd_and_hms(2022, 10, 21, 14, 5, 3)
.unwrap()
.checked_add_signed(Duration::milliseconds(165))
.unwrap(),
duration_ms: 2598.0
},
Span {
name:"responseDuration".to_string(),
start: Utc
.with_ymd_and_hms(2022, 10, 21, 14, 5, 5)
.unwrap()
.checked_add_signed(Duration::milliseconds(763))
.unwrap(),
duration_ms: 0.0
},
),
tracing: Some(TraceContext{
span_id: Some("24cd7d670fa455f0".to_string()),
r#type: TracingType::AmznTraceId,
value: "Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1".to_string(),
}),
},
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.runtimeDone","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","status":"success","metrics":{"durationMs":2599.0,"producedBytes":8},"spans":[{"durationMs":2598.0,"name":"responseLatency","start":"2022-10-21T14:05:03.165Z"},{"durationMs":0.0,"name":"responseDuration","start":"2022-10-21T14:05:05.763Z"}],"tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}"#,
),
// platform.report
platform_report: (
LambdaTelemetry{
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformReport {
request_id: "459921b5-681c-4a96-beb0-81e0aa586026".to_string(),
status: Status::Success,
error_type: None,
metrics: ReportMetrics {
duration_ms: 2599.4,
billed_duration_ms: 2600,
memory_size_mb:128,
max_memory_used_mb:94,
init_duration_ms: Some(549.04),
restore_duration_ms: None,
},
spans: Vec::new(),
tracing: Some(TraceContext {
span_id: Some("24cd7d670fa455f0".to_string()),
r#type: TracingType::AmznTraceId,
value: "Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1".to_string(),
}),
},
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.report","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","status":"success","metrics":{"durationMs":2599.4,"billedDurationMs":2600,"memorySizeMB":128,"maxMemoryUsedMB":94,"initDurationMs":549.04},"spans":[],"tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}"#,
),
// platform.telemetrySubscription
platform_telemetry_subscription: (
LambdaTelemetry{
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformTelemetrySubscription {
name: "my-extension".to_string(),
state: "Subscribed".to_string(),
types: vec!("platform".to_string(), "function".to_string()),
},
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.telemetrySubscription","record":{"name":"my-extension","state":"Subscribed","types":["platform","function"]}}"#,
),
// platform.initRuntimeDone
platform_init_runtime_done: (
LambdaTelemetry{
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformInitRuntimeDone {
initialization_type: InitType::OnDemand,
status: Status::Success,
phase: None,
error_type: None,
spans: Vec::new(),
},
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.initRuntimeDone","record":{"initializationType":"on-demand","status":"success","spans":[]}}"#,
),
// platform.extension
platform_extension: (
LambdaTelemetry {
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformExtension {
name: "my-extension".to_string(),
state: "Ready".to_string(),
events: vec!("SHUTDOWN".to_string(), "INVOKE".to_string()),
},
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.extension","record":{"name":"my-extension","state":"Ready","events":["SHUTDOWN","INVOKE"]}}"#,
),
// platform.initReport
platform_init_report: (
LambdaTelemetry {
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformInitReport {
initialization_type: InitType::OnDemand,
phase: InitPhase::Init,
metrics: InitReportMetrics { duration_ms: 500.0 },
spans: Vec::new(),
},
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.initReport","record":{"initializationType":"on-demand","phase":"init","metrics":{"durationMs":500.0},"spans":[]}}"#,
),

}
}

0 comments on commit ead7f37

Please sign in to comment.