Skip to content

Commit

Permalink
low level csv writer api refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
scMarkus committed Aug 23, 2023
1 parent 3512a7a commit c730d58
Showing 1 changed file with 95 additions and 70 deletions.
165 changes: 95 additions & 70 deletions lib/codecs/src/encoding/format/csv.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::encoding::BuildError;
use bytes::BytesMut;
use chrono::SecondsFormat;
use csv_core::{WriteResult, Writer, WriterBuilder};
use lookup::lookup_v2::ConfigTargetPath;
use tokio_util::codec::Encoder;
use vector_core::{
Expand Down Expand Up @@ -180,7 +181,7 @@ impl CsvSerializerOptions {
pub struct CsvSerializer {
// Box because of clippy error: 'large size difference between variants'
// in SerializerConfig enum
writer: Box<csv_core::Writer>,
writer: Box<Writer>,
fields: Vec<ConfigTargetPath>,
internal_buffer: Vec<u8>,
}
Expand All @@ -190,7 +191,7 @@ impl CsvSerializer {
pub fn new(config: CsvSerializerConfig) -> Self {
// 'flexible' is not needed since every event is a single context free csv line
let writer = Box::new(
csv_core::WriterBuilder::new()
WriterBuilder::new()
.delimiter(config.csv.delimiter)
.double_quote(config.csv.double_quote)
.escape(config.csv.escape)
Expand All @@ -213,89 +214,113 @@ impl CsvSerializer {
}
}

fn write_field(
writer: &mut Writer,
mut field_value: &[u8],
internal_buffer: &mut Vec<u8>,
buffer: &mut BytesMut,
mut used_buffer_bytes: usize,
) -> usize {
loop {
let (res, bytes_read, bytes_written) =
writer.field(field_value, &mut internal_buffer[used_buffer_bytes..]);

field_value = &field_value[bytes_read..];
used_buffer_bytes += bytes_written;

match res {
WriteResult::InputEmpty => return used_buffer_bytes,
WriteResult::OutputFull => {
buffer.extend_from_slice(&internal_buffer[..used_buffer_bytes]);
used_buffer_bytes = 0;
}
}
}
}

fn finish_field(
writer: &mut Writer,
finish_func: fn(&mut Writer, &mut [u8]) -> (WriteResult, usize),
internal_buffer: &mut Vec<u8>,
buffer: &mut BytesMut,
mut used_buffer_bytes: usize,
) -> usize {
loop {
let (res, bytes_written) = finish_func(writer, &mut internal_buffer[used_buffer_bytes..]);
used_buffer_bytes += bytes_written;
match res {
WriteResult::InputEmpty => return used_buffer_bytes,
WriteResult::OutputFull => {
buffer.extend_from_slice(&internal_buffer[..used_buffer_bytes]);
used_buffer_bytes = 0;
}
}
}
}

fn write_delimiter(writer: &mut Writer, field: &mut [u8]) -> (WriteResult, usize) {
writer.delimiter(field)
}

fn finish_event(writer: &mut Writer, field: &mut [u8]) -> (WriteResult, usize) {
writer.finish(field)
}

fn field_to_csv_string(field_value: Option<&Value>) -> String {
match field_value {
Some(Value::Bytes(bytes)) => String::from_utf8_lossy(bytes).into_owned(),
Some(Value::Integer(int)) => int.to_string(),
Some(Value::Float(float)) => float.to_string(),
Some(Value::Boolean(bool)) => bool.to_string(),
Some(Value::Timestamp(timestamp)) => timestamp.to_rfc3339_opts(SecondsFormat::AutoSi, true),
Some(Value::Null) => String::new(),
// Other value types: Array, Regex, Object are not supported by the CSV format.
Some(_) => String::new(),
None => String::new(),
}
}

impl Encoder<Event> for CsvSerializer {
type Error = vector_common::Error;

fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(&mut self, event: Event, upstream_buffer: &mut BytesMut) -> Result<(), Self::Error> {
let log = event.into_log();

let mut used_buffer_bytes = 0;
for (fields_written, field) in self.fields.iter().enumerate() {
let field_value = log.get(field);

// write field delimiter
if fields_written > 0 {
loop {
let (res, bytes_written) = self
.writer
.delimiter(&mut self.internal_buffer[used_buffer_bytes..]);
used_buffer_bytes += bytes_written;
match res {
csv_core::WriteResult::InputEmpty => {
break;
}
csv_core::WriteResult::OutputFull => {
buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
used_buffer_bytes = 0;
}
}
}
used_buffer_bytes = finish_field(
&mut self.writer,
write_delimiter,
&mut self.internal_buffer,
upstream_buffer,
used_buffer_bytes,
);
}

// get string value of current field
let field_value = match field_value {
Some(Value::Bytes(bytes)) => String::from_utf8_lossy(bytes).into_owned(),
Some(Value::Integer(int)) => int.to_string(),
Some(Value::Float(float)) => float.to_string(),
Some(Value::Boolean(bool)) => bool.to_string(),
Some(Value::Timestamp(timestamp)) => {
timestamp.to_rfc3339_opts(SecondsFormat::AutoSi, true)
}
Some(Value::Null) => String::new(),
// Other value types: Array, Regex, Object are not supported by the CSV format.
Some(_) => String::new(),
None => String::new(),
};

// mutable byte_slice so it can be written in chunks if internal_buffer fills up
let mut field_value = field_value.as_bytes();
// write field_value to internal buffer
loop {
let (res, bytes_read, bytes_written) = self
.writer
.field(field_value, &mut self.internal_buffer[used_buffer_bytes..]);

field_value = &field_value[bytes_read..];
used_buffer_bytes += bytes_written;

match res {
csv_core::WriteResult::InputEmpty => break,
csv_core::WriteResult::OutputFull => {
buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
used_buffer_bytes = 0;
}
}
}
let field_data = field_to_csv_string(field_value);
used_buffer_bytes = write_field(
&mut self.writer,
&field_data.as_bytes(),
&mut self.internal_buffer,
upstream_buffer,
used_buffer_bytes,
);
}

// finish current event (potentially add closing quotes)
loop {
let (res, bytes_written) = self
.writer
.finish(&mut self.internal_buffer[used_buffer_bytes..]);
used_buffer_bytes += bytes_written;
match res {
csv_core::WriteResult::InputEmpty => break,
csv_core::WriteResult::OutputFull => {
buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
used_buffer_bytes = 0;
}
}
}
used_buffer_bytes = finish_field(
&mut self.writer,
finish_event,
&mut self.internal_buffer,
upstream_buffer,
used_buffer_bytes,
);

// final flush of internal_buffer
if used_buffer_bytes > 0 {
buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
upstream_buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
}

Ok(())
Expand Down Expand Up @@ -558,7 +583,7 @@ mod tests {

#[test]
fn multiple_events() {
let (fields, event1) = make_event_with_fields(vec![("field1", "foo\"")]);
let (fields, event1) = make_event_with_fields(vec![("field1", "foo,")]);
let (_, event2) = make_event_with_fields(vec![("field1", "\"bar")]);
let opts = CsvSerializerOptions {
fields,
Expand All @@ -571,6 +596,6 @@ mod tests {
serializer.encode(event1, &mut bytes).unwrap();
serializer.encode(event2, &mut bytes).unwrap();

assert_eq!(bytes.freeze(), b"\"foo\"\"\"\"\"bar\"".as_slice());
assert_eq!(bytes.freeze(), b"\"foo,\"\"\nbar\"".as_slice());
}
}

0 comments on commit c730d58

Please sign in to comment.