diff --git a/lib/codecs/src/encoding/format/csv.rs b/lib/codecs/src/encoding/format/csv.rs index bfbcecec6d754..f3fe524abdc49 100644 --- a/lib/codecs/src/encoding/format/csv.rs +++ b/lib/codecs/src/encoding/format/csv.rs @@ -1,7 +1,6 @@ use crate::encoding::BuildError; use bytes::BytesMut; use chrono::SecondsFormat; -use csv_core::{WriteResult, Writer, WriterBuilder}; use lookup::lookup_v2::ConfigTargetPath; use tokio_util::codec::Encoder; use vector_core::{ @@ -181,7 +180,7 @@ impl CsvSerializerOptions { pub struct CsvSerializer { // Box because of clippy error: 'large size difference between variants' // in SerializerConfig enum - writer: Box, + writer: Box, fields: Vec, internal_buffer: Vec, } @@ -191,7 +190,7 @@ impl CsvSerializer { pub fn new(config: CsvSerializerConfig) -> Self { // 'flexible' is not needed since every event is a single context free csv line let writer = Box::new( - WriterBuilder::new() + csv_core::WriterBuilder::new() .delimiter(config.csv.delimiter) .double_quote(config.csv.double_quote) .escape(config.csv.escape) @@ -214,113 +213,89 @@ impl CsvSerializer { } } -fn write_field( - writer: &mut Writer, - mut field_value: &[u8], - internal_buffer: &mut Vec, - buffer: &mut BytesMut, - mut used_buffer_bytes: usize, -) -> usize { - loop { - let (res, bytes_read, bytes_written) = - writer.field(field_value, &mut internal_buffer[used_buffer_bytes..]); - - field_value = &field_value[bytes_read..]; - used_buffer_bytes += bytes_written; - - match res { - WriteResult::InputEmpty => return used_buffer_bytes, - WriteResult::OutputFull => { - buffer.extend_from_slice(&internal_buffer[..used_buffer_bytes]); - used_buffer_bytes = 0; - } - } - } -} - -fn finish_field( - writer: &mut Writer, - finish_func: fn(&mut Writer, &mut [u8]) -> (WriteResult, usize), - internal_buffer: &mut Vec, - buffer: &mut BytesMut, - mut used_buffer_bytes: usize, -) -> usize { - loop { - let (res, bytes_written) = finish_func(writer, &mut internal_buffer[used_buffer_bytes..]); - used_buffer_bytes += bytes_written; - match res { - WriteResult::InputEmpty => return used_buffer_bytes, - WriteResult::OutputFull => { - buffer.extend_from_slice(&internal_buffer[..used_buffer_bytes]); - used_buffer_bytes = 0; - } - } - } -} - -fn write_delimiter(writer: &mut Writer, field: &mut [u8]) -> (WriteResult, usize) { - writer.delimiter(field) -} - -fn finish_event(writer: &mut Writer, field: &mut [u8]) -> (WriteResult, usize) { - writer.finish(field) -} - -fn field_to_csv_string(field_value: Option<&Value>) -> String { - match field_value { - Some(Value::Bytes(bytes)) => String::from_utf8_lossy(bytes).into_owned(), - Some(Value::Integer(int)) => int.to_string(), - Some(Value::Float(float)) => float.to_string(), - Some(Value::Boolean(bool)) => bool.to_string(), - Some(Value::Timestamp(timestamp)) => timestamp.to_rfc3339_opts(SecondsFormat::AutoSi, true), - Some(Value::Null) => String::new(), - // Other value types: Array, Regex, Object are not supported by the CSV format. - Some(_) => String::new(), - None => String::new(), - } -} - impl Encoder for CsvSerializer { type Error = vector_common::Error; - fn encode(&mut self, event: Event, upstream_buffer: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> { let log = event.into_log(); let mut used_buffer_bytes = 0; for (fields_written, field) in self.fields.iter().enumerate() { let field_value = log.get(field); + // write field delimiter if fields_written > 0 { - used_buffer_bytes = finish_field( - &mut self.writer, - write_delimiter, - &mut self.internal_buffer, - upstream_buffer, - used_buffer_bytes, - ); + loop { + let (res, bytes_written) = self + .writer + .delimiter(&mut self.internal_buffer[used_buffer_bytes..]); + used_buffer_bytes += bytes_written; + match res { + csv_core::WriteResult::InputEmpty => { + break; + } + csv_core::WriteResult::OutputFull => { + buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]); + used_buffer_bytes = 0; + } + } + } } - let field_data = field_to_csv_string(field_value); - used_buffer_bytes = write_field( - &mut self.writer, - &field_data.as_bytes(), - &mut self.internal_buffer, - upstream_buffer, - used_buffer_bytes, - ); + // get string value of current field + let field_value = match field_value { + Some(Value::Bytes(bytes)) => String::from_utf8_lossy(bytes).into_owned(), + Some(Value::Integer(int)) => int.to_string(), + Some(Value::Float(float)) => float.to_string(), + Some(Value::Boolean(bool)) => bool.to_string(), + Some(Value::Timestamp(timestamp)) => { + timestamp.to_rfc3339_opts(SecondsFormat::AutoSi, true) + } + Some(Value::Null) => String::new(), + // Other value types: Array, Regex, Object are not supported by the CSV format. + Some(_) => String::new(), + None => String::new(), + }; + + // mutable byte_slice so it can be written in chunks if internal_buffer fills up + let mut field_value = field_value.as_bytes(); + // write field_value to internal buffer + loop { + let (res, bytes_read, bytes_written) = self + .writer + .field(field_value, &mut self.internal_buffer[used_buffer_bytes..]); + + field_value = &field_value[bytes_read..]; + used_buffer_bytes += bytes_written; + + match res { + csv_core::WriteResult::InputEmpty => break, + csv_core::WriteResult::OutputFull => { + buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]); + used_buffer_bytes = 0; + } + } + } } - used_buffer_bytes = finish_field( - &mut self.writer, - finish_event, - &mut self.internal_buffer, - upstream_buffer, - used_buffer_bytes, - ); + // finish current event (potentially add closing quotes) + loop { + let (res, bytes_written) = self + .writer + .finish(&mut self.internal_buffer[used_buffer_bytes..]); + used_buffer_bytes += bytes_written; + match res { + csv_core::WriteResult::InputEmpty => break, + csv_core::WriteResult::OutputFull => { + buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]); + used_buffer_bytes = 0; + } + } + } // final flush of internal_buffer if used_buffer_bytes > 0 { - upstream_buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]); + buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]); } Ok(()) @@ -583,7 +558,7 @@ mod tests { #[test] fn multiple_events() { - let (fields, event1) = make_event_with_fields(vec![("field1", "foo,")]); + let (fields, event1) = make_event_with_fields(vec![("field1", "foo\"")]); let (_, event2) = make_event_with_fields(vec![("field1", "\"bar")]); let opts = CsvSerializerOptions { fields, @@ -596,6 +571,6 @@ mod tests { serializer.encode(event1, &mut bytes).unwrap(); serializer.encode(event2, &mut bytes).unwrap(); - assert_eq!(bytes.freeze(), b"\"foo,\"\"\nbar\"".as_slice()); + assert_eq!(bytes.freeze(), b"\"foo\"\"\"\"\"bar\"".as_slice()); } }