From 37cc5de8a6427284d6dbc4042d031c678732a59b Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 19 Dec 2022 09:10:33 +0000 Subject: [PATCH 1/2] Add CSV build_buffered (#3338) --- arrow-csv/src/reader/mod.rs | 37 +++++++++++++++++++++---------------- arrow/benches/csv_reader.rs | 2 +- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/arrow-csv/src/reader/mod.rs b/arrow-csv/src/reader/mod.rs index 877876b77c9a..14f10fd55137 100644 --- a/arrow-csv/src/reader/mod.rs +++ b/arrow-csv/src/reader/mod.rs @@ -47,7 +47,7 @@ use regex::{Regex, RegexSet}; use std::collections::HashSet; use std::fmt; use std::fs::File; -use std::io::{BufReader, Read, Seek, SeekFrom}; +use std::io::{BufRead, BufReader as StdBufReader, Read, Seek, SeekFrom}; use std::sync::Arc; use arrow_array::builder::Decimal128Builder; @@ -325,14 +325,17 @@ pub fn infer_schema_from_files( // optional bounds of the reader, of the form (min line, max line). type Bounds = Option<(usize, usize)>; +/// CSV file reader using [`std::io::BufReader`] +pub type Reader = BufReader>; + /// CSV file reader -pub struct Reader { +pub struct BufReader { /// Explicit schema for the CSV file schema: SchemaRef, /// Optional projection for which columns to load (zero-based column indices) projection: Option>, /// File reader - reader: RecordReader>, + reader: RecordReader, /// Rows to skip to_skip: usize, /// Current line number @@ -347,9 +350,9 @@ pub struct Reader { datetime_format: Option, } -impl fmt::Debug for Reader +impl fmt::Debug for BufReader where - R: Read, + R: BufRead, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Reader") @@ -394,7 +397,7 @@ impl Reader { if let Some(format) = datetime_format { builder = builder.with_datetime_format(format) } - builder.build_with_schema(reader, schema) + builder.build_with_schema(StdBufReader::new(reader), schema) } /// Returns the schema of the reader, useful for getting the schema without reading @@ -441,7 +444,7 @@ impl Reader { } } -impl Iterator for Reader { +impl Iterator for BufReader { type Item = Result; fn next(&mut self) -> Option { @@ -1041,10 +1044,15 @@ impl ReaderBuilder { } /// Create a new `Reader` from the `ReaderBuilder` - pub fn build( + pub fn build(self, reader: R) -> Result, ArrowError> { + self.build_buffered(StdBufReader::new(reader)) + } + + /// Create a new `BufReader` + pub fn build_buffered( mut self, mut reader: R, - ) -> Result, ArrowError> { + ) -> Result, ArrowError> { // check if schema should be inferred let delimiter = self.delimiter.unwrap_or(b','); let schema = match self.schema.take() { @@ -1068,7 +1076,7 @@ impl ReaderBuilder { Ok(self.build_with_schema(reader, schema)) } - fn build_with_schema(self, reader: R, schema: SchemaRef) -> Reader { + fn build_with_schema(self, reader: R, schema: SchemaRef) -> BufReader { let mut reader_builder = csv_core::ReaderBuilder::new(); reader_builder.escape(self.escape); @@ -1081,11 +1089,8 @@ impl ReaderBuilder { if let Some(t) = self.terminator { reader_builder.terminator(csv_core::Terminator::Any(t)); } - let reader = RecordReader::new( - BufReader::new(reader), - reader_builder.build(), - schema.fields().len(), - ); + let delimiter = reader_builder.build(); + let reader = RecordReader::new(reader, delimiter, schema.fields().len()); let header = self.has_header as usize; @@ -1094,7 +1099,7 @@ impl ReaderBuilder { None => (header, usize::MAX), }; - Reader { + BufReader { schema, projection: self.projection, reader, diff --git a/arrow/benches/csv_reader.rs b/arrow/benches/csv_reader.rs index f6353fb851f5..02c8ca2d2993 100644 --- a/arrow/benches/csv_reader.rs +++ b/arrow/benches/csv_reader.rs @@ -44,7 +44,7 @@ fn do_bench(c: &mut Criterion, name: &str, cols: Vec) { .with_schema(batch.schema()) .with_batch_size(batch_size) .has_header(true) - .build(cursor) + .build_buffered(cursor) .unwrap(); for next in reader { From 1a67e5db6a5029cd6657d24917bbb12480b0df1f Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 19 Dec 2022 20:15:09 +0000 Subject: [PATCH 2/2] Doc tweaks --- arrow-csv/src/reader/mod.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/arrow-csv/src/reader/mod.rs b/arrow-csv/src/reader/mod.rs index 14f10fd55137..bc6b016ec9cf 100644 --- a/arrow-csv/src/reader/mod.rs +++ b/arrow-csv/src/reader/mod.rs @@ -1043,12 +1043,15 @@ impl ReaderBuilder { self } - /// Create a new `Reader` from the `ReaderBuilder` + /// Create a new `Reader` from a non-buffered reader + /// + /// If `R: BufRead` consider using [`Self::build_buffered`] to avoid unnecessary additional + /// buffering, as internally this method wraps `reader` in [`std::io::BufReader`] pub fn build(self, reader: R) -> Result, ArrowError> { self.build_buffered(StdBufReader::new(reader)) } - /// Create a new `BufReader` + /// Create a new `BufReader` from a buffered reader pub fn build_buffered( mut self, mut reader: R,