From 294c1ddfb3f0398105fb02b37c9b9a38e50a6a6c Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Thu, 17 Aug 2023 13:12:36 +0100 Subject: [PATCH] chore(nats sink): Refactor to use StreamSink components (#18243) * Move to stream based sink Signed-off-by: Stephen Wakely * Add request settings Signed-off-by: Stephen Wakely * Updated docs and comments Signed-off-by: Stephen Wakely * Dont need to take metadata Signed-off-by: Stephen Wakely * Clippy Signed-off-by: Stephen Wakely * Made Subject a Template in config Signed-off-by: Stephen Wakely --------- Signed-off-by: Stephen Wakely --- src/internal_events/mod.rs | 4 - src/internal_events/nats.rs | 33 - src/sinks/nats.rs | 716 ------------------ src/sinks/nats/config.rs | 130 ++++ src/sinks/nats/integration_tests.rs | 478 ++++++++++++ src/sinks/nats/mod.rs | 30 + src/sinks/nats/request_builder.rs | 117 +++ src/sinks/nats/service.rs | 63 ++ src/sinks/nats/sink.rs | 116 +++ src/sinks/nats/tests.rs | 6 + src/sinks/prelude.rs | 7 +- .../reference/components/sinks/base/nats.cue | 154 ++++ 12 files changed, 1098 insertions(+), 756 deletions(-) delete mode 100644 src/internal_events/nats.rs delete mode 100644 src/sinks/nats.rs create mode 100644 src/sinks/nats/config.rs create mode 100644 src/sinks/nats/integration_tests.rs create mode 100644 src/sinks/nats/mod.rs create mode 100644 src/sinks/nats/request_builder.rs create mode 100644 src/sinks/nats/service.rs create mode 100644 src/sinks/nats/sink.rs create mode 100644 src/sinks/nats/tests.rs diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index 7d30daba29d97..f1073573cb51c 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -86,8 +86,6 @@ mod lua; mod metric_to_log; #[cfg(feature = "sources-mongodb_metrics")] mod mongodb_metrics; -#[cfg(feature = "sinks-nats")] -mod nats; #[cfg(feature = "sources-nginx_metrics")] mod nginx_metrics; mod open; @@ -224,8 +222,6 @@ pub(crate) use self::loki::*; pub(crate) use self::lua::*; #[cfg(feature = "transforms-metric_to_log")] pub(crate) use self::metric_to_log::*; -#[cfg(feature = "sinks-nats")] -pub(crate) use self::nats::*; #[cfg(feature = "sources-nginx_metrics")] pub(crate) use self::nginx_metrics::*; pub(crate) use self::parser::*; diff --git a/src/internal_events/nats.rs b/src/internal_events/nats.rs deleted file mode 100644 index c68c756a3a377..0000000000000 --- a/src/internal_events/nats.rs +++ /dev/null @@ -1,33 +0,0 @@ -use crate::emit; -use metrics::counter; -use vector_common::internal_event::{ - error_stage, error_type, ComponentEventsDropped, UNINTENTIONAL, -}; -use vector_core::internal_event::InternalEvent; - -#[derive(Debug)] -pub struct NatsEventSendError { - pub error: async_nats::Error, -} - -impl InternalEvent for NatsEventSendError { - fn emit(self) { - let reason = "Failed to send message."; - error!( - message = reason, - error = %self.error, - error_type = error_type::WRITER_FAILED, - stage = error_stage::SENDING, - internal_log_rate_limit = true, - ); - counter!( - "component_errors_total", 1, - "error_type" => error_type::WRITER_FAILED, - "stage" => error_stage::SENDING, - ); - emit!(ComponentEventsDropped:: { count: 1, reason }); - - // deprecated - counter!("send_errors_total", 1); - } -} diff --git a/src/sinks/nats.rs b/src/sinks/nats.rs deleted file mode 100644 index 02265df21972b..0000000000000 --- a/src/sinks/nats.rs +++ /dev/null @@ -1,716 +0,0 @@ -use std::convert::TryFrom; - -use async_trait::async_trait; -use bytes::BytesMut; -use codecs::JsonSerializerConfig; -use futures::{stream::BoxStream, FutureExt, StreamExt, TryFutureExt}; -use snafu::{ResultExt, Snafu}; -use tokio_util::codec::Encoder as _; -use vector_common::internal_event::{ - ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle, Output, Protocol, -}; -use vector_config::configurable_component; - -use crate::{ - codecs::{Encoder, EncodingConfig, Transformer}, - config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext}, - event::{EstimatedJsonEncodedSizeOf, Event, EventStatus, Finalizable}, - internal_events::{NatsEventSendError, TemplateRenderingError}, - nats::{from_tls_auth_config, NatsAuthConfig, NatsConfigError}, - sinks::util::StreamSink, - template::{Template, TemplateParseError}, - tls::TlsEnableableConfig, -}; - -#[derive(Debug, Snafu)] -enum BuildError { - #[snafu(display("invalid encoding: {}", source))] - Encoding { - source: codecs::encoding::BuildError, - }, - #[snafu(display("invalid subject template: {}", source))] - SubjectTemplate { source: TemplateParseError }, - #[snafu(display("NATS Config Error: {}", source))] - Config { source: NatsConfigError }, - #[snafu(display("NATS Connect Error: {}", source))] - Connect { source: async_nats::ConnectError }, -} - -/** - * Code dealing with the SinkConfig struct. - */ - -/// Configuration for the `nats` sink. -#[configurable_component(sink( - "nats", - "Publish observability data to subjects on the NATS messaging system." -))] -#[derive(Clone, Debug)] -#[serde(deny_unknown_fields)] -pub struct NatsSinkConfig { - #[configurable(derived)] - encoding: EncodingConfig, - - #[configurable(derived)] - #[serde( - default, - deserialize_with = "crate::serde::bool_or_struct", - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - pub acknowledgements: AcknowledgementsConfig, - - /// A NATS [name][nats_connection_name] assigned to the NATS connection. - /// - /// [nats_connection_name]: https://docs.nats.io/using-nats/developer/connecting/name - #[serde(default = "default_name", alias = "name")] - #[configurable(metadata(docs::examples = "foo"))] - connection_name: String, - - /// The NATS [subject][nats_subject] to publish messages to. - /// - /// [nats_subject]: https://docs.nats.io/nats-concepts/subjects - #[configurable(metadata(docs::templateable))] - #[configurable(metadata( - docs::examples = "{{ host }}", - docs::examples = "foo", - docs::examples = "time.us.east", - docs::examples = "time.*.east", - docs::examples = "time.>", - docs::examples = ">" - ))] - subject: String, - - /// The NATS [URL][nats_url] to connect to. - /// - /// The URL must take the form of `nats://server:port`. - /// If the port is not specified it defaults to 4222. - /// - /// [nats_url]: https://docs.nats.io/using-nats/developer/connecting#nats-url - #[configurable(metadata(docs::examples = "nats://demo.nats.io"))] - #[configurable(metadata(docs::examples = "nats://127.0.0.1:4242"))] - url: String, - - #[configurable(derived)] - tls: Option, - - #[configurable(derived)] - auth: Option, -} - -fn default_name() -> String { - String::from("vector") -} - -impl GenerateConfig for NatsSinkConfig { - fn generate_config() -> toml::Value { - toml::Value::try_from(Self { - acknowledgements: Default::default(), - auth: None, - connection_name: "vector".into(), - encoding: JsonSerializerConfig::default().into(), - subject: "from.vector".into(), - tls: None, - url: "nats://127.0.0.1:4222".into(), - }) - .unwrap() - } -} - -#[async_trait::async_trait] -#[typetag::serde(name = "nats")] -impl SinkConfig for NatsSinkConfig { - async fn build( - &self, - _cx: SinkContext, - ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { - let sink = NatsSink::new(self.clone()).await?; - let healthcheck = healthcheck(self.clone()).boxed(); - Ok((super::VectorSink::from_event_streamsink(sink), healthcheck)) - } - - fn input(&self) -> Input { - Input::new(self.encoding.config().input_type() & DataType::Log) - } - - fn acknowledgements(&self) -> &AcknowledgementsConfig { - &self.acknowledgements - } -} - -impl std::convert::TryFrom<&NatsSinkConfig> for async_nats::ConnectOptions { - type Error = NatsConfigError; - - fn try_from(config: &NatsSinkConfig) -> Result { - from_tls_auth_config(&config.connection_name, &config.auth, &config.tls) - } -} - -impl NatsSinkConfig { - async fn connect(&self) -> Result { - let options: async_nats::ConnectOptions = self.try_into().context(ConfigSnafu)?; - - options.connect(&self.url).await.context(ConnectSnafu) - } -} - -async fn healthcheck(config: NatsSinkConfig) -> crate::Result<()> { - config.connect().map_ok(|_| ()).map_err(|e| e.into()).await -} - -pub struct NatsSink { - transformer: Transformer, - encoder: Encoder<()>, - connection: async_nats::Client, - subject: Template, -} - -impl NatsSink { - async fn new(config: NatsSinkConfig) -> Result { - let connection = config.connect().await?; - let transformer = config.encoding.transformer(); - let serializer = config.encoding.build().context(EncodingSnafu)?; - let encoder = Encoder::<()>::new(serializer); - - Ok(NatsSink { - connection, - transformer, - encoder, - subject: Template::try_from(config.subject).context(SubjectTemplateSnafu)?, - }) - } -} - -#[async_trait] -impl StreamSink for NatsSink { - async fn run(mut self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { - let bytes_sent = register!(BytesSent::from(Protocol::TCP)); - let events_sent = register!(EventsSent::from(Output(None))); - - while let Some(mut event) = input.next().await { - let finalizers = event.take_finalizers(); - - let subject = match self.subject.render_string(&event) { - Ok(subject) => subject, - Err(error) => { - emit!(TemplateRenderingError { - error, - field: Some("subject"), - drop_event: true, - }); - finalizers.update_status(EventStatus::Rejected); - continue; - } - }; - - self.transformer.transform(&mut event); - - let event_byte_size = event.estimated_json_encoded_size_of(); - - let mut bytes = BytesMut::new(); - if self.encoder.encode(event, &mut bytes).is_err() { - // Error is handled by `Encoder`. - finalizers.update_status(EventStatus::Rejected); - continue; - } - - let message_size = bytes.len(); - match self - .connection - .publish(subject.clone(), bytes.freeze()) - .map_err(Into::into) - .and_then(|_| self.connection.flush().map_err(Into::into)) - .await - { - Err(error) => { - finalizers.update_status(EventStatus::Errored); - - emit!(NatsEventSendError { error }); - } - Ok(_) => { - finalizers.update_status(EventStatus::Delivered); - - events_sent.emit(CountByteSize(1, event_byte_size)); - bytes_sent.emit(ByteSize(message_size)); - } - } - } - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn generate_config() { - crate::test_util::test_generate_config::(); - } -} - -#[cfg(feature = "nats-integration-tests")] -#[cfg(test)] -mod integration_tests { - use codecs::TextSerializerConfig; - use std::time::Duration; - - use super::*; - use crate::nats::{NatsAuthCredentialsFile, NatsAuthNKey, NatsAuthToken, NatsAuthUserPassword}; - use crate::sinks::VectorSink; - use crate::test_util::{ - components::{run_and_assert_sink_compliance, SINK_TAGS}, - random_lines_with_stream, random_string, trace_init, - }; - use crate::tls::TlsConfig; - - async fn publish_and_check(conf: NatsSinkConfig) -> Result<(), BuildError> { - // Publish `N` messages to NATS. - // - // Verify with a separate subscriber that the messages were - // successfully published. - - // Create Sink - let sink = NatsSink::new(conf.clone()).await?; - let sink = VectorSink::from_event_streamsink(sink); - - // Establish the consumer subscription. - let subject = conf.subject.clone(); - let consumer = conf - .clone() - .connect() - .await - .expect("failed to connect with test consumer"); - let mut sub = consumer - .subscribe(subject) - .await - .expect("failed to subscribe with test consumer"); - consumer - .flush() - .await - .expect("failed to flush with the test consumer"); - - // Publish events. - let num_events = 1_000; - let (input, events) = random_lines_with_stream(100, num_events, None); - - run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await; - - // Unsubscribe from the channel. - tokio::time::sleep(Duration::from_secs(3)).await; - sub.unsubscribe().await.unwrap(); - - let mut output: Vec = Vec::new(); - while let Some(msg) = sub.next().await { - output.push(String::from_utf8_lossy(&msg.payload).to_string()) - } - - assert_eq!(output.len(), input.len()); - assert_eq!(output, input); - - Ok(()) - } - - #[tokio::test] - async fn nats_no_auth() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = - std::env::var("NATS_ADDRESS").unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: None, - auth: None, - }; - - let r = publish_and_check(conf).await; - assert!( - r.is_ok(), - "publish_and_check failed, expected Ok(()), got: {:?}", - r - ); - } - - #[tokio::test] - async fn nats_userpass_auth_valid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_USERPASS_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: None, - auth: Some(NatsAuthConfig::UserPassword { - user_password: NatsAuthUserPassword { - user: "natsuser".to_string(), - password: "natspass".to_string().into(), - }, - }), - }; - - publish_and_check(conf) - .await - .expect("publish_and_check failed"); - } - - #[tokio::test] - async fn nats_userpass_auth_invalid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_USERPASS_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: None, - auth: Some(NatsAuthConfig::UserPassword { - user_password: NatsAuthUserPassword { - user: "natsuser".to_string(), - password: "wrongpass".to_string().into(), - }, - }), - }; - - let r = publish_and_check(conf).await; - assert!( - matches!(r, Err(BuildError::Connect { .. })), - "publish_and_check failed, expected BuildError::Connect, got: {:?}", - r - ); - } - - #[tokio::test] - async fn nats_token_auth_valid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_TOKEN_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: None, - auth: Some(NatsAuthConfig::Token { - token: NatsAuthToken { - value: "secret".to_string().into(), - }, - }), - }; - - let r = publish_and_check(conf).await; - assert!( - r.is_ok(), - "publish_and_check failed, expected Ok(()), got: {:?}", - r - ); - } - - #[tokio::test] - async fn nats_token_auth_invalid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_TOKEN_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: None, - auth: Some(NatsAuthConfig::Token { - token: NatsAuthToken { - value: "wrongsecret".to_string().into(), - }, - }), - }; - - let r = publish_and_check(conf).await; - assert!( - matches!(r, Err(BuildError::Connect { .. })), - "publish_and_check failed, expected BuildError::Connect, got: {:?}", - r - ); - } - - #[tokio::test] - async fn nats_nkey_auth_valid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_NKEY_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: None, - auth: Some(NatsAuthConfig::Nkey { - nkey: NatsAuthNKey { - nkey: "UD345ZYSUJQD7PNCTWQPINYSO3VH4JBSADBSYUZOBT666DRASFRAWAWT".into(), - seed: "SUANIRXEZUROTXNFN3TJYMT27K7ZZVMD46FRIHF6KXKS4KGNVBS57YAFGY".into(), - }, - }), - }; - - let r = publish_and_check(conf).await; - assert!( - r.is_ok(), - "publish_and_check failed, expected Ok(()), got: {:?}", - r - ); - } - - #[tokio::test] - async fn nats_nkey_auth_invalid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_NKEY_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: None, - auth: Some(NatsAuthConfig::Nkey { - nkey: NatsAuthNKey { - nkey: "UAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".into(), - seed: "SBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB".into(), - }, - }), - }; - - let r = publish_and_check(conf).await; - assert!( - matches!(r, Err(BuildError::Connect { .. })), - "publish_and_check failed, expected BuildError::Config, got: {:?}", - r - ); - } - - #[tokio::test] - async fn nats_tls_valid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_TLS_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: Some(TlsEnableableConfig { - enabled: Some(true), - options: TlsConfig { - ca_file: Some("tests/data/nats/rootCA.pem".into()), - ..Default::default() - }, - }), - auth: None, - }; - - let r = publish_and_check(conf).await; - assert!( - r.is_ok(), - "publish_and_check failed, expected Ok(()), got: {:?}", - r - ); - } - - #[tokio::test] - async fn nats_tls_invalid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_TLS_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: None, - auth: None, - }; - - let r = publish_and_check(conf).await; - assert!( - matches!(r, Err(BuildError::Connect { .. })), - "publish_and_check failed, expected BuildError::Connect, got: {:?}", - r - ); - } - - #[tokio::test] - async fn nats_tls_client_cert_valid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_TLS_CLIENT_CERT_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: Some(TlsEnableableConfig { - enabled: Some(true), - options: TlsConfig { - ca_file: Some("tests/data/nats/rootCA.pem".into()), - crt_file: Some("tests/data/nats/nats-client.pem".into()), - key_file: Some("tests/data/nats/nats-client.key".into()), - ..Default::default() - }, - }), - auth: None, - }; - - let r = publish_and_check(conf).await; - assert!( - r.is_ok(), - "publish_and_check failed, expected Ok(()), got: {:?}", - r - ); - } - - #[tokio::test] - async fn nats_tls_client_cert_invalid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_TLS_CLIENT_CERT_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: Some(TlsEnableableConfig { - enabled: Some(true), - options: TlsConfig { - ca_file: Some("tests/data/nats/rootCA.pem".into()), - ..Default::default() - }, - }), - auth: None, - }; - - let r = publish_and_check(conf).await; - assert!( - matches!(r, Err(BuildError::Connect { .. })), - "publish_and_check failed, expected BuildError::Connect, got: {:?}", - r - ); - } - - #[tokio::test] - async fn nats_tls_jwt_auth_valid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_JWT_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: Some(TlsEnableableConfig { - enabled: Some(true), - options: TlsConfig { - ca_file: Some("tests/data/nats/rootCA.pem".into()), - ..Default::default() - }, - }), - auth: Some(NatsAuthConfig::CredentialsFile { - credentials_file: NatsAuthCredentialsFile { - path: "tests/data/nats/nats.creds".into(), - }, - }), - }; - - let r = publish_and_check(conf).await; - assert!( - r.is_ok(), - "publish_and_check failed, expected Ok(()), got: {:?}", - r - ); - } - - #[tokio::test] - async fn nats_tls_jwt_auth_invalid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_JWT_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: Some(TlsEnableableConfig { - enabled: Some(true), - options: TlsConfig { - ca_file: Some("tests/data/nats/rootCA.pem".into()), - ..Default::default() - }, - }), - auth: Some(NatsAuthConfig::CredentialsFile { - credentials_file: NatsAuthCredentialsFile { - path: "tests/data/nats/nats-bad.creds".into(), - }, - }), - }; - - let r = publish_and_check(conf).await; - assert!( - matches!(r, Err(BuildError::Connect { .. })), - "publish_and_check failed, expected BuildError::Connect, got: {:?}", - r - ); - } -} diff --git a/src/sinks/nats/config.rs b/src/sinks/nats/config.rs new file mode 100644 index 0000000000000..4ce334cda070a --- /dev/null +++ b/src/sinks/nats/config.rs @@ -0,0 +1,130 @@ +use codecs::JsonSerializerConfig; +use futures_util::TryFutureExt; +use snafu::ResultExt; +use vector_core::tls::TlsEnableableConfig; + +use crate::{ + nats::{from_tls_auth_config, NatsAuthConfig, NatsConfigError}, + sinks::prelude::*, +}; + +use super::{sink::NatsSink, ConfigSnafu, ConnectSnafu, NatsError}; + +/// Configuration for the `nats` sink. +#[configurable_component(sink( + "nats", + "Publish observability data to subjects on the NATS messaging system." +))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct NatsSinkConfig { + #[configurable(derived)] + pub(super) encoding: EncodingConfig, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + pub acknowledgements: AcknowledgementsConfig, + + /// A NATS [name][nats_connection_name] assigned to the NATS connection. + /// + /// [nats_connection_name]: https://docs.nats.io/using-nats/developer/connecting/name + #[serde(default = "default_name", alias = "name")] + #[configurable(metadata(docs::examples = "foo"))] + pub(super) connection_name: String, + + /// The NATS [subject][nats_subject] to publish messages to. + /// + /// [nats_subject]: https://docs.nats.io/nats-concepts/subjects + #[configurable(metadata(docs::templateable))] + #[configurable(metadata( + docs::examples = "{{ host }}", + docs::examples = "foo", + docs::examples = "time.us.east", + docs::examples = "time.*.east", + docs::examples = "time.>", + docs::examples = ">" + ))] + pub(super) subject: Template, + + /// The NATS [URL][nats_url] to connect to. + /// + /// The URL must take the form of `nats://server:port`. + /// If the port is not specified it defaults to 4222. + /// + /// [nats_url]: https://docs.nats.io/using-nats/developer/connecting#nats-url + #[configurable(metadata(docs::examples = "nats://demo.nats.io"))] + #[configurable(metadata(docs::examples = "nats://127.0.0.1:4242"))] + pub(super) url: String, + + #[configurable(derived)] + pub(super) tls: Option, + + #[configurable(derived)] + pub(super) auth: Option, + + #[configurable(derived)] + #[serde(default)] + pub(super) request: TowerRequestConfig, +} + +fn default_name() -> String { + String::from("vector") +} + +impl GenerateConfig for NatsSinkConfig { + fn generate_config() -> toml::Value { + toml::Value::try_from(Self { + acknowledgements: Default::default(), + auth: None, + connection_name: "vector".into(), + encoding: JsonSerializerConfig::default().into(), + subject: Template::try_from("from.vector").unwrap(), + tls: None, + url: "nats://127.0.0.1:4222".into(), + request: Default::default(), + }) + .unwrap() + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "nats")] +impl SinkConfig for NatsSinkConfig { + async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let sink = NatsSink::new(self.clone()).await?; + let healthcheck = healthcheck(self.clone()).boxed(); + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) + } + + fn input(&self) -> Input { + Input::new(self.encoding.config().input_type() & DataType::Log) + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +impl std::convert::TryFrom<&NatsSinkConfig> for async_nats::ConnectOptions { + type Error = NatsConfigError; + + fn try_from(config: &NatsSinkConfig) -> Result { + from_tls_auth_config(&config.connection_name, &config.auth, &config.tls) + } +} + +impl NatsSinkConfig { + pub(super) async fn connect(&self) -> Result { + let options: async_nats::ConnectOptions = self.try_into().context(ConfigSnafu)?; + + options.connect(&self.url).await.context(ConnectSnafu) + } +} + +async fn healthcheck(config: NatsSinkConfig) -> crate::Result<()> { + config.connect().map_ok(|_| ()).map_err(|e| e.into()).await +} diff --git a/src/sinks/nats/integration_tests.rs b/src/sinks/nats/integration_tests.rs new file mode 100644 index 0000000000000..c8972d8fb11e9 --- /dev/null +++ b/src/sinks/nats/integration_tests.rs @@ -0,0 +1,478 @@ +use codecs::TextSerializerConfig; +use std::time::Duration; + +use super::{config::NatsSinkConfig, sink::NatsSink, NatsError}; +use crate::{ + nats::{ + NatsAuthConfig, NatsAuthCredentialsFile, NatsAuthNKey, NatsAuthToken, NatsAuthUserPassword, + }, + sinks::prelude::*, + test_util::{ + components::{run_and_assert_sink_compliance, SINK_TAGS}, + random_lines_with_stream, random_string, trace_init, + }, + tls::TlsEnableableConfig, +}; + +async fn publish_and_check(conf: NatsSinkConfig) -> Result<(), NatsError> { + // Publish `N` messages to NATS. + // + // Verify with a separate subscriber that the messages were + // successfully published. + + // Create Sink + let sink = NatsSink::new(conf.clone()).await?; + let sink = VectorSink::from_event_streamsink(sink); + + // Establish the consumer subscription. + let subject = conf.subject.clone(); + let consumer = conf + .clone() + .connect() + .await + .expect("failed to connect with test consumer"); + let mut sub = consumer + .subscribe(subject.to_string()) + .await + .expect("failed to subscribe with test consumer"); + consumer + .flush() + .await + .expect("failed to flush with the test consumer"); + + // Publish events. + let num_events = 10; + let (input, events) = random_lines_with_stream(100, num_events, None); + + run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await; + + // Unsubscribe from the channel. + tokio::time::sleep(Duration::from_secs(3)).await; + sub.unsubscribe().await.unwrap(); + + let mut output: Vec = Vec::new(); + while let Some(msg) = sub.next().await { + output.push(String::from_utf8_lossy(&msg.payload).to_string()) + } + + assert_eq!(output.len(), input.len()); + assert_eq!(output, input); + + Ok(()) +} + +#[tokio::test] +async fn nats_no_auth() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = + std::env::var("NATS_ADDRESS").unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: None, + auth: None, + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); +} + +#[tokio::test] +async fn nats_userpass_auth_valid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = std::env::var("NATS_USERPASS_ADDRESS") + .unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: None, + auth: Some(NatsAuthConfig::UserPassword { + user_password: NatsAuthUserPassword { + user: "natsuser".to_string(), + password: "natspass".to_string().into(), + }, + }), + request: Default::default(), + }; + + publish_and_check(conf) + .await + .expect("publish_and_check failed"); +} + +#[tokio::test] +async fn nats_userpass_auth_invalid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = std::env::var("NATS_USERPASS_ADDRESS") + .unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: None, + auth: Some(NatsAuthConfig::UserPassword { + user_password: NatsAuthUserPassword { + user: "natsuser".to_string(), + password: "wrongpass".to_string().into(), + }, + }), + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(NatsError::Connect { .. })), + "publish_and_check failed, expected NatsError::Connect, got: {:?}", + r + ); +} + +#[tokio::test] +async fn nats_token_auth_valid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = std::env::var("NATS_TOKEN_ADDRESS") + .unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: None, + auth: Some(NatsAuthConfig::Token { + token: NatsAuthToken { + value: "secret".to_string().into(), + }, + }), + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); +} + +#[tokio::test] +async fn nats_token_auth_invalid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = std::env::var("NATS_TOKEN_ADDRESS") + .unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: None, + auth: Some(NatsAuthConfig::Token { + token: NatsAuthToken { + value: "wrongsecret".to_string().into(), + }, + }), + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(NatsError::Connect { .. })), + "publish_and_check failed, expected NatsError::Connect, got: {:?}", + r + ); +} + +#[tokio::test] +async fn nats_nkey_auth_valid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = std::env::var("NATS_NKEY_ADDRESS") + .unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: None, + auth: Some(NatsAuthConfig::Nkey { + nkey: NatsAuthNKey { + nkey: "UD345ZYSUJQD7PNCTWQPINYSO3VH4JBSADBSYUZOBT666DRASFRAWAWT".into(), + seed: "SUANIRXEZUROTXNFN3TJYMT27K7ZZVMD46FRIHF6KXKS4KGNVBS57YAFGY".into(), + }, + }), + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); +} + +#[tokio::test] +async fn nats_nkey_auth_invalid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = std::env::var("NATS_NKEY_ADDRESS") + .unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: None, + auth: Some(NatsAuthConfig::Nkey { + nkey: NatsAuthNKey { + nkey: "UAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".into(), + seed: "SBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB".into(), + }, + }), + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(NatsError::Connect { .. })), + "publish_and_check failed, expected NatsError::Config, got: {:?}", + r + ); +} + +#[tokio::test] +async fn nats_tls_valid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = + std::env::var("NATS_TLS_ADDRESS").unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: Some(TlsEnableableConfig { + enabled: Some(true), + options: TlsConfig { + ca_file: Some("tests/data/nats/rootCA.pem".into()), + ..Default::default() + }, + }), + auth: None, + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); +} + +#[tokio::test] +async fn nats_tls_invalid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = + std::env::var("NATS_TLS_ADDRESS").unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: None, + auth: None, + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(NatsError::Connect { .. })), + "publish_and_check failed, expected NatsError::Connect, got: {:?}", + r + ); +} + +#[tokio::test] +async fn nats_tls_client_cert_valid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = std::env::var("NATS_TLS_CLIENT_CERT_ADDRESS") + .unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: Some(TlsEnableableConfig { + enabled: Some(true), + options: TlsConfig { + ca_file: Some("tests/data/nats/rootCA.pem".into()), + crt_file: Some("tests/data/nats/nats-client.pem".into()), + key_file: Some("tests/data/nats/nats-client.key".into()), + ..Default::default() + }, + }), + auth: None, + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); +} + +#[tokio::test] +async fn nats_tls_client_cert_invalid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = std::env::var("NATS_TLS_CLIENT_CERT_ADDRESS") + .unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: Some(TlsEnableableConfig { + enabled: Some(true), + options: TlsConfig { + ca_file: Some("tests/data/nats/rootCA.pem".into()), + ..Default::default() + }, + }), + auth: None, + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(NatsError::Connect { .. })), + "publish_and_check failed, expected NatsError::Connect, got: {:?}", + r + ); +} + +#[tokio::test] +async fn nats_tls_jwt_auth_valid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = + std::env::var("NATS_JWT_ADDRESS").unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: Some(TlsEnableableConfig { + enabled: Some(true), + options: TlsConfig { + ca_file: Some("tests/data/nats/rootCA.pem".into()), + ..Default::default() + }, + }), + auth: Some(NatsAuthConfig::CredentialsFile { + credentials_file: NatsAuthCredentialsFile { + path: "tests/data/nats/nats.creds".into(), + }, + }), + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); +} + +#[tokio::test] +async fn nats_tls_jwt_auth_invalid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = + std::env::var("NATS_JWT_ADDRESS").unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: Some(TlsEnableableConfig { + enabled: Some(true), + options: TlsConfig { + ca_file: Some("tests/data/nats/rootCA.pem".into()), + ..Default::default() + }, + }), + auth: Some(NatsAuthConfig::CredentialsFile { + credentials_file: NatsAuthCredentialsFile { + path: "tests/data/nats/nats-bad.creds".into(), + }, + }), + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(NatsError::Connect { .. })), + "publish_and_check failed, expected NatsError::Connect, got: {:?}", + r + ); +} diff --git a/src/sinks/nats/mod.rs b/src/sinks/nats/mod.rs new file mode 100644 index 0000000000000..a9670aeea514a --- /dev/null +++ b/src/sinks/nats/mod.rs @@ -0,0 +1,30 @@ +//! `NATS` sink +//! Publishes data using [NATS](nats.io)(Neural Autonomic Transport System). + +use snafu::Snafu; + +use crate::nats::NatsConfigError; + +mod config; +#[cfg(feature = "nats-integration-tests")] +#[cfg(test)] +mod integration_tests; +mod request_builder; +mod service; +mod sink; +#[cfg(test)] +mod tests; + +#[derive(Debug, Snafu)] +enum NatsError { + #[snafu(display("invalid encoding: {}", source))] + Encoding { + source: codecs::encoding::BuildError, + }, + #[snafu(display("NATS Config Error: {}", source))] + Config { source: NatsConfigError }, + #[snafu(display("NATS Connect Error: {}", source))] + Connect { source: async_nats::ConnectError }, + #[snafu(display("NATS Server Error: {}", source))] + ServerError { source: async_nats::Error }, +} diff --git a/src/sinks/nats/request_builder.rs b/src/sinks/nats/request_builder.rs new file mode 100644 index 0000000000000..6ff785b15a28e --- /dev/null +++ b/src/sinks/nats/request_builder.rs @@ -0,0 +1,117 @@ +use std::io; + +use bytes::{Bytes, BytesMut}; +use tokio_util::codec::Encoder as _; +use vector_core::config::telemetry; + +use crate::sinks::prelude::*; + +use super::sink::NatsEvent; + +pub(super) struct NatsEncoder { + pub(super) transformer: Transformer, + pub(super) encoder: Encoder<()>, +} + +impl encoding::Encoder for NatsEncoder { + fn encode_input( + &self, + mut input: Event, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + let mut body = BytesMut::new(); + self.transformer.transform(&mut input); + + let mut byte_size = telemetry().create_request_count_byte_size(); + byte_size.add_event(&input, input.estimated_json_encoded_size_of()); + + let mut encoder = self.encoder.clone(); + encoder + .encode(input, &mut body) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "unable to encode"))?; + + let body = body.freeze(); + write_all(writer, 1, body.as_ref())?; + + Ok((body.len(), byte_size)) + } +} + +pub(super) struct NatsMetadata { + subject: String, + finalizers: EventFinalizers, +} + +pub(super) struct NatsRequestBuilder { + pub(super) encoder: NatsEncoder, +} + +#[derive(Clone)] +pub(super) struct NatsRequest { + pub(super) bytes: Bytes, + pub(super) subject: String, + finalizers: EventFinalizers, + pub(super) metadata: RequestMetadata, +} + +impl Finalizable for NatsRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + std::mem::take(&mut self.finalizers) + } +} + +impl MetaDescriptive for NatsRequest { + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata + } +} + +impl RequestBuilder for NatsRequestBuilder { + type Metadata = NatsMetadata; + type Events = Event; + type Encoder = NatsEncoder; + type Payload = Bytes; + type Request = NatsRequest; + type Error = io::Error; + + fn compression(&self) -> Compression { + Compression::None + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoder + } + + fn split_input( + &self, + mut input: NatsEvent, + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let builder = RequestMetadataBuilder::from_event(&input.event); + + let metadata = NatsMetadata { + subject: input.subject, + finalizers: input.event.take_finalizers(), + }; + + (metadata, builder, input.event) + } + + fn build_request( + &self, + nats_metadata: Self::Metadata, + metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + let body = payload.into_payload(); + NatsRequest { + bytes: body, + subject: nats_metadata.subject, + finalizers: nats_metadata.finalizers, + metadata, + } + } +} diff --git a/src/sinks/nats/service.rs b/src/sinks/nats/service.rs new file mode 100644 index 0000000000000..0eb2407ab5738 --- /dev/null +++ b/src/sinks/nats/service.rs @@ -0,0 +1,63 @@ +use std::{ + sync::Arc, + task::{Context, Poll}, +}; + +use futures_util::TryFutureExt; + +use crate::sinks::prelude::*; + +use super::{request_builder::NatsRequest, NatsError}; + +#[derive(Clone)] +pub(super) struct NatsService { + pub(super) connection: Arc, +} + +pub(super) struct NatsResponse { + metadata: RequestMetadata, +} + +impl DriverResponse for NatsResponse { + fn event_status(&self) -> EventStatus { + EventStatus::Delivered + } + + fn events_sent(&self) -> &GroupedCountByteSize { + self.metadata.events_estimated_json_encoded_byte_size() + } + + fn bytes_sent(&self) -> Option { + Some(self.metadata.request_encoded_size()) + } +} + +impl Service for NatsService { + type Response = NatsResponse; + + type Error = NatsError; + + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: NatsRequest) -> Self::Future { + let connection = Arc::clone(&self.connection); + + Box::pin(async move { + match connection + .publish(req.subject, req.bytes) + .map_err(async_nats::Error::from) + .and_then(|_| connection.flush().map_err(Into::into)) + .await + { + Err(error) => Err(NatsError::ServerError { source: error }), + Ok(_) => Ok(NatsResponse { + metadata: req.metadata, + }), + } + }) + } +} diff --git a/src/sinks/nats/sink.rs b/src/sinks/nats/sink.rs new file mode 100644 index 0000000000000..97fc747dc2f89 --- /dev/null +++ b/src/sinks/nats/sink.rs @@ -0,0 +1,116 @@ +use std::sync::Arc; + +use snafu::ResultExt; + +use crate::sinks::prelude::*; + +use super::{ + config::NatsSinkConfig, + request_builder::{NatsEncoder, NatsRequestBuilder}, + service::{NatsResponse, NatsService}, + EncodingSnafu, NatsError, +}; + +pub(super) struct NatsEvent { + pub(super) event: Event, + pub(super) subject: String, +} + +pub(super) struct NatsSink { + request: TowerRequestConfig, + transformer: Transformer, + encoder: Encoder<()>, + connection: Arc, + subject: Template, +} + +impl NatsSink { + fn make_nats_event(&self, event: Event) -> Option { + let subject = self + .subject + .render_string(&event) + .map_err(|missing_keys| { + emit!(TemplateRenderingError { + error: missing_keys, + field: Some("subject"), + drop_event: true, + }); + }) + .ok()?; + + Some(NatsEvent { event, subject }) + } + + pub(super) async fn new(config: NatsSinkConfig) -> Result { + let connection = Arc::new(config.connect().await?); + let transformer = config.encoding.transformer(); + let serializer = config.encoding.build().context(EncodingSnafu)?; + let encoder = Encoder::<()>::new(serializer); + let request = config.request; + let subject = config.subject; + + Ok(NatsSink { + request, + connection, + transformer, + encoder, + subject, + }) + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let request = self.request.unwrap_with(&TowerRequestConfig { + concurrency: Concurrency::Fixed(1), + ..Default::default() + }); + + let request_builder = NatsRequestBuilder { + encoder: NatsEncoder { + encoder: self.encoder.clone(), + transformer: self.transformer.clone(), + }, + }; + + let service = ServiceBuilder::new() + .settings(request, NatsRetryLogic) + .service(NatsService { + connection: Arc::clone(&self.connection), + }); + + input + .filter_map(|event| std::future::ready(self.make_nats_event(event))) + .request_builder(None, request_builder) + .filter_map(|request| async move { + match request { + Err(e) => { + error!("Failed to build NATS request: {:?}.", e); + None + } + Ok(req) => Some(req), + } + }) + .into_driver(service) + .protocol("nats") + .run() + .await + } +} + +#[async_trait] +impl StreamSink for NatsSink { + async fn run(mut self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + self.run_inner(input).await + } +} + +#[derive(Debug, Clone)] +pub(super) struct NatsRetryLogic; + +impl RetryLogic for NatsRetryLogic { + type Error = NatsError; + type Response = NatsResponse; + + fn is_retriable_error(&self, _error: &Self::Error) -> bool { + true + } +} diff --git a/src/sinks/nats/tests.rs b/src/sinks/nats/tests.rs new file mode 100644 index 0000000000000..92e90e0d77fba --- /dev/null +++ b/src/sinks/nats/tests.rs @@ -0,0 +1,6 @@ +use super::config::NatsSinkConfig; + +#[test] +fn generate_config() { + crate::test_util::test_generate_config::(); +} diff --git a/src/sinks/prelude.rs b/src/sinks/prelude.rs index b66de220ed5ed..9c7e81cf39418 100644 --- a/src/sinks/prelude.rs +++ b/src/sinks/prelude.rs @@ -6,16 +6,16 @@ pub use crate::{ config::{DataType, GenerateConfig, SinkConfig, SinkContext}, event::{Event, LogEvent}, internal_events::{SinkRequestBuildError, TemplateRenderingError}, - sinks::util::retries::RetryLogic, sinks::{ util::{ builder::SinkBuilderExt, encoding::{self, write_all}, metadata::RequestMetadataBuilder, request_builder::EncodeResult, + retries::{RetryAction, RetryLogic}, service::{ServiceBuilderExt, Svc}, - BatchConfig, Compression, NoDefaultsBatchSettings, RequestBuilder, SinkBatchSettings, - TowerRequestConfig, + BatchConfig, Compression, Concurrency, NoDefaultsBatchSettings, RequestBuilder, + SinkBatchSettings, TowerRequestConfig, }, Healthcheck, HealthcheckError, }, @@ -33,6 +33,7 @@ pub use vector_common::{ request_metadata::{GetEventCountTags, GroupedCountByteSize, MetaDescriptive, RequestMetadata}, }; pub use vector_config::configurable_component; + pub use vector_core::{ config::{AcknowledgementsConfig, Input}, event::Value, diff --git a/website/cue/reference/components/sinks/base/nats.cue b/website/cue/reference/components/sinks/base/nats.cue index 2708c111c08cc..3565f375d2f67 100644 --- a/website/cue/reference/components/sinks/base/nats.cue +++ b/website/cue/reference/components/sinks/base/nats.cue @@ -266,6 +266,160 @@ base: components: sinks: nats: configuration: { } } } + request: { + description: """ + Middleware settings for outbound requests. + + Various settings can be configured, such as concurrency and rate limits, timeouts, etc. + """ + required: false + type: object: options: { + adaptive_concurrency: { + description: """ + Configuration of adaptive concurrency parameters. + + These parameters typically do not require changes from the default, and incorrect values can lead to meta-stable or + unstable performance and sink behavior. Proceed with caution. + """ + required: false + type: object: options: { + decrease_ratio: { + description: """ + The fraction of the current value to set the new concurrency limit when decreasing the limit. + + Valid values are greater than `0` and less than `1`. Smaller values cause the algorithm to scale back rapidly + when latency increases. + + Note that the new limit is rounded down after applying this ratio. + """ + required: false + type: float: default: 0.9 + } + ewma_alpha: { + description: """ + The weighting of new measurements compared to older measurements. + + Valid values are greater than `0` and less than `1`. + + ARC uses an exponentially weighted moving average (EWMA) of past RTT measurements as a reference to compare with + the current RTT. Smaller values cause this reference to adjust more slowly, which may be useful if a service has + unusually high response variability. + """ + required: false + type: float: default: 0.4 + } + initial_concurrency: { + description: """ + The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency). + + It is recommended to set this value to your service's average limit if you're seeing that it takes a + long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the + `adaptive_concurrency_limit` metric. + """ + required: false + type: uint: default: 1 + } + rtt_deviation_scale: { + description: """ + Scale of RTT deviations which are not considered anomalous. + + Valid values are greater than or equal to `0`, and we expect reasonable values to range from `1.0` to `3.0`. + + When calculating the past RTT average, we also compute a secondary “deviation” value that indicates how variable + those values are. We use that deviation when comparing the past RTT average to the current measurements, so we + can ignore increases in RTT that are within an expected range. This factor is used to scale up the deviation to + an appropriate range. Larger values cause the algorithm to ignore larger increases in the RTT. + """ + required: false + type: float: default: 2.5 + } + } + } + concurrency: { + description: "Configuration for outbound request concurrency." + required: false + type: { + string: { + default: "none" + enum: { + adaptive: """ + Concurrency will be managed by Vector's [Adaptive Request Concurrency][arc] feature. + + [arc]: https://vector.dev/docs/about/under-the-hood/networking/arc/ + """ + none: """ + A fixed concurrency of 1. + + Only one request can be outstanding at any given time. + """ + } + } + uint: {} + } + } + rate_limit_duration_secs: { + description: "The time window used for the `rate_limit_num` option." + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + rate_limit_num: { + description: "The maximum number of requests allowed within the `rate_limit_duration_secs` time window." + required: false + type: uint: { + default: 9223372036854775807 + unit: "requests" + } + } + retry_attempts: { + description: """ + The maximum number of retries to make for failed requests. + + The default, for all intents and purposes, represents an infinite number of retries. + """ + required: false + type: uint: { + default: 9223372036854775807 + unit: "retries" + } + } + retry_initial_backoff_secs: { + description: """ + The amount of time to wait before attempting the first retry for a failed request. + + After the first retry has failed, the fibonacci sequence is used to select future backoffs. + """ + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + retry_max_duration_secs: { + description: "The maximum amount of time to wait between retries." + required: false + type: uint: { + default: 3600 + unit: "seconds" + } + } + timeout_secs: { + description: """ + The time a request can take before being aborted. + + Datadog highly recommends that you do not lower this value below the service's internal timeout, as this could + create orphaned requests, pile on retries, and result in duplicate data downstream. + """ + required: false + type: uint: { + default: 60 + unit: "seconds" + } + } + } + } subject: { description: """ The NATS [subject][nats_subject] to publish messages to.