Skip to content

Commit

Permalink
feat(new sink): add AWS Simple Notification Service aws_sns sink (v…
Browse files Browse the repository at this point in the history
…ectordotdev#18141)

* ci: speed up tests by only running sqs tests

* chore: prepare split in sqs and sns

* chore: wrap up split for foundation

* refactor: separate config

* refactor: implement separate publisher

* refactor: drop no longer needed message builder

* refactor: extract error type

* style: name variable correctly

* feat: add sns 🎉

* refactor: abstract retry logic

* chore: remove sqs from shared modules

* chore: cleanup

* chore: drop temporary changes

* chore: update mod to include sns module

* refactor: move healthcheck out of config into separate function

* refactor: simplify request builder setup

* refactor: message id

* nit: make functions just visibile in sub module

* fix: add pub(super)

* fix: sub methods attributes pub(super)

* Update Cargo.toml

Co-authored-by: neuronull <[email protected]>

* Update src/sinks/aws_s_s/mod.rs

Co-authored-by: neuronull <[email protected]>

* Update src/sinks/aws_s_s/retry.rs

Co-authored-by: neuronull <[email protected]>

* Update src/sinks/mod.rs

Co-authored-by: neuronull <[email protected]>

* Update src/sinks/aws_s_s/sns/config.rs

Co-authored-by: neuronull <[email protected]>

* chore: first bunch of comments

* chore: second bunch of comments

* chore: enable all integration tests

* fix: move test

* fix: dead_code warning

* fix: dead code warning

* docs: autogenerated aws_sns docs

* fix: move region serde to downstream impl

* update licenses

---------

Co-authored-by: Kristof Herrmann <[email protected]>
Co-authored-by: ArzelaAscoIi <[email protected]>
Co-authored-by: neuronull <[email protected]>
  • Loading branch information
4 people authored Aug 18, 2023
1 parent 833ac19 commit 7b2bddc
Show file tree
Hide file tree
Showing 26 changed files with 1,464 additions and 341 deletions.
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ metrics-tracing-context = { version = "0.14.0", default-features = false }
# depending on a fork to circumvent https://github.com/awslabs/aws-sdk-rust/issues/749
aws-sdk-s3 = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true }
aws-sdk-sqs = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true }
aws-sdk-sns = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true }
aws-sdk-cloudwatch = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true }
aws-sdk-cloudwatchlogs = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true }
aws-sdk-elasticsearch = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true }
Expand Down Expand Up @@ -622,6 +623,7 @@ sinks-logs = [
"sinks-aws_kinesis_streams",
"sinks-aws_s3",
"sinks-aws_sqs",
"sinks-aws_sns",
"sinks-axiom",
"sinks-azure_blob",
"sinks-azure_monitor_logs",
Expand Down Expand Up @@ -681,6 +683,7 @@ sinks-aws_kinesis_firehose = ["aws-core", "dep:aws-sdk-firehose"]
sinks-aws_kinesis_streams = ["aws-core", "dep:aws-sdk-kinesis"]
sinks-aws_s3 = ["dep:base64", "dep:md-5", "aws-core", "dep:aws-sdk-s3"]
sinks-aws_sqs = ["aws-core", "dep:aws-sdk-sqs"]
sinks-aws_sns = ["aws-core", "dep:aws-sdk-sns"]
sinks-axiom = ["sinks-elasticsearch"]
sinks-azure_blob = ["dep:azure_core", "dep:azure_identity", "dep:azure_storage", "dep:azure_storage_blobs"]
sinks-azure_monitor_logs = []
Expand Down Expand Up @@ -789,6 +792,7 @@ aws-integration-tests = [
"aws-kinesis-streams-integration-tests",
"aws-s3-integration-tests",
"aws-sqs-integration-tests",
"aws-sns-integration-tests",
]

azure-integration-tests = [
Expand All @@ -802,7 +806,8 @@ aws-ecs-metrics-integration-tests = ["sources-aws_ecs_metrics"]
aws-kinesis-firehose-integration-tests = ["sinks-aws_kinesis_firehose", "dep:aws-sdk-elasticsearch", "sinks-elasticsearch"]
aws-kinesis-streams-integration-tests = ["sinks-aws_kinesis_streams"]
aws-s3-integration-tests = ["sinks-aws_s3", "sources-aws_s3"]
aws-sqs-integration-tests = ["sinks-aws_sqs", "sources-aws_sqs"]
aws-sqs-integration-tests = ["sinks-aws_sqs"]
aws-sns-integration-tests = ["sinks-aws_sns"]
axiom-integration-tests = ["sinks-axiom"]
azure-blob-integration-tests = ["sinks-azure_blob"]
chronicle-integration-tests = ["sinks-gcp"]
Expand Down
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ aws-sdk-cloudwatchlogs,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS R
aws-sdk-firehose,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
aws-sdk-kinesis,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
aws-sdk-s3,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
aws-sdk-sns,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
aws-sdk-sqs,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
aws-sdk-sso,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
aws-sdk-sts,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
Expand Down
2 changes: 1 addition & 1 deletion scripts/integration/aws/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ services:
mock-localstack:
image: docker.io/localstack/localstack-full:0.11.6
environment:
- SERVICES=kinesis,s3,cloudwatch,elasticsearch,es,firehose,sqs
- SERVICES=kinesis,s3,cloudwatch,elasticsearch,es,firehose,sqs,sns
mock-watchlogs:
image: docker.io/luciofranco/mockwatchlogs:latest
mock-ecs:
Expand Down
1 change: 1 addition & 0 deletions scripts/integration/aws/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ env:
KINESIS_ADDRESS: http://mock-localstack:4566
S3_ADDRESS: http://mock-localstack:4566
SQS_ADDRESS: http://mock-localstack:4566
SNS_ADDRESS: http://mock-localstack:4566
WATCHLOGS_ADDRESS: http://mock-watchlogs:6000

matrix:
Expand Down
14 changes: 14 additions & 0 deletions src/sinks/aws_s_s/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use super::{request_builder::SendMessageEntry, service::SendMessageResponse};
use aws_sdk_sqs::types::SdkError;

#[async_trait::async_trait]
pub(super) trait Client<R>
where
R: std::fmt::Debug + std::fmt::Display + std::error::Error,
{
async fn send_message(
&self,
entry: SendMessageEntry,
byte_size: usize,
) -> Result<SendMessageResponse, SdkError<R>>;
}
99 changes: 99 additions & 0 deletions src/sinks/aws_s_s/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::convert::TryFrom;

use snafu::{ResultExt, Snafu};

use vector_config::configurable_component;

use crate::{
aws::AwsAuthentication,
codecs::EncodingConfig,
config::AcknowledgementsConfig,
sinks::util::TowerRequestConfig,
template::{Template, TemplateParseError},
tls::TlsConfig,
};

#[derive(Debug, Snafu)]
pub(super) enum BuildError {
#[snafu(display("`message_group_id` should be defined for FIFO queue."))]
MessageGroupIdMissing,
#[snafu(display("`message_group_id` is not allowed with non-FIFO queue."))]
MessageGroupIdNotAllowed,
#[snafu(display("invalid topic template: {}", source))]
TopicTemplate { source: TemplateParseError },
#[snafu(display("invalid message_deduplication_id template: {}", source))]
MessageDeduplicationIdTemplate { source: TemplateParseError },
}

/// Base Configuration `aws_s_s` for sns and sqs sink.
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub(super) struct BaseSSSinkConfig {
#[configurable(derived)]
pub(super) encoding: EncodingConfig,

/// The tag that specifies that a message belongs to a specific message group.
///
/// Can be applied only to FIFO queues.
#[configurable(metadata(docs::examples = "vector"))]
#[configurable(metadata(docs::examples = "vector-%Y-%m-%d"))]
pub(super) message_group_id: Option<String>,

/// The message deduplication ID value to allow AWS to identify duplicate messages.
///
/// This value is a template which should result in a unique string for each event. See the [AWS
/// documentation][deduplication_id_docs] for more about how AWS does message deduplication.
///
/// [deduplication_id_docs]: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html
#[configurable(metadata(docs::examples = "{{ transaction_id }}"))]
pub(super) message_deduplication_id: Option<String>,

#[configurable(derived)]
#[serde(default)]
pub(super) request: TowerRequestConfig,

#[configurable(derived)]
pub(super) tls: Option<TlsConfig>,

/// The ARN of an [IAM role][iam_role] to assume at startup.
///
/// [iam_role]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html
#[configurable(deprecated)]
#[configurable(metadata(docs::hidden))]
pub(super) assume_role: Option<String>,

#[configurable(derived)]
#[serde(default)]
pub(super) auth: AwsAuthentication,

#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::skip_serializing_if_default"
)]
pub(super) acknowledgements: AcknowledgementsConfig,
}

