Skip to content

Commit

Permalink
apache#2061 support "PARTITIONED BY" in CreateExternalTable DDL for d…
Browse files Browse the repository at this point in the history
…atafusion
  • Loading branch information
jychen7 committed Mar 27, 2022
1 parent 8159294 commit 6096139
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 1 deletion.
3 changes: 2 additions & 1 deletion datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ impl SessionContext {
ref location,
ref file_type,
ref has_header,
ref table_partition_cols,
}) => {
let (file_format, file_extension) = match file_type {
FileType::CSV => (
Expand All @@ -236,7 +237,7 @@ impl SessionContext {
collect_stat: false,
file_extension: file_extension.to_owned(),
target_partitions: self.copied_config().target_partitions,
table_partition_cols: vec![],
table_partition_cols: table_partition_cols.clone(),
};

// TODO make schema in CreateExternalTable optional instead of empty
Expand Down
2 changes: 2 additions & 0 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ pub struct CreateExternalTable {
pub file_type: FileType,
/// Whether the CSV file contains a header
pub has_header: bool,
// Partition Columns
pub table_partition_cols: Vec<String>,
}

/// Creates a schema.
Expand Down
66 changes: 66 additions & 0 deletions datafusion/src/sql/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ pub struct CreateExternalTable {
pub has_header: bool,
/// Path to file
pub location: String,
// Partition Columns
pub table_partition_cols: Vec<String>,
}

/// DataFusion Statement representations.
Expand Down Expand Up @@ -192,6 +194,35 @@ impl<'a> DFParser<'a> {
}
}

fn parse_partitions(&mut self) -> Result<Vec<String>, ParserError> {
let mut partitions: Vec<String> = vec![];
if !self.parser.consume_token(&Token::LParen)
|| self.parser.consume_token(&Token::RParen)
{
return Ok(partitions);
}

loop {
if let Token::Word(_) = self.parser.peek_token() {
let identifier = self.parser.parse_identifier()?;
partitions.push(identifier.to_string());
} else {
return self.expected("partition name", self.parser.peek_token());
}
let comma = self.parser.consume_token(&Token::Comma);
if self.parser.consume_token(&Token::RParen) {
// allow a trailing comma, even though it's not in standard
break;
} else if !comma {
return self.expected(
"',' or ')' after partition definition",
self.parser.peek_token(),
);
}
}
Ok(partitions)
}

// This is a copy of the equivalent implementation in sqlparser.
fn parse_columns(
&mut self,
Expand Down Expand Up @@ -277,6 +308,12 @@ impl<'a> DFParser<'a> {

let has_header = self.parse_csv_has_header();

let has_partition = self.parse_has_partition();
let mut table_partition_cols: Vec<String> = vec![];
if has_partition {
table_partition_cols = self.parse_partitions()?;
}

self.parser.expect_keyword(Keyword::LOCATION)?;
let location = self.parser.parse_literal_string()?;

Expand All @@ -286,6 +323,7 @@ impl<'a> DFParser<'a> {
file_type,
has_header,
location,
table_partition_cols,
};
Ok(Statement::CreateExternalTable(create))
}
Expand Down Expand Up @@ -314,6 +352,11 @@ impl<'a> DFParser<'a> {
& self.consume_token(&Token::make_keyword("HEADER"))
& self.consume_token(&Token::make_keyword("ROW"))
}

fn parse_has_partition(&mut self) -> bool {
self.consume_token(&Token::make_keyword("PARTITIONED"))
& self.consume_token(&Token::make_keyword("BY"))
}
}

#[cfg(test)]
Expand Down Expand Up @@ -376,6 +419,20 @@ mod tests {
file_type: FileType::CSV,
has_header: false,
location: "foo.csv".into(),
table_partition_cols: vec![],
});
expect_parse_ok(sql, expected)?;

// positive case: partitioned by
let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1, p2) LOCATION 'foo.csv'";
let display = None;
let expected = Statement::CreateExternalTable(CreateExternalTable {
name: "t".into(),
columns: vec![make_column_def("c1", DataType::Int(display))],
file_type: FileType::CSV,
has_header: false,
location: "foo.csv".into(),
table_partition_cols: vec!["p1".to_string(), "p2".to_string()],
});
expect_parse_ok(sql, expected)?;

Expand All @@ -391,6 +448,7 @@ mod tests {
file_type: FileType::CSV,
has_header: true,
location: "foo.csv".into(),
table_partition_cols: vec![],
});
expect_parse_ok(sql, expected)?;
}
Expand All @@ -403,6 +461,7 @@ mod tests {
file_type: FileType::Parquet,
has_header: false,
location: "foo.parquet".into(),
table_partition_cols: vec![],
});
expect_parse_ok(sql, expected)?;

Expand All @@ -414,6 +473,7 @@ mod tests {
file_type: FileType::Parquet,
has_header: false,
location: "foo.parquet".into(),
table_partition_cols: vec![],
});
expect_parse_ok(sql, expected)?;

Expand All @@ -425,6 +485,7 @@ mod tests {
file_type: FileType::Avro,
has_header: false,
location: "foo.avro".into(),
table_partition_cols: vec![],
});
expect_parse_ok(sql, expected)?;

Expand All @@ -433,6 +494,11 @@ mod tests {
"CREATE EXTERNAL TABLE t(c1 int) STORED AS UNKNOWN_TYPE LOCATION 'foo.csv'";
expect_parse_error(sql, "expect one of PARQUET, AVRO, NDJSON, or CSV");

// Error cases: partition column does not support type
let sql =
"CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1 int) LOCATION 'foo.csv'";
expect_parse_error(sql, "sql parser error: Expected ',' or ')' after partition definition, found: int");

Ok(())
}
}
2 changes: 2 additions & 0 deletions datafusion/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
file_type,
has_header,
location,
table_partition_cols,
} = statement;

// semantic checks
Expand All @@ -333,6 +334,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
location,
file_type,
has_header,
table_partition_cols,
}))
}

Expand Down
15 changes: 15 additions & 0 deletions docs/source/user-guide/sql/ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,21 @@ WITH HEADER ROW
LOCATION '/path/to/aggregate_test_100.csv';
```

If data sources are already partitioned in Hive style, `PARTITIONED BY` can be used for partition pruning.

```
/mnt/nyctaxi/year=2022/month=01/tripdata.parquet
/mnt/nyctaxi/year=2021/month=12/tripdata.parquet
/mnt/nyctaxi/year=2021/month=11/tripdata.parquet
```

```sql
CREATE EXTERNAL TABLE taxi
STORED AS PARQUET
PARTITIONED BY (year, month)
LOCATION '/mnt/nyctaxi';
```

## CREATE MEMORY TABLE

Memory table can be created with query.
Expand Down

0 comments on commit 6096139

Please sign in to comment.