Skip to content

Commit

Permalink
Make CREATE EXTERNAL TABLE format options consistent, remove specia…
Browse files Browse the repository at this point in the history
…l syntax for `HEADER ROW`, `DELIMITER` and `COMPRESSION` (#10404)

* Simplify format options

* Keep PG copy from tests same

* Update datafusion/common/src/config.rs

Co-authored-by: Andrew Lamb <[email protected]>

* Update datafusion/core/src/datasource/file_format/csv.rs

Co-authored-by: Andrew Lamb <[email protected]>

* Remove WITH HEADER ROW

* Review Part 1

* .

* Fix failing tests

* Revert "Fix failing tests"

This reverts commit 9d81601.

* Final commit

* Minor

* Review

* Update avro.slt

* Apply suggestions

* Fix imports

---------

Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
  • Loading branch information
3 people authored May 13, 2024
1 parent 8cc92a9 commit 58cc4e1
Show file tree
Hide file tree
Showing 72 changed files with 563 additions and 734 deletions.
86 changes: 49 additions & 37 deletions datafusion-cli/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,20 @@
use std::borrow::Cow;

use crate::highlighter::{NoSyntaxHighlighter, SyntaxHighlighter};

use datafusion::common::sql_datafusion_err;
use datafusion::error::DataFusionError;
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;
use datafusion::sql::sqlparser::parser::ParserError;
use rustyline::completion::Completer;
use rustyline::completion::FilenameCompleter;
use rustyline::completion::Pair;

use rustyline::completion::{Completer, FilenameCompleter, Pair};
use rustyline::error::ReadlineError;
use rustyline::highlight::Highlighter;
use rustyline::hint::Hinter;
use rustyline::validate::ValidationContext;
use rustyline::validate::ValidationResult;
use rustyline::validate::Validator;
use rustyline::Context;
use rustyline::Helper;
use rustyline::Result;

use crate::highlighter::{NoSyntaxHighlighter, SyntaxHighlighter};
use rustyline::validate::{ValidationContext, ValidationResult, Validator};
use rustyline::{Context, Helper, Result};

pub struct CliHelper {
completer: FilenameCompleter,
Expand Down Expand Up @@ -259,52 +254,69 @@ mod tests {

// shoule be valid
let result = readline_direct(
Cursor::new(r"create external table test stored as csv location 'data.csv' delimiter ',';".as_bytes()),
&validator,
)?;
Cursor::new(
r"create external table test stored as csv location 'data.csv' options ('format.delimiter' ',');"
.as_bytes(),
),
&validator,
)?;
assert!(matches!(result, ValidationResult::Valid(None)));

let result = readline_direct(
Cursor::new(r"create external table test stored as csv location 'data.csv' delimiter '\0';".as_bytes()),
&validator,
)?;
Cursor::new(
r"create external table test stored as csv location 'data.csv' options ('format.delimiter' '\0');"
.as_bytes()),
&validator,
)?;
assert!(matches!(result, ValidationResult::Valid(None)));

let result = readline_direct(
Cursor::new(r"create external table test stored as csv location 'data.csv' delimiter '\n';".as_bytes()),
&validator,
)?;
Cursor::new(
r"create external table test stored as csv location 'data.csv' options ('format.delimiter' '\n');"
.as_bytes()),
&validator,
)?;
assert!(matches!(result, ValidationResult::Valid(None)));

let result = readline_direct(
Cursor::new(r"create external table test stored as csv location 'data.csv' delimiter '\r';".as_bytes()),
&validator,
)?;
Cursor::new(
r"create external table test stored as csv location 'data.csv' options ('format.delimiter' '\r');"
.as_bytes()),
&validator,
)?;
assert!(matches!(result, ValidationResult::Valid(None)));

let result = readline_direct(
Cursor::new(r"create external table test stored as csv location 'data.csv' delimiter '\t';".as_bytes()),
&validator,
)?;
Cursor::new(
r"create external table test stored as csv location 'data.csv' options ('format.delimiter' '\t');"
.as_bytes()),
&validator,
)?;
assert!(matches!(result, ValidationResult::Valid(None)));

let result = readline_direct(
Cursor::new(r"create external table test stored as csv location 'data.csv' delimiter '\\';".as_bytes()),
&validator,
)?;
Cursor::new(
r"create external table test stored as csv location 'data.csv' options ('format.delimiter' '\\');"
.as_bytes()),
&validator,
)?;
assert!(matches!(result, ValidationResult::Valid(None)));

// should be invalid
let result = readline_direct(
Cursor::new(r"create external table test stored as csv location 'data.csv' delimiter ',,';".as_bytes()),
&validator,
)?;
assert!(matches!(result, ValidationResult::Invalid(Some(_))));
Cursor::new(
r"create external table test stored as csv location 'data.csv' options ('format.delimiter' ',,');"
.as_bytes()),
&validator,
)?;
assert!(matches!(result, ValidationResult::Valid(None)));

