From ca7115794deb628a1c594ec36c2e22d2dba0a00b Mon Sep 17 00:00:00 2001 From: Nick Gerace Date: Thu, 21 Nov 2024 13:53:24 -0500 Subject: [PATCH] Incomplete attempt at reading from audit database Signed-off-by: Nick Gerace --- Cargo.lock | 2 + lib/audit-logs/src/database.rs | 240 ++++++++++++----- lib/audit-logs/src/database/config.rs | 1 + lib/dal-test/BUCK | 2 + lib/dal-test/Cargo.toml | 2 + lib/dal-test/src/helpers.rs | 31 +++ lib/dal-test/src/lib.rs | 26 +- lib/dal/src/audit_logging.rs | 249 ------------------ .../tests/integration_test/audit_logging.rs | 194 ++++++++------ .../src/server/app/audit_logs/handlers.rs | 10 +- lib/sdf-server/src/app.rs | 7 + lib/sdf-server/src/app_state.rs | 8 + lib/sdf-server/src/server.rs | 2 + lib/sdf-server/src/service/v2/audit_log.rs | 14 +- .../service/v2/audit_log/list_audit_logs.rs | 118 ++++++++- lib/si-events-rs/src/ulid.rs | 4 +- lib/si-frontend-types-rs/src/audit_log.rs | 13 - lib/si-test-macros/src/dal_test.rs | 10 + lib/si-test-macros/src/expand.rs | 14 + lib/si-test-macros/src/sdf_test.rs | 10 + 20 files changed, 523 insertions(+), 434 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a32449bc47..8c25263fce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1927,6 +1927,7 @@ name = "dal-test" version = "0.1.0" dependencies = [ "async-recursion", + "audit-logs", "base64 0.22.1", "buck2-resources", "color-eyre", @@ -1949,6 +1950,7 @@ dependencies = [ "si-crypto", "si-data-nats", "si-data-pg", + "si-events", "si-layer-cache", "si-pkg", "si-runtime", diff --git a/lib/audit-logs/src/database.rs b/lib/audit-logs/src/database.rs index 55daa1cd2b..cdade35ab9 100644 --- a/lib/audit-logs/src/database.rs +++ b/lib/audit-logs/src/database.rs @@ -1,13 +1,20 @@ //! Contains functionality for setting up and communicating with the audit database. +use std::str::FromStr; + use chrono::DateTime; use chrono::Utc; +use serde::Deserialize; +use serde::Serialize; use si_data_pg::PgError; use si_data_pg::PgPoolError; +use si_data_pg::PgRow; use si_events::audit_log::AuditLogKind; use si_events::audit_log::AuditLogMetadata; +use si_events::ulid; use si_events::Actor; use si_events::ChangeSetId; +use si_events::UserPk; use si_events::WorkspacePk; use telemetry::prelude::*; use thiserror::Error; @@ -33,77 +40,176 @@ pub enum AuditDatabaseError { PgPool(#[from] PgPoolError), #[error("serde json error: {0}")] SerdeJson(#[from] serde_json::Error), + #[error("ulid decode error: {0}")] + UlidDecode(#[from] ulid::DecodeError), } type Result = std::result::Result; -#[allow(clippy::too_many_arguments, missing_docs)] -#[instrument( - name = "audit_log.insert", - level = "debug", - skip_all, - fields( - si.workspace.id = %workspace_id, - ), -)] -pub async fn insert( - context: &AuditDatabaseContext, - workspace_id: WorkspacePk, - kind: AuditLogKind, - timestamp: String, - change_set_id: Option, - actor: Actor, - entity_name: Option, -) -> Result<()> { - let kind_as_string = kind.to_string(); - let user_id = match actor { - Actor::System => None, - Actor::User(user_id) => Some(user_id), - }; +/// A row in the audit logs table of the audit database. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct AuditLogRow { + /// Indicates the workspace that the row belongs to. + pub workspace_id: WorkspacePk, + /// The [kind](AuditLogKind) of the [`AuditLog`] (converted into a string because enum discriminants are not + /// serializable). + pub kind: String, + /// The timestamp that can be used in ISO RFC 3339 format. + pub timestamp: DateTime, + /// The title of the [`AuditLog`]. It will likely be combined with the `entity_type` to make a full display name. + pub title: String, + /// The identifier of the change set, which will only be empty for actions taken outside of the workspace. + pub change_set_id: Option, + /// The identifier of the user. If this is empty, it is the system user. + pub user_id: Option, + /// The entity name. + pub entity_name: Option, + /// The entity type. + pub entity_type: Option, + /// Serialized version of [`AuditLogMetadata`](si_events::audit_log::AuditLogMetadata), which is an + /// untagged version of the specific [`AuditLogKind`](si_events::audit_log::AuditLogKind). + pub metadata: Option, +} + +impl AuditLogRow { + /// Inserts a new row into the audit logs table of the audit database. + #[allow(clippy::too_many_arguments)] + #[instrument( + name = "audit_log.database.insert", + level = "debug", + skip_all, + fields( + si.workspace.id = %workspace_id, + ), + )] + pub async fn insert( + context: &AuditDatabaseContext, + workspace_id: WorkspacePk, + kind: AuditLogKind, + timestamp: String, + change_set_id: Option, + actor: Actor, + entity_name: Option, + ) -> Result<()> { + let kind_as_string = kind.to_string(); + let user_id = match actor { + Actor::System => None, + Actor::User(user_id) => Some(user_id), + }; + + let metadata = AuditLogMetadata::from(kind); + let (title, entity_type) = metadata.title_and_entity_type(); + let serialized_metadata = serde_json::to_value(metadata)?; + let timestamp: DateTime = timestamp.parse()?; + + context + .pg_pool() + .get() + .await? + .query_one( + "INSERT INTO audit_logs ( + workspace_id, + kind, + timestamp, + title, + change_set_id, + user_id, + entity_name, + entity_type, + metadata + ) VALUES ( + $1, + $2, + $3, + $4, + $5, + $6, + $7, + $8, + $9 + ) RETURNING *", + &[ + &workspace_id.to_string(), + &kind_as_string, + ×tamp, + &title, + &change_set_id.map(|id| id.to_string()), + &user_id.map(|id| id.to_string()), + &entity_name, + &entity_type, + &serialized_metadata, + ], + ) + .await?; + Ok(()) + } + + /// Lists rows of the audit logs table in the audit database. + #[instrument( + name = "audit_log.database.list", + level = "debug", + skip_all, + fields( + si.workspace.id = %workspace_id, + ), + )] + pub async fn list( + context: &AuditDatabaseContext, + workspace_id: WorkspacePk, + change_set_id: ChangeSetId, + size: usize, + ) -> Result<(Vec, bool)> { + let size = size as i64; + let mut result = Vec::new(); + let rows = context + .pg_pool() + .get() + .await? + .query( + "SELECT * from audit_logs WHERE workspace_id = $1 AND change_set_id = $2 ORDER BY timestamp DESC LIMIT $3", + &[&workspace_id, &change_set_id, &size], + ) + .await?; + for row in rows { + result.push(Self::try_from(row)?); + } + Ok((result, false)) + } +} + +impl TryFrom for AuditLogRow { + type Error = AuditDatabaseError; - let metadata = AuditLogMetadata::from(kind); - let (title, entity_type) = metadata.title_and_entity_type(); - let serialized_metadata = serde_json::to_value(metadata)?; - let timestamp: DateTime = timestamp.parse()?; + fn try_from(value: PgRow) -> std::result::Result { + let workspace_id = { + let inner: String = value.try_get("workspace_id")?; + WorkspacePk::from_str(&inner)? + }; + let change_set_id = { + let maybe_inner: Option = value.try_get("change_set_id")?; + match maybe_inner { + Some(inner) => Some(ChangeSetId::from_str(&inner)?), + None => None, + } + }; + let user_id = { + let maybe_inner: Option = value.try_get("user_id")?; + match maybe_inner { + Some(inner) => Some(UserPk::from_str(&inner)?), + None => None, + } + }; - context - .pg_pool() - .get() - .await? - .query_one( - "INSERT INTO audit_logs ( - workspace_id, - kind, - timestamp, - title, - change_set_id, - user_id, - entity_name, - entity_type, - metadata - ) VALUES ( - $1, - $2, - $3, - $4, - $5, - $6, - $7, - $8, - $9 - ) RETURNING *", - &[ - &workspace_id, - &kind_as_string, - ×tamp, - &title, - &change_set_id.map(|id| id.to_string()), - &user_id.map(|id| id.to_string()), - &entity_name, - &entity_type, - &serialized_metadata, - ], - ) - .await?; - Ok(()) + Ok(Self { + workspace_id, + kind: value.try_get("kind")?, + timestamp: value.try_get("timestamp")?, + title: value.try_get("title")?, + change_set_id, + user_id, + entity_name: value.try_get("entity_name")?, + entity_type: value.try_get("entity_type")?, + metadata: value.try_get("metadata")?, + }) + } } diff --git a/lib/audit-logs/src/database/config.rs b/lib/audit-logs/src/database/config.rs index 8e3d63924c..38a04d15ea 100644 --- a/lib/audit-logs/src/database/config.rs +++ b/lib/audit-logs/src/database/config.rs @@ -3,6 +3,7 @@ use si_data_pg::PgPoolConfig; /// The name of the audit database. pub const DBNAME: &str = "si_audit"; +// pub const DBNAME: &str = "si_test_audit"; const APPLICATION_NAME: &str = "si-audit"; /// The configuration used for communicating with and setting up the audit database. diff --git a/lib/dal-test/BUCK b/lib/dal-test/BUCK index 0d6cb34a6a..40e4790c55 100644 --- a/lib/dal-test/BUCK +++ b/lib/dal-test/BUCK @@ -3,6 +3,7 @@ load("@prelude-si//:macros.bzl", "rust_library") rust_library( name = "dal-test", deps = [ + "//lib/audit-logs:audit-logs", "//lib/buck2-resources:buck2-resources", "//lib/dal:dal", "//lib/forklift-server:forklift-server", @@ -12,6 +13,7 @@ rust_library( "//lib/si-crypto:si-crypto", "//lib/si-data-nats:si-data-nats", "//lib/si-data-pg:si-data-pg", + "//lib/si-events-rs:si-events", "//lib/si-layer-cache:si-layer-cache", "//lib/si-pkg:si-pkg", "//lib/si-runtime-rs:si-runtime", diff --git a/lib/dal-test/Cargo.toml b/lib/dal-test/Cargo.toml index ec6aef0879..e2a655b145 100644 --- a/lib/dal-test/Cargo.toml +++ b/lib/dal-test/Cargo.toml @@ -10,6 +10,7 @@ publish.workspace = true [dependencies] async-recursion = { workspace = true } +audit-logs = { path = "../../lib/audit-logs" } base64 = { workspace = true } buck2-resources = { path = "../../lib/buck2-resources" } color-eyre = { workspace = true } @@ -31,6 +32,7 @@ serde_json = { workspace = true } si-crypto = { path = "../../lib/si-crypto" } si-data-nats = { path = "../../lib/si-data-nats" } si-data-pg = { path = "../../lib/si-data-pg" } +si-events = { path = "../../lib/si-events-rs" } si-layer-cache = { path = "../../lib/si-layer-cache" } si-pkg = { path = "../../lib/si-pkg" } si-runtime = { path = "../../lib/si-runtime-rs" } diff --git a/lib/dal-test/src/helpers.rs b/lib/dal-test/src/helpers.rs index afe939cc5f..baa2248de8 100644 --- a/lib/dal-test/src/helpers.rs +++ b/lib/dal-test/src/helpers.rs @@ -2,6 +2,7 @@ use std::time::Duration; +use audit_logs::database::{AuditDatabaseContext, AuditLogRow}; use color_eyre::eyre::eyre; use color_eyre::Result; use dal::component::socket::{ComponentInputSocket, ComponentOutputSocket}; @@ -409,3 +410,33 @@ pub async fn confirm_jetstream_stream_has_no_messages( "hit timeout and stream still has at least one message: {message_count}" )) } + +/// Retries listing audit logs until the expected number of rows are returned. +pub async fn list_audit_logs_until_expected_number_of_rows( + context: &AuditDatabaseContext, + workspace_id: si_events::WorkspacePk, + change_set_id: si_events::ChangeSetId, + size: usize, + expected_number_of_rows: usize, + timeout_seconds: u64, + interval_milliseconds: u64, +) -> Result> { + let timeout = Duration::from_secs(timeout_seconds); + let interval = Duration::from_millis(interval_milliseconds); + + let start = Instant::now(); + let mut actual_number_of_rows = 0; + + while start.elapsed() < timeout { + let (audit_logs, _) = AuditLogRow::list(context, workspace_id, change_set_id, size).await?; + actual_number_of_rows = audit_logs.len(); + if actual_number_of_rows == expected_number_of_rows { + return Ok(audit_logs); + } + tokio::time::sleep(interval).await; + } + + Err(eyre!( + "hit timeout before audit logs query returns expected number of rows (expected: {expected_number_of_rows}, actual: {actual_number_of_rows})" + )) +} diff --git a/lib/dal-test/src/lib.rs b/lib/dal-test/src/lib.rs index 5023fff901..45641a1ccc 100644 --- a/lib/dal-test/src/lib.rs +++ b/lib/dal-test/src/lib.rs @@ -33,6 +33,7 @@ use std::{ sync::{Arc, Once}, }; +use audit_logs::database::{AuditDatabaseConfig, AuditDatabaseContext}; use buck2_resources::Buck2Resources; use dal::{ builtins::func, @@ -682,6 +683,7 @@ pub async fn forklift_server( let config: forklift_server::Config = { let mut config_file = forklift_server::ConfigFile::default(); config_file.nats = nats_config; + config_file.enable_audit_logs_app = true; config_file .try_into() .wrap_err("failed to build forklift server config")? @@ -762,15 +764,21 @@ async fn global_setup(test_context_builer: TestContextBuilder) -> Result<()> { .wrap_err("failed to drop and create layer db database")?; info!("running database migrations"); - dal::migrate(services_ctx.pg_pool()) - .await - .wrap_err("failed to migrate database")?; - - services_ctx - .layer_db() - .pg_migrate() - .await - .wrap_err("failed to migrate layerdb")?; + { + dal::migrate(services_ctx.pg_pool()) + .await + .wrap_err("failed to migrate database")?; + services_ctx + .layer_db() + .pg_migrate() + .await + .wrap_err("failed to migrate layerdb")?; + // FIXME(nick): move the config to the right place AND let it be tunable for tests. + let context = AuditDatabaseContext::from_config(&AuditDatabaseConfig::default()).await?; + audit_logs::database::migrate(&context) + .await + .wrap_err("failed to migrate audit database")?; + } // Startup up a Forklift server exclusively for migrations info!("starting Forklift server for initial migrations"); diff --git a/lib/dal/src/audit_logging.rs b/lib/dal/src/audit_logging.rs index 122a66ed4a..8d533b4783 100644 --- a/lib/dal/src/audit_logging.rs +++ b/lib/dal/src/audit_logging.rs @@ -39,34 +39,14 @@ use crate::WsPayload; #[remain::sorted] #[derive(Debug, Error)] pub enum AuditLoggingError { - #[error("async nats batch error: {0}")] - AsyncNatsBatch(#[from] async_nats::error::Error), - #[error("async nats consumer error: {0}")] - AsyncNatsConsumer(#[from] async_nats::error::Error), - #[error("async nats request error: {0}")] - AsyncNatsRequest(#[from] async_nats::error::Error), #[error("audit logs stream error: {0}")] AuditLogsStream(#[from] AuditLogsStreamError), - #[error("cannot return list of unbounded size: both page ({0}) and page size ({1})")] - CannotReturnListOfUnboundedSize(usize, usize), - #[error("change set error: {0}")] - ChangeSet(#[from] Box), - #[error("change set not found by id: {0}")] - ChangeSetNotFound(si_events::ChangeSetId), - #[error("message error: {0}")] - Message(#[source] Box), #[error("pending events error: {0}")] PendingEventsError(#[from] PendingEventsError), - #[error("serde json error: {0}")] - SerdeJson(#[from] serde_json::Error), #[error("shuttle error: {0}")] Shuttle(#[from] ShuttleError), #[error("transactions error: {0}")] Transactions(#[from] Box), - #[error("user error: {0}")] - User(#[from] Box), - #[error("user not found for id: {0}")] - UserNotFound(si_events::UserPk), } type Result = std::result::Result; @@ -273,235 +253,6 @@ pub(crate) async fn write_final_message(ctx: &DalContext) -> Result<()> { Ok(()) } -#[instrument(name = "audit_logging.list", level = "info", skip_all, fields(size))] -pub async fn list(ctx: &DalContext, size: usize) -> Result<(Vec, bool)> { - let start = tokio::time::Instant::now(); - let workspace_id = ctx.workspace_pk().map_err(Box::new)?; - - let change_set_id = ctx.change_set_id(); - let head_change_set_id = ctx - .get_workspace_default_change_set_id() - .await - .map_err(Box::new)?; - let working_on_head = head_change_set_id == change_set_id; - - let stream_wrapper = AuditLogsStream::get_or_create(ctx.jetstream_context()).await?; - let filter_subject = if working_on_head { - stream_wrapper.consuming_subject_for_workspace(workspace_id.into()) - } else { - stream_wrapper.subject_for_change_set(workspace_id.into(), change_set_id.into()) - }; - - let stream = stream_wrapper.stream().await?; - let last_sequence = stream.get_info().await?.state.last_sequence; - let start_sequence = match last_sequence.checked_sub((size as u64) + 1) { - Some(0) | None => 1, - Some(difference) => difference, - }; - - info!(%last_sequence, %start_sequence, ?filter_subject, "creating ephemeral pull consumer for listing audit logs"); - - let consumer = stream - .create_consumer(async_nats::jetstream::consumer::pull::Config { - filter_subject: filter_subject.to_string(), - deliver_policy: async_nats::jetstream::consumer::DeliverPolicy::ByStartSequence { - start_sequence, - }, - ..Default::default() - }) - .await?; - - let mut assembler = FrontendAuditLogAssembler::new(ctx).await?; - let mut frontend_audit_logs = Vec::new(); - - let mut counter = 0; - let mut can_load_more_logs = false; - let mut messages = consumer.fetch().max_messages(size + 1).messages().await?; - - while let Some(message) = messages.next().await { - counter += 1; - - // These are the two conditions in which we can load more logs and exit early. - if size == 0 || counter == size + 1 { - can_load_more_logs = true; - break; - } - - let message = message.map_err(AuditLoggingError::Message)?; - let audit_log: AuditLog = serde_json::from_slice(&message.payload)?; - if let Some(frontend_audit_log) = assembler.assemble(ctx, audit_log).await? { - frontend_audit_logs.push(frontend_audit_log); - } - } - - // We must sort the logs on the way out because we cannot guarantee perfect ordering by the - // time all messages are published to the stream. - frontend_audit_logs.sort_by(|a, b| b.timestamp.cmp(&a.timestamp)); - - info!(elapsed = ?start.elapsed(), "listing audit logs complete"); - - Ok((frontend_audit_logs, can_load_more_logs)) -} - -#[derive(Debug)] -struct FrontendAuditLogAssembler { - change_set_cache: HashMap, - user_cache: HashMap, - change_set_id: si_events::ChangeSetId, - working_on_head: bool, -} - -impl FrontendAuditLogAssembler { - pub async fn new(ctx: &DalContext) -> Result { - let head_change_set_id = ctx - .get_workspace_default_change_set_id() - .await - .map_err(Box::new)?; - let change_set_id = ctx.change_set_id(); - Ok(Self { - change_set_cache: HashMap::new(), - user_cache: HashMap::new(), - change_set_id: change_set_id.into(), - working_on_head: head_change_set_id == change_set_id, - }) - } - - pub async fn assemble( - &mut self, - ctx: &DalContext, - audit_log: AuditLog, - ) -> Result> { - match audit_log { - AuditLog::V1(inner) => { - let change_set_metadata = self - .find_change_set_metadata(ctx, inner.change_set_id) - .await?; - - // Before we continue, we need to know if we need to filter out the audit log based - // on if we are working on HEAD. - // - // If we are working on HEAD, we show all audit logs without a change set, all - // audit logs on HEAD, and all audit logs for abandoned or applied change sets. - // - // If we are not working on HEAD, we only show audit logs for our own change set. - if self.working_on_head { - if let Some((change_set_id, _, change_set_status)) = change_set_metadata { - if change_set_id != self.change_set_id { - match change_set_status { - ChangeSetStatus::Abandoned | ChangeSetStatus::Applied => {} - ChangeSetStatus::Approved - | ChangeSetStatus::Failed - | ChangeSetStatus::NeedsAbandonApproval - | ChangeSetStatus::NeedsApproval - | ChangeSetStatus::Open - | ChangeSetStatus::Rejected => { - return Ok(None); - } - } - } - } - } else { - match change_set_metadata { - Some((change_set_id, _, _)) => { - if change_set_id != self.change_set_id { - return Ok(None); - } - } - None => { - return Ok(None); - } - } - } - - let (user_id, user_email, user_name) = - self.find_user_metadata(ctx, inner.actor).await?; - - let kind = inner.kind.to_string(); - let metadata = AuditLogMetadata::from(inner.kind); - let (title, entity_type) = metadata.title_and_entity_type(); - let entity_type = entity_type.unwrap_or(" "); - let (change_set_id, change_set_name) = match change_set_metadata { - Some((change_set_id, change_set_name, _)) => { - (Some(change_set_id), Some(change_set_name)) - } - None => (None, None), - }; - - Ok(Some(FrontendAuditLog { - title: title.to_owned(), - user_id, - user_email, - user_name, - kind, - entity_name: inner.entity_name, - entity_type: entity_type.to_owned(), - timestamp: inner.timestamp, - change_set_id, - change_set_name, - metadata: serde_json::to_value(metadata)?, - })) - } - } - } - - async fn find_change_set_metadata( - &mut self, - ctx: &DalContext, - change_set_id: Option, - ) -> Result> { - match change_set_id { - Some(change_set_id) => { - let (change_set_status, change_set_name) = - if let Some(change_set) = self.change_set_cache.get(&change_set_id) { - (change_set.status, change_set.name.to_owned()) - } else { - let change_set = ChangeSet::find(ctx, change_set_id.into()) - .await - .map_err(Box::new)? - .ok_or(AuditLoggingError::ChangeSetNotFound(change_set_id))?; - let found_data = (change_set.status, change_set.name.to_owned()); - self.change_set_cache.insert(change_set_id, change_set); - found_data - }; - - Ok(Some((change_set_id, change_set_name, change_set_status))) - } - None => Ok(None), - } - } - - async fn find_user_metadata( - &mut self, - ctx: &DalContext, - actor: Actor, - ) -> Result<(Option, Option, Option)> { - match actor { - Actor::System => Ok((None, None, None)), - Actor::User(user_id) => { - if let Some(user) = self.user_cache.get(&user_id) { - Ok(( - Some(user_id), - Some(user.email().to_owned()), - Some(user.name().to_owned()), - )) - } else { - let user = User::get_by_pk(ctx, user_id.into()) - .await - .map_err(Box::new)? - .ok_or(AuditLoggingError::UserNotFound(user_id))?; - let found_data = ( - Some(user_id), - Some(user.email().to_owned()), - Some(user.name().to_owned()), - ); - self.user_cache.insert(user_id, user); - Ok(found_data) - } - } - } - } -} - #[derive(Clone, Deserialize, Serialize, Debug, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct AuditLogsPublishedPayload { diff --git a/lib/dal/tests/integration_test/audit_logging.rs b/lib/dal/tests/integration_test/audit_logging.rs index c6df2a5452..5684c67cc4 100644 --- a/lib/dal/tests/integration_test/audit_logging.rs +++ b/lib/dal/tests/integration_test/audit_logging.rs @@ -1,19 +1,34 @@ +use audit_logs::database::AuditDatabaseContext; use audit_logs::AuditLogsStream; -use dal::{audit_logging, prop::PropPath, AttributeValue, DalContext, Prop, Schema, SchemaVariant}; +use dal::{prop::PropPath, AttributeValue, DalContext, Prop, Schema, SchemaVariant}; use dal_test::helpers::{ confirm_jetstream_stream_has_no_messages, create_named_component_for_schema_variant_on_default_view, + list_audit_logs_until_expected_number_of_rows, }; use dal_test::{helpers::ChangeSetTestHelpers, test}; use pending_events::PendingEventsStream; use pretty_assertions_sorted::assert_eq; use si_events::audit_log::AuditLogKind; -const TIMEOUT_SECONDS: u64 = 5; -const INTERVAL_MILLISECONDS: u64 = 100; +const DATABASE_RETRY_TIMEOUT_SECONDS: u64 = 2; +const DATABASE_RETRY_INTERVAL_MILLISECONDS: u64 = 100; + +const STREAM_RETRY_TIMEOUT_SECONDS: u64 = 5; +const STREAM_RETRY_INTERVAL_MILLISECONDS: u64 = 100; + +const SIZE: usize = 200; #[test] -async fn round_trip(ctx: &mut DalContext) { +async fn round_trip(ctx: &mut DalContext, audit_database_context: AuditDatabaseContext) { + let context = audit_database_context; + let workspace_id: si_events::WorkspacePk = ctx + .workspace_pk() + .expect("could not get workspace id") + .into(); + let change_set_id: si_events::ChangeSetId = ctx.change_set_id().into(); + + // Collect schema information. let schema = Schema::find_by_name(ctx, "swifty") .await .expect("could not perform find by name") @@ -70,35 +85,38 @@ async fn round_trip(ctx: &mut DalContext) { (source_stream, destination_stream) }; - // Check that the streams look as we expect. - confirm_jetstream_stream_has_no_messages( - &source_stream, - TIMEOUT_SECONDS, - INTERVAL_MILLISECONDS, - ) - .await - .expect("stream message count is greater than zero"); - let first_destination_stream_message_count = destination_stream - .get_info() - .await - .expect("could not get destination stream info") - .state - .messages; - assert!(first_destination_stream_message_count > 0); - - // List all audit logs twice to ensure we don't consume/ack them. After that, check that they - // look as we expect. - let (first_run_audit_logs, _) = audit_logging::list(ctx, 200) - .await - .expect("could not list audit logs"); - let (second_run_audit_logs, _) = audit_logging::list(ctx, 200) + // Check that everything looks as we expect. + { + let expected_total = 5; + confirm_jetstream_stream_has_no_messages( + &source_stream, + STREAM_RETRY_TIMEOUT_SECONDS, + STREAM_RETRY_INTERVAL_MILLISECONDS, + ) + .await + .expect("stream message count is greater than zero"); + let destination_stream_message_count = destination_stream + .get_info() + .await + .expect("could not get destination stream info") + .state + .messages; + assert_eq!( + expected_total, // expected + destination_stream_message_count // actual + ); + list_audit_logs_until_expected_number_of_rows( + &context, + workspace_id, + change_set_id, + SIZE, + expected_total as usize, + DATABASE_RETRY_TIMEOUT_SECONDS, + DATABASE_RETRY_INTERVAL_MILLISECONDS, + ) .await .expect("could not list audit logs"); - assert_eq!(first_run_audit_logs, second_run_audit_logs); - assert_eq!( - first_destination_stream_message_count as usize, // expected - first_run_audit_logs.len() // actual - ); + } // Update a property editor value and commit. Mimic sdf by audit logging here. let prop_path_raw = ["root", "domain", "name"]; @@ -143,35 +161,38 @@ async fn round_trip(ctx: &mut DalContext) { .await .expect("could not commit and update snapshot to visibility"); - // Check that the streams look as we expect. - confirm_jetstream_stream_has_no_messages( - &source_stream, - TIMEOUT_SECONDS, - INTERVAL_MILLISECONDS, - ) - .await - .expect("stream message count is greater than zero"); - let second_destination_stream_message_count = destination_stream - .get_info() - .await - .expect("could not get destination stream info") - .state - .messages; - assert!(second_destination_stream_message_count > first_destination_stream_message_count); - - // List all audit logs twice to ensure we don't consume/ack them. After that, check that they - // look as we expect. - let (first_run_audit_logs, _) = audit_logging::list(ctx, 200) - .await - .expect("could not list audit logs"); - let (second_run_audit_logs, _) = audit_logging::list(ctx, 200) + // Check that everything looks as we expect. + { + let expected_total = 9; + confirm_jetstream_stream_has_no_messages( + &source_stream, + STREAM_RETRY_TIMEOUT_SECONDS, + STREAM_RETRY_INTERVAL_MILLISECONDS, + ) + .await + .expect("stream message count is greater than zero"); + let destination_stream_message_count = destination_stream + .get_info() + .await + .expect("could not get destination stream info") + .state + .messages; + assert_eq!( + expected_total, // expected + destination_stream_message_count // actual + ); + list_audit_logs_until_expected_number_of_rows( + &context, + workspace_id, + change_set_id, + SIZE, + expected_total as usize, + DATABASE_RETRY_TIMEOUT_SECONDS, + DATABASE_RETRY_INTERVAL_MILLISECONDS, + ) .await .expect("could not list audit logs"); - assert_eq!(first_run_audit_logs, second_run_audit_logs); - assert_eq!( - second_destination_stream_message_count as usize, // expected - first_run_audit_logs.len() // actual - ); + } // Delete a component and commit. Mimic sdf by audit logging here. ctx.write_audit_log( @@ -194,33 +215,36 @@ async fn round_trip(ctx: &mut DalContext) { .await .expect("could not commit and update snapshot to visibility"); - // Check that the streams look as we expect. - confirm_jetstream_stream_has_no_messages( - &source_stream, - TIMEOUT_SECONDS, - INTERVAL_MILLISECONDS, - ) - .await - .expect("stream message count is greater than zero"); - let third_destination_stream_message_count = destination_stream - .get_info() - .await - .expect("could not get destination stream info") - .state - .messages; - assert!(third_destination_stream_message_count > second_destination_stream_message_count); - - // List all audit logs twice to ensure we don't consume/ack them. After that, check that they - // look as we expect. - let (first_run_audit_logs, _) = audit_logging::list(ctx, 200) - .await - .expect("could not list audit logs"); - let (second_run_audit_logs, _) = audit_logging::list(ctx, 200) + // Check that everything looks as we expect. + { + let expected_total = 10; + confirm_jetstream_stream_has_no_messages( + &source_stream, + STREAM_RETRY_TIMEOUT_SECONDS, + STREAM_RETRY_INTERVAL_MILLISECONDS, + ) + .await + .expect("stream message count is greater than zero"); + let destination_stream_message_count = destination_stream + .get_info() + .await + .expect("could not get destination stream info") + .state + .messages; + assert_eq!( + expected_total, // expected + destination_stream_message_count // actual + ); + list_audit_logs_until_expected_number_of_rows( + &context, + workspace_id, + change_set_id, + SIZE, + expected_total as usize, + DATABASE_RETRY_TIMEOUT_SECONDS, + DATABASE_RETRY_INTERVAL_MILLISECONDS, + ) .await .expect("could not list audit logs"); - assert_eq!(first_run_audit_logs, second_run_audit_logs); - assert_eq!( - third_destination_stream_message_count as usize, // expected - first_run_audit_logs.len() // actual - ); + } } diff --git a/lib/forklift-server/src/server/app/audit_logs/handlers.rs b/lib/forklift-server/src/server/app/audit_logs/handlers.rs index 5567196571..1cf705ed99 100644 --- a/lib/forklift-server/src/server/app/audit_logs/handlers.rs +++ b/lib/forklift-server/src/server/app/audit_logs/handlers.rs @@ -1,13 +1,13 @@ use std::str::FromStr; -use audit_logs::database::AuditDatabaseError; +use audit_logs::database::{AuditDatabaseError, AuditLogRow}; use naxum::{ extract::State, response::{IntoResponse, Response}, Json, }; use si_data_nats::Subject; -use si_events::{audit_log::AuditLog, WorkspacePk}; +use si_events::WorkspacePk; use telemetry::prelude::*; use thiserror::Error; @@ -38,15 +38,15 @@ impl IntoResponse for HandlerError { pub(crate) async fn default( State(state): State, subject: Subject, - Json(audit_log): Json, + Json(audit_log): Json, ) -> Result<()> { // Hitting an error when finding the workspace id should be impossible as we match the subject using middleware // before we get here. let workspace_id = find_workspace_id(subject, state.using_prefix())?; match audit_log { - AuditLog::V1(inner) => { - audit_logs::database::insert( + si_events::audit_log::AuditLog::V1(inner) => { + AuditLogRow::insert( state.context(), workspace_id, inner.kind, diff --git a/lib/sdf-server/src/app.rs b/lib/sdf-server/src/app.rs index 7469812a89..2fd7073ceb 100644 --- a/lib/sdf-server/src/app.rs +++ b/lib/sdf-server/src/app.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use asset_sprayer::AssetSprayer; +use audit_logs::database::AuditDatabaseContext; use axum::Router; use dal::{JwtPublicSigningKey, ServicesContext}; use nats_multiplexer_client::MultiplexerClient; @@ -35,6 +36,7 @@ impl AxumApp { application_runtime_mode: Arc>, shutdown_token: CancellationToken, spicedb_client: Option, + audit_database_context: AuditDatabaseContext, ) -> Self { Self::inner_from_services( services_context, @@ -50,6 +52,7 @@ impl AxumApp { application_runtime_mode, shutdown_token, spicedb_client, + audit_database_context, ) } @@ -73,6 +76,7 @@ impl AxumApp { application_runtime_mode: Arc>, shutdown_token: CancellationToken, spicedb_client: SpiceDbClient, + audit_database_context: AuditDatabaseContext, ) -> Self { Self::inner_from_services( services_context, @@ -88,6 +92,7 @@ impl AxumApp { application_runtime_mode, shutdown_token, Some(spicedb_client), + audit_database_context, ) } @@ -110,6 +115,7 @@ impl AxumApp { application_runtime_mode: Arc>, shutdown_token: CancellationToken, spicedb_client: Option, + audit_database_context: AuditDatabaseContext, ) -> Self { let state = AppState::new( services_context, @@ -125,6 +131,7 @@ impl AxumApp { application_runtime_mode, shutdown_token, spicedb_client, + audit_database_context, ); let path_filter = Box::new(|path: &str| match path { diff --git a/lib/sdf-server/src/app_state.rs b/lib/sdf-server/src/app_state.rs index f900d5b44b..9e4f194ecf 100644 --- a/lib/sdf-server/src/app_state.rs +++ b/lib/sdf-server/src/app_state.rs @@ -1,6 +1,7 @@ use std::{ops::Deref, sync::Arc}; use asset_sprayer::AssetSprayer; +use audit_logs::database::AuditDatabaseContext; use axum::extract::FromRef; use dal::JwtPublicSigningKey; use nats_multiplexer_client::MultiplexerClient; @@ -36,6 +37,7 @@ pub struct AppState { pub application_runtime_mode: Arc>, shutdown_token: CancellationToken, spicedb_client: Option, + audit_database_context: AuditDatabaseContext, } impl AppState { @@ -54,6 +56,7 @@ impl AppState { application_runtime_mode: Arc>, shutdown_token: CancellationToken, spicedb_client: Option, + audit_database_context: AuditDatabaseContext, ) -> Self { let nats_multiplexer_clients = NatsMultiplexerClients { ws: Arc::new(Mutex::new(ws_multiplexer_client)), @@ -74,6 +77,7 @@ impl AppState { application_runtime_mode, shutdown_token, spicedb_client, + audit_database_context, } } @@ -120,6 +124,10 @@ impl AppState { pub fn spicedb_client_clone(&self) -> Option { self.spicedb_client.clone() } + + pub fn audit_database_context(&self) -> &AuditDatabaseContext { + &self.audit_database_context + } } #[derive(Clone, Debug, FromRef)] diff --git a/lib/sdf-server/src/server.rs b/lib/sdf-server/src/server.rs index a27cd610a3..e0ea6edd02 100644 --- a/lib/sdf-server/src/server.rs +++ b/lib/sdf-server/src/server.rs @@ -181,6 +181,8 @@ impl Server { application_runtime_mode, token.clone(), spicedb_client, + // TODO(nick): split the migrator context and the reader-only context (should be read-only pg pool). + audit_database_context.clone(), ) .into_inner(); diff --git a/lib/sdf-server/src/service/v2/audit_log.rs b/lib/sdf-server/src/service/v2/audit_log.rs index 3d4f661723..45747454ab 100644 --- a/lib/sdf-server/src/service/v2/audit_log.rs +++ b/lib/sdf-server/src/service/v2/audit_log.rs @@ -1,8 +1,10 @@ +use audit_logs::database::AuditDatabaseError; use axum::{ response::{IntoResponse, Response}, routing::get, Router, }; +use si_events::{ChangeSetId, UserPk}; use thiserror::Error; use crate::{service::ApiError, AppState}; @@ -12,10 +14,18 @@ mod list_audit_logs; #[remain::sorted] #[derive(Debug, Error)] pub enum AuditLogError { - #[error("dal audit logging error: {0}")] - DalAuditLogging(#[from] dal::audit_logging::AuditLoggingError), + #[error("audit database error: {0}")] + AuditDatabase(#[from] AuditDatabaseError), + #[error("change set not found for id: {0}")] + ChangeSetNotFound(ChangeSetId), + #[error("dal change set error: {0}")] + DalChangeSet(#[from] dal::ChangeSetError), #[error("dal transactions error: {0}")] DalTransactions(#[from] dal::TransactionsError), + #[error("dal user error: {0}")] + DalUser(#[from] dal::UserError), + #[error("user not found for id: {0}")] + UserNotFound(UserPk), } pub type AuditLogResult = Result; diff --git a/lib/sdf-server/src/service/v2/audit_log/list_audit_logs.rs b/lib/sdf-server/src/service/v2/audit_log/list_audit_logs.rs index e19c99b925..708754baaa 100644 --- a/lib/sdf-server/src/service/v2/audit_log/list_audit_logs.rs +++ b/lib/sdf-server/src/service/v2/audit_log/list_audit_logs.rs @@ -1,12 +1,16 @@ +use std::collections::HashMap; + +use audit_logs::database::AuditLogRow; use axum::{ extract::{Path, Query}, Json, }; -use dal::audit_logging; +use dal::{ChangeSet, DalContext, User}; use serde::{Deserialize, Serialize}; +use si_events::{ChangeSetId, UserPk}; use si_frontend_types as frontend_types; -use super::AuditLogResult; +use super::{AuditLogError, AuditLogResult}; use crate::extract::{AccessBuilder, HandlerContext}; #[derive(Deserialize, Debug)] @@ -32,10 +36,118 @@ pub async fn list_audit_logs( .build(access_builder.build(change_set_id.into())) .await?; - let (logs, can_load_more) = audit_logging::list(&ctx, request.size.unwrap_or(0)).await?; + let (database_logs, can_load_more) = + AuditLogRow::list(todo!(), todo!(), todo!(), request.size.unwrap_or(0)).await?; + + let mut assembler = Assembler::new(); + let mut logs = Vec::with_capacity(database_logs.len()); + for database_log in database_logs { + logs.push(assembler.assemble(&ctx, database_log).await?); + } Ok(Json(ListAuditLogsResponse { logs, can_load_more, })) } + +#[derive(Debug)] +struct Assembler { + change_set_cache: HashMap, + user_cache: HashMap, +} + +impl Assembler { + pub fn new() -> Self { + Self { + change_set_cache: HashMap::new(), + user_cache: HashMap::new(), + } + } + + pub async fn assemble( + &mut self, + ctx: &DalContext, + audit_log: AuditLogRow, + ) -> AuditLogResult { + let (change_set_id, change_set_name) = self + .find_change_set_metadata(ctx, audit_log.change_set_id) + .await?; + let (user_id, user_email, user_name) = + self.find_user_metadata(ctx, audit_log.user_id).await?; + + Ok(si_frontend_types::AuditLog { + title: audit_log.title, + user_id, + user_email, + user_name, + kind: audit_log.kind, + // TODO(nick): allow this to be optional in the frontend. + entity_name: audit_log.entity_name.unwrap_or(" ".to_string()), + // NOTE(nick): this maintains compatibility from when these used to have whitespace-based names. + // However, we should make this optional in the frontend. + entity_type: audit_log.entity_type.unwrap_or(" ".to_string()), + // NOTE(nick): this is specifically converted to ISO RFC 3339 for the frontend. + timestamp: audit_log.timestamp.to_rfc3339(), + change_set_id, + change_set_name, + // TODO(nick): allow this to be optional in the frontend. + metadata: audit_log.metadata.unwrap_or(serde_json::Value::Null), + }) + } + + async fn find_change_set_metadata( + &mut self, + ctx: &DalContext, + change_set_id: Option, + ) -> AuditLogResult<(Option, Option)> { + match change_set_id { + Some(change_set_id) => { + let change_set_name = + if let Some(change_set) = self.change_set_cache.get(&change_set_id) { + change_set.name.to_owned() + } else { + let change_set = ChangeSet::find(ctx, change_set_id.into()) + .await? + .ok_or(AuditLogError::ChangeSetNotFound(change_set_id))?; + let found_data = change_set.name.to_owned(); + self.change_set_cache.insert(change_set_id, change_set); + found_data + }; + + Ok((Some(change_set_id), Some(change_set_name))) + } + None => Ok((None, None)), + } + } + + async fn find_user_metadata( + &mut self, + ctx: &DalContext, + user_id: Option, + ) -> AuditLogResult<(Option, Option, Option)> { + match user_id { + None => Ok((None, None, None)), + Some(user_id) => { + if let Some(user) = self.user_cache.get(&user_id) { + Ok(( + Some(user_id), + Some(user.email().to_owned()), + Some(user.name().to_owned()), + )) + } else { + let user = User::get_by_pk(ctx, user_id.into()) + .await? + .ok_or(AuditLogError::UserNotFound(user_id))?; + let found_data = ( + Some(user_id), + Some(user.email().to_owned()), + Some(user.name().to_owned()), + ); + self.user_cache.insert(user_id, user); + Ok(found_data) + } + } + } + } +} diff --git a/lib/si-events-rs/src/ulid.rs b/lib/si-events-rs/src/ulid.rs index 4f29525fec..2fc79f5184 100644 --- a/lib/si-events-rs/src/ulid.rs +++ b/lib/si-events-rs/src/ulid.rs @@ -1,5 +1,7 @@ +pub use ulid::DecodeError; pub use ulid::ULID_LEN; -use ulid::{DecodeError, Ulid as CoreUlid}; + +use ulid::Ulid as CoreUlid; /// Size is the size in bytes, len is the string length const ULID_SIZE: usize = 16; diff --git a/lib/si-frontend-types-rs/src/audit_log.rs b/lib/si-frontend-types-rs/src/audit_log.rs index 6578c32757..15d3199ec8 100644 --- a/lib/si-frontend-types-rs/src/audit_log.rs +++ b/lib/si-frontend-types-rs/src/audit_log.rs @@ -4,28 +4,15 @@ use si_events::{ChangeSetId, UserPk}; #[derive(Debug, Serialize, Clone, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct AuditLog { - /// The title of the [`AuditLog`]. It will likely be combined with the `entity_type` to make a full display name. pub title: String, - /// The identifier of the user. If this is empty, it is the system user. pub user_id: Option, - /// The email of the user. pub user_email: Option, - /// The name of the user. pub user_name: Option, - /// The [kind](AuditLogKing) of the [`AuditLog`] (converted into a string because enum discriminants are not - /// serializable). pub kind: String, - /// The entity type. pub entity_type: String, - /// The entity name. pub entity_name: String, - /// The timestamp in ISO RFC 3339 format (converted into a string). pub timestamp: String, - /// The identifier of the change set, which will only be empty for actions taken outside of the workspace. pub change_set_id: Option, - /// The name of the change set. pub change_set_name: Option, - /// Serialized version of [`AuditLogMetadata`](si_events::audit_log::AuditLogMetadata), which is an - /// untagged version of the specific [`AuditLogKind`](si_events::audit_log::AuditLogKind). pub metadata: serde_json::Value, } diff --git a/lib/si-test-macros/src/dal_test.rs b/lib/si-test-macros/src/dal_test.rs index ecc5e3cecf..adc797b34c 100644 --- a/lib/si-test-macros/src/dal_test.rs +++ b/lib/si-test-macros/src/dal_test.rs @@ -94,6 +94,11 @@ fn fn_setup<'a>(params: impl Iterator) -> DalTestFnSetup { let var = var.0.as_ref(); expander.push_arg(parse_quote! {#var}); } + "AuditDatabaseContext" => { + let var = expander.setup_audit_database_context(); + let var = var.as_ref(); + expander.push_arg(parse_quote! {#var}); + } _ => panic!("unexpected argument type: {type_path:?}"), }; } @@ -145,6 +150,11 @@ fn fn_setup<'a>(params: impl Iterator) -> DalTestFnSetup { let var = var.0.as_ref(); expander.push_arg(parse_quote! {&#var}); } + "AuditDatabaseContext" => { + let var = expander.setup_audit_database_context(); + let var = var.as_ref(); + expander.push_arg(parse_quote! {#var}); + } _ => panic!("unexpected argument reference type: {type_ref:?}"), } } diff --git a/lib/si-test-macros/src/expand.rs b/lib/si-test-macros/src/expand.rs index 4c7b9a61d3..96dff1d730 100644 --- a/lib/si-test-macros/src/expand.rs +++ b/lib/si-test-macros/src/expand.rs @@ -677,4 +677,18 @@ pub(crate) trait FnSetupExpander { self.dal_context_head_mut_ref().unwrap().clone() } + + fn setup_audit_database_context(&mut self) -> Rc { + let var_audit_database_context = Ident::new("audit_database_context", Span::call_site()); + self.code_extend(quote! { + let #var_audit_database_context = { + let config = ::audit_logs::database::AuditDatabaseConfig::default(); + let r = ::audit_logs::database::AuditDatabaseContext::from_config(&config) + .await + .wrap_err("failed to create audit database context")?; + r + }; + }); + Rc::new(var_audit_database_context) + } } diff --git a/lib/si-test-macros/src/sdf_test.rs b/lib/si-test-macros/src/sdf_test.rs index 86169f7e86..4322129416 100644 --- a/lib/si-test-macros/src/sdf_test.rs +++ b/lib/si-test-macros/src/sdf_test.rs @@ -105,6 +105,11 @@ fn fn_setup<'a>(params: impl Iterator) -> SdfTestFnSetup { let var = var.0.as_ref(); expander.push_arg(parse_quote! {#var}); } + "AuditDatabaseContext" => { + let var = expander.setup_audit_database_context(); + let var = var.as_ref(); + expander.push_arg(parse_quote! {#var}); + } _ => panic!("unexpected argument type: {type_path:?}"), }; } @@ -156,6 +161,11 @@ fn fn_setup<'a>(params: impl Iterator) -> SdfTestFnSetup { let var = var.0.as_ref(); expander.push_arg(parse_quote! {&#var}); } + "AuditDatabaseContext" => { + let var = expander.setup_audit_database_context(); + let var = var.as_ref(); + expander.push_arg(parse_quote! {#var}); + } _ => panic!("unexpected argument reference type: {type_ref:?}"), } }