pub(super) fn message_group_id(
message_group_id: Option<String>,
fifo: bool,
) -> crate::Result<Option<Template>> {
match (message_group_id.as_ref(), fifo) {
(Some(value), true) => Ok(Some(
Template::try_from(value.clone()).context(TopicTemplateSnafu)?,
)),
(Some(_), false) => Err(Box::new(BuildError::MessageGroupIdNotAllowed)),
(None, true) => Err(Box::new(BuildError::MessageGroupIdMissing)),
(None, false) => Ok(None),
}
}
pub(super) fn message_deduplication_id(
message_deduplication_id: Option<String>,
) -> crate::Result<Option<Template>> {
Ok(message_deduplication_id
.clone()
.map(Template::try_from)
.transpose()?)
}
12 changes: 12 additions & 0 deletions src/sinks/aws_s_s/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
mod client;
mod config;
mod request_builder;
mod retry;
mod service;
mod sink;

#[cfg(feature = "sinks-aws_sqs")]
mod sqs;

#[cfg(feature = "sinks-aws_sns")]
mod sns;
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use bytes::Bytes;
use vector_common::request_metadata::{MetaDescriptive, RequestMetadata};
use vector_core::ByteSizeOf;

use super::config::SqsSinkConfig;
use crate::codecs::EncodingConfig;
use crate::{
codecs::{Encoder, Transformer},
event::{Event, EventFinalizers, Finalizable},
Expand All @@ -15,37 +15,39 @@ use crate::{
};

#[derive(Clone)]
pub struct SqsMetadata {
pub finalizers: EventFinalizers,
pub message_group_id: Option<String>,
pub message_deduplication_id: Option<String>,
pub(super) struct SSMetadata {
pub(super) finalizers: EventFinalizers,
pub(super) message_group_id: Option<String>,
pub(super) message_deduplication_id: Option<String>,
}

#[derive(Clone)]
pub(crate) struct SqsRequestBuilder {
pub(super) struct SSRequestBuilder {
encoder: (Transformer, Encoder<()>),
message_group_id: Option<Template>,
message_deduplication_id: Option<Template>,
queue_url: String,
}

impl SqsRequestBuilder {
pub fn new(config: SqsSinkConfig) -> crate::Result<Self> {
let transformer = config.encoding.transformer();
let serializer = config.encoding.build()?;
impl SSRequestBuilder {
pub(super) fn new(
message_group_id: Option<Template>,
message_deduplication_id: Option<Template>,
encoding_config: EncodingConfig,
) -> crate::Result<Self> {
let transformer = encoding_config.transformer();
let serializer = encoding_config.build()?;
let encoder = Encoder::<()>::new(serializer);

Ok(Self {
encoder: (transformer, encoder),
message_group_id: config.message_group_id()?,
message_deduplication_id: config.message_deduplication_id()?,
queue_url: config.queue_url,
message_group_id,
message_deduplication_id,
})
}
}

impl RequestBuilder<Event> for SqsRequestBuilder {
type Metadata = SqsMetadata;
impl RequestBuilder<Event> for SSRequestBuilder {
type Metadata = SSMetadata;
type Events = Event;
type Encoder = (Transformer, Encoder<()>);
type Payload = Bytes;
Expand Down Expand Up @@ -95,17 +97,17 @@ impl RequestBuilder<Event> for SqsRequestBuilder {

let builder = RequestMetadataBuilder::from_event(&event);

let sqs_metadata = SqsMetadata {
let metadata = SSMetadata {
finalizers: event.take_finalizers(),
message_group_id,
message_deduplication_id,
};
(sqs_metadata, builder, event)
(metadata, builder, event)
}

fn build_request(
&self,
sqs_metadata: Self::Metadata,
client_metadata: Self::Metadata,
metadata: RequestMetadata,
payload: EncodeResult<Self::Payload>,
) -> Self::Request {
Expand All @@ -114,23 +116,21 @@ impl RequestBuilder<Event> for SqsRequestBuilder {

SendMessageEntry {
message_body,
message_group_id: sqs_metadata.message_group_id,
message_deduplication_id: sqs_metadata.message_deduplication_id,
queue_url: self.queue_url.clone(),
finalizers: sqs_metadata.finalizers,
message_group_id: client_metadata.message_group_id,
message_deduplication_id: client_metadata.message_deduplication_id,
finalizers: client_metadata.finalizers,
metadata,
}
}
}

#[derive(Debug, Clone)]
pub(crate) struct SendMessageEntry {
pub message_body: String,
pub message_group_id: Option<String>,
pub message_deduplication_id: Option<String>,
pub queue_url: String,
finalizers: EventFinalizers,
pub metadata: RequestMetadata,
pub(super) struct SendMessageEntry {
pub(super) message_body: String,
pub(super) message_group_id: Option<String>,
pub(super) message_deduplication_id: Option<String>,
pub(super) finalizers: EventFinalizers,
pub(super) metadata: RequestMetadata,
}

impl ByteSizeOf for SendMessageEntry {
Expand Down
44 changes: 44 additions & 0 deletions src/sinks/aws_s_s/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use aws_sdk_sqs::types::SdkError;
use std::marker::PhantomData;

use super::service::SendMessageResponse;
use crate::{aws::is_retriable_error, sinks::util::retries::RetryLogic};

#[derive(Debug)]
pub(super) struct SSRetryLogic<E> {
_phantom: PhantomData<fn() -> E>,
}

impl<E> SSRetryLogic<E>
where
E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
{
pub(super) fn new() -> SSRetryLogic<E> {
Self {
_phantom: PhantomData,
}
}
}

impl<E> RetryLogic for SSRetryLogic<E>
where
E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
{
type Error = SdkError<E>;
type Response = SendMessageResponse;

fn is_retriable_error(&self, error: &Self::Error) -> bool {
is_retriable_error(error)
}
}

impl<E> Clone for SSRetryLogic<E>
where
E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
{
fn clone(&self) -> SSRetryLogic<E> {
SSRetryLogic {
_phantom: PhantomData,
}
}
}
Loading

0 comments on commit 7b2bddc

Please sign in to comment.