// should be invalid
let result = readline_direct(
Cursor::new(r"create external table test stored as csv location 'data.csv' delimiter '\u{07}';".as_bytes()),
&validator,
)?;
Cursor::new(
r"create external table test stored as csv location 'data.csv' options ('format.delimiter' '\u{07}');"
.as_bytes()),
&validator,
)?;
assert!(matches!(result, ValidationResult::Invalid(Some(_))));

Ok(())
Expand Down
25 changes: 15 additions & 10 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1564,18 +1564,21 @@ config_namespace_with_hashmap! {
config_namespace! {
/// Options controlling CSV format
pub struct CsvOptions {
pub has_header: bool, default = true
/// Specifies whether there is a CSV header (i.e. the first line
/// consists of is column names). The value `None` indicates that
/// the configuration should be consulted.
pub has_header: Option<bool>, default = None
pub delimiter: u8, default = b','
pub quote: u8, default = b'"'
pub escape: Option<u8>, default = None
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
pub schema_infer_max_rec: usize, default = 100
pub date_format: Option<String>, default = None
pub datetime_format: Option<String>, default = None
pub timestamp_format: Option<String>, default = None
pub timestamp_tz_format: Option<String>, default = None
pub time_format: Option<String>, default = None
pub null_value: Option<String>, default = None
pub date_format: Option<String>, default = None
pub datetime_format: Option<String>, default = None
pub timestamp_format: Option<String>, default = None
pub timestamp_tz_format: Option<String>, default = None
pub time_format: Option<String>, default = None
pub null_value: Option<String>, default = None
}
}

Expand All @@ -1600,12 +1603,14 @@ impl CsvOptions {
/// Set true to indicate that the first line is a header.
/// - default to true
pub fn with_has_header(mut self, has_header: bool) -> Self {
self.has_header = has_header;
self.has_header = Some(has_header);
self
}

/// True if the first line is a header.
pub fn has_header(&self) -> bool {
/// Returns true if the first line is a header. If format options does not
/// specify whether there is a header, returns `None` (indicating that the
/// configuration should be consulted).
pub fn has_header(&self) -> Option<bool> {
self.has_header
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/file_options/csv_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl TryFrom<&CsvOptions> for CsvWriterOptions {

fn try_from(value: &CsvOptions) -> Result<Self> {
let mut builder = WriterBuilder::default()
.with_header(value.has_header)
.with_header(value.has_header.unwrap_or(false))
.with_delimiter(value.delimiter);

if let Some(v) = &value.date_format {
Expand Down
7 changes: 0 additions & 7 deletions datafusion/core/src/catalog/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use crate::datasource::provider::TableProviderFactory;
use crate::datasource::TableProvider;
use crate::execution::context::SessionState;

use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{Constraints, DFSchema, DataFusionError, TableReference};
use datafusion_expr::CreateExternalTable;

Expand Down Expand Up @@ -58,7 +57,6 @@ pub struct ListingSchemaProvider {
store: Arc<dyn ObjectStore>,
tables: Arc<Mutex<HashMap<String, Arc<dyn TableProvider>>>>,
format: String,
has_header: bool,
}

impl ListingSchemaProvider {
Expand All @@ -77,7 +75,6 @@ impl ListingSchemaProvider {
factory: Arc<dyn TableProviderFactory>,
store: Arc<dyn ObjectStore>,
format: String,
has_header: bool,
) -> Self {
Self {
authority,
Expand All @@ -86,7 +83,6 @@ impl ListingSchemaProvider {
store,
tables: Arc::new(Mutex::new(HashMap::new())),
format,
has_header,
}
}

Expand Down Expand Up @@ -139,12 +135,9 @@ impl ListingSchemaProvider {
name,
location: table_url,
file_type: self.format.clone(),
has_header: self.has_header,
delimiter: ',',
table_partition_cols: vec![],
if_not_exists: false,
definition: None,
file_compression_type: CompressionTypeVariant::UNCOMPRESSED,
order_exprs: vec![],
unbounded: false,
options: Default::default(),
Expand Down
55 changes: 39 additions & 16 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ use crate::datasource::physical_plan::{
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::insert::{DataSink, DataSinkExec};
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, Statistics,
};

use arrow::array::RecordBatch;
use arrow::csv::WriterBuilder;
Expand Down Expand Up @@ -136,12 +137,13 @@ impl CsvFormat {
/// Set true to indicate that the first line is a header.
/// - default to true
pub fn with_has_header(mut self, has_header: bool) -> Self {
self.options.has_header = has_header;
self.options.has_header = Some(has_header);
self
}

/// True if the first line is a header.
pub fn has_header(&self) -> bool {
/// Returns `Some(true)` if the first line is a header, `Some(false)` if
/// it is not, and `None` if it is not specified.
pub fn has_header(&self) -> Option<bool> {
self.options.has_header
}

Expand Down Expand Up @@ -200,7 +202,7 @@ impl FileFormat for CsvFormat {

async fn infer_schema(
&self,
_state: &SessionState,
state: &SessionState,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
Expand All @@ -211,7 +213,7 @@ impl FileFormat for CsvFormat {
for object in objects {
let stream = self.read_to_delimited_chunks(store, object).await;
let (schema, records_read) = self
.infer_schema_from_stream(records_to_read, stream)
.infer_schema_from_stream(state, records_to_read, stream)
.await?;
records_to_read -= records_read;
schemas.push(schema);
Expand All @@ -236,13 +238,17 @@ impl FileFormat for CsvFormat {

async fn create_physical_plan(
&self,
_state: &SessionState,
state: &SessionState,
conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let exec = CsvExec::new(
conf,
self.options.has_header,
// If format options does not specify whether there is a header,
// we consult configuration options.
self.options
.has_header
.unwrap_or(state.config_options().catalog.has_header),
self.options.delimiter,
self.options.quote,
self.options.escape,
Expand Down Expand Up @@ -286,6 +292,7 @@ impl CsvFormat {
/// number of lines that were read
async fn infer_schema_from_stream(
&self,
state: &SessionState,
mut records_to_read: usize,
stream: impl Stream<Item = Result<Bytes>>,
) -> Result<(Schema, usize)> {
Expand All @@ -298,7 +305,13 @@ impl CsvFormat {

while let Some(chunk) = stream.next().await.transpose()? {
let format = arrow::csv::reader::Format::default()
.with_header(self.options.has_header && first_chunk)
.with_header(
first_chunk
&& self
.options
.has_header
.unwrap_or(state.config_options().catalog.has_header),
)
.with_delimiter(self.options.delimiter);

let (Schema { fields, .. }, records_read) =
Expand Down Expand Up @@ -538,6 +551,7 @@ mod tests {
use datafusion_common::cast::as_string_array;
use datafusion_common::stats::Precision;
use datafusion_common::{internal_err, GetExt};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_expr::{col, lit};

use chrono::DateTime;
Expand All @@ -554,7 +568,8 @@ mod tests {
let task_ctx = state.task_ctx();
// skip column 9 that overflows the automaticly discovered column type of i64 (u64 would work)
let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12]);
let exec = get_exec(&state, "aggregate_test_100.csv", projection, None).await?;
let exec =
get_exec(&state, "aggregate_test_100.csv", projection, None, true).await?;
let stream = exec.execute(0, task_ctx)?;

let tt_batches: i32 = stream
Expand Down Expand Up @@ -582,7 +597,7 @@ mod tests {
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![0, 1, 2, 3]);
let exec =
get_exec(&state, "aggregate_test_100.csv", projection, Some(1)).await?;
get_exec(&state, "aggregate_test_100.csv", projection, Some(1), true).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
assert_eq!(4, batches[0].num_columns());
Expand All @@ -597,7 +612,8 @@ mod tests {
let state = session_ctx.state();

let projection = None;
let exec = get_exec(&state, "aggregate_test_100.csv", projection, None).await?;
let exec =
get_exec(&state, "aggregate_test_100.csv", projection, None, true).await?;

let x: Vec<String> = exec
.schema()
Expand Down Expand Up @@ -633,7 +649,8 @@ mod tests {
let state = session_ctx.state();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![0]);
let exec = get_exec(&state, "aggregate_test_100.csv", projection, None).await?;
let exec =
get_exec(&state, "aggregate_test_100.csv", projection, None, true).await?;

let batches = collect(exec, task_ctx).await.expect("Collect batches");

Expand Down Expand Up @@ -716,6 +733,11 @@ mod tests {
async fn query_compress_data(
file_compression_type: FileCompressionType,
) -> Result<()> {
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::new()).unwrap());
let mut cfg = SessionConfig::new();
cfg.options_mut().catalog.has_header = true;
let session_state = SessionState::new_with_config_rt(cfg, runtime);

let integration = LocalFileSystem::new_with_prefix(arrow_test_data()).unwrap();

let path = Path::from("csv/aggregate_test_100.csv");
Expand Down Expand Up @@ -757,7 +779,7 @@ mod tests {
.read_to_delimited_chunks_from_stream(compressed_stream.unwrap())
.await;
let (schema, records_read) = compressed_csv
.infer_schema_from_stream(records_to_read, decoded_stream)
.infer_schema_from_stream(&session_state, records_to_read, decoded_stream)
.await?;

assert_eq!(expected, schema);
Expand Down Expand Up @@ -803,9 +825,10 @@ mod tests {
file_name: &str,
projection: Option<Vec<usize>>,
limit: Option<usize>,
has_header: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
let root = format!("{}/csv", crate::test_util::arrow_test_data());
let format = CsvFormat::default();
let format = CsvFormat::default().with_has_header(has_header);
scan_format(state, &format, &root, file_name, projection, limit).await
}

Expand Down
Loading

0 comments on commit 58cc4e1

Please sign in to comment.