Skip to content

Commit

Permalink
fix(kafka sink): performance improvements and fix memory leak (vector…
Browse files Browse the repository at this point in the history
…dotdev#18634)

* fix(kafka sink): performance improvements and fix memory leak

* clippy

* Update src/sinks/kafka/service.rs

Co-authored-by: Bruce Guenter <[email protected]>

---------

Co-authored-by: Bruce Guenter <[email protected]>
  • Loading branch information
dsmith3197 and bruceg authored Sep 22, 2023
1 parent 2e7e7e7 commit 3c662f3
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 132 deletions.
124 changes: 58 additions & 66 deletions src/sinks/kafka/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ use crate::{
},
};

pub(crate) const QUEUED_MIN_MESSAGES: u64 = 100000;

/// Configuration for the `kafka` sink.
#[serde_as]
#[configurable_component(sink(
Expand Down Expand Up @@ -159,79 +157,73 @@ impl KafkaSinkConfig {

self.auth.apply(&mut client_config)?;

match kafka_role {
// All batch options are producer only.
KafkaRole::Producer => {
client_config
.set("compression.codec", &to_string(self.compression))
.set(
"message.timeout.ms",
&self.message_timeout_ms.as_millis().to_string(),
);

if let Some(value) = self.batch.timeout_secs {
// Delay in milliseconds to wait for messages in the producer queue to accumulate before
// constructing message batches (MessageSets) to transmit to brokers. A higher value
// allows larger and more effective (less overhead, improved compression) batches of
// messages to accumulate at the expense of increased message delivery latency.
// Type: float
let key = "queue.buffering.max.ms";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.timeout_secs` sets `librdkafka_options.{}={}`.\
// All batch options are producer only.
if kafka_role == KafkaRole::Producer {
client_config
.set("compression.codec", &to_string(self.compression))
.set(
"message.timeout.ms",
&self.message_timeout_ms.as_millis().to_string(),
);

if let Some(value) = self.batch.timeout_secs {
// Delay in milliseconds to wait for messages in the producer queue to accumulate before
// constructing message batches (MessageSets) to transmit to brokers. A higher value
// allows larger and more effective (less overhead, improved compression) batches of
// messages to accumulate at the expense of increased message delivery latency.
// Type: float
let key = "queue.buffering.max.ms";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.timeout_secs` sets `librdkafka_options.{}={}`.\
The config already sets this as `librdkafka_options.queue.buffering.max.ms={}`.\
Please delete one.", key, value, val).into());
}
debug!(
librdkafka_option = key,
batch_option = "timeout_secs",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &((value * 1000.0).round().to_string()));
}
if let Some(value) = self.batch.max_events {
// Maximum number of messages batched in one MessageSet. The total MessageSet size is
// also limited by batch.size and message.max.bytes.
// Type: integer
let key = "batch.num.messages";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.max_events` sets `librdkafka_options.{}={}`.\
debug!(
librdkafka_option = key,
batch_option = "timeout_secs",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &((value * 1000.0).round().to_string()));
}
if let Some(value) = self.batch.max_events {
// Maximum number of messages batched in one MessageSet. The total MessageSet size is
// also limited by batch.size and message.max.bytes.
// Type: integer
let key = "batch.num.messages";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.max_events` sets `librdkafka_options.{}={}`.\
The config already sets this as `librdkafka_options.batch.num.messages={}`.\
Please delete one.", key, value, val).into());
}
debug!(
librdkafka_option = key,
batch_option = "max_events",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &value.to_string());
}
if let Some(value) = self.batch.max_bytes {
// Maximum size (in bytes) of all messages batched in one MessageSet, including protocol
// framing overhead. This limit is applied after the first message has been added to the
// batch, regardless of the first message's size, this is to ensure that messages that
// exceed batch.size are produced. The total MessageSet size is also limited by
// batch.num.messages and message.max.bytes.
// Type: integer
let key = "batch.size";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.max_bytes` sets `librdkafka_options.{}={}`.\
debug!(
librdkafka_option = key,
batch_option = "max_events",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &value.to_string());
}
if let Some(value) = self.batch.max_bytes {
// Maximum size (in bytes) of all messages batched in one MessageSet, including protocol
// framing overhead. This limit is applied after the first message has been added to the
// batch, regardless of the first message's size, this is to ensure that messages that
// exceed batch.size are produced. The total MessageSet size is also limited by
// batch.num.messages and message.max.bytes.
// Type: integer
let key = "batch.size";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.max_bytes` sets `librdkafka_options.{}={}`.\
The config already sets this as `librdkafka_options.batch.size={}`.\
Please delete one.", key, value, val).into());
}
debug!(
librdkafka_option = key,
batch_option = "max_bytes",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &value.to_string());
}
}

