Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/nullary
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Dec 18, 2024
2 parents e1c1294 + 01ffb64 commit 4f1dcee
Show file tree
Hide file tree
Showing 37 changed files with 1,256 additions and 290 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,10 @@ config_namespace! {
pub timestamp_format: Option<String>, default = None
pub timestamp_tz_format: Option<String>, default = None
pub time_format: Option<String>, default = None
// The output format for Nulls in the CSV writer.
pub null_value: Option<String>, default = None
// The input regex for Nulls when loading CSVs.
pub null_regex: Option<String>, default = None
pub comment: Option<u8>, default = None
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ object_store = { workspace = true }
parking_lot = { workspace = true }
parquet = { workspace = true, optional = true, default-features = true }
rand = { workspace = true }
regex = { workspace = true }
sqlparser = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }
Expand Down
10 changes: 10 additions & 0 deletions datafusion/core/benches/csv_load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ fn criterion_benchmark(c: &mut Criterion) {
)
})
});

group.bench_function("null regex override", |b| {
b.iter(|| {
load_csv(
ctx.clone(),
test_file.path().to_str().unwrap(),
CsvReadOptions::default().null_regex(Some("^NULL$|^$".to_string())),
)
})
});
}

criterion_group!(benches, criterion_benchmark);
Expand Down
24 changes: 24 additions & 0 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2380,6 +2380,30 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn aggregate_assert_no_empty_batches() -> Result<()> {
// build plan using DataFrame API
let df = test_table().await?;
let group_expr = vec![col("c1")];
let aggr_expr = vec![
min(col("c12")),
max(col("c12")),
avg(col("c12")),
sum(col("c12")),
count(col("c12")),
count_distinct(col("c12")),
median(col("c12")),
];

let df: Vec<RecordBatch> = df.aggregate(group_expr, aggr_expr)?.collect().await?;
// Empty batches should not be produced
for batch in df {
assert!(batch.num_rows() > 0);
}

Ok(())
}

#[tokio::test]
async fn test_aggregate_with_pk() -> Result<()> {
// create the dataframe
Expand Down
119 changes: 88 additions & 31 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use datafusion_physical_expr_common::sort_expr::LexRequirement;
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};
use regex::Regex;

#[derive(Default)]
/// Factory struct used to create [CsvFormatFactory]
Expand Down Expand Up @@ -218,6 +219,13 @@ impl CsvFormat {
self
}

/// Set the regex to use for null values in the CSV reader.
/// - default to treat empty values as null.
pub fn with_null_regex(mut self, null_regex: Option<String>) -> Self {
self.options.null_regex = null_regex;
self
}

/// Returns `Some(true)` if the first line is a header, `Some(false)` if
/// it is not, and `None` if it is not specified.
pub fn has_header(&self) -> Option<bool> {
Expand Down Expand Up @@ -502,6 +510,12 @@ impl CsvFormat {
.with_delimiter(self.options.delimiter)
.with_quote(self.options.quote);

if let Some(null_regex) = &self.options.null_regex {
let regex = Regex::new(null_regex.as_str())
.expect("Unable to parse CSV null regex.");
format = format.with_null_regex(regex);
}

if let Some(escape) = self.options.escape {
format = format.with_escape(escape);
}
Expand Down Expand Up @@ -813,8 +827,67 @@ mod tests {
let state = session_ctx.state();

let projection = None;
let exec =
get_exec(&state, "aggregate_test_100.csv", projection, None, true).await?;
let root = "./tests/data/csv";
let format = CsvFormat::default().with_has_header(true);
let exec = scan_format(
&state,
&format,
root,
"aggregate_test_100_with_nulls.csv",
projection,
None,
)
.await?;

let x: Vec<String> = exec
.schema()
.fields()
.iter()
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
.collect();
assert_eq!(
vec![
"c1: Utf8",
"c2: Int64",
"c3: Int64",
"c4: Int64",
"c5: Int64",
"c6: Int64",
"c7: Int64",
"c8: Int64",
"c9: Int64",
"c10: Utf8",
"c11: Float64",
"c12: Float64",
"c13: Utf8",
"c14: Null",
"c15: Utf8"
],
x
);

Ok(())
}

#[tokio::test]
async fn infer_schema_with_null_regex() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();

let projection = None;
let root = "./tests/data/csv";
let format = CsvFormat::default()
.with_has_header(true)
.with_null_regex(Some("^NULL$|^$".to_string()));
let exec = scan_format(
&state,
&format,
root,
"aggregate_test_100_with_nulls.csv",
projection,
None,
)
.await?;

let x: Vec<String> = exec
.schema()
Expand All @@ -836,7 +909,9 @@ mod tests {
"c10: Utf8",
"c11: Float64",
"c12: Float64",
"c13: Utf8"
"c13: Utf8",
"c14: Null",
"c15: Null"
],
x
);
Expand Down Expand Up @@ -1259,73 +1334,57 @@ mod tests {
Ok(())
}

/// Read a single empty csv file in parallel
/// Read a single empty csv file
///
/// empty_0_byte.csv:
/// (file is empty)
#[rstest(n_partitions, case(1), case(2), case(3), case(4))]
#[tokio::test]
async fn test_csv_parallel_empty_file(n_partitions: usize) -> Result<()> {
let config = SessionConfig::new()
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);
let ctx = SessionContext::new_with_config(config);
async fn test_csv_empty_file() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_csv(
"empty",
"tests/data/empty_0_byte.csv",
CsvReadOptions::new().has_header(false),
)
.await?;

// Require a predicate to enable repartition for the optimizer
let query = "select * from empty where random() > 0.5;";
let query_result = ctx.sql(query).await?.collect().await?;
let actual_partitions = count_query_csv_partitions(&ctx, query).await?;

#[rustfmt::skip]
let expected = ["++",
"++"];
assert_batches_eq!(expected, &query_result);
assert_eq!(1, actual_partitions); // Won't get partitioned if all files are empty

Ok(())
}

/// Read a single empty csv file with header in parallel
/// Read a single empty csv file with header
///
/// empty.csv:
/// c1,c2,c3
#[rstest(n_partitions, case(1), case(2), case(3))]
#[tokio::test]
async fn test_csv_parallel_empty_with_header(n_partitions: usize) -> Result<()> {
let config = SessionConfig::new()
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);
let ctx = SessionContext::new_with_config(config);
async fn test_csv_empty_with_header() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_csv(
"empty",
"tests/data/empty.csv",
CsvReadOptions::new().has_header(true),
)
.await?;

// Require a predicate to enable repartition for the optimizer
let query = "select * from empty where random() > 0.5;";
let query_result = ctx.sql(query).await?.collect().await?;
let actual_partitions = count_query_csv_partitions(&ctx, query).await?;

#[rustfmt::skip]
let expected = ["++",
"++"];
assert_batches_eq!(expected, &query_result);
assert_eq!(n_partitions, actual_partitions);

Ok(())
}

/// Read multiple empty csv files in parallel
/// Read multiple empty csv files
///
/// all_empty
/// ├── empty0.csv
Expand All @@ -1334,13 +1393,13 @@ mod tests {
///
/// empty0.csv/empty1.csv/empty2.csv:
/// (file is empty)
#[rstest(n_partitions, case(1), case(2), case(3), case(4))]
#[tokio::test]
async fn test_csv_parallel_multiple_empty_files(n_partitions: usize) -> Result<()> {
async fn test_csv_multiple_empty_files() -> Result<()> {
// Testing that partitioning doesn't break with empty files
let config = SessionConfig::new()
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);
.with_target_partitions(4);
let ctx = SessionContext::new_with_config(config);
let file_format = Arc::new(CsvFormat::default().with_has_header(false));
let listing_options = ListingOptions::new(file_format.clone())
Expand All @@ -1358,13 +1417,11 @@ mod tests {
// Require a predicate to enable repartition for the optimizer
let query = "select * from empty where random() > 0.5;";
let query_result = ctx.sql(query).await?.collect().await?;
let actual_partitions = count_query_csv_partitions(&ctx, query).await?;

#[rustfmt::skip]
let expected = ["++",
"++"];
assert_batches_eq!(expected, &query_result);
assert_eq!(1, actual_partitions); // Won't get partitioned if all files are empty

Ok(())
}
Expand Down
8 changes: 2 additions & 6 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,13 +619,11 @@ mod tests {
Ok(())
}

#[rstest(n_partitions, case(1), case(2), case(3), case(4))]
#[tokio::test]
async fn it_can_read_empty_ndjson_in_parallel(n_partitions: usize) -> Result<()> {
async fn it_can_read_empty_ndjson() -> Result<()> {
let config = SessionConfig::new()
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);
.with_repartition_file_min_size(0);

let ctx = SessionContext::new_with_config(config);

Expand All @@ -638,7 +636,6 @@ mod tests {
let query = "SELECT * FROM json_parallel_empty WHERE random() > 0.5;";

let result = ctx.sql(query).await?.collect().await?;
let actual_partitions = count_num_partitions(&ctx, query).await?;

#[rustfmt::skip]
let expected = [
Expand All @@ -647,7 +644,6 @@ mod tests {
];

assert_batches_eq!(expected, &result);
assert_eq!(1, actual_partitions);

Ok(())
}
Expand Down
12 changes: 11 additions & 1 deletion datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ pub struct CsvReadOptions<'a> {
pub file_compression_type: FileCompressionType,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<SortExpr>>,
/// Optional regex to match null values
pub null_regex: Option<String>,
}

impl Default for CsvReadOptions<'_> {
Expand All @@ -112,6 +114,7 @@ impl<'a> CsvReadOptions<'a> {
file_compression_type: FileCompressionType::UNCOMPRESSED,
file_sort_order: vec![],
comment: None,
null_regex: None,
}
}

Expand Down Expand Up @@ -212,6 +215,12 @@ impl<'a> CsvReadOptions<'a> {
self.file_sort_order = file_sort_order;
self
}

/// Configure the null parsing regex.
pub fn null_regex(mut self, null_regex: Option<String>) -> Self {
self.null_regex = null_regex;
self
}
}

/// Options that control the reading of Parquet files.
Expand Down Expand Up @@ -534,7 +543,8 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
.with_terminator(self.terminator)
.with_newlines_in_values(self.newlines_in_values)
.with_schema_infer_max_rec(self.schema_infer_max_records)
.with_file_compression_type(self.file_compression_type.to_owned());
.with_file_compression_type(self.file_compression_type.to_owned())
.with_null_regex(self.null_regex.clone());

ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
Expand Down
Loading

0 comments on commit 4f1dcee

Please sign in to comment.