Skip to content

Commit

Permalink
Upgrade datafusion to 20.0.0 & sqlparser to to 0.32.0 (#711)
Browse files Browse the repository at this point in the history
* Upgrade datafusion & sqlparser

* Move ballista_round_trip tests of benchmark into a separate feature to avoid stack overflow

* Fix failed tests of scheduler
  • Loading branch information
r4ntix authored Mar 26, 2023
1 parent 9206bdb commit 8007a41
Show file tree
Hide file tree
Showing 15 changed files with 342 additions and 275 deletions.
46 changes: 45 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,50 @@ jobs:
# do not produce debug symbols to keep memory usage down
RUSTFLAGS: "-C debuginfo=0"

# verify that the benchmark queries return the correct results
verify-benchmark-results:
name: verify benchmark results (amd64)
needs: [linux-build-lib]
runs-on: ubuntu-latest
strategy:
matrix:
arch: [amd64]
rust: [stable]
container:
image: ${{ matrix.arch }}/rust
env:
# Disable full debug symbol generation to speed up CI build and keep memory down
# "1" means line tables only, which is useful for panic tracebacks.
RUSTFLAGS: "-C debuginfo=1"
steps:
- uses: actions/checkout@v3
with:
submodules: true
- name: Install protobuf compiler
shell: bash
run: |
apt-get -qq update && apt-get -y -qq install protobuf-compiler
protoc --version
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Cache Rust dependencies
uses: actions/cache@v3
with:
path: /github/home/target
# this key equals the ones on `linux-build-lib` for re-use
key: ${{ runner.os }}-${{ matrix.arch }}-target-cache-${{ matrix.rust }}
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
rust-version: ${{ matrix.rust }}
- name: Verify that benchmark queries return expected results
run: |
cargo test --package ballista-benchmarks --profile release-nonlto --features=ci -- --test-threads=1
lint:
name: Lint
runs-on: ubuntu-latest
Expand Down Expand Up @@ -313,7 +357,7 @@ jobs:
if [[ $DOCKER_TAG =~ ^[0-9\.]+-rc[0-9]+$ ]]
then
echo "publishing docker tag $DOCKER_TAG"
docker tag arrow-ballista-standalone:latest ghcr.io/apache/arrow-ballista-standalone:$DOCKER_TAG
docker tag arrow-ballista-standalone:latest ghcr.io/apache/arrow-ballista-standalone:$DOCKER_TAG
docker login ghcr.io -u $DOCKER_USER -p "$DOCKER_PASS"
docker push ghcr.io/apache/arrow-ballista-standalone:$DOCKER_TAG
fi
Expand Down
19 changes: 19 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,28 @@ members = [
]
exclude = ["python"]

[workspace.dependencies]
arrow = { version = "34.0.0" }
arrow-flight = { version = "34.0.0", features = ["flight-sql-experimental"] }
datafusion = "20.0.0"
datafusion-proto = "20.0.0"

# cargo build --profile release-lto
[profile.release-lto]
codegen-units = 1
inherits = "release"
lto = true

# the release profile takes a long time to build so we can use this profile during development to save time
# cargo build --profile release-nonlto
[profile.release-nonlto]
codegen-units = 16
debug = false
debug-assertions = false
incremental = false
inherits = "release"
lto = false
opt-level = 3
overflow-checks = false
panic = 'unwind'
rpath = false
4 changes: 2 additions & 2 deletions ballista-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ ballista = { path = "../ballista/client", version = "0.11.0", features = [
"standalone",
] }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = "19.0.0"
datafusion-cli = "19.0.0"
datafusion = { workspace = true }
datafusion-cli = "20.0.0"
dirs = "4.0.0"
env_logger = "0.10"
mimalloc = { version = "0.1", default-features = false }
Expand Down
6 changes: 3 additions & 3 deletions ballista/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ rust-version = "1.63"
ballista-core = { path = "../core", version = "0.11.0" }
ballista-executor = { path = "../executor", version = "0.11.0", optional = true }
ballista-scheduler = { path = "../scheduler", version = "0.11.0", optional = true }
datafusion = "19.0.0"
datafusion-proto = "19.0.0"
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
sqlparser = "0.30.0"
sqlparser = "0.32.0"
tempfile = "3"
tokio = "1.0"

Expand Down
2 changes: 1 addition & 1 deletion ballista/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ To build a simple ballista example, add the following dependencies to your `Carg
```toml
[dependencies]
ballista = "0.10"
datafusion = "19.0.0"
datafusion = "20.0.0"
tokio = "1.0"
```

Expand Down
9 changes: 4 additions & 5 deletions ballista/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,13 @@ simd = ["datafusion/simd"]

[dependencies]
ahash = { version = "0.8", default-features = false }

arrow-flight = { version = "33.0.0", features = ["flight-sql-experimental"] }
arrow-flight = { workspace = true }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = "19.0.0"
datafusion = { workspace = true }
datafusion-objectstore-hdfs = { version = "0.1.1", default-features = false, optional = true }
datafusion-proto = "19.0.0"
datafusion-proto = { workspace = true }
futures = "0.3"
hashbrown = "0.13"

Expand All @@ -68,7 +67,7 @@ prost = "0.11"
prost-types = "0.11"
rand = "0.8"
serde = { version = "1", features = ["derive"] }
sqlparser = "0.30.0"
sqlparser = "0.32.0"
sys-info = "0.9.0"
tokio = "1.0"
tokio-stream = { version = "0.1", features = ["net"] }
Expand Down
8 changes: 4 additions & 4 deletions ballista/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ default = ["mimalloc"]

[dependencies]
anyhow = "1"
arrow = { version = "33.0.0" }
arrow-flight = { version = "33.0.0" }
arrow = { workspace = true }
arrow-flight = { workspace = true }
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.11.0", features = ["s3"] }
chrono = { version = "0.4", default-features = false }
configure_me = "0.4.0"
dashmap = "5.4.0"
datafusion = "19.0.0"
datafusion-proto = "19.0.0"
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
futures = "0.3"
hyper = "0.14.4"
log = "0.4"
Expand Down
6 changes: 4 additions & 2 deletions ballista/executor/src/execution_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::config::Extensions;
use datafusion::physical_plan::ExecutionPlan;

use ballista_core::serde::protobuf::{
Expand Down Expand Up @@ -184,14 +185,15 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U: 'static + AsExecutionP
}
let runtime = executor.runtime.clone();
let session_id = task.session_id.clone();
let task_context = Arc::new(TaskContext::new(
let task_context = Arc::new(TaskContext::try_new(
task_identity.clone(),
session_id,
task_props,
task_scalar_functions,
task_aggregate_functions,
runtime.clone(),
));
Extensions::default(),
)?);

let plan: Arc<dyn ExecutionPlan> =
U::try_decode(task.plan.as_slice()).and_then(|proto| {
Expand Down
6 changes: 4 additions & 2 deletions ballista/executor/src/executor_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use ballista_core::BALLISTA_VERSION;
use datafusion::config::Extensions;
use std::collections::HashMap;
use std::convert::TryInto;
use std::ops::Deref;
Expand Down Expand Up @@ -319,14 +320,15 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,

let session_id = task.session_id;
let runtime = self.executor.runtime.clone();
let task_context = Arc::new(TaskContext::new(
let task_context = Arc::new(TaskContext::try_new(
task_identity.clone(),
session_id,
task_props,
task_scalar_functions,
task_aggregate_functions,
runtime.clone(),
));
Extensions::default(),
)?);

let encoded_plan = &task.plan.as_slice();

Expand Down
6 changes: 3 additions & 3 deletions ballista/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ sled = ["sled_package", "tokio-stream"]

[dependencies]
anyhow = "1"
arrow-flight = { version = "33.0.0", features = ["flight-sql-experimental"] }
arrow-flight = { workspace = true }
async-recursion = "1.0.0"
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.11.0", features = ["s3"] }
base64 = { version = "0.13", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = "0.4.0"
dashmap = "5.4.0"
datafusion = "19.0.0"
datafusion-proto = "19.0.0"
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
etcd-client = { version = "0.10", optional = true }
flatbuffers = { version = "22.9.29" }
futures = "0.3"
Expand Down
43 changes: 24 additions & 19 deletions ballista/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,32 +462,34 @@ order by
/* Expected result:
ShuffleWriterExec: Some(Hash([Column { name: "l_orderkey", index: 0 }], 2))
CoalesceBatchesExec: target_batch_size=4096
FilterExec: l_shipmode@4 IN ([Literal { value: Utf8("MAIL") }, Literal { value: Utf8("SHIP") }]) AND l_commitdate@2 < l_receiptdate@3 AND l_shipdate@1 < l_commitdate@2 AND l_receiptdate@3 >= 8766 AND l_receiptdate@3 < 9131
CsvExec: source=Path(testdata/lineitem: [testdata/lineitem/partition0.tbl,testdata/lineitem/partition1.tbl]), has_header=false
ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_shipmode@4 as l_shipmode]
CoalesceBatchesExec: target_batch_size=8192
FilterExec: (l_shipmode@4 = SHIP OR l_shipmode@4 = MAIL) AND l_commitdate@2 < l_receiptdate@3 AND l_shipdate@1 < l_commitdate@2 AND l_receiptdate@3 >= 8766 AND l_receiptdate@3 < 9131
CsvExec: files={2 groups: [[testdata/lineitem/partition0.tbl], [testdata/lineitem/partition1.tbl]]}, has_header=false, limit=None, projection=[l_orderkey, l_shipdate, l_commitdate, l_receiptdate, l_shipmode]
ShuffleWriterExec: Some(Hash([Column { name: "o_orderkey", index: 0 }], 2))
CsvExec: source=Path(testdata/orders: [testdata/orders/orders.tbl]), has_header=false
CsvExec: files={1 group: [[testdata/orders/orders.tbl]]}, has_header=false, limit=None, projection=[o_orderkey, o_orderpriority]
ShuffleWriterExec: Some(Hash([Column { name: "l_shipmode", index: 0 }], 2))
AggregateExec: mode=Partial, gby=[l_shipmode@4 as l_shipmode], aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
CoalesceBatchesExec: target_batch_size=4096
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderkey", index: 0 })]
CoalesceBatchesExec: target_batch_size=4096
UnresolvedShuffleExec
CoalesceBatchesExec: target_batch_size=4096
UnresolvedShuffleExec
AggregateExec: mode=Partial, gby=[l_shipmode@0 as l_shipmode], aggr=[SUM(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
ProjectionExec: expr=[l_shipmode@1 as l_shipmode, o_orderpriority@3 as o_orderpriority]
CoalesceBatchesExec: target_batch_size=8192
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderkey", index: 0 })]
CoalesceBatchesExec: target_batch_size=8192
UnresolvedShuffleExec
CoalesceBatchesExec: target_batch_size=8192
UnresolvedShuffleExec
ShuffleWriterExec: None
ProjectionExec: expr=[l_shipmode@0 as l_shipmode, SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@1 as high_line_count, SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@2 as low_line_count]
AggregateExec: mode=FinalPartitioned, gby=[l_shipmode@0 as l_shipmode], aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
CoalesceBatchesExec: target_batch_size=4096
UnresolvedShuffleExec
SortExec: expr=[l_shipmode@0 ASC NULLS LAST]
ProjectionExec: expr=[l_shipmode@0 as l_shipmode, SUM(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@1 as high_line_count, SUM(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@2 as low_line_count]
AggregateExec: mode=FinalPartitioned, gby=[l_shipmode@0 as l_shipmode], aggr=[SUM(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
CoalesceBatchesExec: target_batch_size=8192
UnresolvedShuffleExec
ShuffleWriterExec: None
SortExec: [l_shipmode@0 ASC]
CoalescePartitionsExec
UnresolvedShuffleExec
SortPreservingMergeExec: [l_shipmode@0 ASC NULLS LAST]
UnresolvedShuffleExec
*/

assert_eq!(5, stages.len());
Expand Down Expand Up @@ -537,7 +539,10 @@ order by

let hash_agg = downcast_exec!(input, AggregateExec);

let coalesce_batches = hash_agg.children()[0].clone();
let projection = hash_agg.children()[0].clone();
let projection = downcast_exec!(projection, ProjectionExec);

let coalesce_batches = projection.children()[0].clone();
let coalesce_batches = downcast_exec!(coalesce_batches, CoalesceBatchesExec);

let join = coalesce_batches.children()[0].clone();
Expand Down
Loading

0 comments on commit 8007a41

Please sign in to comment.