KafkaRole::Consumer => {
client_config.set("queued.min.messages", QUEUED_MIN_MESSAGES.to_string());
debug!(
librdkafka_option = key,
batch_option = "max_bytes",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &value.to_string());
}
}

Expand Down
77 changes: 38 additions & 39 deletions src/sinks/kafka/request_builder.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,43 @@
use std::num::NonZeroUsize;

use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use lookup::OwnedTargetPath;
use rdkafka::message::{Header, OwnedHeaders};
use tokio_util::codec::Encoder as _;
use vrl::path::OwnedTargetPath;

use crate::{
codecs::{Encoder, Transformer},
event::{Event, Finalizable, Value},
internal_events::{KafkaHeaderExtractionError, TemplateRenderingError},
internal_events::KafkaHeaderExtractionError,
sinks::{
kafka::service::{KafkaRequest, KafkaRequestMetadata},
util::metadata::RequestMetadataBuilder,
prelude::*,
},
template::Template,
};

pub struct KafkaRequestBuilder {
pub key_field: Option<OwnedTargetPath>,
pub headers_key: Option<OwnedTargetPath>,
pub topic_template: Template,
pub transformer: Transformer,
pub encoder: Encoder<()>,
pub encoder: (Transformer, Encoder<()>),
}

impl KafkaRequestBuilder {
pub fn build_request(&mut self, mut event: Event) -> Option<KafkaRequest> {
let topic = self
.topic_template
.render_string(&event)
.map_err(|error| {
emit!(TemplateRenderingError {
field: None,
drop_event: true,
error,
});
})
.ok()?;
impl RequestBuilder<(String, Event)> for KafkaRequestBuilder {
type Metadata = KafkaRequestMetadata;
type Events = Event;
type Encoder = (Transformer, Encoder<()>);
type Payload = Bytes;
type Request = KafkaRequest;
type Error = std::io::Error;

fn compression(&self) -> Compression {
Compression::None
}

fn encoder(&self) -> &Self::Encoder {
&self.encoder
}

fn split_input(
&self,
input: (String, Event),
) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
let (topic, mut event) = input;
let builder = RequestMetadataBuilder::from_event(&event);

let metadata = KafkaRequestMetadata {
finalizers: event.take_finalizers(),
Expand All @@ -45,23 +46,21 @@ impl KafkaRequestBuilder {
headers: get_headers(&event, self.headers_key.as_ref()),
topic,
};
self.transformer.transform(&mut event);
let mut body = BytesMut::new();

// Ensure the metadata builder is built after transforming the event so we have the event
// size taking into account any dropped fields.
let metadata_builder = RequestMetadataBuilder::from_event(&event);
self.encoder.encode(event, &mut body).ok()?;
let body = body.freeze();

let bytes_len = NonZeroUsize::new(body.len()).expect("payload should never be zero length");
let request_metadata = metadata_builder.with_request_size(bytes_len);
(metadata, builder, event)
}

Some(KafkaRequest {
body,
fn build_request(
&self,
metadata: Self::Metadata,
request_metadata: RequestMetadata,
payload: EncodeResult<Self::Payload>,
) -> Self::Request {
KafkaRequest {
body: payload.into_payload(),
metadata,
request_metadata,
})
}
}
}

Expand Down
28 changes: 15 additions & 13 deletions src/sinks/kafka/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ use rdkafka::{
producer::{FutureProducer, FutureRecord},
util::Timeout,
};
use vector_core::internal_event::{
ByteSize, BytesSent, InternalEventHandle as _, Protocol, Registered,
};

use crate::{kafka::KafkaStatisticsContext, sinks::prelude::*};

Expand All @@ -29,6 +26,7 @@ pub struct KafkaRequestMetadata {

pub struct KafkaResponse {
event_byte_size: GroupedCountByteSize,
raw_byte_size: usize,
}

impl DriverResponse for KafkaResponse {
Expand All @@ -39,6 +37,10 @@ impl DriverResponse for KafkaResponse {
fn events_sent(&self) -> &GroupedCountByteSize {
&self.event_byte_size
}

fn bytes_sent(&self) -> Option<usize> {
Some(self.raw_byte_size)
}
}

impl Finalizable for KafkaRequest {
Expand All @@ -60,15 +62,13 @@ impl MetaDescriptive for KafkaRequest {
#[derive(Clone)]
pub struct KafkaService {
kafka_producer: FutureProducer<KafkaStatisticsContext>,
bytes_sent: Registered<BytesSent>,
}

impl KafkaService {
pub(crate) fn new(kafka_producer: FutureProducer<KafkaStatisticsContext>) -> KafkaService {
KafkaService {
kafka_producer,
bytes_sent: register!(BytesSent::from(Protocol("kafka".into()))),
}
pub(crate) const fn new(
kafka_producer: FutureProducer<KafkaStatisticsContext>,
) -> KafkaService {
KafkaService { kafka_producer }
}
}

Expand Down Expand Up @@ -104,10 +104,12 @@ impl Service<KafkaRequest> for KafkaService {
// rdkafka will internally retry forever if the queue is full
match this.kafka_producer.send(record, Timeout::Never).await {
Ok((_partition, _offset)) => {
this.bytes_sent.emit(ByteSize(
request.body.len() + request.metadata.key.map(|x| x.len()).unwrap_or(0),
));
Ok(KafkaResponse { event_byte_size })
let raw_byte_size =
request.body.len() + request.metadata.key.map_or(0, |x| x.len());
Ok(KafkaResponse {
event_byte_size,
raw_byte_size,
})
}
Err((kafka_err, _original_record)) => Err(kafka_err),
}
Expand Down
52 changes: 38 additions & 14 deletions src/sinks/kafka/sink.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use futures::future;
use std::num::NonZeroUsize;

use rdkafka::{
consumer::{BaseConsumer, Consumer},
error::KafkaError,
Expand All @@ -13,9 +14,7 @@ use vrl::path::OwnedTargetPath;
use super::config::{KafkaRole, KafkaSinkConfig};
use crate::{
kafka::KafkaStatisticsContext,
sinks::kafka::{
config::QUEUED_MIN_MESSAGES, request_builder::KafkaRequestBuilder, service::KafkaService,
},
sinks::kafka::{request_builder::KafkaRequestBuilder, service::KafkaService},
sinks::prelude::*,
};

Expand Down Expand Up @@ -65,22 +64,47 @@ impl KafkaSink {
}

async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
// rdkafka will internally retry forever, so we need some limit to prevent this from overflowing
let service = ConcurrencyLimit::new(self.service, QUEUED_MIN_MESSAGES as usize);
let mut request_builder = KafkaRequestBuilder {
// rdkafka will internally retry forever, so we need some limit to prevent this from overflowing.
// 64 should be plenty concurrency here, as a rdkafka send operation does not block until its underlying
// buffer is full.
let service = ConcurrencyLimit::new(self.service.clone(), 64);
let builder_limit = NonZeroUsize::new(64);

let request_builder = KafkaRequestBuilder {
key_field: self.key_field,
headers_key: self.headers_key,
topic_template: self.topic,
transformer: self.transformer,
encoder: self.encoder,
encoder: (self.transformer, self.encoder),
};

input
.filter_map(|event|
// request_builder is fallible but the places it can fail are emitting
// `Error` and `DroppedEvent` internal events appropriately so no need to here.
future::ready(request_builder.build_request(event)))
.filter_map(|event| {
// Compute the topic.
future::ready(
self.topic
.render_string(&event)
.map_err(|error| {
emit!(TemplateRenderingError {
field: None,
drop_event: true,
error,
});
})
.ok()
.map(|topic| (topic, event)),
)
})
.request_builder(builder_limit, request_builder)
.filter_map(|request| async {
match request {
Err(error) => {
emit!(SinkRequestBuildError { error });
None
}
Ok(req) => Some(req),
}
})
.into_driver(service)
.protocol("kafka")
.run()
.await
}
Expand Down

0 comments on commit 3c662f3

Please sign in to comment.