Skip to content

Commit

Permalink
Support empty projection in CSV and JSON readers (#2604)
Browse files Browse the repository at this point in the history
* Add support for CSV and JSON readers

* Fix projection option not set

* Fix doc
  • Loading branch information
Dandandan authored Aug 29, 2022
1 parent 81f1f81 commit 248fa30
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 15 deletions.
45 changes: 43 additions & 2 deletions arrow/src/csv/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use crate::array::{
};
use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;
use crate::record_batch::{RecordBatch, RecordBatchOptions};
use crate::util::reader_parser::Parser;

use csv_crate::{ByteRecord, StringRecord};
Expand Down Expand Up @@ -671,7 +671,16 @@ fn parse(
Some(metadata) => Schema::new_with_metadata(projected_fields, metadata),
});

arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr))
arrays.and_then(|arr| {
RecordBatch::try_new_with_options(
projected_schema,
arr,
&RecordBatchOptions {
match_field_names: true,
row_count: Some(rows.len()),
},
)
})
}
fn parse_item<T: Parser>(string: &str) -> Option<T::Native> {
T::parse(string)
Expand Down Expand Up @@ -1869,6 +1878,38 @@ mod tests {
assert!(csv.next().is_none());
}

#[test]
fn test_empty_projection() {
let schema = Schema::new(vec![Field::new("int", DataType::UInt32, false)]);
let data = vec![vec!["0"], vec!["1"]];

let data = data
.iter()
.map(|x| x.join(","))
.collect::<Vec<_>>()
.join("\n");
let data = data.as_bytes();

let reader = std::io::Cursor::new(data);

let mut csv = Reader::new(
reader,
Arc::new(schema),
false,
None,
2,
None,
Some(vec![]),
None,
);

let batch = csv.next().unwrap().unwrap();
assert_eq!(batch.columns().len(), 0);
assert_eq!(batch.num_rows(), 2);

assert!(csv.next().is_none());
}

#[test]
fn test_parsing_bool() {
// Encode the expected behavior of boolean parsing
Expand Down
58 changes: 45 additions & 13 deletions arrow/src/json/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use serde_json::{map::Map as JsonMap, Value};
use crate::buffer::MutableBuffer;
use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;
use crate::record_batch::{RecordBatch, RecordBatchOptions};
use crate::util::bit_util;
use crate::util::reader_parser::Parser;
use crate::{array::*, buffer::Buffer};
Expand Down Expand Up @@ -698,22 +698,34 @@ impl Decoder {
}

let rows = &rows[..];
let projection = self.options.projection.clone().unwrap_or_default();
let arrays = self.build_struct_array(rows, self.schema.fields(), &projection);

let projected_fields: Vec<Field> = if projection.is_empty() {
self.schema.fields().to_vec()
} else {
let arrays =
self.build_struct_array(rows, self.schema.fields(), &self.options.projection);

let projected_fields = if let Some(projection) = self.options.projection.as_ref()
{
projection
.iter()
.filter_map(|name| self.schema.column_with_name(name))
.map(|(_, field)| field.clone())
.collect()
} else {
self.schema.fields().to_vec()
};

let projected_schema = Arc::new(Schema::new(projected_fields));

arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr).map(Some))
arrays.and_then(|arr| {
RecordBatch::try_new_with_options(
projected_schema,
arr,
&RecordBatchOptions {
match_field_names: true,
row_count: Some(rows.len()),
},
)
.map(Some)
})
}

fn build_wrapped_list_array(
Expand Down Expand Up @@ -1138,7 +1150,7 @@ impl Decoder {
})
.collect();
let arrays =
self.build_struct_array(rows.as_slice(), fields.as_slice(), &[])?;
self.build_struct_array(rows.as_slice(), fields.as_slice(), &None)?;
let data_type = DataType::Struct(fields.clone());
let buf = null_buffer.into();
unsafe {
Expand Down Expand Up @@ -1171,18 +1183,23 @@ impl Decoder {
///
/// *Note*: The function is recursive, and will read nested structs.
///
/// If `projection` is not empty, then all values are returned. The first level of projection
/// If `projection` is &None, then all values are returned. The first level of projection
/// occurs at the `RecordBatch` level. No further projection currently occurs, but would be
/// useful if plucking values from a struct, e.g. getting `a.b.c.e` from `a.b.c.{d, e}`.
fn build_struct_array(
&self,
rows: &[Value],
struct_fields: &[Field],
projection: &[String],
projection: &Option<Vec<String>>,
) -> Result<Vec<ArrayRef>> {
let arrays: Result<Vec<ArrayRef>> = struct_fields
.iter()
.filter(|field| projection.is_empty() || projection.contains(field.name()))
.filter(|field| {
projection
.as_ref()
.map(|p| p.contains(field.name()))
.unwrap_or(true)
})
.map(|field| {
match field.data_type() {
DataType::Null => {
Expand Down Expand Up @@ -1345,7 +1362,7 @@ impl Decoder {
})
.collect::<Vec<Value>>();
let arrays =
self.build_struct_array(&struct_rows, fields, &[])?;
self.build_struct_array(&struct_rows, fields, &None)?;
// construct a struct array's data in order to set null buffer
let data_type = DataType::Struct(fields.clone());
let data = ArrayDataBuilder::new(data_type)
Expand Down Expand Up @@ -1442,7 +1459,7 @@ impl Decoder {
let struct_children = self.build_struct_array(
struct_rows.as_slice(),
&[key_field.clone(), value_field.clone()],
&[],
&None,
)?;

unsafe {
Expand Down Expand Up @@ -1806,6 +1823,21 @@ mod tests {
assert_eq!("text", dd.value(8));
}

#[test]
fn test_json_empty_projection() {
let builder = ReaderBuilder::new()
.infer_schema(None)
.with_batch_size(64)
.with_projection(vec![]);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/basic.json").unwrap())
.unwrap();
let batch = reader.next().unwrap().unwrap();

assert_eq!(0, batch.num_columns());
assert_eq!(12, batch.num_rows());
}

#[test]
fn test_json_basic_with_nulls() {
let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(64);
Expand Down

0 comments on commit 248fa30

Please sign in to comment.