Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(postgres sink): Add postgres sink #21248

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/actions/spelling/expect.txt
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ spencergilbert
spinlock
SPOF
spog
sqlx
srcaddr
srcport
SREs
Expand Down
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ semver = { version = "1.0.23", default-features = false, features = ["serde", "s
smallvec = { version = "1", default-features = false, features = ["union", "serde"] }
snap = { version = "1.1.1", default-features = false }
socket2 = { version = "0.5.7", default-features = false }
sqlx = { version = "0.8.2", default-features= false, features = ["derive", "postgres", "runtime-tokio"], optional=true }
stream-cancel = { version = "0.8.2", default-features = false }
strip-ansi-escapes = { version = "0.2.0", default-features = false }
syslog = { version = "6.1.1", default-features = false, optional = true }
Expand Down Expand Up @@ -721,6 +722,7 @@ sinks-logs = [
"sinks-new_relic_logs",
"sinks-new_relic",
"sinks-papertrail",
"sinks-postgres",
"sinks-pulsar",
"sinks-redis",
"sinks-sematext",
Expand Down Expand Up @@ -787,6 +789,7 @@ sinks-new_relic_logs = ["sinks-http"]
sinks-new_relic = []
sinks-papertrail = ["dep:syslog"]
sinks-prometheus = ["dep:base64", "dep:prost", "vector-lib/prometheus"]
sinks-postgres = ["dep:sqlx"]
sinks-pulsar = ["dep:apache-avro", "dep:pulsar", "dep:lru"]
sinks-redis = ["dep:redis"]
sinks-sematext = ["sinks-elasticsearch", "sinks-influxdb"]
Expand Down Expand Up @@ -835,6 +838,7 @@ all-integration-tests = [
"nginx-integration-tests",
"opentelemetry-integration-tests",
"postgresql_metrics-integration-tests",
"postgres_sink-integration-tests",
"prometheus-integration-tests",
"pulsar-integration-tests",
"redis-integration-tests",
Expand Down Expand Up @@ -899,6 +903,7 @@ nats-integration-tests = ["sinks-nats", "sources-nats"]
nginx-integration-tests = ["sources-nginx_metrics"]
opentelemetry-integration-tests = ["sources-opentelemetry", "dep:prost"]
postgresql_metrics-integration-tests = ["sources-postgresql_metrics"]
postgres_sink-integration-tests = ["sinks-postgres"]
prometheus-integration-tests = ["sinks-prometheus", "sources-prometheus", "sinks-influxdb"]
pulsar-integration-tests = ["sinks-pulsar", "sources-pulsar"]
redis-integration-tests = ["sinks-redis", "sources-redis"]
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/15765_postgres_sink.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add a new postgres sink which allows to send log events to a postgres database.

authors: jorgehermo9
2 changes: 2 additions & 0 deletions scripts/integration/postgres/test.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
features:
- postgresql_metrics-integration-tests
- postgres_sink-integration-tests

test_filter: ::postgres

Expand All @@ -18,6 +19,7 @@ matrix:
# expressions are evaluated using https://github.com/micromatch/picomatch
paths:
- "src/internal_events/postgresql_metrics.rs"
- "src/sinks/postgres/**"
- "src/sources/postgresql_metrics.rs"
- "src/sources/util/**"
- "scripts/integration/postgres/**"
2 changes: 2 additions & 0 deletions src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ pub mod new_relic;
pub mod opendal_common;
#[cfg(feature = "sinks-papertrail")]
pub mod papertrail;
#[cfg(feature = "sinks-postgres")]
pub mod postgres;
#[cfg(feature = "sinks-prometheus")]
pub mod prometheus;
#[cfg(feature = "sinks-pulsar")]
Expand Down
146 changes: 146 additions & 0 deletions src/sinks/postgres/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
use futures::FutureExt;
use tower::ServiceBuilder;
use vector_lib::{
config::AcknowledgementsConfig,
configurable::{component::GenerateConfig, configurable_component},
sink::VectorSink,
};

use super::{
service::{PostgresRetryLogic, PostgresService},
sink::PostgresSink,
};
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};

use crate::{
config::{Input, SinkConfig, SinkContext},
sinks::{
util::{
BatchConfig, RealtimeSizeBasedDefaultBatchSettings, ServiceBuilderExt,
TowerRequestConfig, UriSerde,
},
Healthcheck,
},
};

const fn default_pool_size() -> u32 {
5
}

/// Configuration for the `postgres` sink.
#[configurable_component(sink("postgres", "Deliver log data to a PostgreSQL database."))]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should I call this sink postgres or postgres_logs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm good question, could this evolve to handle both logs and metrics in the future?

Copy link
Contributor Author

@jorgehermo9 jorgehermo9 Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking if this can evolve to integrate with other postgres flavours such as timescaledb, which is oriented to time series

My thoughts on this: #21308 (comment)

Timescaledb tracking issue: #939

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be interesting to change the input

Input::log()
of this sink to allow for metrics and traces too. I'll give it a try.

#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct PostgresConfig {
// TODO: if I used UriSerde instead of String, I couldn't get a url string to use
// in the connection pool, as the password would be redacted with UriSerde::to_string
/// The connection string for the PostgreSQL server. It can contain the username and password.
pub endpoint: String,

/// The table that data is inserted into.
pub table: String,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make the table templatable? Like the clickhouse sink. That would complicate the code a little bit (with KeyPartitioner and so. If yes, I would like some guidance about it if possible

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a nice feature but not a must-have, we can do this incrementally. Once we finalized the rest of the comments we can come back to this if you are motivated to add this feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay!


/// The postgres connection pool size. See [this](https://docs.rs/sqlx/latest/sqlx/struct.Pool.html#why-use-a-pool) for more
/// information about why a connection pool should be used.
#[serde(default = "default_pool_size")]
pub pool_size: u32,

#[configurable(derived)]
#[serde(default)]
pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,

#[configurable(derived)]
#[serde(default)]
pub request: TowerRequestConfig,

#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
pub acknowledgements: AcknowledgementsConfig,
}

impl GenerateConfig for PostgresConfig {
fn generate_config() -> toml::Value {
toml::from_str(
r#"endpoint = "postgres://user:password@localhost/default"
table = "default"
"#,
)
.unwrap()
}
}

#[async_trait::async_trait]
#[typetag::serde(name = "postgres")]
impl SinkConfig for PostgresConfig {
async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
// TODO: make connection pool configurable. Or should we just have one connection per sink?
// TODO: it seems that the number of connections in the pool does not affect the throughput of the sink
// does the sink execute batches in parallel?
let connection_pool = PgPoolOptions::new()
.max_connections(self.pool_size)
.connect(&self.endpoint)
.await?;

let healthcheck = healthcheck(connection_pool.clone()).boxed();

let batch_settings = self.batch.into_batcher_settings()?;
let request_settings = self.request.into_settings();

let endpoint_uri: UriSerde = self.endpoint.parse()?;
let service = PostgresService::new(
connection_pool,
self.table.clone(),
// TODO: this endpoint is used for metrics' tags. It could contain passwords,
// will it be redacted there?
endpoint_uri.to_string(),
);
let service = ServiceBuilder::new()
.settings(request_settings, PostgresRetryLogic)
.service(service);

let sink = PostgresSink::new(service, batch_settings);

Ok((VectorSink::from_event_streamsink(sink), healthcheck))
}

// TODO: allow for Input::all()
fn input(&self) -> Input {
Input::log()
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}

async fn healthcheck(connection_pool: Pool<Postgres>) -> crate::Result<()> {
sqlx::query("SELECT 1").execute(&connection_pool).await?;
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn generate_config() {
crate::test_util::test_generate_config::<PostgresConfig>();
}

#[test]
fn parse_config() {
let cfg = toml::from_str::<PostgresConfig>(
r#"
endpoint = "postgres://user:password@localhost/default"
table = "mytable"
"#,
)
.unwrap();
assert_eq!(cfg.endpoint, "postgres://user:password@localhost/default");
assert_eq!(cfg.table, "mytable");
}
}
111 changes: 111 additions & 0 deletions src/sinks/postgres/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use crate::{
config::{SinkConfig, SinkContext},
sinks::{
postgres::PostgresConfig,
util::{test::load_sink, UriSerde},
},
test_util::{components::run_and_assert_sink_compliance, random_string, trace_init},
};
use futures::stream;
use serde::{Deserialize, Serialize};
use sqlx::{Connection, FromRow, PgConnection};
use std::future::ready;
use vector_lib::event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event, LogEvent};

fn pg_host() -> String {
Copy link
Contributor Author

@jorgehermo9 jorgehermo9 Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied those utility functions from the postgres_metrics source integration tests

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do some small refactoring here.

  • The folder src/test_util seems like a good location to put these utils.
  • You can use #[cfg(any(feature = "postgresql_metrics-integration-tests", feature = "postgres_sink-integration-tests"))] in src/test_util/mod.rs.

std::env::var("PG_HOST").unwrap_or_else(|_| "localhost".into())
}

fn pg_url() -> String {
std::env::var("PG_URL")
.unwrap_or_else(|_| format!("postgres://vector:vector@{}/postgres", pg_host()))
}

fn gen_table() -> String {
format!("test_{}", random_string(10).to_lowercase())
}

fn make_event() -> (Event, BatchStatusReceiver) {
let (batch, receiver) = BatchNotifier::new_with_receiver();
let mut event = LogEvent::from("raw log line").with_batch_notifier(&batch);
event.insert("host", "example.com");
let event_payload = event.clone().into_parts().0;
event.insert("payload", event_payload);
(event.into(), receiver)
}

#[derive(Debug, Serialize, Deserialize, FromRow)]
struct TestEvent {
host: String,
timestamp: String,
message: String,
payload: serde_json::Value,
}

async fn prepare_config() -> (String, String, PgConnection) {
trace_init();

let table = gen_table();
let endpoint = pg_url();
let _endpoint: UriSerde = endpoint.parse().unwrap();

let cfg = format!(
r#"
endpoint = "{endpoint}"
table = "{table}"
batch.max_events = 1
"#,
);

let connection = PgConnection::connect(&endpoint)
.await
.expect("Failed to connect to Postgres");

(cfg, table, connection)
}

async fn insert_event_with_cfg(cfg: String, table: String, mut connection: PgConnection) {
// We store the timestamp as text and not as `timestamp with timezone` postgres type due to
// postgres not supporting nanosecond-resolution (it does support microsecond-resolution).
let create_table_sql =
format!("CREATE TABLE IF NOT EXISTS {table} (host text, timestamp text, message text, payload jsonb)",);
sqlx::query(&create_table_sql)
.execute(&mut connection)
.await
.unwrap();

let (config, _) = load_sink::<PostgresConfig>(&cfg).unwrap();
let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();

let (input_event, mut receiver) = make_event();
run_and_assert_sink_compliance(
sink,
stream::once(ready(input_event.clone())),
&["endpoint", "protocol"],
)
.await;

let select_all_sql = format!("SELECT * FROM {table}");
let events: Vec<TestEvent> = sqlx::query_as(&select_all_sql)
.fetch_all(&mut connection)
.await
.unwrap();
dbg!(&events);
assert_eq!(1, events.len());

// drop input_event after comparing with response
{
let log_event = input_event.into_log();
let expected = serde_json::to_value(&log_event).unwrap();
let actual = serde_json::to_value(&events[0]).unwrap();
assert_eq!(expected, actual);
}

assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
}

#[tokio::test]
async fn test_postgres_sink() {
Copy link
Contributor Author

@jorgehermo9 jorgehermo9 Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think just a single test is too little.. But I couldn't figure out anything else to test. This test is very similar to the integration tests from databend sink

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some more interesting things we can do here, at the very least send more than one events. Also, we could test failures such as sending a badly formatted payload.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, going to include more tests!

let (cfg, table, connection) = prepare_config().await;
insert_event_with_cfg(cfg, table, connection).await;
}
7 changes: 7 additions & 0 deletions src/sinks/postgres/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod config;
#[cfg(all(test, feature = "postgres-integration-tests"))]

Check failure on line 2 in src/sinks/postgres/mod.rs

View workflow job for this annotation

GitHub Actions / Checks

unexpected `cfg` condition value: `postgres-integration-tests`
mod integration_tests;
mod service;
mod sink;

pub use self::config::PostgresConfig;
Loading
Loading