From a2cc8f82d80885c14ab22bd792f7e356df03e39c Mon Sep 17 00:00:00 2001 From: Fletcher Nichol Date: Wed, 27 Nov 2024 12:31:39 -0700 Subject: [PATCH] feat: add a NATS "dead letter queue" stream for failing messages This change introduces a new NATS Jetstream stream called `DEAD_LETTER_QUEUES` which will contain metadata of messages that reached their consumer's `max_deliver` limit. Currently, only the `forklift-server` consumer for the `AUDIT_LOGS` stream is configured, but more streams and consumers could be configured to use this in the future. The forklift/audit logs NATS consumer is configured to attempt `4` deliveries of each message with a linear backoff (i.e. when a message is `nack`d) of 5 seconds, then 10, then 15 for a total of 30 seconds before the message metadata is added to the "dead letter queue" stream. Note that a "failed" message is *not* deleted from its source stream, but rather skipped over by that consumer. This allows us to inspect each failed message in place in the stream and decide how to triage any remediations. Co-authored-by: Nick Gerace Signed-off-by: Fletcher Nichol --- Cargo.lock | 11 ++++ Cargo.toml | 3 +- lib/forklift-server/BUCK | 1 + lib/forklift-server/Cargo.toml | 1 + .../src/server/app/audit_logs.rs | 12 ++++ lib/nats-dead-letter-queue/BUCK | 14 +++++ lib/nats-dead-letter-queue/Cargo.toml | 15 +++++ lib/nats-dead-letter-queue/src/lib.rs | 59 +++++++++++++++++++ 8 files changed, 115 insertions(+), 1 deletion(-) create mode 100644 lib/nats-dead-letter-queue/BUCK create mode 100644 lib/nats-dead-letter-queue/Cargo.toml create mode 100644 lib/nats-dead-letter-queue/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index a32449bc47..9a52ef37a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2629,6 +2629,7 @@ dependencies = [ "data-warehouse-stream-client", "derive_builder", "futures", + "nats-dead-letter-queue", "naxum", "remain", "serde", @@ -4303,6 +4304,16 @@ dependencies = [ "getrandom 0.2.15", ] +[[package]] +name = "nats-dead-letter-queue" +version = "0.1.0" +dependencies = [ + "remain", + "si-data-nats", + "telemetry-nats", + "thiserror", +] + [[package]] name = "nats-multiplexer" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 5b1adc38ed..6f16be360d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ members = [ "lib/forklift-server", "lib/module-index-client", "lib/module-index-server", + "lib/nats-dead-letter-queue", "lib/nats-multiplexer", "lib/nats-multiplexer-client", "lib/nats-multiplexer-core", @@ -37,9 +38,9 @@ members = [ "lib/naxum-api-types", "lib/object-tree", "lib/pending-events", + "lib/permissions", "lib/pinga-core", "lib/pinga-server", - "lib/permissions", "lib/rebaser-client", "lib/rebaser-core", "lib/rebaser-server", diff --git a/lib/forklift-server/BUCK b/lib/forklift-server/BUCK index 8c9b699235..27640261c2 100644 --- a/lib/forklift-server/BUCK +++ b/lib/forklift-server/BUCK @@ -7,6 +7,7 @@ rust_library( "//lib/billing-events:billing-events", "//lib/buck2-resources:buck2-resources", "//lib/data-warehouse-stream-client:data-warehouse-stream-client", + "//lib/nats-dead-letter-queue:nats-dead-letter-queue", "//lib/naxum:naxum", "//lib/si-data-nats:si-data-nats", "//lib/si-events-rs:si-events", diff --git a/lib/forklift-server/Cargo.toml b/lib/forklift-server/Cargo.toml index 15bf19d4b6..c0e1f63cc2 100644 --- a/lib/forklift-server/Cargo.toml +++ b/lib/forklift-server/Cargo.toml @@ -13,6 +13,7 @@ audit-logs = { path = "../../lib/audit-logs" } billing-events = { path = "../../lib/billing-events" } buck2-resources = { path = "../../lib/buck2-resources" } data-warehouse-stream-client = { path = "../../lib/data-warehouse-stream-client" } +nats-dead-letter-queue = { path = "../../lib/nats-dead-letter-queue" } naxum = { path = "../../lib/naxum" } si-data-nats = { path = "../../lib/si-data-nats" } si-events = { path = "../../lib/si-events-rs" } diff --git a/lib/forklift-server/src/server/app/audit_logs.rs b/lib/forklift-server/src/server/app/audit_logs.rs index 92aeb4b00f..befd64eea4 100644 --- a/lib/forklift-server/src/server/app/audit_logs.rs +++ b/lib/forklift-server/src/server/app/audit_logs.rs @@ -2,6 +2,7 @@ use std::{ future::{Future, IntoFuture as _}, io, sync::Arc, + time::Duration, }; use app_state::AppState; @@ -9,6 +10,7 @@ use audit_logs::{ database::{AuditDatabaseConfig, AuditDatabaseContext, AuditDatabaseContextError}, AuditLogsStream, AuditLogsStreamError, }; +use nats_dead_letter_queue::NatsDeadLetterQueueError; use naxum::{ extract::MatchedSubject, handler::Handler as _, @@ -46,6 +48,8 @@ pub enum AuditLogsAppSetupError { AuditDatabaseContext(#[from] AuditDatabaseContextError), #[error("audit logs stream error: {0}")] AuditLogsStream(#[from] AuditLogsStreamError), + #[error("failed to create dead letter stream: {0}")] + NatsDeadLetterQueue(#[from] NatsDeadLetterQueueError), } type Result = std::result::Result; @@ -65,6 +69,8 @@ pub(crate) async fn build_and_run( audit_database_config: &AuditDatabaseConfig, token: CancellationToken, ) -> Result> + Unpin + Send>> { + nats_dead_letter_queue::create_stream(&jetstream_context).await?; + let incoming = { let stream = AuditLogsStream::get_or_create(jetstream_context).await?; let consumer_subject = stream.consuming_subject_for_all_workspaces(); @@ -74,6 +80,12 @@ pub(crate) async fn build_and_run( .create_consumer(async_nats::jetstream::consumer::pull::Config { durable_name: Some(durable_consumer_name), filter_subject: consumer_subject.into_string(), + max_deliver: 4, + backoff: vec![ + Duration::from_secs(5), + Duration::from_secs(10), + Duration::from_secs(15), + ], ..Default::default() }) .await? diff --git a/lib/nats-dead-letter-queue/BUCK b/lib/nats-dead-letter-queue/BUCK new file mode 100644 index 0000000000..b38f57c5cb --- /dev/null +++ b/lib/nats-dead-letter-queue/BUCK @@ -0,0 +1,14 @@ +load("@prelude-si//:macros.bzl", "rust_library") + +rust_library( + name = "nats-dead-letter-queue", + deps = [ + "//lib/si-data-nats:si-data-nats", + "//lib/telemetry-rs:telemetry", + "//third-party/rust:remain", + "//third-party/rust:thiserror", + ], + srcs = glob([ + "src/**/*.rs", + ]), +) diff --git a/lib/nats-dead-letter-queue/Cargo.toml b/lib/nats-dead-letter-queue/Cargo.toml new file mode 100644 index 0000000000..a6c6e2b809 --- /dev/null +++ b/lib/nats-dead-letter-queue/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "nats-dead-letter-queue" +version.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true +edition.workspace = true +rust-version.workspace = true +publish.workspace = true + +[dependencies] +remain = { workspace = true } +si-data-nats = { path = "../../lib/si-data-nats" } +telemetry-nats = { path = "../../lib/telemetry-nats-rs" } +thiserror = { workspace = true } diff --git a/lib/nats-dead-letter-queue/src/lib.rs b/lib/nats-dead-letter-queue/src/lib.rs new file mode 100644 index 0000000000..2106aab72d --- /dev/null +++ b/lib/nats-dead-letter-queue/src/lib.rs @@ -0,0 +1,59 @@ +use si_data_nats::{ + async_nats::jetstream::{ + context::CreateStreamError, + stream::{Config, RetentionPolicy}, + }, + jetstream::Context, +}; +use thiserror::Error; + +const STREAM_NAME: &str = "DEAD_LETTER_QUEUES"; +const STREAM_DESCRIPTION: &str = "Dead Letter Queues"; +// Subscribe to *all* stream and consumer max deliveries events. This subject is of the form: +// `$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES..` +// +// See: https://docs.nats.io/running-a-nats-service/nats_admin/monitoring/monitoring_jetstream +const STREAM_SUBJECTS: &str = "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.*.*"; + +#[allow(missing_docs)] +#[remain::sorted] +#[derive(Debug, Error)] +pub enum Error { + #[error("create stream error: {0}")] + CreateStream(#[from] CreateStreamError), +} + +pub type NatsDeadLetterQueueError = Error; + +type Result = std::result::Result; + +/// Ensures that the "dead letter queue" stream is created +pub async fn create_stream(context: &Context) -> Result<()> { + let prefix = context.metadata().subject_prefix(); + + context + .get_or_create_stream(Config { + name: prefixed_stream_name(prefix, STREAM_NAME), + description: Some(STREAM_DESCRIPTION.to_string()), + retention: RetentionPolicy::Limits, + subjects: vec![prefixed_subject(prefix, STREAM_SUBJECTS)], + ..Default::default() + }) + .await?; + + Ok(()) +} + +fn prefixed_stream_name(prefix: Option<&str>, stream_name: &str) -> String { + match prefix { + Some(prefix) => format!("{prefix}_{stream_name}"), + None => stream_name.to_owned(), + } +} + +fn prefixed_subject(prefix: Option<&str>, subject: &str) -> String { + match prefix { + Some(prefix) => format!("{prefix}.{subject}"), + None => subject.to_owned(), + } +}