Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(architecture): consolidate sink I/O driver logic into reusable component #9215

Merged
merged 14 commits into from
Oct 5, 2021
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/vector-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ tokio = { version = "1.11.0", default-features = false }
tokio-stream = { version = "0.1", default-features = false, optional = true }
tokio-util = { version = "0.6.8", default-features = false, features = ["time"] }
toml = { version = "0.5.8", default-features = false }
tower = { version = "0.4", default-features = false }
tracing = { version = "0.1.27", default-features = false }
tracing-core = { version = "0.1.20", default-features = false }
tracing-log = { version = "0.1.2", default-features = false }
Expand Down
10 changes: 10 additions & 0 deletions lib/vector-core/buffers/src/acker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@ use metrics::counter;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

/// A value that can be acknowledged.
///
/// This is used to define how many events should be acknowledged when this value has been
/// processed. Since the value might be tied to a single event, or to multiple events, this
/// provides a generic mechanism for gathering the number of events to acknowledge.
pub trait Ackable {
/// Number of events to acknowledge for this value.
fn ack_size(&self) -> usize;
}

#[derive(Debug, Clone)]
pub enum Acker {
Disk(Arc<AtomicUsize>, Arc<AtomicWaker>),
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-core/buffers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mod test;
mod variant;

use crate::bytes::{DecodeBytes, EncodeBytes};
pub use acker::Acker;
pub use acker::{Ackable, Acker};
use futures::{channel::mpsc, Sink, SinkExt, Stream};
use pin_project::pin_project;
#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions lib/vector-core/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod batcher;
pub mod driver;
149 changes: 149 additions & 0 deletions lib/vector-core/src/stream/driver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use std::{collections::HashMap, fmt, marker::PhantomData};

use buffers::{Ackable, Acker};
use futures::{stream::FuturesUnordered, FutureExt, Stream, StreamExt, TryFutureExt};
use tokio::{pin, select, sync::oneshot};
use tower::{Service, ServiceExt};
use tracing::Instrument;

use crate::event::{EventStatus, Finalizable};

/// Drives the interaction between a stream of items and a service which processes them
/// asynchronously.
///
/// `Driver`, as a high-level, facilitates taking items from an arbitrary `Stream` and pushing them
/// through a `Service`, spawning each call to the service so that work can be run concurrently,
/// managing waiting for the service to be ready before processing more items, and so on.
///
/// Additionally, `Driver` handles two event-specific facilities: finalization and acknowledgement.
///
/// This capability is parameterized so any implementation which can define how to interpret the
/// response for each request, as well as define how many events a request is compromised of, can be
/// used with `Driver`.
pub struct Driver<St, Svc, Req>
where
Svc: Service<Req>,
{
input: St,
service: Svc,
acker: Acker,
_req: PhantomData<Req>,
}

impl<St, Svc, Req> Driver<St, Svc, Req>
where
Svc: Service<Req>,
{
pub fn new(input: St, service: Svc, acker: Acker) -> Self {
Self {
input,
service,
acker,
_req: PhantomData,
}
}
}

impl<St, Svc, Req> Driver<St, Svc, Req>
where
St: Stream<Item = Req>,
Svc: Service<Req>,
Svc::Error: fmt::Debug + 'static,
Svc::Future: Send + 'static,
Svc::Response: AsRef<EventStatus>,
Req: Ackable + Finalizable,
{
/// Runs the driver until the input stream is exhausted.
///
/// All in-flight calls to the provided `service` will also be completed before `run` returns.
///
/// # Errors
///
/// No errors are currently returned. Te return type is purely to simplify caller code, but may
/// return an error for a legitimate reason in the future.
pub async fn run(self) -> Result<(), ()> {
let in_flight = FuturesUnordered::new();
let mut pending_acks = HashMap::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small note that we could probably parameterize this with twox hash or some such if we wanted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the exact same thought. There's actually a nohash-hasher crate since our keys are already u64. I guess if we've both had this thought, maybe I should just go ahead and do it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL about nohash-hasher.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is almost a glorified VecDeque. I wonder if it could be emulated with a wrapper around VecDequeue<Option<(u64, u64)>> where inserting inserts "blanks" if it's not actually sequentially next.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point regarding VecDeque. I'll have to think about your idea... off the top of my head, I can't visualize it, so might just be one of those things I need to write down first to grok.

Copy link
Contributor Author

@tobz tobz Oct 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 05bc5f3, I reworked this as a BinaryHeap because I didn't like the idea of having to insert blanks that we would then had to do a linear scan to find when the call finishes and we want to mark the pending acknowledgement as completed.

Admittedly, though, the code using BinaryHeap is longer than the HashMap approach.

let mut seq_head: u64 = 0;
let mut seq_tail: u64 = 0;

let Self {
input,
mut service,
acker,
..
} = self;

pin!(input);
pin!(in_flight);

loop {
select! {
tobz marked this conversation as resolved.
Show resolved Hide resolved
// We've received an item from the input stream.
Some(req) = input.next() => {
// Rebind the variable to avoid a bug with the pattern matching
// in `select!`: https://github.com/tokio-rs/tokio/issues/4076
let mut req = req;
let seqno = seq_head;
seq_head += 1;

let (tx, rx) = oneshot::channel();
tobz marked this conversation as resolved.
Show resolved Hide resolved

in_flight.push(rx);

trace!(
message = "Submitting service request.",
in_flight_requests = in_flight.len()
);
let ack_size = req.ack_size();
let finalizers = req.take_finalizers();

let svc = service.ready().await.expect("should not get error when waiting for svc readiness");
let fut = svc.call(req)
.err_into()
.map(move |result: Result<Svc::Response, Svc::Error>| {
let status = match result {
Err(error) => {
error!(message = "Service call failed.", ?error, seqno);
EventStatus::Failed
},
Ok(response) => {
trace!(message = "Service call succeeded.", seqno);
*response.as_ref()
}
};
finalizers.update_status(status);

// The receiver could drop before we reach this point if Driver`
// goes away as part of a sink closing. We can't do anything
// about it, so just silently ignore the error.
let _ = tx.send((seqno, ack_size));
})
.instrument(info_span!("request", request_id = %seqno));
tokio::spawn(fut);
},

// One of our service calls has completed.
Some(Ok((seqno, ack_size))) = in_flight.next() => {
trace!(message = "Sending request.", seqno, ack_size);
pending_acks.insert(seqno, ack_size);

let mut num_to_ack = 0;
while let Some(ack_size) = pending_acks.remove(&seq_tail) {
num_to_ack += ack_size;
seq_tail += 1;
}

if num_to_ack > 0 {
trace!(message = "Acking events.", ack_size = num_to_ack);
acker.ack(num_to_ack);
}
},

else => break
}
}

Ok(())
}
}
Loading