-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore(architecture): consolidate sink I/O driver logic into reusable component #9215
Conversation
…component Signed-off-by: Toby Lawrence <[email protected]>
Signed-off-by: Toby Lawrence <[email protected]>
✔️ Deploy Preview for vector-project canceled. 🔨 Explore the source changes: 8957e8f 🔍 Inspect the deploy log: https://app.netlify.com/sites/vector-project/deploys/615b528c1dec8e0008419f77 |
lib/vector-core/src/stream/driver.rs
Outdated
/// return an error for a legitimate reason in the future. | ||
pub async fn run(self) -> Result<(), ()> { | ||
let in_flight = FuturesUnordered::new(); | ||
let mut pending_acks = HashMap::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small note that we could probably parameterize this with twox hash or some such if we wanted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the exact same thought. There's actually a nohash-hasher
crate since our keys are already u64
. I guess if we've both had this thought, maybe I should just go ahead and do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL about nohash-hasher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is almost a glorified VecDeque
. I wonder if it could be emulated with a wrapper around VecDequeue<Option<(u64, u64)>>
where inserting inserts "blanks" if it's not actually sequentially next.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point regarding VecDeque
. I'll have to think about your idea... off the top of my head, I can't visualize it, so might just be one of those things I need to write down first to grok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In 05bc5f3, I reworked this as a BinaryHeap
because I didn't like the idea of having to insert blanks that we would then had to do a linear scan to find when the call finishes and we want to mark the pending acknowledgement as completed.
Admittedly, though, the code using BinaryHeap
is longer than the HashMap
approach.
I really like where this is headed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A big 👍🏻 to the reusable component in vector-core
.
lib/vector-core/src/stream/driver.rs
Outdated
/// return an error for a legitimate reason in the future. | ||
pub async fn run(self) -> Result<(), ()> { | ||
let in_flight = FuturesUnordered::new(); | ||
let mut pending_acks = HashMap::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is almost a glorified VecDeque
. I wonder if it could be emulated with a wrapper around VecDequeue<Option<(u64, u64)>>
where inserting inserts "blanks" if it's not actually sequentially next.
Signed-off-by: Toby Lawrence <[email protected]>
Signed-off-by: Toby Lawrence <[email protected]>
a8a2430
to
d24f943
Compare
Signed-off-by: Toby Lawrence <[email protected]>
Co-authored-by: Bruce Guenter <[email protected]>
Signed-off-by: Toby Lawrence <[email protected]>
Signed-off-by: Toby Lawrence <[email protected]>
d24f943
to
45d95cd
Compare
…eamify-new-sink-io-task Signed-off-by: Toby Lawrence <[email protected]>
1f30482
to
021a974
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still working through this, but a couple quick notes.
Signed-off-by: Toby Lawrence <[email protected]>
Signed-off-by: Toby Lawrence <[email protected]>
Signed-off-by: Toby Lawrence <[email protected]>
Signed-off-by: Toby Lawrence <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great to me! Just one note about a log message.
let result: Result<(u64, usize), JoinError> = result; | ||
match result { | ||
Ok((seq_no, ack_size)) => { | ||
trace!(message = "Sending request.", seq_no, ack_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this message accurate? It seems like we've already sent the request and gotten a response at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh! Good catch.
@@ -220,7 +220,7 @@ mod integration_tests { | |||
|
|||
let (_lines, events, receiver) = make_events_batch(1, 1); | |||
sink.run(events).await.unwrap(); | |||
assert_eq!(receiver.await, BatchStatus::Errored); | |||
assert_eq!(receiver.await, BatchStatus::Failed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear to me why this switch would happen, but I'm not opposed. Mostly I want to flag that there's some logic behind this change, which I am missing. I have to look up the difference between 'failed' and 'errored' every time; they are "at least one event in the batch had a permanent failure" and "at least one event in the batch had a transient error in delivery" respectively. These definitions lead to tricky questions about priority if a batch has an event that suffered a permanent failure and also a transient error, so I would tend to imagine the line between them is blurry in practice in vector today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've noticed you've left the Datadog log sink's run_io
in place, here. I'd be happy to see that converted in this PR or in a follow-up issue, whichever your preference. I had a question about a follow-up issue for a movement of code into core as well.
That said, this is excellent and I look forward to it being merged up.
@blt I'll be working on an immediate follow-up PR where I handle some of the code movement into |
I'm for it. Merge this thing. |
…component (#9215) * chore(architecture): consolidate sink I/O driver logic into reusable component Signed-off-by: Toby Lawrence <[email protected]>
This PR moves the logic previously baked into the
run_io
method for the S3 sink into a reusable component calledDriver
. Simply put, you give it aStream
of items which can be used as the request for aService
, and it handles building calls for each item, as well as providing finalization and acking as the responses come through.As it stands, we've got a lot of changes going on here as part of trying to make the boilerplate specific to sinks be as simple/flexible as possible. To name a few:
Encoder
, that allows defining a type that can "encode" a single event (EncodingConfiguration
+Encoding
enums)StandardEncodings
enum that implementsEncoder
for the most common encodingsCompression
tweaks slightly to depend more on defined compression levels fromflate2
itselfRequestBuilder
, to try and provide a starting point for generic "how do we take events and make a request?" interfaceSinkBuilderExt
, to wrap streams of events with common building blocks, such as batching, request building,Stream
->Service
driving, for building a complete sink