diff --git a/lib/codecs/src/encoding/format/csv.rs b/lib/codecs/src/encoding/format/csv.rs index f3fe524abdc493..bfbcecec6d7545 100644 --- a/lib/codecs/src/encoding/format/csv.rs +++ b/lib/codecs/src/encoding/format/csv.rs @@ -1,6 +1,7 @@ use crate::encoding::BuildError; use bytes::BytesMut; use chrono::SecondsFormat; +use csv_core::{WriteResult, Writer, WriterBuilder}; use lookup::lookup_v2::ConfigTargetPath; use tokio_util::codec::Encoder; use vector_core::{ @@ -180,7 +181,7 @@ impl CsvSerializerOptions { pub struct CsvSerializer { // Box because of clippy error: 'large size difference between variants' // in SerializerConfig enum - writer: Box, + writer: Box, fields: Vec, internal_buffer: Vec, } @@ -190,7 +191,7 @@ impl CsvSerializer { pub fn new(config: CsvSerializerConfig) -> Self { // 'flexible' is not needed since every event is a single context free csv line let writer = Box::new( - csv_core::WriterBuilder::new() + WriterBuilder::new() .delimiter(config.csv.delimiter) .double_quote(config.csv.double_quote) .escape(config.csv.escape) @@ -213,89 +214,113 @@ impl CsvSerializer { } } +fn write_field( + writer: &mut Writer, + mut field_value: &[u8], + internal_buffer: &mut Vec, + buffer: &mut BytesMut, + mut used_buffer_bytes: usize, +) -> usize { + loop { + let (res, bytes_read, bytes_written) = + writer.field(field_value, &mut internal_buffer[used_buffer_bytes..]); + + field_value = &field_value[bytes_read..]; + used_buffer_bytes += bytes_written; + + match res { + WriteResult::InputEmpty => return used_buffer_bytes, + WriteResult::OutputFull => { + buffer.extend_from_slice(&internal_buffer[..used_buffer_bytes]); + used_buffer_bytes = 0; + } + } + } +} + +fn finish_field( + writer: &mut Writer, + finish_func: fn(&mut Writer, &mut [u8]) -> (WriteResult, usize), + internal_buffer: &mut Vec, + buffer: &mut BytesMut, + mut used_buffer_bytes: usize, +) -> usize { + loop { + let (res, bytes_written) = finish_func(writer, &mut internal_buffer[used_buffer_bytes..]); + used_buffer_bytes += bytes_written; + match res { + WriteResult::InputEmpty => return used_buffer_bytes, + WriteResult::OutputFull => { + buffer.extend_from_slice(&internal_buffer[..used_buffer_bytes]); + used_buffer_bytes = 0; + } + } + } +} + +fn write_delimiter(writer: &mut Writer, field: &mut [u8]) -> (WriteResult, usize) { + writer.delimiter(field) +} + +fn finish_event(writer: &mut Writer, field: &mut [u8]) -> (WriteResult, usize) { + writer.finish(field) +} + +fn field_to_csv_string(field_value: Option<&Value>) -> String { + match field_value { + Some(Value::Bytes(bytes)) => String::from_utf8_lossy(bytes).into_owned(), + Some(Value::Integer(int)) => int.to_string(), + Some(Value::Float(float)) => float.to_string(), + Some(Value::Boolean(bool)) => bool.to_string(), + Some(Value::Timestamp(timestamp)) => timestamp.to_rfc3339_opts(SecondsFormat::AutoSi, true), + Some(Value::Null) => String::new(), + // Other value types: Array, Regex, Object are not supported by the CSV format. + Some(_) => String::new(), + None => String::new(), + } +} + impl Encoder for CsvSerializer { type Error = vector_common::Error; - fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, event: Event, upstream_buffer: &mut BytesMut) -> Result<(), Self::Error> { let log = event.into_log(); let mut used_buffer_bytes = 0; for (fields_written, field) in self.fields.iter().enumerate() { let field_value = log.get(field); - // write field delimiter if fields_written > 0 { - loop { - let (res, bytes_written) = self - .writer - .delimiter(&mut self.internal_buffer[used_buffer_bytes..]); - used_buffer_bytes += bytes_written; - match res { - csv_core::WriteResult::InputEmpty => { - break; - } - csv_core::WriteResult::OutputFull => { - buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]); - used_buffer_bytes = 0; - } - } - } + used_buffer_bytes = finish_field( + &mut self.writer, + write_delimiter, + &mut self.internal_buffer, + upstream_buffer, + used_buffer_bytes, + ); } - // get string value of current field - let field_value = match field_value { - Some(Value::Bytes(bytes)) => String::from_utf8_lossy(bytes).into_owned(), - Some(Value::Integer(int)) => int.to_string(), - Some(Value::Float(float)) => float.to_string(), - Some(Value::Boolean(bool)) => bool.to_string(), - Some(Value::Timestamp(timestamp)) => { - timestamp.to_rfc3339_opts(SecondsFormat::AutoSi, true) - } - Some(Value::Null) => String::new(), - // Other value types: Array, Regex, Object are not supported by the CSV format. - Some(_) => String::new(), - None => String::new(), - }; - - // mutable byte_slice so it can be written in chunks if internal_buffer fills up - let mut field_value = field_value.as_bytes(); - // write field_value to internal buffer - loop { - let (res, bytes_read, bytes_written) = self - .writer - .field(field_value, &mut self.internal_buffer[used_buffer_bytes..]); - - field_value = &field_value[bytes_read..]; - used_buffer_bytes += bytes_written; - - match res { - csv_core::WriteResult::InputEmpty => break, - csv_core::WriteResult::OutputFull => { - buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]); - used_buffer_bytes = 0; - } - } - } + let field_data = field_to_csv_string(field_value); + used_buffer_bytes = write_field( + &mut self.writer, + &field_data.as_bytes(), + &mut self.internal_buffer, + upstream_buffer, + used_buffer_bytes, + ); } - // finish current event (potentially add closing quotes) - loop { - let (res, bytes_written) = self - .writer - .finish(&mut self.internal_buffer[used_buffer_bytes..]); - used_buffer_bytes += bytes_written; - match res { - csv_core::WriteResult::InputEmpty => break, - csv_core::WriteResult::OutputFull => { - buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]); - used_buffer_bytes = 0; - } - } - } + used_buffer_bytes = finish_field( + &mut self.writer, + finish_event, + &mut self.internal_buffer, + upstream_buffer, + used_buffer_bytes, + ); // final flush of internal_buffer if used_buffer_bytes > 0 { - buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]); + upstream_buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]); } Ok(()) @@ -558,7 +583,7 @@ mod tests { #[test] fn multiple_events() { - let (fields, event1) = make_event_with_fields(vec![("field1", "foo\"")]); + let (fields, event1) = make_event_with_fields(vec![("field1", "foo,")]); let (_, event2) = make_event_with_fields(vec![("field1", "\"bar")]); let opts = CsvSerializerOptions { fields, @@ -571,6 +596,6 @@ mod tests { serializer.encode(event1, &mut bytes).unwrap(); serializer.encode(event2, &mut bytes).unwrap(); - assert_eq!(bytes.freeze(), b"\"foo\"\"\"\"\"bar\"".as_slice()); + assert_eq!(bytes.freeze(), b"\"foo,\"\"\nbar\"".as_slice()); } }