diff --git a/Cargo.lock b/Cargo.lock index 3508dd5271b88..025c52bd23f4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7756,6 +7756,7 @@ dependencies = [ "tokio-test", "tokio-util", "toml", + "tower", "tracing 0.1.28", "tracing-core 0.1.20", "tracing-log", diff --git a/lib/vector-core/Cargo.toml b/lib/vector-core/Cargo.toml index 29ff0c15d97a1..d466132fb08f2 100644 --- a/lib/vector-core/Cargo.toml +++ b/lib/vector-core/Cargo.toml @@ -45,6 +45,7 @@ tokio = { version = "1.12.0", default-features = false } tokio-stream = { version = "0.1", default-features = false, optional = true } tokio-util = { version = "0.6.8", default-features = false, features = ["time"] } toml = { version = "0.5.8", default-features = false } +tower = { version = "0.4", default-features = false } tracing = { version = "0.1.28", default-features = false } tracing-core = { version = "0.1.20", default-features = false } tracing-log = { version = "0.1.2", default-features = false } diff --git a/lib/vector-core/buffers/src/acker.rs b/lib/vector-core/buffers/src/acker.rs index 6ded32e8c75e2..6871b6512b3af 100644 --- a/lib/vector-core/buffers/src/acker.rs +++ b/lib/vector-core/buffers/src/acker.rs @@ -3,6 +3,16 @@ use metrics::counter; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +/// A value that can be acknowledged. +/// +/// This is used to define how many events should be acknowledged when this value has been +/// processed. Since the value might be tied to a single event, or to multiple events, this +/// provides a generic mechanism for gathering the number of events to acknowledge. +pub trait Ackable { + /// Number of events to acknowledge for this value. + fn ack_size(&self) -> usize; +} + #[derive(Debug, Clone)] pub enum Acker { Disk(Arc, Arc), diff --git a/lib/vector-core/buffers/src/lib.rs b/lib/vector-core/buffers/src/lib.rs index ce17632b76192..93dfdbccf7a18 100644 --- a/lib/vector-core/buffers/src/lib.rs +++ b/lib/vector-core/buffers/src/lib.rs @@ -20,7 +20,7 @@ mod test; mod variant; use crate::bytes::{DecodeBytes, EncodeBytes}; -pub use acker::Acker; +pub use acker::{Ackable, Acker}; use futures::{channel::mpsc, Sink, SinkExt, Stream}; use pin_project::pin_project; #[cfg(test)] diff --git a/lib/vector-core/src/event/mod.rs b/lib/vector-core/src/event/mod.rs index 05fe0095ec0f5..d01934398673a 100644 --- a/lib/vector-core/src/event/mod.rs +++ b/lib/vector-core/src/event/mod.rs @@ -66,6 +66,16 @@ impl Finalizable for Event { } } +impl Finalizable for Vec { + fn take_finalizers(&mut self) -> EventFinalizers { + self.iter_mut() + .fold(EventFinalizers::default(), |mut acc, x| { + acc.merge(x.take_finalizers()); + acc + }) + } +} + impl Event { #[must_use] pub fn new_empty_log() -> Self { diff --git a/lib/vector-core/src/stream.rs b/lib/vector-core/src/stream.rs index de0a2da5a96e1..8463eeb9759ad 100644 --- a/lib/vector-core/src/stream.rs +++ b/lib/vector-core/src/stream.rs @@ -1 +1,2 @@ pub mod batcher; +pub mod driver; diff --git a/lib/vector-core/src/stream/driver.rs b/lib/vector-core/src/stream/driver.rs new file mode 100644 index 0000000000000..1fe58687b56a5 --- /dev/null +++ b/lib/vector-core/src/stream/driver.rs @@ -0,0 +1,189 @@ +use std::{collections::BinaryHeap, fmt}; + +use buffers::{Ackable, Acker}; +use futures::{stream::FuturesUnordered, FutureExt, Stream, StreamExt, TryFutureExt}; +use tokio::{pin, select, task::JoinError}; +use tower::{Service, ServiceExt}; +use tracing::Instrument; + +use crate::event::{EventStatus, Finalizable}; + +#[derive(Eq)] +struct PendingAcknowledgement { + seq_no: u64, + ack_size: usize, +} + +impl PartialEq for PendingAcknowledgement { + fn eq(&self, other: &Self) -> bool { + self.seq_no == other.seq_no + } +} + +impl PartialOrd for PendingAcknowledgement { + fn partial_cmp(&self, other: &Self) -> Option { + // Reverse ordering so that in a `BinaryHeap`, the lowest sequence number is the highest priority. + Some(other.seq_no.cmp(&self.seq_no)) + } +} + +impl Ord for PendingAcknowledgement { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + other + .partial_cmp(self) + .expect("PendingAcknowledgement should always return a valid comparison") + } +} + +/// Drives the interaction between a stream of items and a service which processes them +/// asynchronously. +/// +/// `Driver`, as a high-level, facilitates taking items from an arbitrary `Stream` and pushing them +/// through a `Service`, spawning each call to the service so that work can be run concurrently, +/// managing waiting for the service to be ready before processing more items, and so on. +/// +/// Additionally, `Driver` handles two event-specific facilities: finalization and acknowledgement. +/// +/// This capability is parameterized so any implementation which can define how to interpret the +/// response for each request, as well as define how many events a request is compromised of, can be +/// used with `Driver`. +pub struct Driver { + input: St, + service: Svc, + acker: Acker, +} + +impl Driver { + pub fn new(input: St, service: Svc, acker: Acker) -> Self { + Self { + input, + service, + acker, + } + } +} + +impl Driver +where + St: Stream, + St::Item: Ackable + Finalizable, + Svc: Service, + Svc::Error: fmt::Debug + 'static, + Svc::Future: Send + 'static, + Svc::Response: AsRef, +{ + /// Runs the driver until the input stream is exhausted. + /// + /// All in-flight calls to the provided `service` will also be completed before `run` returns. + /// + /// # Errors + /// + /// No errors are currently returned. The return type is purely to simplify caller code, but may + /// return an error for a legitimate reason in the future. + pub async fn run(self) -> Result<(), ()> { + let mut in_flight = FuturesUnordered::new(); + let mut pending_acks = BinaryHeap::new(); + let mut seq_head: u64 = 0; + let mut seq_tail: u64 = 0; + + let Self { + input, + mut service, + acker, + } = self; + + pin!(input); + + loop { + select! { + // Using `biased` ensures we check the branches in the order they're written, and + // the way they're ordered is to ensure that we're reacting to completed requests as + // soon as possible to acknowledge them and make room for more requests to be processed. + biased; + + // One of our service calls has completed. + Some(result) = in_flight.next() => { + // Rebind so we can annotate the type, otherwise the compiler is inexplicably angry. + let result: Result<(u64, usize), JoinError> = result; + match result { + Ok((seq_no, ack_size)) => { + trace!(message = "Sending request.", seq_no, ack_size); + pending_acks.push(PendingAcknowledgement { seq_no, ack_size }); + + let mut num_to_ack = 0; + while let Some(pending_ack) = pending_acks.peek() { + if pending_ack.seq_no == seq_tail { + let PendingAcknowledgement { ack_size, .. } = pending_acks.pop() + .expect("should not be here unless pending_acks is non-empty"); + num_to_ack += ack_size; + seq_tail += 1; + } else { + break + } + } + + if num_to_ack > 0 { + trace!(message = "Acking events.", ack_size = num_to_ack); + acker.ack(num_to_ack); + } + }, + Err(e) => { + if e.is_panic() { + error!("driver service call unexpectedly panicked"); + } + + if e.is_cancelled() { + error!("driver service call unexpectedly cancelled"); + } + }, + } + } + + // We've received an item from the input stream. + Some(req) = input.next() => { + // Rebind the variable to avoid a bug with the pattern matching + // in `select!`: https://github.com/tokio-rs/tokio/issues/4076 + let mut req = req; + let seqno = seq_head; + seq_head += 1; + + trace!( + message = "Submitting service request.", + in_flight_requests = in_flight.len() + ); + let ack_size = req.ack_size(); + let finalizers = req.take_finalizers(); + + let svc = service.ready().await.expect("should not get error when waiting for svc readiness"); + let fut = svc.call(req) + .err_into() + .map(move |result: Result| { + let status = match result { + Err(error) => { + error!(message = "Service call failed.", ?error, seqno); + EventStatus::Failed + }, + Ok(response) => { + trace!(message = "Service call succeeded.", seqno); + *response.as_ref() + } + }; + finalizers.update_status(status); + + (seqno, ack_size) + }) + .instrument(info_span!("request", request_id = %seqno)); + + let handle = tokio::spawn(fut); + in_flight.push(handle); + } + + else => { + break + } + } + } + + Ok(()) + } +} diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index d659f8109fca0..147573921e93a 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -1,5 +1,6 @@ use crate::config::SinkContext; use crate::sinks::s3_common::sink::S3Sink; +use crate::sinks::util::encoding::StandardEncodings; use crate::{ config::{DataType, GenerateConfig, ProxyConfig, SinkConfig}, rusoto::{AwsAuthentication, RegionOrEndpoint}, @@ -24,6 +25,8 @@ use std::num::NonZeroUsize; use tower::ServiceBuilder; use vector_core::sink::VectorSink; +use super::sink::S3RequestOptions; + const DEFAULT_REQUEST_LIMITS: TowerRequestConfig = { TowerRequestConfig::const_new(Concurrency::Fixed(50), Concurrency::Fixed(50)) .rate_limit_num(250) @@ -53,7 +56,7 @@ pub struct S3SinkConfig { pub options: S3Options, #[serde(flatten)] pub region: RegionOrEndpoint, - pub encoding: EncodingConfig, + pub encoding: EncodingConfig, #[serde(default = "Compression::gzip_default")] pub compression: Compression, #[serde(default)] @@ -66,24 +69,6 @@ pub struct S3SinkConfig { pub auth: AwsAuthentication, } -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)] -#[serde(rename_all = "snake_case")] -pub enum Encoding { - Text, - Ndjson, -} - -#[derive(Clone)] -pub struct S3RequestOptions { - pub bucket: String, - pub filename_time_format: String, - pub filename_append_uuid: bool, - pub filename_extension: Option, - pub api_options: S3Options, - pub encoding: EncodingConfig, - pub compression: Compression, -} - impl GenerateConfig for S3SinkConfig { fn generate_config() -> toml::Value { toml::Value::try_from(Self { @@ -94,7 +79,7 @@ impl GenerateConfig for S3SinkConfig { filename_extension: None, options: S3Options::default(), region: RegionOrEndpoint::default(), - encoding: Encoding::Text.into(), + encoding: StandardEncodings::Text.into(), compression: Compression::gzip_default(), batch: BatchConfig::default(), request: TowerRequestConfig::default(), @@ -174,6 +159,8 @@ impl S3SinkConfig { service, request_options, partitioner, + self.encoding.clone(), + self.compression, batch_size_bytes, batch_size_events, batch_timeout, diff --git a/src/sinks/aws_s3/sink.rs b/src/sinks/aws_s3/sink.rs index 8eab55c0127cc..d7da622120f1d 100644 --- a/src/sinks/aws_s3/sink.rs +++ b/src/sinks/aws_s3/sink.rs @@ -1,261 +1,76 @@ -use crate::sinks::aws_s3::config::S3RequestOptions; use crate::{ - config::log_schema, event::Event, sinks::{ - aws_s3::config::Encoding, - s3_common::{ - service::S3Request, - sink::{process_event_batch, S3EventEncoding, S3RequestBuilder}, + s3_common::{config::S3Options, service::S3Request}, + util::{ + encoding::{EncodingConfig, StandardEncodings}, + Compression, RequestBuilder, }, - util::encoding::EncodingConfiguration, }, }; +use bytes::Bytes; use chrono::Utc; -use std::io::{self, Write}; use uuid::Uuid; - -impl S3EventEncoding for S3RequestOptions { - fn encode_event(&mut self, mut event: Event, mut writer: &mut dyn Write) -> io::Result<()> { - self.encoding.apply_rules(&mut event); - - let log = event.into_log(); - match self.encoding.codec() { - Encoding::Ndjson => { - let _ = serde_json::to_writer(&mut writer, &log)?; - writer.write_all(b"\n") - } - Encoding::Text => { - let buf = log - .get(log_schema().message_key()) - .map(|v| v.as_bytes()) - .unwrap_or_default(); - let _ = writer.write_all(&buf)?; - writer.write_all(b"\n") - } - } - } +use vector_core::event::{EventFinalizers, Finalizable}; + +#[derive(Clone)] +pub struct S3RequestOptions { + pub bucket: String, + pub filename_time_format: String, + pub filename_append_uuid: bool, + pub filename_extension: Option, + pub api_options: S3Options, + pub encoding: EncodingConfig, + pub compression: Compression, } -impl S3RequestBuilder for S3RequestOptions { - fn build_request(&mut self, key: String, batch: Vec) -> S3Request { - { - // Generate the filename for this batch, which involves a surprising amount - // of code. - let filename = { - /* - Since this is generic over the partitioner, for purposes of unit tests, - we can't get the compiler to let us define a conversion trait such that - we can get &Event from &P::Item, or I at least don't know how to - trivially do that. I'm leaving this snippet here because it embodies - the prior TODO comment of using the timestamp of the last event in the - batch rather than the current time. - - Now that I think of it... is that even right? Do customers want logs - with timestamps in them related to the last event contained within, or - do they want timestamps that include when the file was generated and - dropped into the bucket? My gut says "time when the log dropped" but - maybe not... - - let last_event_ts = batch - .items() - .iter() - .last() - .and_then(|e| match e.into() { - // If the event has a field called timestamp, in RFC3339 format, use that. - Event::Log(le) => le - .get(log_schema().timestamp_key()) - .cloned() - .and_then(|ts| match ts { - Value::Timestamp(ts) => Some(ts), - Value::Bytes(buf) => std::str::from_utf8(&buf) - .ok() - .and_then(|s| DateTime::parse_from_rfc3339(s).ok()) - .map(|dt| dt.with_timezone(&Utc)), - _ => None, - }), - // TODO: We don't ship metrics to the S3, but if we did, would this be right? or is - // there an actual field we should be checking similar to above? - Event::Metric(_) => Some(Utc::now()), - }) - .unwrap_or_else(|| Utc::now()); - let formatted_ts = last_event_ts.format(&time_format); - */ - let formatted_ts = Utc::now().format(self.filename_time_format.as_str()); +impl RequestBuilder<(String, Vec)> for S3RequestOptions { + type Metadata = (String, usize, EventFinalizers); + type Events = Vec; + type Payload = Bytes; + type Request = S3Request; - if self.filename_append_uuid { - let uuid = Uuid::new_v4(); - format!("{}-{}", formatted_ts, uuid.to_hyphenated()) - } else { - formatted_ts.to_string() - } - }; + fn split_input(&self, input: (String, Vec)) -> (Self::Metadata, Self::Events) { + let (partition_key, mut events) = input; + let finalizers = events.take_finalizers(); - let extension = self - .filename_extension - .as_ref() - .cloned() - .unwrap_or_else(|| self.compression.extension().into()); - let key = format!("{}/{}.{}", key, filename, extension); - - // Process our events. This does all of the necessary encoding rule - // application, as well as encoding and compressing the events. We're - // handed back a tidy `Bytes` instance we can send directly to S3. - let batch_size = batch.len(); - let (body, finalizers) = process_event_batch(batch, self, self.compression); - - debug!( - message = "Sending events.", - bytes = ?body.len(), - bucket = ?self.bucket, - key = ?key - ); - - S3Request { - body, - bucket: self.bucket.clone(), - key, - content_encoding: self.compression.content_encoding(), - options: self.api_options.clone(), - batch_size, - finalizers, - } - } + ((partition_key, events.len(), finalizers), events) } -} -#[cfg(test)] -mod tests { - use std::{collections::BTreeMap, io::Cursor}; + fn build_request(&self, metadata: Self::Metadata, payload: Self::Payload) -> Self::Request { + let (key, batch_size, finalizers) = metadata; - use crate::{sinks::s3_common::config::S3Options, sinks::util::encoding::EncodingConfig}; - use vector_core::partition::Partitioner; + let filename = { + let formatted_ts = Utc::now().format(self.filename_time_format.as_str()); - use super::*; - use crate::sinks::util::Compression; - - #[derive(Clone, Default)] - struct TestPartitioner; - - impl Partitioner for TestPartitioner { - type Item = Event; - type Key = &'static str; - - fn partition(&self, _: &Self::Item) -> Self::Key { - "key" - } - } - - #[test] - fn s3_encode_event_text() { - let message = "hello world".to_string(); - let mut writer = Cursor::new(Vec::new()); - let mut request_options = S3RequestOptions { - encoding: EncodingConfig::from(Encoding::Text), - ..request_options() + self.filename_append_uuid + .then(|| format!("{}-{}", formatted_ts, Uuid::new_v4().to_hyphenated())) + .unwrap_or_else(|| formatted_ts.to_string()) }; - let _ = request_options - .encode_event(message.clone().into(), &mut writer) - .expect("should not have failed to encode event"); - let encoded = writer.into_inner(); - - let encoded_message = message + "\n"; - assert_eq!(encoded.as_slice(), encoded_message.as_bytes()); - } - - #[test] - fn s3_encode_event_ndjson() { - let message = "hello world".to_string(); - let mut event = Event::from(message.clone()); - event.as_mut_log().insert("key", "value"); - - let mut writer = Cursor::new(Vec::new()); - let mut request_options = S3RequestOptions { - encoding: EncodingConfig::from(Encoding::Ndjson), - ..request_options() - }; - let _ = request_options - .encode_event(event, &mut writer) - .expect("should not have failed to encode event"); - let encoded = writer.into_inner(); - let map: BTreeMap = serde_json::from_slice(encoded.as_slice()).unwrap(); - - assert_eq!(map[&log_schema().message_key().to_string()], message); - assert_eq!(map["key"], "value".to_string()); - } - - #[test] - fn s3_encode_event_with_removed_key() { - let encoding_config = EncodingConfig { - codec: Encoding::Ndjson, - schema: None, - only_fields: None, - except_fields: Some(vec!["key".into()]), - timestamp_format: None, - }; - - let message = "hello world".to_string(); - let mut event = Event::from(message.clone()); - event.as_mut_log().insert("key", "value"); - - let mut writer = Cursor::new(Vec::new()); - let mut request_options = S3RequestOptions { - encoding: encoding_config, - ..request_options() - }; - let _ = request_options - .encode_event(event, &mut writer) - .expect("should not have failed to encode event"); - let encoded = writer.into_inner(); - let map: BTreeMap = serde_json::from_slice(encoded.as_slice()).unwrap(); - - assert_eq!(map[&log_schema().message_key().to_string()], message); - assert!(!map.contains_key("key")); - } - - #[test] - fn s3_build_request() { - let partitioner = TestPartitioner::default(); - - let event = "hello world".into(); - let partition_key = partitioner.partition(&event).to_string(); - let finished_batch = vec![event]; - - let mut settings = request_options(); - let req = settings.build_request(partition_key.clone(), finished_batch.clone()); - assert_eq!(req.key, "key/date.ext"); - - let mut settings = S3RequestOptions { - filename_extension: None, - ..request_options() - }; - let req = settings.build_request(partition_key.clone(), finished_batch.clone()); - assert_eq!(req.key, "key/date.log"); - - let mut settings = S3RequestOptions { - compression: Compression::gzip_default(), - ..settings - }; - let req = settings.build_request(partition_key.clone(), finished_batch.clone()); - assert_eq!(req.key, "key/date.log.gz"); - - let mut settings = S3RequestOptions { - filename_append_uuid: true, - ..settings - }; - let req = settings.build_request(partition_key, finished_batch); - assert_ne!(req.key, "key/date.log.gz"); - } - fn request_options() -> S3RequestOptions { - S3RequestOptions { - bucket: "bucket".into(), - filename_time_format: "date".into(), - filename_append_uuid: false, - filename_extension: Some("ext".into()), - api_options: S3Options::default(), - encoding: Encoding::Text.into(), - compression: Compression::None, + let extension = self + .filename_extension + .as_ref() + .cloned() + .unwrap_or_else(|| self.compression.extension().into()); + let key = format!("{}/{}.{}", key, filename, extension); + + trace!( + message = "Sending events.", + bytes = ?payload.len(), + events_len = ?batch_size, + bucket = ?self.bucket, + key = ?key + ); + + S3Request { + body: payload, + bucket: self.bucket.clone(), + key, + content_encoding: self.compression.content_encoding(), + options: self.api_options.clone(), + batch_size, + finalizers, } } } diff --git a/src/sinks/aws_s3/tests.rs b/src/sinks/aws_s3/tests.rs index 85588c348769b..c0f2c49cd5932 100644 --- a/src/sinks/aws_s3/tests.rs +++ b/src/sinks/aws_s3/tests.rs @@ -3,9 +3,9 @@ mod integration_tests { use crate::config::SinkContext; use crate::rusoto::RegionOrEndpoint; - use crate::sinks::aws_s3::config::Encoding; use crate::sinks::aws_s3::S3SinkConfig; use crate::sinks::s3_common::config::S3Options; + use crate::sinks::util::encoding::StandardEncodings; use crate::sinks::util::BatchConfig; use crate::sinks::util::Compression; use crate::sinks::util::TowerRequestConfig; @@ -220,7 +220,7 @@ mod integration_tests { let (_lines, events, receiver) = make_events_batch(1, 1); sink.run(events).await.unwrap(); - assert_eq!(receiver.await, BatchStatus::Errored); + assert_eq!(receiver.await, BatchStatus::Failed); let objects = list_objects(&bucket, prefix.unwrap()).await; assert_eq!(objects, None); @@ -268,7 +268,7 @@ mod integration_tests { filename_extension: None, options: S3Options::default(), region: RegionOrEndpoint::with_endpoint("http://localhost:4566".to_owned()), - encoding: Encoding::Text.into(), + encoding: StandardEncodings::Text.into(), compression: Compression::None, batch: BatchConfig { max_events: Some(batch_size), diff --git a/src/sinks/datadog/logs/sink.rs b/src/sinks/datadog/logs/sink.rs index 4bba1cb1de281..c4ab6d381a5af 100644 --- a/src/sinks/datadog/logs/sink.rs +++ b/src/sinks/datadog/logs/sink.rs @@ -1,7 +1,6 @@ use super::service::LogApiRequest; use crate::config::SinkContext; use crate::sinks::datadog::logs::config::Encoding; -use crate::sinks::util::buffer::GZIP_FAST; use crate::sinks::util::encoding::{EncodingConfigWithDefault, EncodingConfiguration}; use crate::sinks::util::Compression; use async_trait::async_trait; @@ -223,11 +222,8 @@ impl RequestBuilder { let (encoded_body, is_compressed) = match self.compression { Compression::None => (body, false), Compression::Gzip(level) => { - let level = level.unwrap_or(GZIP_FAST); - let mut encoder = GzEncoder::new( - Vec::with_capacity(serialized_payload_bytes_len), - flate2::Compression::new(level as u32), - ); + let mut encoder = + GzEncoder::new(Vec::with_capacity(serialized_payload_bytes_len), level); encoder .write_all(&body) diff --git a/src/sinks/datadog_archives.rs b/src/sinks/datadog_archives.rs index f4c7446e09be4..633d0d88d990a 100644 --- a/src/sinks/datadog_archives.rs +++ b/src/sinks/datadog_archives.rs @@ -4,10 +4,11 @@ use std::{ convert::TryFrom, io::{self, Write}, num::NonZeroUsize, + sync::atomic::{AtomicU32, Ordering}, time::Duration, }; -use bytes::{BufMut, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; use chrono::{SecondsFormat, Utc}; use rand::{thread_rng, Rng}; use rusoto_s3::S3Client; @@ -16,7 +17,7 @@ use snafu::Snafu; use tower::ServiceBuilder; use uuid::Uuid; -use vector_core::event::Event; +use vector_core::event::{Event, EventFinalizers, Finalizable}; use crate::sinks::s3_common; use crate::{ @@ -31,20 +32,24 @@ use crate::{ }, partitioner::KeyPartitioner, service::{S3Request, S3Service}, - sink::{process_event_batch, S3EventEncoding, S3RequestBuilder, S3Sink}, + sink::S3Sink, }, util::Concurrency, - util::{Compression, ServiceBuilderExt, TowerRequestConfig}, + util::{ServiceBuilderExt, TowerRequestConfig}, VectorSink, }, template::Template, }; +use super::util::{encoding::Encoder, Compression, RequestBuilder}; + const DEFAULT_REQUEST_LIMITS: TowerRequestConfig = { TowerRequestConfig::const_new(Concurrency::Fixed(50), Concurrency::Fixed(50)) .rate_limit_num(250) }; +const DEFAULT_COMPRESSION: Compression = Compression::gzip_default(); + #[derive(Deserialize, Serialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct DatadogArchivesSinkConfig { @@ -156,21 +161,22 @@ impl DatadogArchivesSinkConfig { let partitioner = DatadogArchivesSinkConfig::key_partitioner(); - let request_builder = DatadogS3RequestBuilder { - bucket: self.bucket.clone(), - key_prefix: self.key_prefix.clone(), - aws_s3: self - .aws_s3 - .as_ref() - .expect("s3 config wasn't provided") - .clone(), - }; + let s3_config = self + .aws_s3 + .as_ref() + .expect("s3 config wasn't provided") + .clone(); + let encoding = DatadogArchiveEncoding::default(); + let request_builder = + DatadogS3RequestBuilder::new(self.bucket.clone(), self.key_prefix.clone(), s3_config); let sink = S3Sink::new( cx, service, request_builder, partitioner, + encoding, + DEFAULT_COMPRESSION, batch_size_bytes, batch_size_events, batch_timeout, @@ -188,21 +194,55 @@ const RESERVED_ATTRIBUTES: [&str; 10] = [ "_id", "date", "message", "host", "source", "service", "status", "tags", "trace_id", "span_id", ]; -#[derive(Debug, Clone)] -struct DatadogArchivesSinkEncoding { +struct DatadogArchiveEncoding { reserved_attributes: HashSet<&'static str>, id_rnd_bytes: [u8; 8], - id_seq_number: u32, + id_seq_number: AtomicU32, +} + +impl DatadogArchiveEncoding { + /// Generates a unique event ID compatible with DD: + /// - 18 bytes; + /// - first 6 bytes represent a "now" timestamp in millis; + /// - the rest 12 bytes can be just any sequence unique for a given timestamp. + /// + /// To generate unique-ish trailing 12 bytes we use random 8 bytes, generated at startup, + /// and a rolling-over 4-bytes sequence number. + fn generate_log_id(&self) -> String { + let mut id = BytesMut::with_capacity(18); + // timestamp in millis - 6 bytes + let now = Utc::now(); + id.put_int(now.timestamp_millis(), 6); + + // 8 random bytes + id.put_slice(&self.id_rnd_bytes); + + // 4 bytes for the counter should be more than enough - it should be unique for 1 millisecond only + let id_seq_number = self.id_seq_number.fetch_add(1, Ordering::Relaxed); + id.put_u32(id_seq_number); + + base64::encode(id.freeze()) + } } -impl S3EventEncoding for DatadogArchivesSinkEncoding { +impl Default for DatadogArchiveEncoding { + fn default() -> Self { + Self { + reserved_attributes: RESERVED_ATTRIBUTES.to_vec().into_iter().collect(), + id_rnd_bytes: thread_rng().gen::<[u8; 8]>(), + id_seq_number: AtomicU32::new(0), + } + } +} + +impl Encoder for DatadogArchiveEncoding { /// Applies the following transformations to align event's schema with DD: /// - `_id` is generated in the sink(format described below); /// - `date` is set from the Global Log Schema's `timestamp` mapping, or to the current time if missing; /// - `message`,`host` are set from the corresponding Global Log Schema mappings; /// - `source`, `service`, `status`, `tags` and other reserved attributes are left as is; /// - the rest of the fields is moved to `attributes`. - fn encode_event(&mut self, event: Event, mut writer: &mut dyn Write) -> io::Result<()> { + fn encode_event(&self, event: Event, mut writer: &mut dyn Write) -> io::Result<()> { let mut log_event = event.into_log(); log_event.insert("_id", self.generate_log_id()); @@ -239,77 +279,64 @@ impl S3EventEncoding for DatadogArchivesSinkEncoding { writer.write_all(b"\n") } } +#[derive(Debug)] +struct DatadogS3RequestBuilder { + bucket: String, + key_prefix: Option, + config: S3Config, +} -impl DatadogArchivesSinkEncoding { - /// Generates a unique event ID compatible with DD: - /// - 18 bytes; - /// - first 6 bytes represent a "now" timestamp in millis; - /// - the rest 12 bytes can be just any sequence unique for a given timestamp. - /// - /// To generate unique-ish trailing 12 bytes we use random 8 bytes, generated at startup, - /// and a rolling-over 4-bytes sequence number. - fn generate_log_id(&mut self) -> String { - let mut id = BytesMut::with_capacity(18); - // timestamp in millis - 6 bytes - let now = Utc::now(); - id.put_int(now.timestamp_millis(), 6); - - // 8 random bytes - id.put_slice(&self.id_rnd_bytes); - - // 4 bytes for the counter should be more than enough - it should be unique for 1 millisecond only - self.id_seq_number = self.id_seq_number.wrapping_add(1); - id.put_u32(self.id_seq_number); - - base64::encode(id.freeze()) +impl DatadogS3RequestBuilder { + pub const fn new(bucket: String, key_prefix: Option, config: S3Config) -> Self { + Self { + bucket, + key_prefix, + config, + } } } -fn default_encoding() -> DatadogArchivesSinkEncoding { - DatadogArchivesSinkEncoding { - reserved_attributes: RESERVED_ATTRIBUTES.to_vec().into_iter().collect(), - id_rnd_bytes: thread_rng().gen::<[u8; 8]>(), - id_seq_number: u32::default(), +impl RequestBuilder<(String, Vec)> for DatadogS3RequestBuilder { + type Metadata = (String, usize, EventFinalizers); + type Events = Vec; + type Payload = Bytes; + type Request = S3Request; + + fn split_input(&self, input: (String, Vec)) -> (Self::Metadata, Self::Events) { + let (partition_key, mut events) = input; + let finalizers = events.take_finalizers(); + + ((partition_key, events.len(), finalizers), events) } -} -#[derive(Debug, Clone)] -struct DatadogS3RequestBuilder { - pub bucket: String, - pub key_prefix: Option, - pub aws_s3: S3Config, -} + fn build_request(&self, metadata: Self::Metadata, payload: Self::Payload) -> Self::Request { + let (partition_key, batch_size, finalizers) = metadata; -impl S3RequestBuilder for DatadogS3RequestBuilder { - fn build_request(&mut self, key: String, batch: Vec) -> S3Request { let filename = Uuid::new_v4().to_string(); let key = format!( "{}/{}{}.{}", self.key_prefix.clone().unwrap_or_default(), - key, + partition_key, filename, "json.gz" ) .replace("//", "/"); - let batch_size = batch.len(); - let (body, finalizers) = - process_event_batch(batch, &mut default_encoding(), Compression::gzip_default()); - - debug!( + trace!( message = "Sending events.", - bytes = ?body.len(), + bytes = ?payload.len(), + events_len = ?batch_size, bucket = ?self.bucket, key = ?key ); - let s3_options = self.aws_s3.options.clone(); + let s3_options = self.config.options.clone(); S3Request { - body, + body: payload, bucket: self.bucket.clone(), key, - content_encoding: Compression::gzip_default().content_encoding(), + content_encoding: DEFAULT_COMPRESSION.content_encoding(), options: s3_common::config::S3Options { acl: s3_options.acl, grant_full_control: s3_options.grant_full_control, @@ -375,7 +402,7 @@ mod tests { log_mut.insert("timestamp", timestamp); let mut writer = Cursor::new(Vec::new()); - let mut encoding = default_encoding(); + let encoding = DatadogArchiveEncoding::default(); let _ = encoding.encode_event(event, &mut writer); let encoded = writer.into_inner(); @@ -461,7 +488,7 @@ mod tests { fn generates_valid_id() { let log1 = Event::from("test event 1"); let mut writer = Cursor::new(Vec::new()); - let mut encoding = default_encoding(); + let encoding = DatadogArchiveEncoding::default(); let _ = encoding.encode_event(log1, &mut writer); let encoded = writer.into_inner(); let json: BTreeMap = @@ -493,7 +520,7 @@ mod tests { fn generates_date_if_missing() { let log = Event::from("test message"); let mut writer = Cursor::new(Vec::new()); - let mut encoding = default_encoding(); + let encoding = DatadogArchiveEncoding::default(); let _ = encoding.encode_event(log, &mut writer); let encoded = writer.into_inner(); let json: BTreeMap = @@ -529,6 +556,7 @@ mod tests { #[test] fn s3_build_request() { + let fake_buf = Bytes::new(); let mut log = Event::from("test message"); let timestamp = DateTime::parse_from_rfc3339("2021-08-23T18:00:27.879+02:00") .expect("invalid test case") @@ -539,13 +567,14 @@ mod tests { .partition(&log) .expect("key wasn't provided"); - let mut request_builder = DatadogS3RequestBuilder { - bucket: "dd-logs".into(), - key_prefix: Some("audit".into()), - aws_s3: S3Config::default(), - }; + let request_builder = DatadogS3RequestBuilder::new( + "dd-logs".into(), + Some("audit".into()), + S3Config::default(), + ); - let req = request_builder.build_request(key, vec![log]); + let (metadata, _events) = request_builder.split_input((key, vec![log])); + let req = request_builder.build_request(metadata, fake_buf.clone()); let expected_key_prefix = "audit/dt=20210823/hour=16/"; let expected_key_ext = ".json.gz"; println!("{}", req.key); @@ -560,7 +589,8 @@ mod tests { let key = key_partitioner .partition(&log2) .expect("key wasn't provided"); - let req = request_builder.build_request(key, vec![log2]); + let (metadata, _events) = request_builder.split_input((key, vec![log2])); + let req = request_builder.build_request(metadata, fake_buf); let uuid2 = &req.key[expected_key_prefix.len()..req.key.len() - expected_key_ext.len()]; assert_ne!(uuid1, uuid2); } diff --git a/src/sinks/http.rs b/src/sinks/http.rs index 62517be7b94e0..8253eaee0ce92 100644 --- a/src/sinks/http.rs +++ b/src/sinks/http.rs @@ -4,7 +4,6 @@ use crate::{ http::{Auth, HttpClient, MaybeAuth}, internal_events::{HttpEventEncoded, HttpEventMissingMessage}, sinks::util::{ - buffer::compression::GZIP_DEFAULT, encoding::{EncodingConfig, EncodingConfiguration}, http::{BatchedHttpSink, HttpSink, RequestConfig}, BatchConfig, BatchSettings, Buffer, Compression, TowerRequestConfig, UriSerde, @@ -249,8 +248,7 @@ impl HttpSink for HttpSinkConfig { Compression::Gzip(level) => { builder = builder.header("Content-Encoding", "gzip"); - let level = level.unwrap_or(GZIP_DEFAULT) as u32; - let mut w = GzEncoder::new(Vec::new(), flate2::Compression::new(level)); + let mut w = GzEncoder::new(Vec::new(), level); w.write_all(&body).expect("Writing to Vec can't fail"); body = w.finish().expect("Writing to Vec can't fail"); } diff --git a/src/sinks/s3_common/service.rs b/src/sinks/s3_common/service.rs index 26f0f18424b00..ee86f2179ce26 100644 --- a/src/sinks/s3_common/service.rs +++ b/src/sinks/s3_common/service.rs @@ -1,4 +1,3 @@ -use crate::{internal_events::aws_s3::sink::S3EventsSent, serde::to_string}; use bytes::Bytes; use futures::{future::BoxFuture, stream}; use md5::Digest; @@ -7,9 +6,13 @@ use rusoto_s3::{PutObjectError, PutObjectOutput, PutObjectRequest, S3Client, S3} use std::task::{Context, Poll}; use tower::Service; use tracing_futures::Instrument; -use vector_core::event::{EventFinalizers, EventStatus, Finalizable}; +use vector_core::{ + buffers::Ackable, + event::{EventFinalizers, EventStatus, Finalizable}, +}; use super::config::S3Options; +use crate::{internal_events::aws_s3::sink::S3EventsSent, serde::to_string}; #[derive(Debug, Clone)] pub struct S3Request { @@ -22,6 +25,12 @@ pub struct S3Request { pub finalizers: EventFinalizers, } +impl Ackable for S3Request { + fn ack_size(&self) -> usize { + self.batch_size + } +} + impl Finalizable for S3Request { fn take_finalizers(&mut self) -> EventFinalizers { std::mem::take(&mut self.finalizers) diff --git a/src/sinks/s3_common/sink.rs b/src/sinks/s3_common/sink.rs index 17f7619fa03f0..082e1667cb41f 100644 --- a/src/sinks/s3_common/sink.rs +++ b/src/sinks/s3_common/sink.rs @@ -1,65 +1,38 @@ use crate::{ config::SinkContext, event::Event, - sinks::util::{buffer::GZIP_FAST, Compression}, + sinks::util::{encoding::Encoder, Compression, RequestBuilder, SinkBuilderExt}, }; use async_trait::async_trait; -use bytes::Bytes; -use flate2::write::GzEncoder; -use futures::{ - stream::{BoxStream, FuturesUnordered, StreamExt}, - FutureExt, TryFutureExt, -}; -use std::sync::Arc; -use std::{ - collections::HashMap, - fmt::Debug, - io::{self, Write}, - num::NonZeroUsize, - time::Duration, -}; -use tokio::sync::Barrier; -use tokio::{ - pin, select, - sync::{ - mpsc::{channel, Receiver}, - oneshot, - }, -}; -use tower::{Service, ServiceExt}; -use tracing_futures::Instrument; -use vector_core::{ - buffers::Acker, - event::{EventFinalizers, EventStatus, Finalizable}, - sink::StreamSink, - stream::batcher::Batcher, -}; +use futures::stream::BoxStream; +use futures_util::StreamExt; +use std::{fmt, num::NonZeroUsize, time::Duration}; +use tower::Service; +use vector_core::{buffers::Ackable, event::Finalizable, sink::StreamSink}; +use vector_core::{buffers::Acker, event::EventStatus}; use crate::sinks::s3_common::partitioner::KeyPartitioner; -use crate::sinks::s3_common::service::S3Request; -pub struct S3Sink -where - R: S3RequestBuilder, -{ +pub struct S3Sink { acker: Acker, - service: S, - request_builder: R, + service: Svc, + request_builder: RB, partitioner: KeyPartitioner, + encoding: E, + compression: Compression, batch_size_bytes: Option, batch_size_events: NonZeroUsize, batch_timeout: Duration, } -impl S3Sink -where - R: S3RequestBuilder, -{ +impl S3Sink { pub fn new( cx: SinkContext, - service: S, - request_builder: R, + service: Svc, + request_builder: RB, partitioner: KeyPartitioner, + encoding: E, + compression: Compression, batch_size_bytes: Option, batch_size_events: NonZeroUsize, batch_timeout: Duration, @@ -69,6 +42,8 @@ where service, request_builder, partitioner, + encoding, + compression, batch_size_bytes, batch_size_events, batch_timeout, @@ -76,225 +51,70 @@ where } } -#[async_trait] -impl StreamSink for S3Sink +impl S3Sink where - S: Service + Send + 'static, - S::Future: Send + 'static, - S::Response: AsRef + Send + 'static, - S::Error: Debug + Into + Send, - R: S3RequestBuilder + Send, + Svc: Service + Send + 'static, + Svc::Future: Send + 'static, + Svc::Response: AsRef + Send + 'static, + Svc::Error: fmt::Debug + Into + Send, + RB: RequestBuilder<(String, Vec)> + Send + Sync + 'static, + RB::Events: IntoIterator, + RB::Payload: From>, + RB::Request: Ackable + Finalizable + Send, + E: Encoder + Send + Sync + 'static, { - async fn run(mut self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { // All sinks do the same fundamental job: take in events, and ship them // out. Empirical testing shows that our number one priority for high // throughput needs to be servicing I/O as soon as we possibly can. In // order to do that, we'll spin up a separate task that deals // exclusively with I/O, while we deal with everything else here: // batching, ordering, and so on. - let (io_tx, io_rx) = channel(64); - let io_barrier = Arc::new(Barrier::new(2)); - - let io = run_io(io_rx, Arc::clone(&io_barrier), self.service, self.acker).in_current_span(); - let _ = tokio::spawn(io); - - let batcher = Batcher::new( - input, - self.partitioner, - self.batch_timeout, - self.batch_size_events, - self.batch_size_bytes, - ); - pin!(batcher); - - while let Some((key, batch)) = batcher.next().await { - match key { - Some(key) => { - // We could push this down to the I/O task if we wanted to. - let request = self.request_builder.build_request(key, batch); - if io_tx.send(request).await.is_err() { - error!( - "Sink I/O channel should not be closed before sink itself is closed." - ); - return Err(()); + let request_builder_concurrency_limit = NonZeroUsize::new(50); + + let sink = input + .batched( + self.partitioner, + self.batch_timeout, + self.batch_size_events, + self.batch_size_bytes, + ) + .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) }) + .request_builder( + request_builder_concurrency_limit, + self.request_builder, + self.encoding, + self.compression, + ) + .filter_map(|request| async move { + match request { + Err(e) => { + error!("Failed to build S3 request: {:?}.", e); + None } + Ok(req) => Some(req), } - // Partitioning failed for one or more events. - // - // TODO: this might be where we would insert something like the - // proposed error handling/dead letter queue stuff; events that - // _we_ can't handle, but some other component may be able to - // salvage - None => { - continue; - } - } - } - - // Wait for the I/O task to complete. - // - // TODO: the need for this synchronization means we should probably look into something more - // ergonomic like `buffered`/`buffer_unordered` + `tokio::spawn`, otherwise every sink will - // be reimplementing this logic for the common case. - drop(io_tx); - io_barrier.wait().await; + }) + .into_driver(self.service, self.acker); - Ok(()) + sink.run().await } } -async fn run_io(mut rx: Receiver, barrier: Arc, mut service: S, acker: Acker) +#[async_trait] +impl StreamSink for S3Sink where - S: Service, - S::Future: Send + 'static, - S::Response: AsRef + Send + 'static, - S::Error: Debug + Into + Send, + Svc: Service + Send + 'static, + Svc::Future: Send + 'static, + Svc::Response: AsRef + Send + 'static, + Svc::Error: fmt::Debug + Into + Send, + RB: RequestBuilder<(String, Vec)> + Send + Sync + 'static, + RB::Events: IntoIterator, + RB::Payload: From>, + RB::Request: Ackable + Finalizable + Send, + E: Encoder + Send + Sync + 'static, { - let in_flight = FuturesUnordered::new(); - let mut pending_acks = HashMap::new(); - let mut seq_head: u64 = 0; - let mut seq_tail: u64 = 0; - - pin!(in_flight); - - loop { - select! { - Some(req) = rx.recv() => { - // Rebind the variable to avoid a bug with the pattern matching - // in `select!`: https://github.com/tokio-rs/tokio/issues/4076 - let mut req = req; - let seqno = seq_head; - seq_head += 1; - - let (tx, rx) = oneshot::channel(); - - in_flight.push(rx); - - trace!( - message = "Submitting service request.", - in_flight_requests = in_flight.len() - ); - // TODO: I'm not entirely happy with how we're smuggling - // batch_size/finalizers this far through, from the finished - // batch all the way through to the concrete request type...we - // lifted this code from `ServiceSink` directly, but we should - // probably treat it like `PartitionBatcher` and shove it into a - // single, encapsulated type instead. - let batch_size = req.batch_size; - let finalizers = req.take_finalizers(); - - let svc = service.ready().await.expect("should not get error when waiting for svc readiness"); - let fut = svc.call(req) - .err_into() - .map(move |result| { - let status = match result { - Err(error) => { - error!("Sink IO failed with error: {}.", error); - EventStatus::Errored - }, - Ok(response) => { *response.as_ref() } - }; - finalizers.update_status(status); - // If the rx end is dropped we still completed - // the request so this is a weird case that we can - // ignore for now. - let _ = tx.send((seqno, batch_size)); - }) - .instrument(info_span!("request", request_id = %seqno)); - tokio::spawn(fut); - }, - - Some(Ok((seqno, batch_size))) = in_flight.next() => { - trace!("pending batch {} finished (n={})", seqno, batch_size); - pending_acks.insert(seqno, batch_size); - - let mut num_to_ack = 0; - while let Some(ack_size) = pending_acks.remove(&seq_tail) { - num_to_ack += ack_size; - seq_tail += 1 - } - trace!(message = "Acking events.", acking_num = num_to_ack); - acker.ack(num_to_ack); - }, - - else => break - } - } - - barrier.wait().await; -} - -pub trait S3RequestBuilder { - fn build_request(&mut self, key: String, batch: Vec) -> S3Request; -} - -pub trait S3EventEncoding { - fn encode_event(&mut self, event: Event, writer: &mut dyn Write) -> io::Result<()>; -} - -pub fn process_event_batch( - batch: Vec, - encoding: &mut E, - compression: Compression, -) -> (Bytes, EventFinalizers) { - enum Writer { - Plain(Vec), - GzipCompressed(GzEncoder>), - } - - impl Write for Writer { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - match self { - Writer::Plain(inner_buf) => inner_buf.write(buf), - Writer::GzipCompressed(writer) => writer.write(buf), - } - } - - fn flush(&mut self) -> std::io::Result<()> { - match self { - Writer::Plain(_) => Ok(()), - Writer::GzipCompressed(writer) => writer.flush(), - } - } - } - - // Build our compressor first, so that we can encode directly into it. - let mut writer = { - // This is a best guess, because encoding could add a good chunk of - // overhead to the raw, in-memory representation of an event, but if - // we're compressing, then we should end up net below the capacity. - let buffer = Vec::with_capacity(1_024); - match compression { - Compression::None => Writer::Plain(buffer), - Compression::Gzip(level) => { - let level = level.unwrap_or(GZIP_FAST); - Writer::GzipCompressed(GzEncoder::new( - buffer, - flate2::Compression::new(level as u32), - )) - } - } - }; - - let mut finalizers = EventFinalizers::default(); - - // Now encode each item into the writer. - for mut event in batch { - finalizers.merge(event.take_finalizers()); - - let _ = encoding - .encode_event(event, &mut writer) - .expect("failed to encode event into writer; this is a bug!"); + async fn run(mut self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + self.run_inner(input).await } - - // Extract the buffer and push it back in a frozen state. - let buf = match writer { - Writer::Plain(buf) => buf.into(), - Writer::GzipCompressed(writer) => writer - .finish() - .expect("gzip writer should not fail to finish") - .into(), - }; - - (buf, finalizers) } diff --git a/src/sinks/splunk_hec/conn.rs b/src/sinks/splunk_hec/conn.rs index 42f5ade75ff40..1615d64006304 100644 --- a/src/sinks/splunk_hec/conn.rs +++ b/src/sinks/splunk_hec/conn.rs @@ -165,7 +165,7 @@ mod tests { async fn test_build_request_compression_gzip_returns_expected_request() { let endpoint = "http://localhost:8888"; let token = "token"; - let compression = Compression::Gzip(None); + let compression = Compression::gzip_default(); let events = "events".as_bytes().to_vec(); let request = build_request(endpoint, token, compression, events.clone()) @@ -199,7 +199,7 @@ mod tests { async fn test_build_request_uri_invalid_uri_returns_error() { let endpoint = "invalid"; let token = "token"; - let compression = Compression::Gzip(None); + let compression = Compression::gzip_default(); let events = "events".as_bytes().to_vec(); let err = build_request(endpoint, token, compression, events.clone()) diff --git a/src/sinks/util/buffer/compression.rs b/src/sinks/util/buffer/compression.rs index 52b68e831206d..6c5a052efe826 100644 --- a/src/sinks/util/buffer/compression.rs +++ b/src/sinks/util/buffer/compression.rs @@ -2,22 +2,25 @@ use serde::{de, ser}; use serde_json::Value; use std::fmt; -pub const GZIP_NONE: usize = 0; -pub const GZIP_FAST: usize = 1; -pub const GZIP_DEFAULT: usize = 6; -pub const GZIP_BEST: usize = 9; +pub const GZIP_NONE: u32 = 0; +pub const GZIP_FAST: u32 = 1; +pub const GZIP_DEFAULT: u32 = 6; +pub const GZIP_BEST: u32 = 9; #[derive(Debug, Derivative, Copy, Clone, Eq, PartialEq)] #[derivative(Default)] pub enum Compression { #[derivative(Default)] None, - Gzip(Option), + Gzip(flate2::Compression), } impl Compression { pub const fn gzip_default() -> Compression { - Compression::Gzip(None) + // flate2 doesn't have a const `default` fn, since it actually implements the `Default` + // trait, and it doesn't have a constant for what the "default" level should be, so we + // hard-code it here. + Compression::Gzip(flate2::Compression::new(6)) } pub const fn content_encoding(&self) -> Option<&'static str> { @@ -38,7 +41,7 @@ impl fmt::Display for Compression { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { Compression::None => write!(f, "none"), - Compression::Gzip(ref level) => write!(f, "gzip({})", level.unwrap_or(GZIP_DEFAULT)), + Compression::Gzip(ref level) => write!(f, "gzip({})", level.level()), } } } @@ -49,8 +52,7 @@ impl From for rusoto_core::encoding::ContentEncoding { match compression { Compression::None => rusoto_core::encoding::ContentEncoding::Identity, Compression::Gzip(level) => { - let level = level.unwrap_or(GZIP_DEFAULT); - rusoto_core::encoding::ContentEncoding::Gzip(None, level as u32) + rusoto_core::encoding::ContentEncoding::Gzip(None, level.level()) } } } @@ -105,7 +107,9 @@ impl<'de> de::Deserialize<'de> for Compression { } level = Some(match map.next_value::()? { Value::Number(level) => match level.as_u64() { - Some(value) if value <= 9 => value as usize, + Some(value) if value <= 9 => { + flate2::Compression::new(value as u32) + } Some(_) | None => { return Err(de::Error::invalid_value( de::Unexpected::Other(&level.to_string()), @@ -114,10 +118,10 @@ impl<'de> de::Deserialize<'de> for Compression { } }, Value::String(level) => match level.as_str() { - "none" => GZIP_NONE, - "fast" => GZIP_FAST, - "default" => GZIP_DEFAULT, - "best" => GZIP_BEST, + "none" => flate2::Compression::none(), + "fast" => flate2::Compression::fast(), + "default" => flate2::Compression::default(), + "best" => flate2::Compression::best(), level => { return Err(de::Error::invalid_value( de::Unexpected::Str(level), @@ -142,7 +146,9 @@ impl<'de> de::Deserialize<'de> for Compression { Some(_) => Err(de::Error::unknown_field("level", &[])), None => Ok(Compression::None), }, - "gzip" => Ok(Compression::Gzip(level)), + "gzip" => Ok(Compression::Gzip( + level.unwrap_or_else(flate2::Compression::default), + )), algorithm => Err(de::Error::unknown_variant(algorithm, &["none", "gzip"])), } } @@ -164,10 +170,12 @@ impl ser::Serialize for Compression { Compression::None => map.serialize_entry("algorithm", "none")?, Compression::Gzip(level) => { map.serialize_entry("algorithm", "gzip")?; - match level.unwrap_or(GZIP_DEFAULT) { + match level.level() { GZIP_NONE => map.serialize_entry("level", "none")?, GZIP_FAST => map.serialize_entry("level", "fast")?, - GZIP_DEFAULT => map.serialize_entry("level", "default")?, + // Don't serialize if at default level, we already utilize that when + // deserializing and it just clutters the resulting JSON. + GZIP_DEFAULT => {} GZIP_BEST => map.serialize_entry("level", "best")?, level => map.serialize_entry("level", &level)?, }; @@ -186,14 +194,17 @@ mod test { let fixtures_valid = [ (r#""none""#, Compression::None), (r#"{"algorithm": "none"}"#, Compression::None), - (r#"{"algorithm": "gzip"}"#, Compression::Gzip(None)), + ( + r#"{"algorithm": "gzip"}"#, + Compression::Gzip(flate2::Compression::default()), + ), ( r#"{"algorithm": "gzip", "level": "best"}"#, - Compression::Gzip(Some(9)), + Compression::Gzip(flate2::Compression::best()), ), ( r#"{"algorithm": "gzip", "level": 8}"#, - Compression::Gzip(Some(8)), + Compression::Gzip(flate2::Compression::new(8)), ), ]; for (sources, result) in fixtures_valid.iter() { diff --git a/src/sinks/util/buffer/mod.rs b/src/sinks/util/buffer/mod.rs index 2aa759a41be8e..688b902cc13a5 100644 --- a/src/sinks/util/buffer/mod.rs +++ b/src/sinks/util/buffer/mod.rs @@ -48,13 +48,7 @@ impl Buffer { let buffer = Vec::with_capacity(bytes); match compression { Compression::None => InnerBuffer::Plain(buffer), - Compression::Gzip(level) => { - let level = level.unwrap_or(GZIP_FAST); - InnerBuffer::Gzip(GzEncoder::new( - buffer, - flate2::Compression::new(level as u32), - )) - } + Compression::Gzip(level) => InnerBuffer::Gzip(GzEncoder::new(buffer, level)), } }) } diff --git a/src/sinks/util/builder.rs b/src/sinks/util/builder.rs new file mode 100644 index 0000000000000..6c80aedc8148f --- /dev/null +++ b/src/sinks/util/builder.rs @@ -0,0 +1,138 @@ +use std::{fmt, future::Future, io, num::NonZeroUsize, pin::Pin, sync::Arc, time::Duration}; + +use futures_util::Stream; +use tower::Service; +use vector_core::{ + buffers::{Ackable, Acker}, + event::{Event, EventStatus, Finalizable}, + partition::Partitioner, + stream::{ + batcher::{Batcher, ExpirationQueue}, + driver::Driver, + }, +}; + +use super::{encoding::Encoder, Compression, Compressor, ConcurrentMap, RequestBuilder}; + +impl SinkBuilderExt for T where T: Stream {} + +pub trait SinkBuilderExt: Stream { + /// Batches the stream based on the given partitioner and batch settings. + /// + /// The stream will yield batches of events, with their partition key, when either a batch fills + /// up or times out. [`Partitioner`] operates on a per-event basis, and has access to the event + /// itself, and so can access any and all fields of an event. + fn batched

( + self, + partitioner: P, + batch_timeout: Duration, + batch_item_limit: NonZeroUsize, + batch_allocation_limit: Option, + ) -> Batcher> + where + Self: Sized + Unpin, + P: Partitioner + Unpin, + { + Batcher::new( + self, + partitioner, + batch_timeout, + batch_item_limit, + batch_allocation_limit, + ) + } + + /// Maps the items in the stream concurrently, up to the configured limit. + /// + /// For every item, the given mapper is invoked, and the future that is returned is spawned + /// and awaited concurrently. A limit can be passed: `None` is self-describing, as it imposes + /// no concurrency limit, and `Some(n)` limits this stage to `n` concurrent operations at any + /// given time. + /// + /// If the spawned future panics, the panic will be carried through and resumed on the task + /// calling the stream. + fn concurrent_map(self, limit: Option, f: F) -> ConcurrentMap + where + Self: Sized, + F: Fn(Self::Item) -> Pin + Send + 'static>> + Send + 'static, + T: Send + 'static, + { + ConcurrentMap::new(self, limit, f) + } + + /// Constructs a [`Stream`] which transforms the input into a request suitable for sending to + /// downstream services. + /// + /// Each input is transformed concurrently, up to the given limit. A limit of `None` is + /// self-describing, as it imposes no concurrency limit, and `Some(n)` limits this stage to `n` + /// concurrent operations at any given time. + /// + /// Encoding and compression are handled internally, deferring to the builder are the necessary + /// checkpoints for adjusting the event before encoding/compression, as well as generating the + /// correct request object with the result of encoding/compressing the events. + fn request_builder( + self, + limit: Option, + builder: B, + encoding: E, + compression: Compression, + ) -> ConcurrentMap> + where + Self: Sized, + Self: Stream, + B: RequestBuilder + Send + Sync + 'static, + B::Events: IntoIterator, + B::Payload: From>, + B::Request: Send, + I: Send + 'static, + E: Encoder + Send + Sync + 'static, + { + let builder = Arc::new(builder); + let encoder = Arc::new(encoding); + + self.concurrent_map(limit, move |input| { + let builder = Arc::clone(&builder); + let encoder = Arc::clone(&encoder); + let mut compressor = Compressor::from(compression); + + Box::pin(async move { + // Split the input into metadata and events. + let (metadata, events) = builder.split_input(input); + + // Encode/compress each event. + for event in events.into_iter() { + // In practice, encoding should be infallible, since we're typically using `Vec` + // as the write target, but the `std::io::Write` interface _is_ fallible, and + // technically we might run out of memory when allocating for the vector, so we + // pass the error through. + let _ = encoder.encode_event(event, &mut compressor)?; + } + + // Now build the actual request. + let payload = compressor.into_inner().into(); + Ok(builder.build_request(metadata, payload)) + }) + }) + } + + /// Creates a [`Driver`] that uses the configured event stream as the input to the given + /// service. + /// + /// This is typically a terminal step in building a sink, bridging the gap from the processing + /// that must be performed by Vector (in the stream) to the underlying sink itself (the + /// service). + /// + /// As it is intended to be a terminal step, we require an [`Acker`] in order to be able to + /// provide acking based on the responses from the underlying service. + fn into_driver(self, service: Svc, acker: Acker) -> Driver + where + Self: Sized, + Self::Item: Ackable + Finalizable, + Svc: Service, + Svc::Error: fmt::Debug + 'static, + Svc::Future: Send + 'static, + Svc::Response: AsRef, + { + Driver::new(self, service, acker) + } +} diff --git a/src/sinks/util/compressor.rs b/src/sinks/util/compressor.rs new file mode 100644 index 0000000000000..dd006673823d7 --- /dev/null +++ b/src/sinks/util/compressor.rs @@ -0,0 +1,73 @@ +use std::io; + +use flate2::write::GzEncoder; + +use super::Compression; + +enum Writer { + Plain(Vec), + Gzip(GzEncoder>), +} + +impl From for Writer { + fn from(compression: Compression) -> Self { + let buffer = Vec::with_capacity(1_024); + match compression { + Compression::None => Writer::Plain(buffer), + Compression::Gzip(level) => Writer::Gzip(GzEncoder::new(buffer, level)), + } + } +} + +impl io::Write for Writer { + fn write(&mut self, buf: &[u8]) -> io::Result { + match self { + Writer::Plain(inner_buf) => inner_buf.write(buf), + Writer::Gzip(writer) => writer.write(buf), + } + } + + fn flush(&mut self) -> io::Result<()> { + match self { + Writer::Plain(_) => Ok(()), + Writer::Gzip(writer) => writer.flush(), + } + } +} + +/// Simple compressor implementation based on [`Compression`]. +/// +/// Users can acquire a `Compressor` via [`Compressor::from`] based on the desired compression scheme. +pub struct Compressor { + inner: Writer, +} + +impl Compressor { + /// Consumes the compressor, returning the internal buffer used by the compressor. + pub fn into_inner(self) -> Vec { + match self.inner { + Writer::Plain(buf) => buf, + Writer::Gzip(writer) => writer + .finish() + .expect("gzip writer should not fail to finish"), + } + } +} + +impl io::Write for Compressor { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.inner.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.flush() + } +} + +impl From for Compressor { + fn from(compression: Compression) -> Self { + Compressor { + inner: compression.into(), + } + } +} diff --git a/src/sinks/util/concurrent_map.rs b/src/sinks/util/concurrent_map.rs new file mode 100644 index 0000000000000..505ec03bd3b85 --- /dev/null +++ b/src/sinks/util/concurrent_map.rs @@ -0,0 +1,104 @@ +use std::{ + future::Future, + num::NonZeroUsize, + panic, + pin::Pin, + task::{Context, Poll}, +}; + +use futures_util::{ + ready, + stream::{Fuse, FuturesOrdered}, + Stream, StreamExt, +}; +use pin_project::pin_project; +use tokio::task::JoinHandle; + +#[pin_project] +pub struct ConcurrentMap +where + St: Stream, + T: Send + 'static, +{ + #[pin] + stream: Fuse, + limit: Option, + in_flight: FuturesOrdered>, + f: Box Pin + Send + 'static>> + Send>, +} + +impl ConcurrentMap +where + St: Stream, + T: Send + 'static, +{ + pub fn new(stream: St, limit: Option, f: F) -> Self + where + F: Fn(St::Item) -> Pin + Send + 'static>> + Send + 'static, + { + Self { + stream: stream.fuse(), + limit, + in_flight: FuturesOrdered::new(), + f: Box::new(f), + } + } +} + +impl Stream for ConcurrentMap +where + St: Stream, + T: Send + 'static, +{ + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + // The underlying stream is done, and we have no more in-flight futures. + if this.stream.is_done() && this.in_flight.is_empty() { + return Poll::Ready(None); + } + + loop { + let can_poll_stream = match this.limit { + None => true, + Some(limit) => this.in_flight.len() < limit.get(), + }; + + if can_poll_stream { + match this.stream.as_mut().poll_next(cx) { + // Even if there's no items from the underlying stream, we still have the in-flight + // futures to check, so we don't return just yet. + Poll::Pending | Poll::Ready(None) => break, + Poll::Ready(Some(item)) => { + let fut = (this.f)(item); + let handle = tokio::spawn(fut); + this.in_flight.push(handle); + } + } + } else { + // We're at our in-flight limit, so stop generating tasks for the moment. + break; + } + } + + match ready!(this.in_flight.poll_next_unpin(cx)) { + // Either nothing is in-flight, or nothing is ready. + None => Poll::Pending, + Some(result) => match result { + Ok(item) => Poll::Ready(Some(item)), + Err(e) => { + if let Ok(reason) = e.try_into_panic() { + // Resume the panic here on the calling task. + panic::resume_unwind(reason); + } else { + // The task was cancelled, which makes no sense, because _we_ hold the join + // handle. Only sensible thing to do is panic, because this is a bug. + panic!("concurrent map task cancelled outside of our control"); + } + } + }, + } + } +} diff --git a/src/sinks/util/encoding/codec.rs b/src/sinks/util/encoding/codec.rs new file mode 100644 index 0000000000000..f53e7a1986271 --- /dev/null +++ b/src/sinks/util/encoding/codec.rs @@ -0,0 +1,152 @@ +use std::io; + +use serde::{Deserialize, Serialize}; +use vector_core::{config::log_schema, event::Event}; + +use super::Encoder; + +static DEFAULT_TEXT_ENCODER: StandardTextEncoding = StandardTextEncoding; +static DEFAULT_JSON_ENCODER: StandardJsonEncoding = StandardJsonEncoding; + +/// A standardized set of encodings with common sense behavior. +/// +/// Each encoding utilizes a specific default set of behavior. For example, the standard JSON +/// encoder will encode the entire event, while the standard text encoder will only encode the +/// `message` field of an event, or fail if passed a metric. +/// +/// These encodings are meant to cover the most common use cases, so if there is a need for +/// specialization, you should prefer to use your own encoding enum with suitable implementations of +/// the [`Encoder`] trait. +#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum StandardEncodings { + Text, + Ndjson, +} + +impl Encoder for StandardEncodings { + fn encode_event(&self, event: Event, writer: &mut dyn io::Write) -> io::Result<()> { + match self { + StandardEncodings::Text => DEFAULT_TEXT_ENCODER.encode_event(event, writer), + StandardEncodings::Ndjson => DEFAULT_JSON_ENCODER.encode_event(event, writer), + } + } +} + +/// Standard implementation for encoding events as JSON. +/// +/// All event types will be serialized to JSON, without pretty printing. Uses +/// [`serde_json::to_writer`] under the hood, so all caveats mentioned therein apply here. +/// +/// Each event is delimited with a newline character. +pub struct StandardJsonEncoding; + +impl Encoder for StandardJsonEncoding { + fn encode_event(&self, event: Event, mut writer: &mut dyn io::Write) -> io::Result<()> { + match event { + Event::Log(log) => serde_json::to_writer(&mut writer, &log)?, + Event::Metric(metric) => serde_json::to_writer(&mut writer, &metric)?, + } + writer.write_all(b"\n") + } +} + +/// Standard implementation for encoding events as text. +/// +/// If given a log event, the value used in the field matching the global lob schema's "message" key +/// will be written out, otherwise an empty string will be written. If anything other than a log +/// event is given, the encoder will panic. +/// +/// Each event is delimited with a newline character. +pub struct StandardTextEncoding; + +impl Encoder for StandardTextEncoding { + fn encode_event(&self, event: Event, writer: &mut dyn io::Write) -> io::Result<()> { + match event { + Event::Log(log) => { + let message = log + .get(log_schema().message_key()) + .map(|v| v.as_bytes()) + .unwrap_or_default(); + let _ = writer.write_all(&message[..]); + writer.write_all(b"\n") + } + _ => panic!("standard text encoding cannot be used for anything other than logs"), + } + } +} + +#[cfg(test)] +mod tests { + use std::io; + + use chrono::{SecondsFormat, Utc}; + use vector_core::{ + config::log_schema, + event::{Event, Metric, MetricKind, MetricValue}, + }; + + use super::StandardEncodings; + use crate::sinks::util::encoding::Encoder; + + fn encode_event(event: Event, encoding: StandardEncodings) -> io::Result> { + let mut buf = Vec::new(); + let result = encoding.encode_event(event, &mut buf); + result.map(|()| buf) + } + + #[test] + fn test_standard_text() { + let encoding = StandardEncodings::Text; + + let message = "log event"; + let log_event = Event::from(message.to_string()); + + let result = encode_event(log_event, encoding).expect("should not have failed"); + let encoded = std::str::from_utf8(&result).expect("result should be valid UTF-8"); + + let expected = format!("{}\n", message); + assert_eq!(expected, encoded); + } + + #[test] + #[should_panic] + fn test_standard_text_panics_with_metric_event() { + let encoding = StandardEncodings::Text; + + let metric_event = Metric::new( + "namespace", + MetricKind::Absolute, + MetricValue::Counter { value: 1.23 }, + ) + .into(); + + let _result = encode_event(metric_event, encoding); + } + + #[test] + fn test_standard_json() { + let msg_key = log_schema().message_key(); + let ts_key = log_schema().timestamp_key(); + let now = Utc::now(); + let encoding = StandardEncodings::Ndjson; + + let message = "log event"; + let mut log_event = Event::from(message.to_string()); + log_event.as_mut_log().insert(ts_key, now); + + let result = encode_event(log_event, encoding).expect("should not have failed"); + let encoded = std::str::from_utf8(&result).expect("result should be valid UTF-8"); + + // We have to hard-code the transformation of the timestamp here, as `chrono::DateTime` + // uses a more timezone-explicit format in its `Display` implementation, while its + // `Serialize` implementation uses RFC3339. + let expected = format!( + "{{\"{}\":\"log event\",\"{}\":\"{}\"}}\n", + msg_key, + ts_key, + now.to_rfc3339_opts(SecondsFormat::AutoSi, true) + ); + assert_eq!(expected, encoded); + } +} diff --git a/src/sinks/util/encoding/config.rs b/src/sinks/util/encoding/config.rs index ab2850eb5e8c7..6462c8aed039f 100644 --- a/src/sinks/util/encoding/config.rs +++ b/src/sinks/util/encoding/config.rs @@ -33,20 +33,26 @@ pub struct EncodingConfig { pub(crate) timestamp_format: Option, } -impl EncodingConfiguration for EncodingConfig { - fn codec(&self) -> &E { +impl EncodingConfiguration for EncodingConfig { + type Codec = E; + + fn codec(&self) -> &Self::Codec { &self.codec } + fn schema(&self) -> &Option { &self.schema } + // TODO(2410): Using PathComponents here is a hack for #2407, #2410 should fix this fully. fn only_fields(&self) -> &Option>> { &self.only_fields } + fn except_fields(&self) -> &Option> { &self.except_fields } + fn timestamp_format(&self) -> &Option { &self.timestamp_format } diff --git a/src/sinks/util/encoding/mod.rs b/src/sinks/util/encoding/mod.rs index 2fe68e1df1a20..c47ff74b73ddd 100644 --- a/src/sinks/util/encoding/mod.rs +++ b/src/sinks/util/encoding/mod.rs @@ -29,6 +29,8 @@ // `Encoder` that defines some `encode` function which this config then calls internally as // part of it's own (yet to be written) `encode() -> Vec` function. +mod codec; +pub use codec::StandardEncodings; mod config; pub use config::EncodingConfig; mod with_default; @@ -39,13 +41,20 @@ use crate::{ Result, }; use serde::{Deserialize, Serialize}; -use std::fmt::Debug; +use std::{fmt::Debug, io}; + +/// Encodes an event. +pub trait Encoder { + /// Encodes an individual event to the provided writer. + fn encode_event(&self, event: Event, writer: &mut dyn io::Write) -> io::Result<()>; +} /// The behavior of a encoding configuration. -pub trait EncodingConfiguration { +pub trait EncodingConfiguration { + type Codec; // Required Accessors - fn codec(&self) -> &E; + fn codec(&self) -> &Self::Codec; fn schema(&self) -> &Option; // TODO(2410): Using PathComponents here is a hack for #2407, #2410 should fix this fully. fn only_fields(&self) -> &Option>>; @@ -152,6 +161,17 @@ pub trait EncodingConfiguration { } } +impl Encoder for E +where + E: EncodingConfiguration, + E::Codec: Encoder, +{ + fn encode_event(&self, mut event: Event, writer: &mut dyn io::Write) -> io::Result<()> { + self.apply_rules(&mut event); + self.codec().encode_event(event, writer) + } +} + #[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum TimestampFormat { diff --git a/src/sinks/util/encoding/with_default.rs b/src/sinks/util/encoding/with_default.rs index f4bc6aa9c4cc8..28396f39872b7 100644 --- a/src/sinks/util/encoding/with_default.rs +++ b/src/sinks/util/encoding/with_default.rs @@ -36,20 +36,26 @@ pub struct EncodingConfigWithDefault { pub(crate) timestamp_format: Option, } -impl EncodingConfiguration for EncodingConfigWithDefault { - fn codec(&self) -> &E { +impl EncodingConfiguration for EncodingConfigWithDefault { + type Codec = E; + + fn codec(&self) -> &Self::Codec { &self.codec } + fn schema(&self) -> &Option { &self.schema } + // TODO(2410): Using PathComponents here is a hack for #2407, #2410 should fix this fully. fn only_fields(&self) -> &Option>> { &self.only_fields } + fn except_fields(&self) -> &Option> { &self.except_fields } + fn timestamp_format(&self) -> &Option { &self.timestamp_format } diff --git a/src/sinks/util/mod.rs b/src/sinks/util/mod.rs index be81e548a6261..f07d2560fd0e2 100644 --- a/src/sinks/util/mod.rs +++ b/src/sinks/util/mod.rs @@ -1,8 +1,12 @@ pub mod adaptive_concurrency; pub mod batch; pub mod buffer; +pub mod builder; +pub mod compressor; +pub mod concurrent_map; pub mod encoding; pub mod http; +pub mod request_builder; pub mod retries; pub mod service; pub mod sink; @@ -28,6 +32,10 @@ pub use buffer::json::{BoxedRawValue, JsonArrayBuffer}; pub use buffer::partition::Partition; pub use buffer::vec::{EncodedLength, VecBuffer}; pub use buffer::{Buffer, Compression, PartitionBuffer, PartitionInnerBuffer}; +pub use builder::SinkBuilderExt; +pub use compressor::Compressor; +pub use concurrent_map::ConcurrentMap; +pub use request_builder::RequestBuilder; pub use service::{ Concurrency, ServiceBuilderExt, TowerBatchedSink, TowerPartitionSink, TowerRequestConfig, TowerRequestLayer, TowerRequestSettings, diff --git a/src/sinks/util/request_builder.rs b/src/sinks/util/request_builder.rs new file mode 100644 index 0000000000000..b0fdefd509d80 --- /dev/null +++ b/src/sinks/util/request_builder.rs @@ -0,0 +1,16 @@ +/// Generalized interface for defining how a batch of events will be turned into an request. +pub trait RequestBuilder { + type Metadata; + type Events; + type Payload; + type Request; + + /// Splits apart the input into the metadata and events portions. + /// + /// The metadata should be any information that needs to be passed back to `build_request` + /// as-is, such as event finalizers, while the events are the actual events to process. + fn split_input(&self, input: Input) -> (Self::Metadata, Self::Events); + + /// Builds a request for the given metadata and payload. + fn build_request(&self, metadata: Self::Metadata, payload: Self::Payload) -> Self::Request; +}