-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(postgres sink): Add postgres sink #21248
base: master
Are you sure you want to change the base?
feat(postgres sink): Add postgres sink #21248
Conversation
} | ||
|
||
/// Configuration for the `postgres` sink. | ||
#[configurable_component(sink("postgres", "Deliver log data to a PostgreSQL database."))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should I call this sink postgres
or postgres_logs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm good question, could this evolve to handle both logs and metrics in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking if this can evolve to integrate with other postgres flavours such as timescaledb, which is oriented to time series
My thoughts on this: #21308 (comment)
Timescaledb tracking issue: #939
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be interesting to change the input
vector/src/sinks/postgres/config.rs
Line 110 in 5b1ca45
Input::log() |
// TODO: If a single item of the batch fails, the whole batch will fail its insert. | ||
// Is this intended behaviour? | ||
sqlx::query(&format!( | ||
"INSERT INTO {table} SELECT * FROM jsonb_populate_recordset(NULL::{table}, $1)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the table configuration can be a victim of sql injection, but in my opinion, we shouldn't avoid that kind of attacks at this level and the user should be responsible of ensuring that there is not sql injection in the config... The databend
sink works like this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I suppose sqlx
does not support parameterized table names? Does the query builder help here? If none of the above works, then we can leave as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that could help in this case. See this statement about sqlx
's query builder.
And we cannot use a variable bind ($
syntax) in postgres for table names, as the prepared statements are bounded to a query plan and it cannot change if the target table changes.
I think this is the better way to do it... sqlx
does not check for sql injection
pub endpoint: String, | ||
|
||
/// The table that data is inserted into. | ||
pub table: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make the table templatable? Like the clickhouse
sink. That would complicate the code a little bit (with KeyPartitioner
and so. If yes, I would like some guidance about it if possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a nice feature but not a must-have, we can do this incrementally. Once we finalized the rest of the comments we can come back to this if you are motivated to add this feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay!
} | ||
|
||
#[tokio::test] | ||
async fn test_postgres_sink() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think just a single test is too little.. But I couldn't figure out anything else to test. This test is very similar to the integration tests from databend
sink
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some more interesting things we can do here, at the very least send more than one events. Also, we could test failures such as sending a badly formatted payload.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, going to include more tests!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good for docs
use std::future::ready; | ||
use vector_lib::event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event, LogEvent}; | ||
|
||
fn pg_host() -> String { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copied those utility functions from the postgres_metrics
source integration tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do some small refactoring here.
- The folder
src/test_util
seems like a good location to put these utils. - You can use
#[cfg(any(feature = "postgresql_metrics-integration-tests", feature = "postgres_sink-integration-tests"))]
insrc/test_util/mod.rs
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @jorgehermo9, thank you for this sizable contribution! On a high level, it looks great. I did a first review and left some comments. Don't hesitate to follow up, happy to discuss details.
} | ||
|
||
/// Configuration for the `postgres` sink. | ||
#[configurable_component(sink("postgres", "Deliver log data to a PostgreSQL database."))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm good question, could this evolve to handle both logs and metrics in the future?
pub endpoint: String, | ||
|
||
/// The table that data is inserted into. | ||
pub table: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a nice feature but not a must-have, we can do this incrementally. Once we finalized the rest of the comments we can come back to this if you are motivated to add this feature.
// TODO: If a single item of the batch fails, the whole batch will fail its insert. | ||
// Is this intended behaviour? | ||
sqlx::query(&format!( | ||
"INSERT INTO {table} SELECT * FROM jsonb_populate_recordset(NULL::{table}, $1)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I suppose sqlx
does not support parameterized table names? Does the query builder help here? If none of the above works, then we can leave as is.
src/sinks/postgres/config.rs
Outdated
/// The table that data is inserted into. | ||
pub table: String, | ||
|
||
/// The postgres connection pool size. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here it would be useful to explain what this pool is used for. Maybe a link to relevant docs would suffice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done in 21f3ad3. Do you think it is enough?
I also have doubts about using a connection pool. Can the event batches be executed in parallel for this sink? I don't know the specifics of vector's internals...
vector/src/sinks/postgres/sink.rs
Line 24 in 21f3ad3
.batched(self.batch_settings.as_byte_size_config()) |
If the batches of events can be processed in parallel, then a connection pool is beneficial. If the batches are processed sequentially, then we should use a single postgres connection as a pooled connection does not have sense
use std::future::ready; | ||
use vector_lib::event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event, LogEvent}; | ||
|
||
fn pg_host() -> String { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do some small refactoring here.
- The folder
src/test_util
seems like a good location to put these utils. - You can use
#[cfg(any(feature = "postgresql_metrics-integration-tests", feature = "postgres_sink-integration-tests"))]
insrc/test_util/mod.rs
.
} | ||
|
||
#[tokio::test] | ||
async fn test_postgres_sink() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some more interesting things we can do here, at the very least send more than one events. Also, we could test failures such as sending a badly formatted payload.
Thank you very much for the review @pront! I'm kinda busy these days but I will revisit this as soon as I can :) |
There are a few failing checks. Also, let's add a new |
I will work on this PR these days, I'll ping you whenever it is ready for another round. Thank you so much @pront! |
Closes #15765
This PR is not 100% ready by my side and there will likely be a few things wrong, but had a few questions and wanted to know if the direction seems right... So I would like an initial round of review if possible.
I tested the sink and it seems to be working, but I lack a lot of knowledge about Vector's internals and I'm not sure if the implementation is okay.
I inspired a lot from the
databend
andclickhouse
sinks, but left a few questions as TODOs in the source. I found this sink a bit different from the others, as the others had therequest_builder
thing and encoding the payload in bytes (as most of the sinks are http based).. But I didn't think that fitted well in this case, as in thesqlx
API I should wrap the events with thesqlx::types::Json
type and that will do all the encoding withserde
internally.If someone want to manually test it, I used this Vector config:
Run postgres server with
podman run -e POSTGRES_PASSWORD=postgres -p 5432:5432 docker.io/postgres
and execute the following with
psql -h localhost -U postgres
:then execute
\c test
and last:
And then, you will see logs in that table: