From 8c5dea74215780c9079c0e3eaf4530e5b550cb4c Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 30 Oct 2024 14:40:27 +0100 Subject: [PATCH] Report file location and offset when CSV schema mismatch --- .../core/src/datasource/file_format/csv.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 3cb5ae4f85ca..e0daab330ef1 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -56,6 +56,7 @@ use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::stream::BoxStream; use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore}; +use object_store::path::Path; #[derive(Default)] /// Factory struct used to create [CsvFormatFactory] @@ -324,7 +325,7 @@ impl FileFormat for CsvFormat { for object in objects { let stream = self.read_to_delimited_chunks(store, object).await; let (schema, records_read) = self - .infer_schema_from_stream(state, records_to_read, stream) + .infer_schema_from_stream(state, records_to_read, stream, &object.location) .await?; records_to_read -= records_read; schemas.push(schema); @@ -429,15 +430,18 @@ impl CsvFormat { state: &SessionState, mut records_to_read: usize, stream: impl Stream>, + location: &Path, ) -> Result<(Schema, usize)> { let mut total_records_read = 0; let mut column_names = vec![]; let mut column_type_possibilities = vec![]; - let mut first_chunk = true; + let mut record_number = -1; pin_mut!(stream); while let Some(chunk) = stream.next().await.transpose()? { + record_number += 1; + let first_chunk = record_number == 0; let mut format = arrow::csv::reader::Format::default() .with_header( first_chunk @@ -471,14 +475,15 @@ impl CsvFormat { (field.name().clone(), possibilities) }) .unzip(); - first_chunk = false; } else { if fields.len() != column_type_possibilities.len() { return exec_err!( - "Encountered unequal lengths between records on CSV file whilst inferring schema. \ - Expected {} records, found {} records", + "Encountered unequal lengths between records on CSV file {} whilst inferring schema. \ + Expected {} fields, found {} fields at record {}", + location, column_type_possibilities.len(), - fields.len() + fields.len(), + record_number + 1 ); } @@ -920,7 +925,7 @@ mod tests { .read_to_delimited_chunks_from_stream(compressed_stream.unwrap()) .await; let (schema, records_read) = compressed_csv - .infer_schema_from_stream(&session_state, records_to_read, decoded_stream) + .infer_schema_from_stream(&session_state, records_to_read, decoded_stream, &path) .await?; assert_eq!(expected, schema);