Skip to content

Commit

Permalink
feat: Introduce AWS Lambda sink (#208)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Mar 18, 2022
1 parent c8d888b commit 81fb6f5
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 3 deletions.
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ murmur3 = { version = "0.5.1", optional = true }
# feature: aws
aws-config = { version = "0.8.0", optional = true }
aws-sdk-sqs = { version = "0.8.0", optional = true }
aws-sdk-lambda = { version = "0.8.0", optional = true }

# features: elasticsearch || aws
tokio = { version = "1.17.0", optional = true, features = ["rt"] }
Expand Down
3 changes: 3 additions & 0 deletions src/bin/oura/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use oura::sinks::kafka::Config as KafkaConfig;
use oura::sinks::elastic::Config as ElasticConfig;

#[cfg(feature = "aws")]
use oura::sinks::aws_lambda::Config as AwsLambdaConfig;
use oura::sinks::aws_sqs::Config as AwsSqsConfig;

#[cfg(feature = "fingerprint")]
Expand Down Expand Up @@ -104,6 +105,7 @@ enum Sink {
Elastic(ElasticConfig),

#[cfg(feature = "aws")]
AwsLambda(AwsLambdaConfig),
AwsSqs(AwsSqsConfig),
}

Expand All @@ -126,6 +128,7 @@ fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc<Utils>) -> Boot
Sink::Elastic(c) => WithUtils::new(c, utils).bootstrap(input),

#[cfg(feature = "aws")]
Sink::AwsLambda(c) => WithUtils::new(c, utils).bootstrap(input),
Sink::AwsSqs(c) => WithUtils::new(c, utils).bootstrap(input),
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/sinks/aws_lambda/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mod run;
mod setup;

pub use setup::*;
54 changes: 54 additions & 0 deletions src/sinks/aws_lambda/run.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use aws_sdk_lambda::{types::Blob, Client};
use serde_json::json;
use std::sync::Arc;

use crate::{model::Event, pipelining::StageReceiver, utils::Utils, Error};

async fn invoke_lambda_function(
client: Arc<Client>,
function_name: &str,
event: &Event,
) -> Result<(), Error> {
let body = json!(event).to_string();

let req = client
.invoke()
.function_name(function_name)
.payload(Blob::new(body));

let res = req.send().await?;

log::trace!("Lambda invoke response: {:?}", res);

Ok(())
}

pub fn writer_loop(
input: StageReceiver,
client: Client,
function_name: &str,
utils: Arc<Utils>,
) -> Result<(), Error> {
let client = Arc::new(client);

let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.enable_io()
.build()?;

loop {
let event = input.recv().unwrap();

// notify the pipeline where we are
utils.track_sink_progress(&event);

let client = client.clone();

let result = rt.block_on(invoke_lambda_function(client, function_name, &event));

if let Err(err) = result {
log::error!("unrecoverable error invoking lambda funcion: {:?}", err);
break Err(err);
}
}
}
48 changes: 48 additions & 0 deletions src/sinks/aws_lambda/setup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use aws_config::{self, meta::region::RegionProviderChain, RetryConfig};
use aws_sdk_lambda::{Client, Region};
use serde::Deserialize;

use crate::{
pipelining::{BootstrapResult, SinkProvider, StageReceiver},
utils::WithUtils,
};

use super::run::writer_loop;

const DEFAULT_MAX_RETRIES: u32 = 5;

#[derive(Default, Debug, Deserialize)]
pub struct Config {
pub region: String,
pub function_name: String,
pub max_retries: Option<u32>,
}

impl SinkProvider for WithUtils<Config> {
fn bootstrap(&self, input: StageReceiver) -> BootstrapResult {
let explicit_region = self.inner.region.to_owned();

let region_provider =
RegionProviderChain::first_try(Region::new(explicit_region)).or_default_provider();

let aws_config = tokio::runtime::Runtime::new()?
.block_on(aws_config::from_env().region(region_provider).load());

let retry_config = RetryConfig::new()
.with_max_attempts(self.inner.max_retries.unwrap_or(DEFAULT_MAX_RETRIES));

let sqs_config = aws_sdk_lambda::config::Builder::from(&aws_config)
.retry_config(retry_config)
.build();

let client = Client::from_conf(sqs_config);
let function_name = self.inner.function_name.clone();

let utils = self.utils.clone();
let handle = std::thread::spawn(move || {
writer_loop(input, client, &function_name, utils).expect("writer loop failed")
});

Ok(handle)
}
}
4 changes: 1 addition & 3 deletions src/sinks/aws_sqs/setup.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::borrow::Cow;

use aws_config::{self, meta::region::RegionProviderChain, RetryConfig};
use aws_sdk_sqs::{Client, Region};
use serde::Deserialize;
Expand All @@ -24,7 +22,7 @@ pub struct Config {

impl SinkProvider for WithUtils<Config> {
fn bootstrap(&self, input: StageReceiver) -> BootstrapResult {
let explicit_region = Cow::Owned(self.inner.region.to_owned());
let explicit_region = self.inner.region.to_owned();

let region_provider =
RegionProviderChain::first_try(Region::new(explicit_region)).or_default_provider();
Expand Down
1 change: 1 addition & 0 deletions src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ pub mod kafka;
pub mod elastic;

#[cfg(feature = "aws")]
pub mod aws_lambda;
pub mod aws_sqs;

0 comments on commit 81fb6f5

Please sign in to comment.