Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(rust): bump datafusion to 36 #2249

Merged
merged 8 commits into from
Apr 1, 2024
Merged
13 changes: 7 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ object_store = { version = "0.9" }
parquet = { version = "50" }

# datafusion
datafusion = { version = "35" }
datafusion-expr = { version = "35" }
datafusion-common = { version = "35" }
datafusion-proto = { version = "35" }
datafusion-sql = { version = "35" }
datafusion-physical-expr = { version = "35" }
datafusion = { version = "36" }
datafusion-expr = { version = "36" }
datafusion-common = { version = "36" }
datafusion-proto = { version = "36" }
datafusion-sql = { version = "36" }
datafusion-physical-expr = { version = "36" }
datafusion-functions = { version = "36" }

# serde
serde = { version = "1.0.194", features = ["derive"] }
Expand Down
4 changes: 3 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ datafusion-common = { workspace = true, optional = true }
datafusion-proto = { workspace = true, optional = true }
datafusion-sql = { workspace = true, optional = true }
datafusion-physical-expr = { workspace = true, optional = true }
datafusion-functions = { workspace = true, optional = true }

# serde
serde = { workspace = true, features = ["derive"] }
Expand Down Expand Up @@ -95,7 +96,7 @@ reqwest = { version = "0.11.18", default-features = false, features = [
"rustls-tls",
"json",
], optional = true }
sqlparser = { version = "0.41", optional = true }
sqlparser = { version = "0.44", optional = true }

[dev-dependencies]
criterion = "0.5"
Expand All @@ -121,6 +122,7 @@ datafusion = [
"datafusion-proto",
"datafusion-physical-expr",
"datafusion-sql",
"datafusion-functions",
"sqlparser",
]
datafusion-ext = ["datafusion"]
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/data_catalog/unity/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;

use dashmap::DashMap;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogList, CatalogProvider};
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
use datafusion::datasource::TableProvider;
use tracing::error;

Expand Down Expand Up @@ -49,7 +49,7 @@ impl UnityCatalogList {
}
}

impl CatalogList for UnityCatalogList {
impl CatalogProviderList for UnityCatalogList {
fn as_any(&self) -> &dyn Any {
self
}
Expand Down
34 changes: 20 additions & 14 deletions crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ use datafusion_expr::{
expr::InList, AggregateUDF, Between, BinaryExpr, Cast, Expr, GetIndexedField, Like, TableSource,
};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use sqlparser::ast::escape_quoted_string;
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
use sqlparser::tokenizer::Tokenizer;
use datafusion_sql::sqlparser::ast::escape_quoted_string;
use datafusion_sql::sqlparser::dialect::GenericDialect;
use datafusion_sql::sqlparser::parser::Parser;
use datafusion_sql::sqlparser::tokenizer::Tokenizer;

use crate::{DeltaResult, DeltaTableError};

Expand Down Expand Up @@ -275,13 +275,18 @@ impl<'a> Display for SqlFormat<'a> {
datafusion_expr::GetFieldAccess::ListIndex { key } => {
write!(f, "{}[{}]", SqlFormat { expr }, SqlFormat { expr: key })
}
datafusion_expr::GetFieldAccess::ListRange { start, stop } => {
datafusion_expr::GetFieldAccess::ListRange {
start,
stop,
stride,
} => {
write!(
f,
"{}[{}:{}]",
SqlFormat { expr },
SqlFormat { expr: start },
SqlFormat { expr: stop }
"{expr}[{start}:{stop}:{stride}]",
expr = SqlFormat { expr },
start = SqlFormat { expr: start },
stop = SqlFormat { expr: stop },
stride = SqlFormat { expr: stride }
)
}
},
Expand Down Expand Up @@ -367,8 +372,9 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> {
mod test {
use arrow_schema::DataType as ArrowDataType;
use datafusion::prelude::SessionContext;
use datafusion_common::{Column, DFSchema, ScalarValue};
use datafusion_expr::{cardinality, col, decode, lit, substring, Cast, Expr, ExprSchemable};
use datafusion_common::{Column, ScalarValue, ToDFSchema};
use datafusion_expr::{cardinality, col, lit, substring, Cast, Expr, ExprSchemable};
use datafusion_functions::encoding::expr_fn::decode;

use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext};
use crate::kernel::{ArrayType, DataType, PrimitiveType, StructField, StructType};
Expand Down Expand Up @@ -572,7 +578,7 @@ mod test {
),
simple!(
col("value")
.cast_to::<DFSchema>(
.cast_to(
&arrow_schema::DataType::Utf8,
&table
.snapshot()
Expand All @@ -581,7 +587,7 @@ mod test {
.unwrap()
.as_ref()
.to_owned()
.try_into()
.to_dfschema()
.unwrap()
)
.unwrap()
Expand All @@ -602,7 +608,7 @@ mod test {
),
simple!(
cardinality(col("_list").range(col("value"), lit(10_i64))),
"cardinality(_list[value:10])".to_string()
"cardinality(_list[value:10:1])".to_string()
),
];

Expand Down
8 changes: 7 additions & 1 deletion crates/core/src/delta_datafusion/find_files/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,14 @@ impl ExecutionPlan for FindFilesExec {

fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if !children.is_empty() {
return Err(datafusion::error::DataFusionError::Plan(
"Children cannot be replaced in FindFilesExec".to_string(),
));
}

Ok(self)
}

Expand Down
13 changes: 12 additions & 1 deletion crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,18 @@ impl ExecutionPlan for DeltaScan {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
ExecutionPlan::with_new_children(self.parquet_scan.clone(), children)
if children.len() != 1 {
return Err(DataFusionError::Plan(format!(
"DeltaScan wrong number of children {}",
children.len()
)));
}
Ok(Arc::new(DeltaScan {
table_uri: self.table_uri.clone(),
config: self.config.clone(),
parquet_scan: children[0].clone(),
logical_schema: self.logical_schema.clone(),
}))
}

fn execute(
Expand Down
10 changes: 9 additions & 1 deletion crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,15 @@ mod datafusion {
_ => None,
})
.collect::<Option<Vec<_>>>()
.map(|o| Precision::Exact(ScalarValue::Struct(Some(o), fields.clone())))
.map(|o| {
let arrays = o
.into_iter()
.map(|sv| sv.to_array())
.collect::<Result<Vec<_>, datafusion_common::DataFusionError>>()
.unwrap();
let sa = StructArray::new(fields.clone(), arrays, None);
Precision::Exact(ScalarValue::Struct(Arc::new(sa)))
})
.unwrap_or(Precision::Absent);
}
_ => Precision::Absent,
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/operations/merge/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ impl ExecutionPlan for MergeBarrierExec {
self: std::sync::Arc<Self>,
children: Vec<std::sync::Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<std::sync::Arc<dyn ExecutionPlan>> {
if children.len() != 1 {
return Err(DataFusionError::Plan(
"MergeBarrierExec wrong number of children".to_string(),
));
}
Ok(Arc::new(MergeBarrierExec::new(
children[0].clone(),
self.file_column.clone(),
Expand Down
2 changes: 1 addition & 1 deletion crates/sql/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl<'a> DeltaParser<'a> {
}

pub fn parse_vacuum(&mut self) -> Result<Statement, ParserError> {
let table_name = self.parser.parse_object_name()?;
let table_name = self.parser.parse_object_name(false)?;
match self.parser.peek_token().token {
Token::Word(w) => match w.keyword {
Keyword::RETAIN => {
Expand Down
Loading