From 022406f2c870736a496b85f540cac5602783986f Mon Sep 17 00:00:00 2001 From: Michael Meyer Date: Wed, 19 Feb 2025 15:34:53 +0100 Subject: [PATCH 01/13] Create postgres sink --- Cargo.lock | 2 + Cargo.toml | 4 +- src/sinks/mod.rs | 2 + src/sinks/postgres/mod.rs | 205 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 212 insertions(+), 1 deletion(-) create mode 100644 src/sinks/postgres/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 5852fe7dd92c5..59f38c5f6393f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7288,6 +7288,8 @@ dependencies = [ "chrono", "fallible-iterator", "postgres-protocol", + "serde", + "serde_json", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 868849bd87790..b6e26bd18152d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -371,7 +371,7 @@ stream-cancel = { version = "0.8.2", default-features = false } strip-ansi-escapes = { version = "0.2.1", default-features = false } syslog = { version = "6.1.1", default-features = false, optional = true } tikv-jemallocator = { version = "0.6.0", default-features = false, features = ["unprefixed_malloc_on_supported_platforms"], optional = true } -tokio-postgres = { version = "0.7.13", default-features = false, features = ["runtime", "with-chrono-0_4"], optional = true } +tokio-postgres = { version = "0.7.13", default-features = false, features = ["runtime", "with-chrono-0_4", "with-serde_json-1"], optional = true } tokio-tungstenite = { version = "0.20.1", default-features = false, features = ["connect"], optional = true } toml.workspace = true tonic = { workspace = true, optional = true } @@ -737,6 +737,7 @@ sinks-logs = [ "sinks-new_relic_logs", "sinks-opentelemetry", "sinks-papertrail", + "sinks-postgres", "sinks-pulsar", "sinks-redis", "sinks-sematext", @@ -805,6 +806,7 @@ sinks-new_relic_logs = ["sinks-http"] sinks-new_relic = [] sinks-opentelemetry = ["sinks-http"] sinks-papertrail = ["dep:syslog"] +sinks-postgres = ["dep:tokio-postgres"] sinks-prometheus = ["dep:base64", "dep:prost", "vector-lib/prometheus"] sinks-pulsar = ["dep:apache-avro", "dep:pulsar", "dep:lru"] sinks-redis = ["dep:redis"] diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index 784f74b1e6005..b5a45a462566e 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -88,6 +88,8 @@ pub mod opendal_common; pub mod opentelemetry; #[cfg(feature = "sinks-papertrail")] pub mod papertrail; +#[cfg(feature = "sinks-postgres")] +pub mod postgres; #[cfg(feature = "sinks-prometheus")] pub mod prometheus; #[cfg(feature = "sinks-pulsar")] diff --git a/src/sinks/postgres/mod.rs b/src/sinks/postgres/mod.rs new file mode 100644 index 0000000000000..18f5766cd3106 --- /dev/null +++ b/src/sinks/postgres/mod.rs @@ -0,0 +1,205 @@ +use crate::sinks::prelude::*; +use bytes::Buf; +use chrono::{DateTime, Utc}; +use itertools::Itertools; +use tokio_postgres::{ + types::{to_sql_checked, IsNull, ToSql}, + NoTls, +}; + +#[configurable_component(sink("postgres"))] +#[derive(Clone, Debug)] +/// A basic sink that dumps its output to stdout. +pub struct BasicConfig { + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::is_default" + )] + pub acknowledgements: AcknowledgementsConfig, + /// The postgress host + pub host: String, + /// The postgress port + pub port: u16, + /// The postgress table + pub table: String, + /// The postgress schema (default: public) + pub schema: Option, + /// The postgress database (default: postgres) + pub database: Option, + + /// The postgres user + pub user: Option, + /// The postgres password + pub password: Option, +} + +impl GenerateConfig for BasicConfig { + fn generate_config() -> toml::Value { + toml::from_str("").unwrap() + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "postgres")] +impl SinkConfig for BasicConfig { + async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let healthcheck = Box::pin(async move { Ok(()) }); + + let BasicConfig { + host, + port, + user, + password, + table, + database, + schema, + .. + } = self; + + let user = user.as_deref().unwrap_or("postgres"); + + let schema = schema.as_deref().unwrap_or("public"); + + // dbname defaults to username if omitted so we do the same here + let database = database.as_deref().unwrap_or(user); + let password = password.as_deref().unwrap_or("mysecretpassword"); + + let (client, connection) = tokio_postgres::connect( + &format!("host={host} user={user} port={port} password={password} dbname={database}"), + NoTls, + ) + .await?; + + tokio::spawn(async move { + if let Err(e) = connection.await { + error!("connection error: {}", e); + } + }); + + let columns = client.query("SELECT column_name from INFORMATION_SCHEMA.COLUMNS WHERE table_name = $1 AND table_schema = $2", &[&table, &schema]).await?.into_iter().map(|x| x.get(0)).collect(); + + let sink = VectorSink::from_event_streamsink(PostgresSink { + client, + table: table.to_owned(), + columns, + }); + + Ok((sink, healthcheck)) + } + + fn input(&self) -> Input { + Input::log() + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +struct PostgresSink { + client: tokio_postgres::Client, + table: String, + columns: Vec, +} + +#[async_trait::async_trait] +impl StreamSink for PostgresSink { + async fn run( + self: Box, + input: futures_util::stream::BoxStream<'_, Event>, + ) -> Result<(), ()> { + self.run_inner(input).await + } +} + +#[derive(Debug)] +struct Wrapper<'a>(&'a Value); + +impl<'a> ToSql for Wrapper<'a> { + fn to_sql( + &self, + ty: &tokio_postgres::types::Type, + out: &mut bytes::BytesMut, + ) -> Result> + where + Self: Sized, + { + match self.0 { + Value::Bytes(bytes) => bytes.chunk().to_sql(ty, out), + Value::Regex(value_regex) => value_regex.as_str().to_sql(ty, out), + Value::Integer(i) => i.to_sql(ty, out), + Value::Float(not_nan) => not_nan.to_sql(ty, out), + Value::Boolean(b) => b.to_sql(ty, out), + Value::Timestamp(date_time) => date_time.to_sql(ty, out), + Value::Object(btree_map) => serde_json::to_value(btree_map).unwrap().to_sql(ty, out), + Value::Array(values) => values.iter().map(Wrapper).collect_vec().to_sql(ty, out), + Value::Null => Ok(IsNull::Yes), + } + } + + fn accepts(ty: &tokio_postgres::types::Type) -> bool + where + Self: Sized, + { + <&[u8]>::accepts(ty) + || <&str>::accepts(ty) + || i64::accepts(ty) + || f64::accepts(ty) + || bool::accepts(ty) + || DateTime::::accepts(ty) + || serde_json::Value::accepts(ty) + || Option::::accepts(ty) + } + + to_sql_checked!(); +} + +impl PostgresSink { + async fn run_inner(self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { + let Self { table, .. } = self.as_ref(); + + let statement = self + .client + .prepare(&format!( + "INSERT INTO {table} ({}) VALUES ({})", + self.columns.iter().map(|v| format!("\"{v}\"")).join(","), + self.columns + .iter() + .enumerate() + .map(|(i, _)| format!("${}", i + 1)) + .join(",") + )) + .await + .unwrap(); + + while let Some(event) = input.next().await { + match event { + Event::Log(log_event) => { + let (v, mut metadata) = log_event.into_parts(); + + let v = v.into_object().unwrap(); + + let p = self + .columns + .iter() + .map(|k| v.get(k.as_str()).unwrap_or(&Value::Null)) + .map(Wrapper); + + let status = match self.client.execute_raw(&statement, p).await { + Ok(_) => EventStatus::Delivered, + Err(err) => { + error!("{err}"); + EventStatus::Rejected + } + }; + metadata.take_finalizers().update_status(status) + } + _ => todo!("Only logs are implemented so far"), + } + } + + Ok(()) + } +} From 8f93c5274617366897dbba5611d7df0400e71398 Mon Sep 17 00:00:00 2001 From: Michael Meyer Date: Wed, 19 Feb 2025 15:37:32 +0100 Subject: [PATCH 02/13] Fix docstring --- src/sinks/postgres/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sinks/postgres/mod.rs b/src/sinks/postgres/mod.rs index 18f5766cd3106..0b3be83447f2e 100644 --- a/src/sinks/postgres/mod.rs +++ b/src/sinks/postgres/mod.rs @@ -9,7 +9,7 @@ use tokio_postgres::{ #[configurable_component(sink("postgres"))] #[derive(Clone, Debug)] -/// A basic sink that dumps its output to stdout. +/// Write the input to a postgres tables pub struct BasicConfig { #[configurable(derived)] #[serde( From 13a5fe640532d9b0be61b9f0869649b32774002d Mon Sep 17 00:00:00 2001 From: Michael Meyer Date: Thu, 20 Feb 2025 09:41:34 +0100 Subject: [PATCH 03/13] Remove unwrap --- src/sinks/postgres/mod.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/sinks/postgres/mod.rs b/src/sinks/postgres/mod.rs index 0b3be83447f2e..a0581443aea8b 100644 --- a/src/sinks/postgres/mod.rs +++ b/src/sinks/postgres/mod.rs @@ -179,7 +179,16 @@ impl PostgresSink { Event::Log(log_event) => { let (v, mut metadata) = log_event.into_parts(); - let v = v.into_object().unwrap(); + let v = match v.into_object() { + Some(object) => object, + None => { + error!("Log value was not an object"); + metadata + .take_finalizers() + .update_status(EventStatus::Rejected); + return Err(()); + } + }; let p = self .columns From 7de11c6081598f89785bf4fc669be8d12052eaa1 Mon Sep 17 00:00:00 2001 From: Michael Meyer Date: Thu, 20 Feb 2025 09:43:08 +0100 Subject: [PATCH 04/13] Move prepare statement to sink construction --- src/sinks/postgres/mod.rs | 39 +++++++++++++++++++-------------------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/src/sinks/postgres/mod.rs b/src/sinks/postgres/mod.rs index a0581443aea8b..f7abdb90959c8 100644 --- a/src/sinks/postgres/mod.rs +++ b/src/sinks/postgres/mod.rs @@ -78,11 +78,24 @@ impl SinkConfig for BasicConfig { } }); - let columns = client.query("SELECT column_name from INFORMATION_SCHEMA.COLUMNS WHERE table_name = $1 AND table_schema = $2", &[&table, &schema]).await?.into_iter().map(|x| x.get(0)).collect(); + let columns: Vec<_> = client.query("SELECT column_name from INFORMATION_SCHEMA.COLUMNS WHERE table_name = $1 AND table_schema = $2", &[&table, &schema]).await?.into_iter().map(|x| x.get(0)).collect(); + + let statement = client + .prepare(&format!( + "INSERT INTO {table} ({}) VALUES ({})", + columns.iter().map(|v| format!("\"{v}\"")).join(","), + columns + .iter() + .enumerate() + .map(|(i, _)| format!("${}", i + 1)) + .join(",") + )) + .await + .unwrap(); let sink = VectorSink::from_event_streamsink(PostgresSink { client, - table: table.to_owned(), + statement, columns, }); @@ -100,7 +113,7 @@ impl SinkConfig for BasicConfig { struct PostgresSink { client: tokio_postgres::Client, - table: String, + statement: tokio_postgres::Statement, columns: Vec, } @@ -158,23 +171,9 @@ impl<'a> ToSql for Wrapper<'a> { impl PostgresSink { async fn run_inner(self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { - let Self { table, .. } = self.as_ref(); - - let statement = self - .client - .prepare(&format!( - "INSERT INTO {table} ({}) VALUES ({})", - self.columns.iter().map(|v| format!("\"{v}\"")).join(","), - self.columns - .iter() - .enumerate() - .map(|(i, _)| format!("${}", i + 1)) - .join(",") - )) - .await - .unwrap(); + let Self { statement, .. } = self.as_ref(); - while let Some(event) = input.next().await { + while let Some(event) = input.next().await { match event { Event::Log(log_event) => { let (v, mut metadata) = log_event.into_parts(); @@ -196,7 +195,7 @@ impl PostgresSink { .map(|k| v.get(k.as_str()).unwrap_or(&Value::Null)) .map(Wrapper); - let status = match self.client.execute_raw(&statement, p).await { + let status = match self.client.execute_raw(statement, p).await { Ok(_) => EventStatus::Delivered, Err(err) => { error!("{err}"); From 97443808fe8e0bf84a2c04992ef7dae968f880ba Mon Sep 17 00:00:00 2001 From: Michael Meyer Date: Thu, 20 Feb 2025 09:43:53 +0100 Subject: [PATCH 05/13] Make Value serializaion zero-copy --- Cargo.lock | 1 + Cargo.toml | 4 +++- src/sinks/postgres/mod.rs | 50 +++++++++++++++++++++++++++++++++++---- 3 files changed, 50 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 59f38c5f6393f..ada0c19784fd2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11078,6 +11078,7 @@ dependencies = [ "pin-project", "portpicker", "postgres-openssl", + "postgres-protocol", "proptest", "proptest-derive", "prost 0.12.6", diff --git a/Cargo.toml b/Cargo.toml index b6e26bd18152d..9f01dc0815270 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -391,6 +391,7 @@ heim = { git = "https://github.com/vectordotdev/heim.git", branch = "update-nix" mlua = { version = "0.10.3", default-features = false, features = ["lua54", "send", "vendored", "macros"], optional = true } sysinfo = "0.32.1" byteorder = "1.5.0" +postgres-protocol = { version = "0.6.8", optional = true } [target.'cfg(windows)'.dependencies] windows-service = "0.7.0" @@ -806,7 +807,7 @@ sinks-new_relic_logs = ["sinks-http"] sinks-new_relic = [] sinks-opentelemetry = ["sinks-http"] sinks-papertrail = ["dep:syslog"] -sinks-postgres = ["dep:tokio-postgres"] +sinks-postgres = ["dep:tokio-postgres", "dep:postgres-protocol"] sinks-prometheus = ["dep:base64", "dep:prost", "vector-lib/prometheus"] sinks-pulsar = ["dep:apache-avro", "dep:pulsar", "dep:lru"] sinks-redis = ["dep:redis"] @@ -995,6 +996,7 @@ codecs-benches = [] loki-benches = ["sinks-loki"] enrichment-tables-benches = ["enrichment-tables-geoip", "enrichment-tables-mmdb", "enrichment-tables-memory"] proptest = ["dep:proptest", "dep:proptest-derive", "vrl/proptest"] +postgres-protocol = ["dep:postgres-protocol"] [[bench]] name = "default" diff --git a/src/sinks/postgres/mod.rs b/src/sinks/postgres/mod.rs index f7abdb90959c8..79f1ade5f059b 100644 --- a/src/sinks/postgres/mod.rs +++ b/src/sinks/postgres/mod.rs @@ -1,5 +1,5 @@ use crate::sinks::prelude::*; -use bytes::Buf; +use bytes::{Buf, BufMut}; use chrono::{DateTime, Utc}; use itertools::Itertools; use tokio_postgres::{ @@ -146,8 +146,46 @@ impl<'a> ToSql for Wrapper<'a> { Value::Float(not_nan) => not_nan.to_sql(ty, out), Value::Boolean(b) => b.to_sql(ty, out), Value::Timestamp(date_time) => date_time.to_sql(ty, out), - Value::Object(btree_map) => serde_json::to_value(btree_map).unwrap().to_sql(ty, out), - Value::Array(values) => values.iter().map(Wrapper).collect_vec().to_sql(ty, out), + Value::Object(btree_map) => { + serde_json::to_writer(out.writer(), btree_map)?; + Ok(IsNull::No) + } + Value::Array(values) => { + // Taken from postgres-types/lib.rs `impl ToSql for &[T]` + // + // There is no function that serializes an iterator, only a method on slices, + // but we should not have to allocate a new `Vec>` just to + // serialize the `Vec` we already have + + let member_type = match *ty.kind() { + tokio_postgres::types::Kind::Array(ref member) => member, + _ => panic!("expected array type"), + }; + + // Arrays are normally one indexed by default but oidvector and int2vector *require* zero indexing + let lower_bound = match *ty { + tokio_postgres::types::Type::OID_VECTOR + | tokio_postgres::types::Type::INT2_VECTOR => 0, + _ => 1, + }; + + let dimension = postgres_protocol::types::ArrayDimension { + len: values.len().try_into()?, + lower_bound, + }; + + postgres_protocol::types::array_to_sql( + Some(dimension), + member_type.oid(), + values.iter().map(Wrapper), + |e, w| match e.to_sql(member_type, w)? { + IsNull::No => Ok(postgres_protocol::IsNull::No), + IsNull::Yes => Ok(postgres_protocol::IsNull::Yes), + }, + out, + )?; + Ok(IsNull::No) + } Value::Null => Ok(IsNull::Yes), } } @@ -164,6 +202,10 @@ impl<'a> ToSql for Wrapper<'a> { || DateTime::::accepts(ty) || serde_json::Value::accepts(ty) || Option::::accepts(ty) + || match *ty.kind() { + tokio_postgres::types::Kind::Array(ref member) => Self::accepts(member), + _ => false, + } } to_sql_checked!(); @@ -173,7 +215,7 @@ impl PostgresSink { async fn run_inner(self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { let Self { statement, .. } = self.as_ref(); - while let Some(event) = input.next().await { + while let Some(event) = input.next().await { match event { Event::Log(log_event) => { let (v, mut metadata) = log_event.into_parts(); From 67bdc46e1dea30f8ed62976c6e18a4be4663a0b7 Mon Sep 17 00:00:00 2001 From: Michael Meyer Date: Thu, 20 Feb 2025 10:48:42 +0100 Subject: [PATCH 06/13] Remove potential panics --- src/sinks/postgres/mod.rs | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/sinks/postgres/mod.rs b/src/sinks/postgres/mod.rs index 79f1ade5f059b..3535a2db63637 100644 --- a/src/sinks/postgres/mod.rs +++ b/src/sinks/postgres/mod.rs @@ -10,7 +10,7 @@ use tokio_postgres::{ #[configurable_component(sink("postgres"))] #[derive(Clone, Debug)] /// Write the input to a postgres tables -pub struct BasicConfig { +pub struct PostgresConfig { #[configurable(derived)] #[serde( default, @@ -35,19 +35,15 @@ pub struct BasicConfig { pub password: Option, } -impl GenerateConfig for BasicConfig { - fn generate_config() -> toml::Value { - toml::from_str("").unwrap() - } -} +impl_generate_config_from_default!(PostgresConfig); #[async_trait::async_trait] #[typetag::serde(name = "postgres")] -impl SinkConfig for BasicConfig { +impl SinkConfig for PostgresConfig { async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { let healthcheck = Box::pin(async move { Ok(()) }); - let BasicConfig { + let PostgresConfig { host, port, user, @@ -90,8 +86,7 @@ impl SinkConfig for BasicConfig { .map(|(i, _)| format!("${}", i + 1)) .join(",") )) - .await - .unwrap(); + .await?; let sink = VectorSink::from_event_streamsink(PostgresSink { client, @@ -159,7 +154,11 @@ impl<'a> ToSql for Wrapper<'a> { let member_type = match *ty.kind() { tokio_postgres::types::Kind::Array(ref member) => member, - _ => panic!("expected array type"), + _ => { + return Err(Box::new( + tokio_postgres::types::WrongType::new::>(ty.clone()), + )) + } }; // Arrays are normally one indexed by default but oidvector and int2vector *require* zero indexing @@ -215,7 +214,7 @@ impl PostgresSink { async fn run_inner(self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { let Self { statement, .. } = self.as_ref(); - while let Some(event) = input.next().await { + while let Some(mut event) = input.next().await { match event { Event::Log(log_event) => { let (v, mut metadata) = log_event.into_parts(); @@ -246,7 +245,10 @@ impl PostgresSink { }; metadata.take_finalizers().update_status(status) } - _ => todo!("Only logs are implemented so far"), + _ => { + error!("Only logs are implemented so far"); + event.take_finalizers().update_status(EventStatus::Rejected); + } } } From cb7266d1de1b6cba366442461ce3b5d43785f68d Mon Sep 17 00:00:00 2001 From: Michael Meyer Date: Thu, 20 Feb 2025 12:24:46 +0100 Subject: [PATCH 07/13] Add functionality for traces --- src/sinks/postgres/mod.rs | 47 ++++++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/src/sinks/postgres/mod.rs b/src/sinks/postgres/mod.rs index 3535a2db63637..96248e0a8cbd1 100644 --- a/src/sinks/postgres/mod.rs +++ b/src/sinks/postgres/mod.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeMap; + use crate::sinks::prelude::*; use bytes::{Buf, BufMut}; use chrono::{DateTime, Utc}; @@ -6,9 +8,11 @@ use tokio_postgres::{ types::{to_sql_checked, IsNull, ToSql}, NoTls, }; +use vector_lib::event::EventMetadata; +use vrl::value::KeyString; #[configurable_component(sink("postgres"))] -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] /// Write the input to a postgres tables pub struct PostgresConfig { #[configurable(derived)] @@ -212,8 +216,6 @@ impl<'a> ToSql for Wrapper<'a> { impl PostgresSink { async fn run_inner(self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { - let Self { statement, .. } = self.as_ref(); - while let Some(mut event) = input.next().await { match event { Event::Log(log_event) => { @@ -229,29 +231,38 @@ impl PostgresSink { return Err(()); } }; - - let p = self - .columns - .iter() - .map(|k| v.get(k.as_str()).unwrap_or(&Value::Null)) - .map(Wrapper); - - let status = match self.client.execute_raw(statement, p).await { - Ok(_) => EventStatus::Delivered, - Err(err) => { - error!("{err}"); - EventStatus::Rejected - } - }; - metadata.take_finalizers().update_status(status) + self.store_log((v, metadata)).await? } + Event::Trace(trace) => self.store_log(trace.into_parts()).await?, _ => { error!("Only logs are implemented so far"); event.take_finalizers().update_status(EventStatus::Rejected); } } } + Ok(()) + } + async fn store_log( + &self, + event: (BTreeMap, EventMetadata), + ) -> Result<(), ()> { + let (v, mut metadata) = event; + + let p = self + .columns + .iter() + .map(|k| v.get(k.as_str()).unwrap_or(&Value::Null)) + .map(Wrapper); + + let status = match self.client.execute_raw(&self.statement, p).await { + Ok(_) => EventStatus::Delivered, + Err(err) => { + error!("{err}"); + EventStatus::Rejected + } + }; + metadata.take_finalizers().update_status(status); Ok(()) } } From 441466f2669543f8d6d778fb54c61e00aa43b692 Mon Sep 17 00:00:00 2001 From: Michael Meyer Date: Thu, 20 Feb 2025 13:23:20 +0100 Subject: [PATCH 08/13] Add proper healthcheck --- src/sinks/postgres/mod.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/sinks/postgres/mod.rs b/src/sinks/postgres/mod.rs index 96248e0a8cbd1..bed10c38ecb38 100644 --- a/src/sinks/postgres/mod.rs +++ b/src/sinks/postgres/mod.rs @@ -1,4 +1,4 @@ -use std::collections::BTreeMap; +use std::{collections::BTreeMap, sync::Arc}; use crate::sinks::prelude::*; use bytes::{Buf, BufMut}; @@ -45,9 +45,7 @@ impl_generate_config_from_default!(PostgresConfig); #[typetag::serde(name = "postgres")] impl SinkConfig for PostgresConfig { async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - let healthcheck = Box::pin(async move { Ok(()) }); - - let PostgresConfig { + let PostgresConfig { host, port, user, @@ -72,6 +70,17 @@ impl SinkConfig for PostgresConfig { ) .await?; + let client = Arc::new(client); + + let health_client = client.clone(); + + let healthcheck = Box::pin(async move { + health_client + .query_one("SELECT 1", Default::default()) + .await?; + Ok(()) + }); + tokio::spawn(async move { if let Err(e) = connection.await { error!("connection error: {}", e); @@ -111,7 +120,7 @@ impl SinkConfig for PostgresConfig { } struct PostgresSink { - client: tokio_postgres::Client, + client: Arc, statement: tokio_postgres::Statement, columns: Vec, } From 3162544d69b105242620db100fb5511fbf543108 Mon Sep 17 00:00:00 2001 From: Michael Meyer Date: Thu, 20 Feb 2025 13:24:36 +0100 Subject: [PATCH 09/13] Distinguish storage for traces and logs --- src/sinks/postgres/mod.rs | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/src/sinks/postgres/mod.rs b/src/sinks/postgres/mod.rs index bed10c38ecb38..c8774ed57fdf3 100644 --- a/src/sinks/postgres/mod.rs +++ b/src/sinks/postgres/mod.rs @@ -45,7 +45,7 @@ impl_generate_config_from_default!(PostgresConfig); #[typetag::serde(name = "postgres")] impl SinkConfig for PostgresConfig { async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - let PostgresConfig { + let PostgresConfig { host, port, user, @@ -230,19 +230,26 @@ impl PostgresSink { Event::Log(log_event) => { let (v, mut metadata) = log_event.into_parts(); - let v = match v.into_object() { - Some(object) => object, - None => { - error!("Log value was not an object"); + match v { + Value::Object(btree_map) => { + self.store_trace((btree_map, metadata)).await?; + } + v if self.columns.len() == 1 => { + self.client + .execute(&self.statement, &[&Wrapper(&v)]) + .await + .map_err(|_| ())?; + } + _ => { + error!("Either the Value must be an object or the tables must have exactly one column"); metadata .take_finalizers() .update_status(EventStatus::Rejected); return Err(()); } - }; - self.store_log((v, metadata)).await? + } } - Event::Trace(trace) => self.store_log(trace.into_parts()).await?, + Event::Trace(trace) => self.store_trace(trace.into_parts()).await?, _ => { error!("Only logs are implemented so far"); event.take_finalizers().update_status(EventStatus::Rejected); @@ -252,7 +259,7 @@ impl PostgresSink { Ok(()) } - async fn store_log( + async fn store_trace( &self, event: (BTreeMap, EventMetadata), ) -> Result<(), ()> { From b7551a988014f81c302bcad6da3d8906e8a6f5be Mon Sep 17 00:00:00 2001 From: Michael Meyer Date: Thu, 20 Feb 2025 14:05:29 +0100 Subject: [PATCH 10/13] Make Clippy happy --- src/sinks/postgres/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sinks/postgres/mod.rs b/src/sinks/postgres/mod.rs index c8774ed57fdf3..bcfb9b37b0427 100644 --- a/src/sinks/postgres/mod.rs +++ b/src/sinks/postgres/mod.rs @@ -72,7 +72,7 @@ impl SinkConfig for PostgresConfig { let client = Arc::new(client); - let health_client = client.clone(); + let health_client = Arc::clone(&client); let healthcheck = Box::pin(async move { health_client @@ -138,7 +138,7 @@ impl StreamSink for PostgresSink { #[derive(Debug)] struct Wrapper<'a>(&'a Value); -impl<'a> ToSql for Wrapper<'a> { +impl ToSql for Wrapper<'_> { fn to_sql( &self, ty: &tokio_postgres::types::Type, From 42e62af4d59d03621b8ee197c3e32980ae4995f2 Mon Sep 17 00:00:00 2001 From: Michael Meyer Date: Thu, 20 Feb 2025 21:19:49 +0100 Subject: [PATCH 11/13] Add missing status update --- src/sinks/postgres/mod.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/sinks/postgres/mod.rs b/src/sinks/postgres/mod.rs index bcfb9b37b0427..e96608b9ff44b 100644 --- a/src/sinks/postgres/mod.rs +++ b/src/sinks/postgres/mod.rs @@ -235,10 +235,15 @@ impl PostgresSink { self.store_trace((btree_map, metadata)).await?; } v if self.columns.len() == 1 => { - self.client + match self + .client .execute(&self.statement, &[&Wrapper(&v)]) .await - .map_err(|_| ())?; + .map_err(|_| ()) + { + Ok(_) => metadata.update_status(EventStatus::Delivered), + Err(_) => metadata.update_status(EventStatus::Rejected), + } } _ => { error!("Either the Value must be an object or the tables must have exactly one column"); From 875873fc3003f3ead71ea2e2d19f703fb87268fd Mon Sep 17 00:00:00 2001 From: Michael Meyer Date: Fri, 21 Feb 2025 12:27:32 +0100 Subject: [PATCH 12/13] Add metric storage --- src/sinks/postgres/mod.rs | 94 +++++++++++++++++++++++++++++++++++---- 1 file changed, 86 insertions(+), 8 deletions(-) diff --git a/src/sinks/postgres/mod.rs b/src/sinks/postgres/mod.rs index e96608b9ff44b..e78a658ab107c 100644 --- a/src/sinks/postgres/mod.rs +++ b/src/sinks/postgres/mod.rs @@ -1,14 +1,18 @@ -use std::{collections::BTreeMap, sync::Arc}; +use std::{collections::BTreeMap, fmt::Debug, sync::Arc}; use crate::sinks::prelude::*; use bytes::{Buf, BufMut}; use chrono::{DateTime, Utc}; use itertools::Itertools; +use serde::Serialize; use tokio_postgres::{ - types::{to_sql_checked, IsNull, ToSql}, + types::{accepts, to_sql_checked, BorrowToSql, IsNull, ToSql}, NoTls, }; -use vector_lib::event::EventMetadata; +use vector_lib::event::{ + metric::{MetricData, MetricSeries, MetricTime}, + EventMetadata, Metric, MetricValue, +}; use vrl::value::KeyString; #[configurable_component(sink("postgres"))] @@ -223,9 +227,32 @@ impl ToSql for Wrapper<'_> { to_sql_checked!(); } +/// Allows for zero-copy SQL conversion for any struct that is +/// Serializable into a JSON object +#[derive(Debug)] +struct JsonObjWrapper(Inner); + +impl ToSql for JsonObjWrapper { + fn to_sql( + &self, + _: &tokio_postgres::types::Type, + out: &mut bytes::BytesMut, + ) -> Result> + where + Self: Sized, + { + serde_json::to_writer(out.writer(), &self.0)?; + Ok(IsNull::No) + } + + accepts!(JSON, JSONB); + + to_sql_checked!(); +} + impl PostgresSink { async fn run_inner(self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { - while let Some(mut event) = input.next().await { + while let Some(event) = input.next().await { match event { Event::Log(log_event) => { let (v, mut metadata) = log_event.into_parts(); @@ -255,10 +282,7 @@ impl PostgresSink { } } Event::Trace(trace) => self.store_trace(trace.into_parts()).await?, - _ => { - error!("Only logs are implemented so far"); - event.take_finalizers().update_status(EventStatus::Rejected); - } + Event::Metric(metric) => self.store_metric(metric).await?, } } Ok(()) @@ -286,4 +310,58 @@ impl PostgresSink { metadata.take_finalizers().update_status(status); Ok(()) } + + async fn store_metric(&self, metric: Metric) -> Result<(), ()> { + let (series, data, mut metadata) = metric.into_parts(); + let MetricSeries { ref name, tags } = series; + let tags = tags.map(JsonObjWrapper); + let MetricData { + time: MetricTime { + timestamp, + interval_ms, + }, + kind, + value, + } = data; + let interval_ms = interval_ms.map(|i| i.get()); + let value_wrapped = JsonObjWrapper(value); + + // Same semantics as serializing the metric into a JSON object + // and then indexing into the resulting map, but without allocation + let p = self + .columns + .iter() + .map(|c| match (c.as_str(), &value_wrapped.0) { + ("name", _) => name.name.borrow_to_sql(), + ("tags", _) => tags.borrow_to_sql(), + ("timestamp", _) => timestamp.borrow_to_sql(), + ("interval_ms", _) => interval_ms.borrow_to_sql(), + ("kind", _) => match kind { + vector_lib::event::MetricKind::Incremental => "Incremental".borrow_to_sql(), + vector_lib::event::MetricKind::Absolute => "Absolute".borrow_to_sql(), + }, + ("aggregated_histogram", MetricValue::AggregatedHistogram { .. }) => { + value_wrapped.borrow_to_sql() + } + ("aggregated_summary", MetricValue::AggregatedSummary { .. }) => { + value_wrapped.borrow_to_sql() + } + ("counter", MetricValue::Counter { .. }) => value_wrapped.borrow_to_sql(), + ("distribution", MetricValue::Distribution { .. }) => value_wrapped.borrow_to_sql(), + ("gauge", MetricValue::Gauge { .. }) => value_wrapped.borrow_to_sql(), + ("set", MetricValue::Set { .. }) => value_wrapped.borrow_to_sql(), + ("sketch", MetricValue::Sketch { .. }) => value_wrapped.borrow_to_sql(), + _ => Wrapper(&Value::Null).borrow_to_sql(), + }); + + let status = match self.client.execute_raw(&self.statement, p).await { + Ok(_) => EventStatus::Delivered, + Err(err) => { + error!("{err}"); + EventStatus::Rejected + } + }; + metadata.take_finalizers().update_status(status); + Ok(()) + } } From e9b0c8c7f5f3f61508b34c6d4974d505201970a7 Mon Sep 17 00:00:00 2001 From: Michael Meyer Date: Fri, 21 Feb 2025 12:47:40 +0100 Subject: [PATCH 13/13] Extract Wrappers --- src/sinks/postgres/mod.rs | 185 +++++++--------------------------- src/sinks/postgres/wrapper.rs | 118 ++++++++++++++++++++++ 2 files changed, 156 insertions(+), 147 deletions(-) create mode 100644 src/sinks/postgres/wrapper.rs diff --git a/src/sinks/postgres/mod.rs b/src/sinks/postgres/mod.rs index e78a658ab107c..9ba2515160b0e 100644 --- a/src/sinks/postgres/mod.rs +++ b/src/sinks/postgres/mod.rs @@ -1,19 +1,15 @@ -use std::{collections::BTreeMap, fmt::Debug, sync::Arc}; +mod wrapper; use crate::sinks::prelude::*; -use bytes::{Buf, BufMut}; -use chrono::{DateTime, Utc}; use itertools::Itertools; -use serde::Serialize; -use tokio_postgres::{ - types::{accepts, to_sql_checked, BorrowToSql, IsNull, ToSql}, - NoTls, -}; +use std::{collections::BTreeMap, fmt::Debug, sync::Arc}; +use tokio_postgres::{types::BorrowToSql, NoTls}; use vector_lib::event::{ metric::{MetricData, MetricSeries, MetricTime}, EventMetadata, Metric, MetricValue, }; use vrl::value::KeyString; +use wrapper::{JsonObjWrapper, Wrapper}; #[configurable_component(sink("postgres"))] #[derive(Clone, Debug, Default)] @@ -139,148 +135,11 @@ impl StreamSink for PostgresSink { } } -#[derive(Debug)] -struct Wrapper<'a>(&'a Value); - -impl ToSql for Wrapper<'_> { - fn to_sql( - &self, - ty: &tokio_postgres::types::Type, - out: &mut bytes::BytesMut, - ) -> Result> - where - Self: Sized, - { - match self.0 { - Value::Bytes(bytes) => bytes.chunk().to_sql(ty, out), - Value::Regex(value_regex) => value_regex.as_str().to_sql(ty, out), - Value::Integer(i) => i.to_sql(ty, out), - Value::Float(not_nan) => not_nan.to_sql(ty, out), - Value::Boolean(b) => b.to_sql(ty, out), - Value::Timestamp(date_time) => date_time.to_sql(ty, out), - Value::Object(btree_map) => { - serde_json::to_writer(out.writer(), btree_map)?; - Ok(IsNull::No) - } - Value::Array(values) => { - // Taken from postgres-types/lib.rs `impl ToSql for &[T]` - // - // There is no function that serializes an iterator, only a method on slices, - // but we should not have to allocate a new `Vec>` just to - // serialize the `Vec` we already have - - let member_type = match *ty.kind() { - tokio_postgres::types::Kind::Array(ref member) => member, - _ => { - return Err(Box::new( - tokio_postgres::types::WrongType::new::>(ty.clone()), - )) - } - }; - - // Arrays are normally one indexed by default but oidvector and int2vector *require* zero indexing - let lower_bound = match *ty { - tokio_postgres::types::Type::OID_VECTOR - | tokio_postgres::types::Type::INT2_VECTOR => 0, - _ => 1, - }; - - let dimension = postgres_protocol::types::ArrayDimension { - len: values.len().try_into()?, - lower_bound, - }; - - postgres_protocol::types::array_to_sql( - Some(dimension), - member_type.oid(), - values.iter().map(Wrapper), - |e, w| match e.to_sql(member_type, w)? { - IsNull::No => Ok(postgres_protocol::IsNull::No), - IsNull::Yes => Ok(postgres_protocol::IsNull::Yes), - }, - out, - )?; - Ok(IsNull::No) - } - Value::Null => Ok(IsNull::Yes), - } - } - - fn accepts(ty: &tokio_postgres::types::Type) -> bool - where - Self: Sized, - { - <&[u8]>::accepts(ty) - || <&str>::accepts(ty) - || i64::accepts(ty) - || f64::accepts(ty) - || bool::accepts(ty) - || DateTime::::accepts(ty) - || serde_json::Value::accepts(ty) - || Option::::accepts(ty) - || match *ty.kind() { - tokio_postgres::types::Kind::Array(ref member) => Self::accepts(member), - _ => false, - } - } - - to_sql_checked!(); -} - -/// Allows for zero-copy SQL conversion for any struct that is -/// Serializable into a JSON object -#[derive(Debug)] -struct JsonObjWrapper(Inner); - -impl ToSql for JsonObjWrapper { - fn to_sql( - &self, - _: &tokio_postgres::types::Type, - out: &mut bytes::BytesMut, - ) -> Result> - where - Self: Sized, - { - serde_json::to_writer(out.writer(), &self.0)?; - Ok(IsNull::No) - } - - accepts!(JSON, JSONB); - - to_sql_checked!(); -} - impl PostgresSink { async fn run_inner(self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { while let Some(event) = input.next().await { match event { - Event::Log(log_event) => { - let (v, mut metadata) = log_event.into_parts(); - - match v { - Value::Object(btree_map) => { - self.store_trace((btree_map, metadata)).await?; - } - v if self.columns.len() == 1 => { - match self - .client - .execute(&self.statement, &[&Wrapper(&v)]) - .await - .map_err(|_| ()) - { - Ok(_) => metadata.update_status(EventStatus::Delivered), - Err(_) => metadata.update_status(EventStatus::Rejected), - } - } - _ => { - error!("Either the Value must be an object or the tables must have exactly one column"); - metadata - .take_finalizers() - .update_status(EventStatus::Rejected); - return Err(()); - } - } - } + Event::Log(log) => self.store_log(log).await?, Event::Trace(trace) => self.store_trace(trace.into_parts()).await?, Event::Metric(metric) => self.store_metric(metric).await?, } @@ -288,6 +147,38 @@ impl PostgresSink { Ok(()) } + async fn store_log(&self, log_event: LogEvent) -> Result<(), ()> { + let (v, mut metadata) = log_event.into_parts(); + + match v { + Value::Object(btree_map) => { + self.store_trace((btree_map, metadata)).await?; + } + v if self.columns.len() == 1 => { + match self + .client + .execute(&self.statement, &[&Wrapper(&v)]) + .await + .map_err(|_| ()) + { + Ok(_) => metadata.update_status(EventStatus::Delivered), + Err(_) => metadata.update_status(EventStatus::Rejected), + } + } + _ => { + error!( + "Either the Value must be an object or the tables must have exactly one column" + ); + metadata + .take_finalizers() + .update_status(EventStatus::Rejected); + return Err(()); + } + } + + Ok(()) + } + async fn store_trace( &self, event: (BTreeMap, EventMetadata), @@ -313,7 +204,7 @@ impl PostgresSink { async fn store_metric(&self, metric: Metric) -> Result<(), ()> { let (series, data, mut metadata) = metric.into_parts(); - let MetricSeries { ref name, tags } = series; + let MetricSeries { name, tags } = series; let tags = tags.map(JsonObjWrapper); let MetricData { time: MetricTime { diff --git a/src/sinks/postgres/wrapper.rs b/src/sinks/postgres/wrapper.rs new file mode 100644 index 0000000000000..3ac2d66e9f376 --- /dev/null +++ b/src/sinks/postgres/wrapper.rs @@ -0,0 +1,118 @@ +use std::fmt::Debug; + +use crate::sinks::prelude::*; +use bytes::{Buf, BufMut}; +use chrono::{DateTime, Utc}; +use serde::Serialize; +use tokio_postgres::types::{accepts, to_sql_checked, IsNull, ToSql}; + +#[derive(Debug)] +pub(crate) struct Wrapper<'a>(pub &'a Value); + +impl ToSql for Wrapper<'_> { + fn to_sql( + &self, + ty: &tokio_postgres::types::Type, + out: &mut bytes::BytesMut, + ) -> Result> + where + Self: Sized, + { + match self.0 { + Value::Bytes(bytes) => bytes.chunk().to_sql(ty, out), + Value::Regex(value_regex) => value_regex.as_str().to_sql(ty, out), + Value::Integer(i) => i.to_sql(ty, out), + Value::Float(not_nan) => not_nan.to_sql(ty, out), + Value::Boolean(b) => b.to_sql(ty, out), + Value::Timestamp(date_time) => date_time.to_sql(ty, out), + Value::Object(btree_map) => { + serde_json::to_writer(out.writer(), btree_map)?; + Ok(IsNull::No) + } + Value::Array(values) => { + // Taken from postgres-types/lib.rs `impl ToSql for &[T]` + // + // There is no function that serializes an iterator, only a method on slices, + // but we should not have to allocate a new `Vec>` just to + // serialize the `Vec` we already have + + let member_type = match *ty.kind() { + tokio_postgres::types::Kind::Array(ref member) => member, + _ => { + return Err(Box::new( + tokio_postgres::types::WrongType::new::>(ty.clone()), + )) + } + }; + + // Arrays are normally one indexed by default but oidvector and int2vector *require* zero indexing + let lower_bound = match *ty { + tokio_postgres::types::Type::OID_VECTOR + | tokio_postgres::types::Type::INT2_VECTOR => 0, + _ => 1, + }; + + let dimension = postgres_protocol::types::ArrayDimension { + len: values.len().try_into()?, + lower_bound, + }; + + postgres_protocol::types::array_to_sql( + Some(dimension), + member_type.oid(), + values.iter().map(Wrapper), + |e, w| match e.to_sql(member_type, w)? { + IsNull::No => Ok(postgres_protocol::IsNull::No), + IsNull::Yes => Ok(postgres_protocol::IsNull::Yes), + }, + out, + )?; + Ok(IsNull::No) + } + Value::Null => Ok(IsNull::Yes), + } + } + + fn accepts(ty: &tokio_postgres::types::Type) -> bool + where + Self: Sized, + { + <&[u8]>::accepts(ty) + || <&str>::accepts(ty) + || i64::accepts(ty) + || f64::accepts(ty) + || bool::accepts(ty) + || DateTime::::accepts(ty) + || serde_json::Value::accepts(ty) + || Option::::accepts(ty) + || match *ty.kind() { + tokio_postgres::types::Kind::Array(ref member) => Self::accepts(member), + _ => false, + } + } + + to_sql_checked!(); +} + +/// Allows for zero-copy SQL conversion for any struct that is +/// Serializable into a JSON object +#[derive(Debug)] +pub(crate) struct JsonObjWrapper(pub Inner); + +impl ToSql for JsonObjWrapper { + fn to_sql( + &self, + _: &tokio_postgres::types::Type, + out: &mut bytes::BytesMut, + ) -> Result> + where + Self: Sized, + { + serde_json::to_writer(out.writer(), &self.0)?; + Ok(IsNull::No) + } + + accepts!(JSON, JSONB); + + to_sql_checked!(); +}