Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(new sink): new postgres sink #22481

Closed
wants to merge 13 commits into from
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ stream-cancel = { version = "0.8.2", default-features = false }
strip-ansi-escapes = { version = "0.2.1", default-features = false }
syslog = { version = "6.1.1", default-features = false, optional = true }
tikv-jemallocator = { version = "0.6.0", default-features = false, features = ["unprefixed_malloc_on_supported_platforms"], optional = true }
tokio-postgres = { version = "0.7.13", default-features = false, features = ["runtime", "with-chrono-0_4"], optional = true }
tokio-postgres = { version = "0.7.13", default-features = false, features = ["runtime", "with-chrono-0_4", "with-serde_json-1"], optional = true }
tokio-tungstenite = { version = "0.20.1", default-features = false, features = ["connect"], optional = true }
toml.workspace = true
tonic = { workspace = true, optional = true }
Expand All @@ -391,6 +391,7 @@ heim = { git = "https://github.com/vectordotdev/heim.git", branch = "update-nix"
mlua = { version = "0.10.3", default-features = false, features = ["lua54", "send", "vendored", "macros"], optional = true }
sysinfo = "0.32.1"
byteorder = "1.5.0"
postgres-protocol = { version = "0.6.8", optional = true }

[target.'cfg(windows)'.dependencies]
windows-service = "0.7.0"
Expand Down Expand Up @@ -737,6 +738,7 @@ sinks-logs = [
"sinks-new_relic_logs",
"sinks-opentelemetry",
"sinks-papertrail",
"sinks-postgres",
"sinks-pulsar",
"sinks-redis",
"sinks-sematext",
Expand Down Expand Up @@ -805,6 +807,7 @@ sinks-new_relic_logs = ["sinks-http"]
sinks-new_relic = []
sinks-opentelemetry = ["sinks-http"]
sinks-papertrail = ["dep:syslog"]
sinks-postgres = ["dep:tokio-postgres", "dep:postgres-protocol"]
sinks-prometheus = ["dep:base64", "dep:prost", "vector-lib/prometheus"]
sinks-pulsar = ["dep:apache-avro", "dep:pulsar", "dep:lru"]
sinks-redis = ["dep:redis"]
Expand Down Expand Up @@ -993,6 +996,7 @@ codecs-benches = []
loki-benches = ["sinks-loki"]
enrichment-tables-benches = ["enrichment-tables-geoip", "enrichment-tables-mmdb", "enrichment-tables-memory"]
proptest = ["dep:proptest", "dep:proptest-derive", "vrl/proptest"]
postgres-protocol = ["dep:postgres-protocol"]

[[bench]]
name = "default"
Expand Down
2 changes: 2 additions & 0 deletions src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ pub mod opendal_common;
pub mod opentelemetry;
#[cfg(feature = "sinks-papertrail")]
pub mod papertrail;
#[cfg(feature = "sinks-postgres")]
pub mod postgres;
#[cfg(feature = "sinks-prometheus")]
pub mod prometheus;
#[cfg(feature = "sinks-pulsar")]
Expand Down
258 changes: 258 additions & 0 deletions src/sinks/postgres/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
mod wrapper;

use crate::sinks::prelude::*;
use itertools::Itertools;
use std::{collections::BTreeMap, fmt::Debug, sync::Arc};
use tokio_postgres::{types::BorrowToSql, NoTls};
use vector_lib::event::{
metric::{MetricData, MetricSeries, MetricTime},
EventMetadata, Metric, MetricValue,
};
use vrl::value::KeyString;
use wrapper::{JsonObjWrapper, Wrapper};

#[configurable_component(sink("postgres"))]
#[derive(Clone, Debug, Default)]
/// Write the input to a postgres tables
pub struct PostgresConfig {
#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
pub acknowledgements: AcknowledgementsConfig,
/// The postgress host
pub host: String,
/// The postgress port
pub port: u16,
/// The postgress table
pub table: String,
/// The postgress schema (default: public)
pub schema: Option<String>,
/// The postgress database (default: postgres)
pub database: Option<String>,

/// The postgres user
pub user: Option<String>,
/// The postgres password
pub password: Option<String>,
}

impl_generate_config_from_default!(PostgresConfig);

#[async_trait::async_trait]
#[typetag::serde(name = "postgres")]
impl SinkConfig for PostgresConfig {
async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let PostgresConfig {
host,
port,
user,
password,
table,
database,
schema,
..
} = self;

let user = user.as_deref().unwrap_or("postgres");

let schema = schema.as_deref().unwrap_or("public");

// dbname defaults to username if omitted so we do the same here
let database = database.as_deref().unwrap_or(user);
let password = password.as_deref().unwrap_or("mysecretpassword");

let (client, connection) = tokio_postgres::connect(
&format!("host={host} user={user} port={port} password={password} dbname={database}"),
NoTls,
)
.await?;

let client = Arc::new(client);

let health_client = Arc::clone(&client);

let healthcheck = Box::pin(async move {
health_client
.query_one("SELECT 1", Default::default())
.await?;
Ok(())
});

tokio::spawn(async move {
if let Err(e) = connection.await {
error!("connection error: {}", e);
}
});

let columns: Vec<_> = client.query("SELECT column_name from INFORMATION_SCHEMA.COLUMNS WHERE table_name = $1 AND table_schema = $2", &[&table, &schema]).await?.into_iter().map(|x| x.get(0)).collect();

let statement = client
.prepare(&format!(
"INSERT INTO {table} ({}) VALUES ({})",
columns.iter().map(|v| format!("\"{v}\"")).join(","),
columns
.iter()
.enumerate()
.map(|(i, _)| format!("${}", i + 1))
.join(",")
))
.await?;

let sink = VectorSink::from_event_streamsink(PostgresSink {
client,
statement,
columns,
});

Ok((sink, healthcheck))
}

fn input(&self) -> Input {
Input::log()
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}

struct PostgresSink {
client: Arc<tokio_postgres::Client>,
statement: tokio_postgres::Statement,
columns: Vec<String>,
}

#[async_trait::async_trait]
impl StreamSink<Event> for PostgresSink {
async fn run(
self: Box<Self>,
input: futures_util::stream::BoxStream<'_, Event>,
) -> Result<(), ()> {
self.run_inner(input).await
}
}

impl PostgresSink {
async fn run_inner(self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
while let Some(event) = input.next().await {
match event {
Event::Log(log) => self.store_log(log).await?,
Event::Trace(trace) => self.store_trace(trace.into_parts()).await?,
Event::Metric(metric) => self.store_metric(metric).await?,
}
}
Ok(())
}

async fn store_log(&self, log_event: LogEvent) -> Result<(), ()> {
let (v, mut metadata) = log_event.into_parts();

match v {
Value::Object(btree_map) => {
self.store_trace((btree_map, metadata)).await?;
}
v if self.columns.len() == 1 => {
match self
.client
.execute(&self.statement, &[&Wrapper(&v)])
.await
.map_err(|_| ())
{
Ok(_) => metadata.update_status(EventStatus::Delivered),
Err(_) => metadata.update_status(EventStatus::Rejected),
}
}
_ => {
error!(
"Either the Value must be an object or the tables must have exactly one column"
);
metadata
.take_finalizers()
.update_status(EventStatus::Rejected);
return Err(());
}
}

Ok(())
}

async fn store_trace(
&self,
event: (BTreeMap<KeyString, Value>, EventMetadata),
) -> Result<(), ()> {
let (v, mut metadata) = event;

let p = self
.columns
.iter()
.map(|k| v.get(k.as_str()).unwrap_or(&Value::Null))
.map(Wrapper);

let status = match self.client.execute_raw(&self.statement, p).await {
Ok(_) => EventStatus::Delivered,
Err(err) => {
error!("{err}");
EventStatus::Rejected
}
};
metadata.take_finalizers().update_status(status);
Ok(())
}

async fn store_metric(&self, metric: Metric) -> Result<(), ()> {
let (series, data, mut metadata) = metric.into_parts();
let MetricSeries { name, tags } = series;
let tags = tags.map(JsonObjWrapper);
let MetricData {
time: MetricTime {
timestamp,
interval_ms,
},
kind,
value,
} = data;
let interval_ms = interval_ms.map(|i| i.get());
let value_wrapped = JsonObjWrapper(value);

// Same semantics as serializing the metric into a JSON object
// and then indexing into the resulting map, but without allocation
let p = self
.columns
.iter()
.map(|c| match (c.as_str(), &value_wrapped.0) {
("name", _) => name.name.borrow_to_sql(),
("tags", _) => tags.borrow_to_sql(),
("timestamp", _) => timestamp.borrow_to_sql(),
("interval_ms", _) => interval_ms.borrow_to_sql(),
("kind", _) => match kind {
vector_lib::event::MetricKind::Incremental => "Incremental".borrow_to_sql(),
vector_lib::event::MetricKind::Absolute => "Absolute".borrow_to_sql(),
},
("aggregated_histogram", MetricValue::AggregatedHistogram { .. }) => {
value_wrapped.borrow_to_sql()
}
("aggregated_summary", MetricValue::AggregatedSummary { .. }) => {
value_wrapped.borrow_to_sql()
}
("counter", MetricValue::Counter { .. }) => value_wrapped.borrow_to_sql(),
("distribution", MetricValue::Distribution { .. }) => value_wrapped.borrow_to_sql(),
("gauge", MetricValue::Gauge { .. }) => value_wrapped.borrow_to_sql(),
("set", MetricValue::Set { .. }) => value_wrapped.borrow_to_sql(),
("sketch", MetricValue::Sketch { .. }) => value_wrapped.borrow_to_sql(),
_ => Wrapper(&Value::Null).borrow_to_sql(),
});

let status = match self.client.execute_raw(&self.statement, p).await {
Ok(_) => EventStatus::Delivered,
Err(err) => {
error!("{err}");
EventStatus::Rejected
}
};
metadata.take_finalizers().update_status(status);
Ok(())
}
}
Loading
Loading