Skip to content

Commit

Permalink
Update DataFusion to 26 (#798)
Browse files Browse the repository at this point in the history
* Enable all tests

* Adapt

* Update DataFusion to 26

* Add physical_round_trip test

* Fmt

* Add cfg again

* Do not enable q15 just yet

* fmt

* Fix

* Fix

* Fix

* Fix

* Fix

* Schema fix

* Undo some

---------

Co-authored-by: Daniël Heres <[email protected]>
  • Loading branch information
Dandandan and Daniël Heres authored Jun 9, 2023
1 parent 8dbf9d2 commit efb3a7d
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 102 deletions.
13 changes: 7 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@
members = ["ballista-cli", "ballista/client", "ballista/core", "ballista/executor", "ballista/scheduler", "benchmarks", "examples"]

[workspace.dependencies]
arrow = { version = "39.0.0" }
arrow-flight = { version = "39.0.0", features = ["flight-sql-experimental"] }
arrow = { version = "40.0.0" }
arrow-flight = { version = "40.0.0", features = ["flight-sql-experimental"] }
arrow-schema = { version = "40.0.0", default-features = false }
configure_me = { version = "0.4.0" }
configure_me_codegen = { version = "0.4.4" }
datafusion = "25.0.0"
datafusion-cli = "25.0.0"
datafusion-proto = "25.0.0"
datafusion = "26.0.0"
datafusion-cli = "26.0.0"
datafusion-proto = "26.0.0"
object_store = "0.5.6"
sqlparser = "0.33.0"
sqlparser = "0.34.0"
tonic = { version = "0.9" }
tonic-build = { version = "0.9", default-features = false, features = ["transport", "prost"] }
tracing = "0.1.36"
Expand Down
2 changes: 1 addition & 1 deletion ballista-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ dirs = "4.0.0"
env_logger = "0.10"
mimalloc = { version = "0.1", default-features = false }
num_cpus = "1.13.0"
rustyline = "10.0"
rustyline = "11.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }

[features]
Expand Down
6 changes: 3 additions & 3 deletions ballista-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub async fn exec_from_files(

/// run and execute SQL statements and commands against a context with the given print options
pub async fn exec_from_repl(ctx: &BallistaContext, print_options: &mut PrintOptions) {
let mut rl = Editor::<CliHelper>::new().expect("created editor");
let mut rl = Editor::new().expect("created editor");
rl.set_helper(Some(CliHelper::default()));
rl.load_history(".history").ok();

Expand All @@ -99,7 +99,7 @@ pub async fn exec_from_repl(ctx: &BallistaContext, print_options: &mut PrintOpti
loop {
match rl.readline("❯ ") {
Ok(line) if line.starts_with('\\') => {
rl.add_history_entry(line.trim_end());
rl.add_history_entry(line.trim_end()).unwrap();
let command = line.split_whitespace().collect::<Vec<_>>().join(" ");
if let Ok(cmd) = &command[1..].parse::<Command>() {
match cmd {
Expand Down Expand Up @@ -133,7 +133,7 @@ pub async fn exec_from_repl(ctx: &BallistaContext, print_options: &mut PrintOpti
}
}
Ok(line) => {
rl.add_history_entry(line.trim_end());
rl.add_history_entry(line.trim_end()).unwrap();
match exec_and_print(ctx, &print_options, line).await {
Ok(_) => {}
Err(err) => eprintln!("{err:?}"),
Expand Down
44 changes: 22 additions & 22 deletions ballista/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ mod tests {
table_partition_cols: x.table_partition_cols.clone(),
collect_stat: x.collect_stat,
target_partitions: x.target_partitions,
file_sort_order: None,
file_sort_order: vec![],
infinite_source: false,
};

Expand Down Expand Up @@ -810,11 +810,11 @@ mod tests {
.unwrap();
let res = df.collect().await.unwrap();
let expected = vec![
"+-------------------------+",
"| APPROXDISTINCT(test.id) |",
"+-------------------------+",
"| 8 |",
"+-------------------------+",
"+--------------------------+",
"| APPROX_DISTINCT(test.id) |",
"+--------------------------+",
"| 8 |",
"+--------------------------+",
];
assert_result_eq(expected, &res);

Expand All @@ -825,7 +825,7 @@ mod tests {
let res = df.collect().await.unwrap();
let expected = vec![
"+--------------------------+",
"| ARRAYAGG(test.id) |",
"| ARRAY_AGG(test.id) |",
"+--------------------------+",
"| [4, 5, 6, 7, 2, 3, 0, 1] |",
"+--------------------------+",
Expand All @@ -849,11 +849,11 @@ mod tests {
.unwrap();
let res = df.collect().await.unwrap();
let expected = vec![
"+----------------------+",
"| VARIANCEPOP(test.id) |",
"+----------------------+",
"| 5.250000000000001 |",
"+----------------------+",
"+-----------------------+",
"| VARIANCE_POP(test.id) |",
"+-----------------------+",
"| 5.250000000000001 |",
"+-----------------------+",
];
assert_result_eq(expected, &res);

Expand Down Expand Up @@ -933,11 +933,11 @@ mod tests {
.unwrap();
let res = df.collect().await.unwrap();
let expected = vec![
"+---------------------------------------------------------------+",
"| APPROXPERCENTILECONTWITHWEIGHT(test.id,Int64(2),Float64(0.5)) |",
"+---------------------------------------------------------------+",
"| 1 |",
"+---------------------------------------------------------------+",
"+-------------------------------------------------------------------+",
"| APPROX_PERCENTILE_CONT_WITH_WEIGHT(test.id,Int64(2),Float64(0.5)) |",
"+-------------------------------------------------------------------+",
"| 1 |",
"+-------------------------------------------------------------------+",
];
assert_result_eq(expected, &res);

Expand All @@ -947,11 +947,11 @@ mod tests {
.unwrap();
let res = df.collect().await.unwrap();
let expected = vec![
"+----------------------------------------------------+",
"| APPROXPERCENTILECONT(test.double_col,Float64(0.5)) |",
"+----------------------------------------------------+",
"| 7.574999999999999 |",
"+----------------------------------------------------+",
"+------------------------------------------------------+",
"| APPROX_PERCENTILE_CONT(test.double_col,Float64(0.5)) |",
"+------------------------------------------------------+",
"| 7.574999999999999 |",
"+------------------------------------------------------+",
];

assert_result_eq(expected, &res);
Expand Down
48 changes: 24 additions & 24 deletions ballista/scheduler/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,51 +151,51 @@ pub fn get_tpch_schema(table: &str) -> Schema {

match table {
"part" => Schema::new(vec![
Field::new("p_partkey", DataType::Int32, false),
Field::new("p_partkey", DataType::Int64, false),
Field::new("p_name", DataType::Utf8, false),
Field::new("p_mfgr", DataType::Utf8, false),
Field::new("p_brand", DataType::Utf8, false),
Field::new("p_type", DataType::Utf8, false),
Field::new("p_size", DataType::Int32, false),
Field::new("p_container", DataType::Utf8, false),
Field::new("p_retailprice", DataType::Float64, false),
Field::new("p_retailprice", DataType::Decimal128(15, 2), false),
Field::new("p_comment", DataType::Utf8, false),
]),

"supplier" => Schema::new(vec![
Field::new("s_suppkey", DataType::Int32, false),
Field::new("s_suppkey", DataType::Int64, false),
Field::new("s_name", DataType::Utf8, false),
Field::new("s_address", DataType::Utf8, false),
Field::new("s_nationkey", DataType::Int32, false),
Field::new("s_nationkey", DataType::Int64, false),
Field::new("s_phone", DataType::Utf8, false),
Field::new("s_acctbal", DataType::Float64, false),
Field::new("s_acctbal", DataType::Decimal128(15, 2), false),
Field::new("s_comment", DataType::Utf8, false),
]),

"partsupp" => Schema::new(vec![
Field::new("ps_partkey", DataType::Int32, false),
Field::new("ps_suppkey", DataType::Int32, false),
Field::new("ps_partkey", DataType::Int64, false),
Field::new("ps_suppkey", DataType::Int64, false),
Field::new("ps_availqty", DataType::Int32, false),
Field::new("ps_supplycost", DataType::Float64, false),
Field::new("ps_supplycost", DataType::Decimal128(15, 2), false),
Field::new("ps_comment", DataType::Utf8, false),
]),

"customer" => Schema::new(vec![
Field::new("c_custkey", DataType::Int32, false),
Field::new("c_custkey", DataType::Int64, false),
Field::new("c_name", DataType::Utf8, false),
Field::new("c_address", DataType::Utf8, false),
Field::new("c_nationkey", DataType::Int32, false),
Field::new("c_nationkey", DataType::Int64, false),
Field::new("c_phone", DataType::Utf8, false),
Field::new("c_acctbal", DataType::Float64, false),
Field::new("c_acctbal", DataType::Decimal128(15, 2), false),
Field::new("c_mktsegment", DataType::Utf8, false),
Field::new("c_comment", DataType::Utf8, false),
]),

"orders" => Schema::new(vec![
Field::new("o_orderkey", DataType::Int32, false),
Field::new("o_custkey", DataType::Int32, false),
Field::new("o_orderkey", DataType::Int64, false),
Field::new("o_custkey", DataType::Int64, false),
Field::new("o_orderstatus", DataType::Utf8, false),
Field::new("o_totalprice", DataType::Float64, false),
Field::new("o_totalprice", DataType::Decimal128(15, 2), false),
Field::new("o_orderdate", DataType::Date32, false),
Field::new("o_orderpriority", DataType::Utf8, false),
Field::new("o_clerk", DataType::Utf8, false),
Expand All @@ -204,14 +204,14 @@ pub fn get_tpch_schema(table: &str) -> Schema {
]),

"lineitem" => Schema::new(vec![
Field::new("l_orderkey", DataType::Int32, false),
Field::new("l_partkey", DataType::Int32, false),
Field::new("l_suppkey", DataType::Int32, false),
Field::new("l_orderkey", DataType::Int64, false),
Field::new("l_partkey", DataType::Int64, false),
Field::new("l_suppkey", DataType::Int64, false),
Field::new("l_linenumber", DataType::Int32, false),
Field::new("l_quantity", DataType::Float64, false),
Field::new("l_extendedprice", DataType::Float64, false),
Field::new("l_discount", DataType::Float64, false),
Field::new("l_tax", DataType::Float64, false),
Field::new("l_quantity", DataType::Decimal128(15, 2), false),
Field::new("l_extendedprice", DataType::Decimal128(15, 2), false),
Field::new("l_discount", DataType::Decimal128(15, 2), false),
Field::new("l_tax", DataType::Decimal128(15, 2), false),
Field::new("l_returnflag", DataType::Utf8, false),
Field::new("l_linestatus", DataType::Utf8, false),
Field::new("l_shipdate", DataType::Date32, false),
Expand All @@ -223,14 +223,14 @@ pub fn get_tpch_schema(table: &str) -> Schema {
]),

"nation" => Schema::new(vec![
Field::new("n_nationkey", DataType::Int32, false),
Field::new("n_nationkey", DataType::Int64, false),
Field::new("n_name", DataType::Utf8, false),
Field::new("n_regionkey", DataType::Int32, false),
Field::new("n_regionkey", DataType::Int64, false),
Field::new("n_comment", DataType::Utf8, false),
]),

"region" => Schema::new(vec![
Field::new("r_regionkey", DataType::Int32, false),
Field::new("r_regionkey", DataType::Int64, false),
Field::new("r_name", DataType::Utf8, false),
Field::new("r_comment", DataType::Utf8, false),
]),
Expand Down
1 change: 1 addition & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ simd = ["datafusion/simd"]
snmalloc = ["snmalloc-rs"]

[dependencies]
arrow-schema = { workspace = true }
ballista = { path = "../ballista/client", version = "0.11.0" }
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
Expand Down
Loading

0 comments on commit efb3a7d

Please sign in to comment.