From 9d816017f2c11d0197c35e4d8a98c249840a6f96 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Thu, 9 May 2024 14:19:58 +0300 Subject: [PATCH] Fix failing tests --- datafusion/common/src/config.rs | 6 +- .../core/src/datasource/file_format/csv.rs | 15 ++- datafusion/core/src/datasource/stream.rs | 13 ++- datafusion/core/tests/sql/mod.rs | 2 +- .../tests/cases/roundtrip_logical_plan.rs | 4 +- datafusion/sql/src/parser.rs | 105 ++++-------------- datafusion/sql/src/statement.rs | 31 +----- .../test_files/create_external_table.slt | 8 -- .../test_files/repartition_scan.slt | 1 - 9 files changed, 49 insertions(+), 136 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index a1445dfb2f40..584e69cd8591 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1566,7 +1566,7 @@ config_namespace! { pub struct CsvOptions { /// Specifies whether there is a CSV header (i.e. the first line /// consists of is column names). If not specified, uses default from - /// the `CREATE TABLE` command, if any. + /// the session state, if any. pub has_header: Option, default = None pub delimiter: u8, default = b',' pub quote: u8, default = b'"' @@ -1609,8 +1609,8 @@ impl CsvOptions { /// Returns true if the first line is a header. If format options does not /// specify whether there is a header, consults the configuration. - pub fn has_header(&self, config_opt: &ConfigOptions) -> bool { - self.has_header.unwrap_or(config_opt.catalog.has_header) + pub fn has_header(&self) -> Option { + self.has_header } /// The character separating values within a row. diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 3e5c7af1f82e..bfd0975a52a9 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -40,7 +40,7 @@ use arrow::array::RecordBatch; use arrow::csv::WriterBuilder; use arrow::datatypes::SchemaRef; use arrow::datatypes::{DataType, Field, Fields, Schema}; -use datafusion_common::config::{ConfigOptions, CsvOptions}; +use datafusion_common::config::CsvOptions; use datafusion_common::file_options::csv_writer::CsvWriterOptions; use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType}; use datafusion_execution::TaskContext; @@ -142,8 +142,8 @@ impl CsvFormat { } /// True if the first line is a header. - pub fn has_header(&self, config_opt: &ConfigOptions) -> bool { - self.options.has_header(config_opt) + pub fn has_header(&self) -> Option { + self.options.has_header } /// The character separating values within a row. @@ -245,7 +245,9 @@ impl FileFormat for CsvFormat { conf, // If format options does not specify whether there is a header, // we consult configuration options. - self.options.has_header(state.config_options()), + self.options + .has_header + .unwrap_or(state.config_options().catalog.has_header), self.options.delimiter, self.options.quote, self.options.escape, @@ -303,7 +305,10 @@ impl CsvFormat { while let Some(chunk) = stream.next().await.transpose()? { let format = arrow::csv::reader::Format::default() .with_header( - self.options.has_header(state.config_options()) && first_chunk, + self.options + .has_header + .unwrap_or(state.config_options().catalog.has_header) + && first_chunk, ) .with_delimiter(self.options.delimiter); diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs index 5e485f42b516..41296c1b8d23 100644 --- a/datafusion/core/src/datasource/stream.rs +++ b/datafusion/core/src/datasource/stream.rs @@ -30,7 +30,7 @@ use arrow_schema::SchemaRef; use async_trait::async_trait; use futures::StreamExt; -use datafusion_common::{plan_err, Constraints, DataFusionError, Result}; +use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, Result}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::{CreateExternalTable, Expr, TableType}; @@ -58,11 +58,22 @@ impl TableProviderFactory for StreamTableFactory { let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into()); let location = cmd.location.clone(); let encoding = cmd.file_type.parse()?; + let header = if let Ok(opt) = cmd + .options + .get("format.has_header") + .map(|has_header| bool::from_str(has_header)) + .transpose() + { + opt.unwrap_or(false) + } else { + return config_err!("format.has_header can either be true or false"); + }; let config = StreamConfig::new_file(schema, location.into()) .with_encoding(encoding) .with_order(cmd.order_exprs.clone()) .with_batch_size(state.config().batch_size()) + .with_header(header) .with_constraints(cmd.constraints.clone()); Ok(Arc::new(StreamTable(Arc::new(config)))) diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 00f091d77cff..995ce35c5bc2 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -85,7 +85,7 @@ async fn register_aggregate_csv_by_sql(ctx: &SessionContext) { c13 VARCHAR NOT NULL ) STORED AS CSV - LOCATION '{testdata}/csv/aggregate_test_100.csv + LOCATION '{testdata}/csv/aggregate_test_100.csv' OPTIONS ('format.has_header' 'true') " )) diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 74df7178b0f2..63e9177ffaf8 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -238,7 +238,7 @@ async fn roundtrip_custom_listing_tables() -> Result<()> { STORED AS CSV WITH ORDER (a ASC, b ASC) WITH ORDER (c ASC) - LOCATION '../core/tests/data/window_2.csv'; + LOCATION '../core/tests/data/window_2.csv' OPTIONS ('format.has_header' 'true')"; let plan = ctx.state().create_logical_plan(query).await?; @@ -268,7 +268,7 @@ async fn roundtrip_logical_plan_aggregation_with_pk() -> Result<()> { STORED AS CSV WITH ORDER (a ASC, b ASC) WITH ORDER (c ASC) - LOCATION '../core/tests/data/window_2.csv'; + LOCATION '../core/tests/data/window_2.csv' OPTIONS ('format.has_header' 'true')", ) .await?; diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs index 7468527662f6..d3ae7a13ff3d 100644 --- a/datafusion/sql/src/parser.rs +++ b/datafusion/sql/src/parser.rs @@ -101,12 +101,10 @@ pub struct CopyToStatement { pub target: String, /// Partition keys pub partitioned_by: Vec, - /// Indicates whether there is a header row (e.g. CSV) - pub has_header: bool, /// File type (Parquet, NDJSON, CSV etc.) pub stored_as: Option, /// Target specific options - pub options: Vec<(String, Value)>, + pub options: HashMap, } impl fmt::Display for CopyToStatement { @@ -129,7 +127,10 @@ impl fmt::Display for CopyToStatement { } if !options.is_empty() { - let opts: Vec<_> = options.iter().map(|(k, v)| format!("{k} {v}")).collect(); + let opts: Vec<_> = options + .iter() + .map(|(k, v)| format!("'{k}' '{v}'")) + .collect(); write!(f, " OPTIONS ({})", opts.join(", "))?; } @@ -386,8 +387,7 @@ impl<'a> DFParser<'a> { stored_as: Option, target: Option, partitioned_by: Option>, - has_header: Option, - options: Option>, + options: Option>, } let mut builder = Builder::default(); @@ -423,7 +423,7 @@ impl<'a> DFParser<'a> { } Keyword::OPTIONS => { ensure_not_set(&builder.options, "OPTIONS")?; - builder.options = Some(self.parse_value_options()?); + builder.options = Some(self.parse_string_options()?); } _ => { unreachable!() @@ -451,9 +451,8 @@ impl<'a> DFParser<'a> { source, target, partitioned_by: builder.partitioned_by.unwrap_or(vec![]), - has_header: builder.has_header.unwrap_or(false), stored_as: builder.stored_as, - options: builder.options.unwrap_or(vec![]), + options: builder.options.unwrap_or(HashMap::new()), })) } @@ -835,33 +834,6 @@ impl<'a> DFParser<'a> { } Ok(options) } - - /// Parses (key value) style options into a map of String --> [`Value`]. - /// - /// Unlike [`Self::parse_string_options`], this method supports - /// keywords as key names as well as multiple value types such as - /// Numbers as well as Strings. - fn parse_value_options(&mut self) -> Result, ParserError> { - let mut options = vec![]; - self.parser.expect_token(&Token::LParen)?; - - loop { - let key = self.parse_option_key()?; - let value = self.parse_option_value()?; - options.push((key, value)); - let comma = self.parser.consume_token(&Token::Comma); - if self.parser.consume_token(&Token::RParen) { - // allow a trailing comma, even though it's not in standard - break; - } else if !comma { - return self.expected( - "',' or ')' after option definition", - self.parser.peek_token(), - ); - } - } - Ok(options) - } } #[cfg(test)] @@ -997,27 +969,6 @@ mod tests { }); expect_parse_ok(sql, expected)?; - // positive case: it is ok for case insensitive sql stmt with has_header option tokens - let sqls = vec![ - "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv' OPTIONS ('FORMAT.HAS_HEADER' 'TRUE')", - "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv' OPTIONS ('format.has_header' 'true')", - ]; - for sql in sqls { - let expected = Statement::CreateExternalTable(CreateExternalTable { - name: "t".into(), - columns: vec![make_column_def("c1", DataType::Int(display))], - file_type: "CSV".to_string(), - location: "foo.csv".into(), - table_partition_cols: vec![], - order_exprs: vec![], - if_not_exists: false, - unbounded: false, - options: HashMap::from([("format.has_header".into(), "true".into())]), - constraints: vec![], - }); - expect_parse_ok(sql, expected)?; - } - // positive case: it is ok for sql stmt with `COMPRESSION TYPE GZIP` tokens let sqls = vec![ ("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv' OPTIONS @@ -1357,9 +1308,8 @@ mod tests { source: object_name("foo"), target: "bar".to_string(), partitioned_by: vec![], - has_header: false, stored_as: Some("CSV".to_owned()), - options: vec![], + options: HashMap::new(), }); assert_eq!(verified_stmt(sql), expected); @@ -1393,9 +1343,8 @@ mod tests { source: object_name("foo"), target: "bar".to_string(), partitioned_by: vec![], - has_header: false, stored_as: Some("PARQUET".to_owned()), - options: vec![], + options: HashMap::new(), }); let expected = Statement::Explain(ExplainStatement { analyze, @@ -1430,9 +1379,8 @@ mod tests { source: CopyToSource::Query(query), target: "bar".to_string(), partitioned_by: vec![], - has_header: true, stored_as: Some("CSV".to_owned()), - options: vec![], + options: HashMap::from([("format.has_header".into(), "true".into())]), }); assert_eq!(verified_stmt(sql), expected); Ok(()) @@ -1445,12 +1393,8 @@ mod tests { source: object_name("foo"), target: "bar".to_string(), partitioned_by: vec![], - has_header: false, stored_as: Some("CSV".to_owned()), - options: vec![( - "row_group_size".to_string(), - Value::Number("55".to_string(), false), - )], + options: HashMap::from([("row_group_size".into(), "55".into())]), }); assert_eq!(verified_stmt(sql), expected); Ok(()) @@ -1458,17 +1402,13 @@ mod tests { #[test] fn copy_to_partitioned_by() -> Result<(), ParserError> { - let sql = "COPY foo TO bar STORED AS CSV PARTITIONED BY (a) OPTIONS (row_group_size 55)"; + let sql = "COPY foo TO bar STORED AS CSV PARTITIONED BY (a) OPTIONS ('row_group_size' '55')"; let expected = Statement::CopyTo(CopyToStatement { source: object_name("foo"), target: "bar".to_string(), partitioned_by: vec!["a".to_string()], - has_header: false, stored_as: Some("CSV".to_owned()), - options: vec![( - "row_group_size".to_string(), - Value::Number("55".to_string(), false), - )], + options: HashMap::from([("row_group_size".to_string(), "55".into())]), }); assert_eq!(verified_stmt(sql), expected); Ok(()) @@ -1478,18 +1418,12 @@ mod tests { fn copy_to_multi_options() -> Result<(), ParserError> { // order of options is preserved let sql = - "COPY foo TO bar STORED AS parquet OPTIONS ('format.row_group_size' 55, 'format.compression' snappy)"; + "COPY foo TO bar STORED AS parquet OPTIONS ('format.row_group_size' '55', 'format.compression' 'snappy')"; - let expected_options = vec![ - ( - "format.row_group_size".to_string(), - Value::Number("55".to_string(), false), - ), - ( - "format.compression".to_string(), - Value::UnQuotedString("snappy".to_string()), - ), - ]; + let expected_options = HashMap::from([ + ("format.row_group_size".to_string(), "55".into()), + ("format.compression".to_string(), "snappy".into()), + ]); let mut statements = DFParser::parse_sql(sql).unwrap(); assert_eq!(statements.len(), 1); @@ -1527,6 +1461,7 @@ mod tests { /// `canonical` sql string fn one_statement_parses_to(sql: &str, canonical: &str) -> Statement { let mut statements = DFParser::parse_sql(sql).unwrap(); + println!("{:?}", statements[0]); assert_eq!(statements.len(), 1); if sql != canonical { diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index e6eda0b2787d..e5eef838b301 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -849,36 +849,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } }; - let mut options = HashMap::new(); - for (key, value) in statement.options { - let value_string = match value { - Value::SingleQuotedString(s) => s.to_string(), - Value::DollarQuotedString(s) => s.to_string(), - Value::UnQuotedString(s) => s.to_string(), - Value::Number(_, _) | Value::Boolean(_) => value.to_string(), - Value::DoubleQuotedString(_) - | Value::EscapedStringLiteral(_) - | Value::NationalStringLiteral(_) - | Value::SingleQuotedByteStringLiteral(_) - | Value::DoubleQuotedByteStringLiteral(_) - | Value::RawStringLiteral(_) - | Value::HexStringLiteral(_) - | Value::Null - | Value::Placeholder(_) => { - return plan_err!("Unsupported Value in COPY statement {}", value); - } - }; - if !(&key.contains('.')) { - // If config does not belong to any namespace, assume it is - // a format option and apply the format prefix for backwards - // compatibility. - - let renamed_key = format!("format.{}", key); - options.insert(renamed_key.to_lowercase(), value_string.to_lowercase()); - } else { - options.insert(key.to_lowercase(), value_string.to_lowercase()); - } - } + let options = statement.options; let file_type = if let Some(file_type) = statement.stored_as { FileType::from_str(&file_type).map_err(|_| { diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt b/datafusion/sqllogictest/test_files/create_external_table.slt index 4724bc87a099..9068c77ab422 100644 --- a/datafusion/sqllogictest/test_files/create_external_table.slt +++ b/datafusion/sqllogictest/test_files/create_external_table.slt @@ -52,14 +52,6 @@ CREATE EXTERNAL TABLE t STORED AS CSV WITH HEADER LOCATION 'abc' statement error DataFusion error: SQL error: ParserError\("Expected BY, found: LOCATION"\) CREATE EXTERNAL TABLE t STORED AS CSV PARTITIONED LOCATION 'abc' -# Missing `TYPE` in COMPRESSION clause -statement error DataFusion error: SQL error: ParserError\("Expected TYPE, found: LOCATION"\) -CREATE EXTERNAL TABLE t STORED AS CSV COMPRESSION LOCATION 'abc' - -# Invalid compression type -statement error DataFusion error: SQL error: ParserError\("Unsupported file compression type ZZZ"\) -CREATE EXTERNAL TABLE t STORED AS CSV COMPRESSION TYPE ZZZ LOCATION 'blahblah' - # Duplicate `STORED AS` clause statement error DataFusion error: SQL error: ParserError\("STORED AS specified more than once"\) CREATE EXTERNAL TABLE t STORED AS CSV STORED AS PARQUET LOCATION 'foo.parquet' diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt b/datafusion/sqllogictest/test_files/repartition_scan.slt index 2d4ca1baf23d..6484d437e179 100644 --- a/datafusion/sqllogictest/test_files/repartition_scan.slt +++ b/datafusion/sqllogictest/test_files/repartition_scan.slt @@ -278,7 +278,6 @@ statement ok CREATE EXTERNAL TABLE avro_table STORED AS AVRO LOCATION '../../testing/data/avro/simple_enum.avro' -OPTIONS ('format.has_header' 'true'); # It would be great to see the file read as "4" groups with even sizes (offsets) eventually