diff --git a/Cargo.lock b/Cargo.lock index c06129cc..681d21ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -128,6 +128,28 @@ dependencies = [ "tracing", ] +[[package]] +name = "aws-sdk-lambda" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ce37ff9119422e2428461a1285f6c64d4a423d1ba268e93e915d6ab8f712fb0" +dependencies = [ + "aws-endpoint", + "aws-http", + "aws-sig-auth", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-json", + "aws-smithy-types", + "aws-types", + "bytes", + "http", + "tokio-stream", + "tower", +] + [[package]] name = "aws-sdk-sqs" version = "0.8.0" @@ -1403,6 +1425,7 @@ name = "oura" version = "1.2.2" dependencies = [ "aws-config", + "aws-sdk-lambda", "aws-sdk-sqs", "bech32", "clap", diff --git a/Cargo.toml b/Cargo.toml index d18802f9..50a32e96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ murmur3 = { version = "0.5.1", optional = true } # feature: aws aws-config = { version = "0.8.0", optional = true } aws-sdk-sqs = { version = "0.8.0", optional = true } +aws-sdk-lambda = { version = "0.8.0", optional = true } # features: elasticsearch || aws tokio = { version = "1.17.0", optional = true, features = ["rt"] } diff --git a/src/bin/oura/daemon.rs b/src/bin/oura/daemon.rs index d902f418..74d49065 100644 --- a/src/bin/oura/daemon.rs +++ b/src/bin/oura/daemon.rs @@ -36,6 +36,7 @@ use oura::sinks::kafka::Config as KafkaConfig; use oura::sinks::elastic::Config as ElasticConfig; #[cfg(feature = "aws")] +use oura::sinks::aws_lambda::Config as AwsLambdaConfig; use oura::sinks::aws_sqs::Config as AwsSqsConfig; #[cfg(feature = "fingerprint")] @@ -104,6 +105,7 @@ enum Sink { Elastic(ElasticConfig), #[cfg(feature = "aws")] + AwsLambda(AwsLambdaConfig), AwsSqs(AwsSqsConfig), } @@ -126,6 +128,7 @@ fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc) -> Boot Sink::Elastic(c) => WithUtils::new(c, utils).bootstrap(input), #[cfg(feature = "aws")] + Sink::AwsLambda(c) => WithUtils::new(c, utils).bootstrap(input), Sink::AwsSqs(c) => WithUtils::new(c, utils).bootstrap(input), } } diff --git a/src/sinks/aws_lambda/mod.rs b/src/sinks/aws_lambda/mod.rs new file mode 100644 index 00000000..0a447c1d --- /dev/null +++ b/src/sinks/aws_lambda/mod.rs @@ -0,0 +1,4 @@ +mod run; +mod setup; + +pub use setup::*; diff --git a/src/sinks/aws_lambda/run.rs b/src/sinks/aws_lambda/run.rs new file mode 100644 index 00000000..597f9f17 --- /dev/null +++ b/src/sinks/aws_lambda/run.rs @@ -0,0 +1,54 @@ +use aws_sdk_lambda::{types::Blob, Client}; +use serde_json::json; +use std::sync::Arc; + +use crate::{model::Event, pipelining::StageReceiver, utils::Utils, Error}; + +async fn invoke_lambda_function( + client: Arc, + function_name: &str, + event: &Event, +) -> Result<(), Error> { + let body = json!(event).to_string(); + + let req = client + .invoke() + .function_name(function_name) + .payload(Blob::new(body)); + + let res = req.send().await?; + + log::trace!("Lambda invoke response: {:?}", res); + + Ok(()) +} + +pub fn writer_loop( + input: StageReceiver, + client: Client, + function_name: &str, + utils: Arc, +) -> Result<(), Error> { + let client = Arc::new(client); + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_time() + .enable_io() + .build()?; + + loop { + let event = input.recv().unwrap(); + + // notify the pipeline where we are + utils.track_sink_progress(&event); + + let client = client.clone(); + + let result = rt.block_on(invoke_lambda_function(client, function_name, &event)); + + if let Err(err) = result { + log::error!("unrecoverable error invoking lambda funcion: {:?}", err); + break Err(err); + } + } +} diff --git a/src/sinks/aws_lambda/setup.rs b/src/sinks/aws_lambda/setup.rs new file mode 100644 index 00000000..07528280 --- /dev/null +++ b/src/sinks/aws_lambda/setup.rs @@ -0,0 +1,48 @@ +use aws_config::{self, meta::region::RegionProviderChain, RetryConfig}; +use aws_sdk_lambda::{Client, Region}; +use serde::Deserialize; + +use crate::{ + pipelining::{BootstrapResult, SinkProvider, StageReceiver}, + utils::WithUtils, +}; + +use super::run::writer_loop; + +const DEFAULT_MAX_RETRIES: u32 = 5; + +#[derive(Default, Debug, Deserialize)] +pub struct Config { + pub region: String, + pub function_name: String, + pub max_retries: Option, +} + +impl SinkProvider for WithUtils { + fn bootstrap(&self, input: StageReceiver) -> BootstrapResult { + let explicit_region = self.inner.region.to_owned(); + + let region_provider = + RegionProviderChain::first_try(Region::new(explicit_region)).or_default_provider(); + + let aws_config = tokio::runtime::Runtime::new()? + .block_on(aws_config::from_env().region(region_provider).load()); + + let retry_config = RetryConfig::new() + .with_max_attempts(self.inner.max_retries.unwrap_or(DEFAULT_MAX_RETRIES)); + + let sqs_config = aws_sdk_lambda::config::Builder::from(&aws_config) + .retry_config(retry_config) + .build(); + + let client = Client::from_conf(sqs_config); + let function_name = self.inner.function_name.clone(); + + let utils = self.utils.clone(); + let handle = std::thread::spawn(move || { + writer_loop(input, client, &function_name, utils).expect("writer loop failed") + }); + + Ok(handle) + } +} diff --git a/src/sinks/aws_sqs/setup.rs b/src/sinks/aws_sqs/setup.rs index 9cae80eb..b6ad76b5 100644 --- a/src/sinks/aws_sqs/setup.rs +++ b/src/sinks/aws_sqs/setup.rs @@ -1,5 +1,3 @@ -use std::borrow::Cow; - use aws_config::{self, meta::region::RegionProviderChain, RetryConfig}; use aws_sdk_sqs::{Client, Region}; use serde::Deserialize; @@ -24,7 +22,7 @@ pub struct Config { impl SinkProvider for WithUtils { fn bootstrap(&self, input: StageReceiver) -> BootstrapResult { - let explicit_region = Cow::Owned(self.inner.region.to_owned()); + let explicit_region = self.inner.region.to_owned(); let region_provider = RegionProviderChain::first_try(Region::new(explicit_region)).or_default_provider(); diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index d499269d..1b6089ce 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -15,4 +15,5 @@ pub mod kafka; pub mod elastic; #[cfg(feature = "aws")] +pub mod aws_lambda; pub mod aws_sqs;