Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incomplete attempt at reading from audit database #5038

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

240 changes: 173 additions & 67 deletions lib/audit-logs/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
//! Contains functionality for setting up and communicating with the audit database.

use std::str::FromStr;

use chrono::DateTime;
use chrono::Utc;
use serde::Deserialize;
use serde::Serialize;
use si_data_pg::PgError;
use si_data_pg::PgPoolError;
use si_data_pg::PgRow;
use si_events::audit_log::AuditLogKind;
use si_events::audit_log::AuditLogMetadata;
use si_events::ulid;
use si_events::Actor;
use si_events::ChangeSetId;
use si_events::UserPk;
use si_events::WorkspacePk;
use telemetry::prelude::*;
use thiserror::Error;
Expand All @@ -33,77 +40,176 @@ pub enum AuditDatabaseError {
PgPool(#[from] PgPoolError),
#[error("serde json error: {0}")]
SerdeJson(#[from] serde_json::Error),
#[error("ulid decode error: {0}")]
UlidDecode(#[from] ulid::DecodeError),
}

type Result<T> = std::result::Result<T, AuditDatabaseError>;

#[allow(clippy::too_many_arguments, missing_docs)]
#[instrument(
name = "audit_log.insert",
level = "debug",
skip_all,
fields(
si.workspace.id = %workspace_id,
),
)]
pub async fn insert(
context: &AuditDatabaseContext,
workspace_id: WorkspacePk,
kind: AuditLogKind,
timestamp: String,
change_set_id: Option<ChangeSetId>,
actor: Actor,
entity_name: Option<String>,
) -> Result<()> {
let kind_as_string = kind.to_string();
let user_id = match actor {
Actor::System => None,
Actor::User(user_id) => Some(user_id),
};
/// A row in the audit logs table of the audit database.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AuditLogRow {
/// Indicates the workspace that the row belongs to.
pub workspace_id: WorkspacePk,
/// The [kind](AuditLogKind) of the [`AuditLog`] (converted into a string because enum discriminants are not
/// serializable).
pub kind: String,
/// The timestamp that can be used in ISO RFC 3339 format.
pub timestamp: DateTime<Utc>,
/// The title of the [`AuditLog`]. It will likely be combined with the `entity_type` to make a full display name.
pub title: String,
/// The identifier of the change set, which will only be empty for actions taken outside of the workspace.
pub change_set_id: Option<ChangeSetId>,
/// The identifier of the user. If this is empty, it is the system user.
pub user_id: Option<UserPk>,
/// The entity name.
pub entity_name: Option<String>,
/// The entity type.
pub entity_type: Option<String>,
/// Serialized version of [`AuditLogMetadata`](si_events::audit_log::AuditLogMetadata), which is an
/// untagged version of the specific [`AuditLogKind`](si_events::audit_log::AuditLogKind).
pub metadata: Option<serde_json::Value>,
}

impl AuditLogRow {
/// Inserts a new row into the audit logs table of the audit database.
#[allow(clippy::too_many_arguments)]
#[instrument(
name = "audit_log.database.insert",
level = "debug",
skip_all,
fields(
si.workspace.id = %workspace_id,
),
)]
pub async fn insert(
context: &AuditDatabaseContext,
workspace_id: WorkspacePk,
kind: AuditLogKind,
timestamp: String,
change_set_id: Option<ChangeSetId>,
actor: Actor,
entity_name: Option<String>,
) -> Result<()> {
let kind_as_string = kind.to_string();
let user_id = match actor {
Actor::System => None,
Actor::User(user_id) => Some(user_id),
};

let metadata = AuditLogMetadata::from(kind);
let (title, entity_type) = metadata.title_and_entity_type();
let serialized_metadata = serde_json::to_value(metadata)?;
let timestamp: DateTime<Utc> = timestamp.parse()?;

context
.pg_pool()
.get()
.await?
.query_one(
"INSERT INTO audit_logs (
workspace_id,
kind,
timestamp,
title,
change_set_id,
user_id,
entity_name,
entity_type,
metadata
) VALUES (
$1,
$2,
$3,
$4,
$5,
$6,
$7,
$8,
$9
) RETURNING *",
&[
&workspace_id.to_string(),
&kind_as_string,
&timestamp,
&title,
&change_set_id.map(|id| id.to_string()),
&user_id.map(|id| id.to_string()),
&entity_name,
&entity_type,
&serialized_metadata,
],
)
.await?;
Ok(())
}

/// Lists rows of the audit logs table in the audit database.
#[instrument(
name = "audit_log.database.list",
level = "debug",
skip_all,
fields(
si.workspace.id = %workspace_id,
),
)]
pub async fn list(
context: &AuditDatabaseContext,
workspace_id: WorkspacePk,
change_set_id: ChangeSetId,
size: usize,
) -> Result<(Vec<Self>, bool)> {
let size = size as i64;
let mut result = Vec::new();
let rows = context
.pg_pool()
.get()
.await?
.query(
"SELECT * from audit_logs WHERE workspace_id = $1 AND change_set_id = $2 ORDER BY timestamp DESC LIMIT $3",
&[&workspace_id, &change_set_id, &size],
)
.await?;
for row in rows {
result.push(Self::try_from(row)?);
}
Ok((result, false))
}
}

