Skip to content

Commit

Permalink
Report file location and offset when CSV schema mismatch
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Oct 30, 2024
1 parent 223bb02 commit 8c5dea7
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use datafusion_physical_expr_common::sort_expr::LexRequirement;
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};
use object_store::path::Path;

#[derive(Default)]
/// Factory struct used to create [CsvFormatFactory]
Expand Down Expand Up @@ -324,7 +325,7 @@ impl FileFormat for CsvFormat {
for object in objects {
let stream = self.read_to_delimited_chunks(store, object).await;
let (schema, records_read) = self
.infer_schema_from_stream(state, records_to_read, stream)
.infer_schema_from_stream(state, records_to_read, stream, &object.location)
.await?;
records_to_read -= records_read;
schemas.push(schema);
Expand Down Expand Up @@ -429,15 +430,18 @@ impl CsvFormat {
state: &SessionState,
mut records_to_read: usize,
stream: impl Stream<Item = Result<Bytes>>,
location: &Path,
) -> Result<(Schema, usize)> {
let mut total_records_read = 0;
let mut column_names = vec![];
let mut column_type_possibilities = vec![];
let mut first_chunk = true;
let mut record_number = -1;

pin_mut!(stream);

while let Some(chunk) = stream.next().await.transpose()? {
record_number += 1;
let first_chunk = record_number == 0;
let mut format = arrow::csv::reader::Format::default()
.with_header(
first_chunk
Expand Down Expand Up @@ -471,14 +475,15 @@ impl CsvFormat {
(field.name().clone(), possibilities)
})
.unzip();
first_chunk = false;
} else {
if fields.len() != column_type_possibilities.len() {
return exec_err!(
"Encountered unequal lengths between records on CSV file whilst inferring schema. \
Expected {} records, found {} records",
"Encountered unequal lengths between records on CSV file {} whilst inferring schema. \
Expected {} fields, found {} fields at record {}",
location,
column_type_possibilities.len(),
fields.len()
fields.len(),
record_number + 1
);
}

Expand Down Expand Up @@ -920,7 +925,7 @@ mod tests {
.read_to_delimited_chunks_from_stream(compressed_stream.unwrap())
.await;
let (schema, records_read) = compressed_csv
.infer_schema_from_stream(&session_state, records_to_read, decoded_stream)
.infer_schema_from_stream(&session_state, records_to_read, decoded_stream, &path)
.await?;

assert_eq!(expected, schema);
Expand Down

0 comments on commit 8c5dea7

Please sign in to comment.