impl TryFrom<PgRow> for AuditLogRow {
type Error = AuditDatabaseError;

let metadata = AuditLogMetadata::from(kind);
let (title, entity_type) = metadata.title_and_entity_type();
let serialized_metadata = serde_json::to_value(metadata)?;
let timestamp: DateTime<Utc> = timestamp.parse()?;
fn try_from(value: PgRow) -> std::result::Result<Self, Self::Error> {
let workspace_id = {
let inner: String = value.try_get("workspace_id")?;
WorkspacePk::from_str(&inner)?
};
let change_set_id = {
let maybe_inner: Option<String> = value.try_get("change_set_id")?;
match maybe_inner {
Some(inner) => Some(ChangeSetId::from_str(&inner)?),
None => None,
}
};
let user_id = {
let maybe_inner: Option<String> = value.try_get("user_id")?;
match maybe_inner {
Some(inner) => Some(UserPk::from_str(&inner)?),
None => None,
}
};

context
.pg_pool()
.get()
.await?
.query_one(
"INSERT INTO audit_logs (
workspace_id,
kind,
timestamp,
title,
change_set_id,
user_id,
entity_name,
entity_type,
metadata
) VALUES (
$1,
$2,
$3,
$4,
$5,
$6,
$7,
$8,
$9
) RETURNING *",
&[
&workspace_id,
&kind_as_string,
&timestamp,
&title,
&change_set_id.map(|id| id.to_string()),
&user_id.map(|id| id.to_string()),
&entity_name,
&entity_type,
&serialized_metadata,
],
)
.await?;
Ok(())
Ok(Self {
workspace_id,
kind: value.try_get("kind")?,
timestamp: value.try_get("timestamp")?,
title: value.try_get("title")?,
change_set_id,
user_id,
entity_name: value.try_get("entity_name")?,
entity_type: value.try_get("entity_type")?,
metadata: value.try_get("metadata")?,
})
}
}
1 change: 1 addition & 0 deletions lib/audit-logs/src/database/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use si_data_pg::PgPoolConfig;

/// The name of the audit database.
pub const DBNAME: &str = "si_audit";
// pub const DBNAME: &str = "si_test_audit";
const APPLICATION_NAME: &str = "si-audit";

/// The configuration used for communicating with and setting up the audit database.
Expand Down
2 changes: 2 additions & 0 deletions lib/dal-test/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@prelude-si//:macros.bzl", "rust_library")
rust_library(
name = "dal-test",
deps = [
"//lib/audit-logs:audit-logs",
"//lib/buck2-resources:buck2-resources",
"//lib/dal:dal",
"//lib/forklift-server:forklift-server",
Expand All @@ -12,6 +13,7 @@ rust_library(
"//lib/si-crypto:si-crypto",
"//lib/si-data-nats:si-data-nats",
"//lib/si-data-pg:si-data-pg",
"//lib/si-events-rs:si-events",
"//lib/si-layer-cache:si-layer-cache",
"//lib/si-pkg:si-pkg",
"//lib/si-runtime-rs:si-runtime",
Expand Down
2 changes: 2 additions & 0 deletions lib/dal-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ publish.workspace = true

[dependencies]
async-recursion = { workspace = true }
audit-logs = { path = "../../lib/audit-logs" }
base64 = { workspace = true }
buck2-resources = { path = "../../lib/buck2-resources" }
color-eyre = { workspace = true }
Expand All @@ -31,6 +32,7 @@ serde_json = { workspace = true }
si-crypto = { path = "../../lib/si-crypto" }
si-data-nats = { path = "../../lib/si-data-nats" }
si-data-pg = { path = "../../lib/si-data-pg" }
si-events = { path = "../../lib/si-events-rs" }
si-layer-cache = { path = "../../lib/si-layer-cache" }
si-pkg = { path = "../../lib/si-pkg" }
si-runtime = { path = "../../lib/si-runtime-rs" }
Expand Down
31 changes: 31 additions & 0 deletions lib/dal-test/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::time::Duration;

use audit_logs::database::{AuditDatabaseContext, AuditLogRow};
use color_eyre::eyre::eyre;
use color_eyre::Result;
use dal::component::socket::{ComponentInputSocket, ComponentOutputSocket};
Expand Down Expand Up @@ -409,3 +410,33 @@ pub async fn confirm_jetstream_stream_has_no_messages(
"hit timeout and stream still has at least one message: {message_count}"
))
}

/// Retries listing audit logs until the expected number of rows are returned.
pub async fn list_audit_logs_until_expected_number_of_rows(
context: &AuditDatabaseContext,
workspace_id: si_events::WorkspacePk,
change_set_id: si_events::ChangeSetId,
size: usize,
expected_number_of_rows: usize,
timeout_seconds: u64,
interval_milliseconds: u64,
) -> Result<Vec<AuditLogRow>> {
let timeout = Duration::from_secs(timeout_seconds);
let interval = Duration::from_millis(interval_milliseconds);

let start = Instant::now();
let mut actual_number_of_rows = 0;

while start.elapsed() < timeout {
let (audit_logs, _) = AuditLogRow::list(context, workspace_id, change_set_id, size).await?;
actual_number_of_rows = audit_logs.len();
if actual_number_of_rows == expected_number_of_rows {
return Ok(audit_logs);
}
tokio::time::sleep(interval).await;
}

Err(eyre!(
"hit timeout before audit logs query returns expected number of rows (expected: {expected_number_of_rows}, actual: {actual_number_of_rows})"
))
}
Loading