From 8007a414c40966180235839db1693a8e66f62810 Mon Sep 17 00:00:00 2001 From: "r.4ntix" Date: Mon, 27 Mar 2023 05:28:39 +0800 Subject: [PATCH] Upgrade datafusion to 20.0.0 & sqlparser to to 0.32.0 (#711) * Upgrade datafusion & sqlparser * Move ballista_round_trip tests of benchmark into a separate feature to avoid stack overflow * Fix failed tests of scheduler --- .github/workflows/rust.yml | 46 ++- Cargo.toml | 19 + ballista-cli/Cargo.toml | 4 +- ballista/client/Cargo.toml | 6 +- ballista/client/README.md | 2 +- ballista/core/Cargo.toml | 9 +- ballista/executor/Cargo.toml | 8 +- ballista/executor/src/execution_loop.rs | 6 +- ballista/executor/src/executor_server.rs | 6 +- ballista/scheduler/Cargo.toml | 6 +- ballista/scheduler/src/planner.rs | 43 +- .../src/state/execution_graph_dot.rs | 88 ++--- benchmarks/Cargo.toml | 5 +- benchmarks/src/bin/tpch.rs | 367 +++++++++--------- examples/Cargo.toml | 2 +- 15 files changed, 342 insertions(+), 275 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index b9f04342b..d15c5099a 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -268,6 +268,50 @@ jobs: # do not produce debug symbols to keep memory usage down RUSTFLAGS: "-C debuginfo=0" + # verify that the benchmark queries return the correct results + verify-benchmark-results: + name: verify benchmark results (amd64) + needs: [linux-build-lib] + runs-on: ubuntu-latest + strategy: + matrix: + arch: [amd64] + rust: [stable] + container: + image: ${{ matrix.arch }}/rust + env: + # Disable full debug symbol generation to speed up CI build and keep memory down + # "1" means line tables only, which is useful for panic tracebacks. + RUSTFLAGS: "-C debuginfo=1" + steps: + - uses: actions/checkout@v3 + with: + submodules: true + - name: Install protobuf compiler + shell: bash + run: | + apt-get -qq update && apt-get -y -qq install protobuf-compiler + protoc --version + - name: Cache Cargo + uses: actions/cache@v3 + with: + path: /github/home/.cargo + # this key equals the ones on `linux-build-lib` for re-use + key: cargo-cache- + - name: Cache Rust dependencies + uses: actions/cache@v3 + with: + path: /github/home/target + # this key equals the ones on `linux-build-lib` for re-use + key: ${{ runner.os }}-${{ matrix.arch }}-target-cache-${{ matrix.rust }} + - name: Setup Rust toolchain + uses: ./.github/actions/setup-builder + with: + rust-version: ${{ matrix.rust }} + - name: Verify that benchmark queries return expected results + run: | + cargo test --package ballista-benchmarks --profile release-nonlto --features=ci -- --test-threads=1 + lint: name: Lint runs-on: ubuntu-latest @@ -313,7 +357,7 @@ jobs: if [[ $DOCKER_TAG =~ ^[0-9\.]+-rc[0-9]+$ ]] then echo "publishing docker tag $DOCKER_TAG" - docker tag arrow-ballista-standalone:latest ghcr.io/apache/arrow-ballista-standalone:$DOCKER_TAG + docker tag arrow-ballista-standalone:latest ghcr.io/apache/arrow-ballista-standalone:$DOCKER_TAG docker login ghcr.io -u $DOCKER_USER -p "$DOCKER_PASS" docker push ghcr.io/apache/arrow-ballista-standalone:$DOCKER_TAG fi diff --git a/Cargo.toml b/Cargo.toml index ab4fd3c7b..29e6bc980 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,9 +27,28 @@ members = [ ] exclude = ["python"] +[workspace.dependencies] +arrow = { version = "34.0.0" } +arrow-flight = { version = "34.0.0", features = ["flight-sql-experimental"] } +datafusion = "20.0.0" +datafusion-proto = "20.0.0" + # cargo build --profile release-lto [profile.release-lto] codegen-units = 1 inherits = "release" lto = true +# the release profile takes a long time to build so we can use this profile during development to save time +# cargo build --profile release-nonlto +[profile.release-nonlto] +codegen-units = 16 +debug = false +debug-assertions = false +incremental = false +inherits = "release" +lto = false +opt-level = 3 +overflow-checks = false +panic = 'unwind' +rpath = false diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml index 844e6c422..695fba9a3 100644 --- a/ballista-cli/Cargo.toml +++ b/ballista-cli/Cargo.toml @@ -33,8 +33,8 @@ ballista = { path = "../ballista/client", version = "0.11.0", features = [ "standalone", ] } clap = { version = "3", features = ["derive", "cargo"] } -datafusion = "19.0.0" -datafusion-cli = "19.0.0" +datafusion = { workspace = true } +datafusion-cli = "20.0.0" dirs = "4.0.0" env_logger = "0.10" mimalloc = { version = "0.1", default-features = false } diff --git a/ballista/client/Cargo.toml b/ballista/client/Cargo.toml index 44c0a1432..88b93378f 100644 --- a/ballista/client/Cargo.toml +++ b/ballista/client/Cargo.toml @@ -31,12 +31,12 @@ rust-version = "1.63" ballista-core = { path = "../core", version = "0.11.0" } ballista-executor = { path = "../executor", version = "0.11.0", optional = true } ballista-scheduler = { path = "../scheduler", version = "0.11.0", optional = true } -datafusion = "19.0.0" -datafusion-proto = "19.0.0" +datafusion = { workspace = true } +datafusion-proto = { workspace = true } futures = "0.3" log = "0.4" parking_lot = "0.12" -sqlparser = "0.30.0" +sqlparser = "0.32.0" tempfile = "3" tokio = "1.0" diff --git a/ballista/client/README.md b/ballista/client/README.md index 4618b264b..525d6987b 100644 --- a/ballista/client/README.md +++ b/ballista/client/README.md @@ -85,7 +85,7 @@ To build a simple ballista example, add the following dependencies to your `Carg ```toml [dependencies] ballista = "0.10" -datafusion = "19.0.0" +datafusion = "20.0.0" tokio = "1.0" ``` diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml index f3c6c341a..95282a367 100644 --- a/ballista/core/Cargo.toml +++ b/ballista/core/Cargo.toml @@ -45,14 +45,13 @@ simd = ["datafusion/simd"] [dependencies] ahash = { version = "0.8", default-features = false } - -arrow-flight = { version = "33.0.0", features = ["flight-sql-experimental"] } +arrow-flight = { workspace = true } async-trait = "0.1.41" chrono = { version = "0.4", default-features = false } clap = { version = "3", features = ["derive", "cargo"] } -datafusion = "19.0.0" +datafusion = { workspace = true } datafusion-objectstore-hdfs = { version = "0.1.1", default-features = false, optional = true } -datafusion-proto = "19.0.0" +datafusion-proto = { workspace = true } futures = "0.3" hashbrown = "0.13" @@ -68,7 +67,7 @@ prost = "0.11" prost-types = "0.11" rand = "0.8" serde = { version = "1", features = ["derive"] } -sqlparser = "0.30.0" +sqlparser = "0.32.0" sys-info = "0.9.0" tokio = "1.0" tokio-stream = { version = "0.1", features = ["net"] } diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml index be605b92e..1167e40a0 100644 --- a/ballista/executor/Cargo.toml +++ b/ballista/executor/Cargo.toml @@ -38,15 +38,15 @@ default = ["mimalloc"] [dependencies] anyhow = "1" -arrow = { version = "33.0.0" } -arrow-flight = { version = "33.0.0" } +arrow = { workspace = true } +arrow-flight = { workspace = true } async-trait = "0.1.41" ballista-core = { path = "../core", version = "0.11.0", features = ["s3"] } chrono = { version = "0.4", default-features = false } configure_me = "0.4.0" dashmap = "5.4.0" -datafusion = "19.0.0" -datafusion-proto = "19.0.0" +datafusion = { workspace = true } +datafusion-proto = { workspace = true } futures = "0.3" hyper = "0.14.4" log = "0.4" diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs index 8cf6a4dac..6b7070b89 100644 --- a/ballista/executor/src/execution_loop.rs +++ b/ballista/executor/src/execution_loop.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use datafusion::config::Extensions; use datafusion::physical_plan::ExecutionPlan; use ballista_core::serde::protobuf::{ @@ -184,14 +185,15 @@ async fn run_received_task = U::try_decode(task.plan.as_slice()).and_then(|proto| { diff --git a/ballista/executor/src/executor_server.rs b/ballista/executor/src/executor_server.rs index 4468be98a..7256d8e29 100644 --- a/ballista/executor/src/executor_server.rs +++ b/ballista/executor/src/executor_server.rs @@ -16,6 +16,7 @@ // under the License. use ballista_core::BALLISTA_VERSION; +use datafusion::config::Extensions; use std::collections::HashMap; use std::convert::TryInto; use std::ops::Deref; @@ -319,14 +320,15 @@ impl ExecutorServer= 8766 AND l_receiptdate@3 < 9131 - CsvExec: source=Path(testdata/lineitem: [testdata/lineitem/partition0.tbl,testdata/lineitem/partition1.tbl]), has_header=false + ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_shipmode@4 as l_shipmode] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: (l_shipmode@4 = SHIP OR l_shipmode@4 = MAIL) AND l_commitdate@2 < l_receiptdate@3 AND l_shipdate@1 < l_commitdate@2 AND l_receiptdate@3 >= 8766 AND l_receiptdate@3 < 9131 + CsvExec: files={2 groups: [[testdata/lineitem/partition0.tbl], [testdata/lineitem/partition1.tbl]]}, has_header=false, limit=None, projection=[l_orderkey, l_shipdate, l_commitdate, l_receiptdate, l_shipmode] ShuffleWriterExec: Some(Hash([Column { name: "o_orderkey", index: 0 }], 2)) - CsvExec: source=Path(testdata/orders: [testdata/orders/orders.tbl]), has_header=false + CsvExec: files={1 group: [[testdata/orders/orders.tbl]]}, has_header=false, limit=None, projection=[o_orderkey, o_orderpriority] ShuffleWriterExec: Some(Hash([Column { name: "l_shipmode", index: 0 }], 2)) - AggregateExec: mode=Partial, gby=[l_shipmode@4 as l_shipmode], aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)] - CoalesceBatchesExec: target_batch_size=4096 - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderkey", index: 0 })] - CoalesceBatchesExec: target_batch_size=4096 - UnresolvedShuffleExec - CoalesceBatchesExec: target_batch_size=4096 - UnresolvedShuffleExec + AggregateExec: mode=Partial, gby=[l_shipmode@0 as l_shipmode], aggr=[SUM(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)] + ProjectionExec: expr=[l_shipmode@1 as l_shipmode, o_orderpriority@3 as o_orderpriority] + CoalesceBatchesExec: target_batch_size=8192 + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderkey", index: 0 })] + CoalesceBatchesExec: target_batch_size=8192 + UnresolvedShuffleExec + CoalesceBatchesExec: target_batch_size=8192 + UnresolvedShuffleExec ShuffleWriterExec: None - ProjectionExec: expr=[l_shipmode@0 as l_shipmode, SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@1 as high_line_count, SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@2 as low_line_count] - AggregateExec: mode=FinalPartitioned, gby=[l_shipmode@0 as l_shipmode], aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)] - CoalesceBatchesExec: target_batch_size=4096 - UnresolvedShuffleExec + SortExec: expr=[l_shipmode@0 ASC NULLS LAST] + ProjectionExec: expr=[l_shipmode@0 as l_shipmode, SUM(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@1 as high_line_count, SUM(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@2 as low_line_count] + AggregateExec: mode=FinalPartitioned, gby=[l_shipmode@0 as l_shipmode], aggr=[SUM(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)] + CoalesceBatchesExec: target_batch_size=8192 + UnresolvedShuffleExec ShuffleWriterExec: None - SortExec: [l_shipmode@0 ASC] - CoalescePartitionsExec - UnresolvedShuffleExec + SortPreservingMergeExec: [l_shipmode@0 ASC NULLS LAST] + UnresolvedShuffleExec */ assert_eq!(5, stages.len()); @@ -537,7 +539,10 @@ order by let hash_agg = downcast_exec!(input, AggregateExec); - let coalesce_batches = hash_agg.children()[0].clone(); + let projection = hash_agg.children()[0].clone(); + let projection = downcast_exec!(projection, ProjectionExec); + + let coalesce_batches = projection.children()[0].clone(); let coalesce_batches = downcast_exec!(coalesce_batches, CoalesceBatchesExec); let join = coalesce_batches.children()[0].clone(); diff --git a/ballista/scheduler/src/state/execution_graph_dot.rs b/ballista/scheduler/src/state/execution_graph_dot.rs index 853fa21dd..5b4f744d6 100644 --- a/ballista/scheduler/src/state/execution_graph_dot.rs +++ b/ballista/scheduler/src/state/execution_graph_dot.rs @@ -469,27 +469,25 @@ filter_expr="] subgraph cluster4 { label = "Stage 5 [Unresolved]"; stage_5_0 [shape=box, label="ShuffleWriter [48 partitions]"] - stage_5_0_0 [shape=box, label="Projection: a@0, b@1, a@2, b@3, a@4, b@5"] - stage_5_0_0_0 [shape=box, label="CoalesceBatches [batchSize=4096]"] - stage_5_0_0_0_0 [shape=box, label="HashJoin + stage_5_0_0 [shape=box, label="CoalesceBatches [batchSize=4096]"] + stage_5_0_0_0 [shape=box, label="HashJoin join_expr=b@3 = b@1 filter_expr="] - stage_5_0_0_0_0_0 [shape=box, label="CoalesceBatches [batchSize=4096]"] - stage_5_0_0_0_0_0_0 [shape=box, label="UnresolvedShuffleExec [stage_id=3]"] - stage_5_0_0_0_0_0_0 -> stage_5_0_0_0_0_0 + stage_5_0_0_0_0 [shape=box, label="CoalesceBatches [batchSize=4096]"] + stage_5_0_0_0_0_0 [shape=box, label="UnresolvedShuffleExec [stage_id=3]"] stage_5_0_0_0_0_0 -> stage_5_0_0_0_0 - stage_5_0_0_0_0_1 [shape=box, label="CoalesceBatches [batchSize=4096]"] - stage_5_0_0_0_0_1_0 [shape=box, label="UnresolvedShuffleExec [stage_id=4]"] - stage_5_0_0_0_0_1_0 -> stage_5_0_0_0_0_1 - stage_5_0_0_0_0_1 -> stage_5_0_0_0_0 stage_5_0_0_0_0 -> stage_5_0_0_0 + stage_5_0_0_0_1 [shape=box, label="CoalesceBatches [batchSize=4096]"] + stage_5_0_0_0_1_0 [shape=box, label="UnresolvedShuffleExec [stage_id=4]"] + stage_5_0_0_0_1_0 -> stage_5_0_0_0_1 + stage_5_0_0_0_1 -> stage_5_0_0_0 stage_5_0_0_0 -> stage_5_0_0 stage_5_0_0 -> stage_5_0 } stage_1_0 -> stage_3_0_0_0_0_0 stage_2_0 -> stage_3_0_0_0_1_0 - stage_3_0 -> stage_5_0_0_0_0_0_0 - stage_4_0 -> stage_5_0_0_0_0_1_0 + stage_3_0 -> stage_5_0_0_0_0_0 + stage_4_0 -> stage_5_0_0_0_1_0 } "#; assert_eq!(expected, &dot); @@ -552,36 +550,34 @@ filter_expr="] subgraph cluster3 { label = "Stage 4 [Unresolved]"; stage_4_0 [shape=box, label="ShuffleWriter [48 partitions]"] - stage_4_0_0 [shape=box, label="Projection: a@0, a@1, a@2"] - stage_4_0_0_0 [shape=box, label="CoalesceBatches [batchSize=4096]"] - stage_4_0_0_0_0 [shape=box, label="HashJoin + stage_4_0_0 [shape=box, label="CoalesceBatches [batchSize=4096]"] + stage_4_0_0_0 [shape=box, label="HashJoin join_expr=a@1 = a@0 filter_expr="] - stage_4_0_0_0_0_0 [shape=box, label="CoalesceBatches [batchSize=4096]"] - stage_4_0_0_0_0_0_0 [shape=box, label="HashJoin + stage_4_0_0_0_0 [shape=box, label="CoalesceBatches [batchSize=4096]"] + stage_4_0_0_0_0_0 [shape=box, label="HashJoin join_expr=a@0 = a@0 filter_expr="] - stage_4_0_0_0_0_0_0_0 [shape=box, label="CoalesceBatches [batchSize=4096]"] - stage_4_0_0_0_0_0_0_0_0 [shape=box, label="UnresolvedShuffleExec [stage_id=1]"] - stage_4_0_0_0_0_0_0_0_0 -> stage_4_0_0_0_0_0_0_0 + stage_4_0_0_0_0_0_0 [shape=box, label="CoalesceBatches [batchSize=4096]"] + stage_4_0_0_0_0_0_0_0 [shape=box, label="UnresolvedShuffleExec [stage_id=1]"] stage_4_0_0_0_0_0_0_0 -> stage_4_0_0_0_0_0_0 - stage_4_0_0_0_0_0_0_1 [shape=box, label="CoalesceBatches [batchSize=4096]"] - stage_4_0_0_0_0_0_0_1_0 [shape=box, label="UnresolvedShuffleExec [stage_id=2]"] - stage_4_0_0_0_0_0_0_1_0 -> stage_4_0_0_0_0_0_0_1 - stage_4_0_0_0_0_0_0_1 -> stage_4_0_0_0_0_0_0 stage_4_0_0_0_0_0_0 -> stage_4_0_0_0_0_0 + stage_4_0_0_0_0_0_1 [shape=box, label="CoalesceBatches [batchSize=4096]"] + stage_4_0_0_0_0_0_1_0 [shape=box, label="UnresolvedShuffleExec [stage_id=2]"] + stage_4_0_0_0_0_0_1_0 -> stage_4_0_0_0_0_0_1 + stage_4_0_0_0_0_0_1 -> stage_4_0_0_0_0_0 stage_4_0_0_0_0_0 -> stage_4_0_0_0_0 - stage_4_0_0_0_0_1 [shape=box, label="CoalesceBatches [batchSize=4096]"] - stage_4_0_0_0_0_1_0 [shape=box, label="UnresolvedShuffleExec [stage_id=3]"] - stage_4_0_0_0_0_1_0 -> stage_4_0_0_0_0_1 - stage_4_0_0_0_0_1 -> stage_4_0_0_0_0 stage_4_0_0_0_0 -> stage_4_0_0_0 + stage_4_0_0_0_1 [shape=box, label="CoalesceBatches [batchSize=4096]"] + stage_4_0_0_0_1_0 [shape=box, label="UnresolvedShuffleExec [stage_id=3]"] + stage_4_0_0_0_1_0 -> stage_4_0_0_0_1 + stage_4_0_0_0_1 -> stage_4_0_0_0 stage_4_0_0_0 -> stage_4_0_0 stage_4_0_0 -> stage_4_0 } - stage_1_0 -> stage_4_0_0_0_0_0_0_0_0 - stage_2_0 -> stage_4_0_0_0_0_0_0_1_0 - stage_3_0 -> stage_4_0_0_0_0_1_0 + stage_1_0 -> stage_4_0_0_0_0_0_0_0 + stage_2_0 -> stage_4_0_0_0_0_0_1_0 + stage_3_0 -> stage_4_0_0_0_1_0 } "#; assert_eq!(expected, &dot); @@ -596,30 +592,28 @@ filter_expr="] let expected = r#"digraph G { stage_4_0 [shape=box, label="ShuffleWriter [48 partitions]"] - stage_4_0_0 [shape=box, label="Projection: a@0, a@1, a@2"] - stage_4_0_0_0 [shape=box, label="CoalesceBatches [batchSize=4096]"] - stage_4_0_0_0_0 [shape=box, label="HashJoin + stage_4_0_0 [shape=box, label="CoalesceBatches [batchSize=4096]"] + stage_4_0_0_0 [shape=box, label="HashJoin join_expr=a@1 = a@0 filter_expr="] - stage_4_0_0_0_0_0 [shape=box, label="CoalesceBatches [batchSize=4096]"] - stage_4_0_0_0_0_0_0 [shape=box, label="HashJoin + stage_4_0_0_0_0 [shape=box, label="CoalesceBatches [batchSize=4096]"] + stage_4_0_0_0_0_0 [shape=box, label="HashJoin join_expr=a@0 = a@0 filter_expr="] - stage_4_0_0_0_0_0_0_0 [shape=box, label="CoalesceBatches [batchSize=4096]"] - stage_4_0_0_0_0_0_0_0_0 [shape=box, label="UnresolvedShuffleExec [stage_id=1]"] - stage_4_0_0_0_0_0_0_0_0 -> stage_4_0_0_0_0_0_0_0 + stage_4_0_0_0_0_0_0 [shape=box, label="CoalesceBatches [batchSize=4096]"] + stage_4_0_0_0_0_0_0_0 [shape=box, label="UnresolvedShuffleExec [stage_id=1]"] stage_4_0_0_0_0_0_0_0 -> stage_4_0_0_0_0_0_0 - stage_4_0_0_0_0_0_0_1 [shape=box, label="CoalesceBatches [batchSize=4096]"] - stage_4_0_0_0_0_0_0_1_0 [shape=box, label="UnresolvedShuffleExec [stage_id=2]"] - stage_4_0_0_0_0_0_0_1_0 -> stage_4_0_0_0_0_0_0_1 - stage_4_0_0_0_0_0_0_1 -> stage_4_0_0_0_0_0_0 stage_4_0_0_0_0_0_0 -> stage_4_0_0_0_0_0 + stage_4_0_0_0_0_0_1 [shape=box, label="CoalesceBatches [batchSize=4096]"] + stage_4_0_0_0_0_0_1_0 [shape=box, label="UnresolvedShuffleExec [stage_id=2]"] + stage_4_0_0_0_0_0_1_0 -> stage_4_0_0_0_0_0_1 + stage_4_0_0_0_0_0_1 -> stage_4_0_0_0_0_0 stage_4_0_0_0_0_0 -> stage_4_0_0_0_0 - stage_4_0_0_0_0_1 [shape=box, label="CoalesceBatches [batchSize=4096]"] - stage_4_0_0_0_0_1_0 [shape=box, label="UnresolvedShuffleExec [stage_id=3]"] - stage_4_0_0_0_0_1_0 -> stage_4_0_0_0_0_1 - stage_4_0_0_0_0_1 -> stage_4_0_0_0_0 stage_4_0_0_0_0 -> stage_4_0_0_0 + stage_4_0_0_0_1 [shape=box, label="CoalesceBatches [batchSize=4096]"] + stage_4_0_0_0_1_0 [shape=box, label="UnresolvedShuffleExec [stage_id=3]"] + stage_4_0_0_0_1_0 -> stage_4_0_0_0_1 + stage_4_0_0_0_1 -> stage_4_0_0_0 stage_4_0_0_0 -> stage_4_0_0 stage_4_0_0 -> stage_4_0 } diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 538f9107a..53ad3d136 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -28,14 +28,15 @@ publish = false rust-version = "1.63" [features] +ci = [] default = ["mimalloc"] simd = ["datafusion/simd"] snmalloc = ["snmalloc-rs"] [dependencies] ballista = { path = "../ballista/client", version = "0.11.0" } -datafusion = "19.0.0" -datafusion-proto = "19.0.0" +datafusion = { workspace = true } +datafusion-proto = { workspace = true } env_logger = "0.10" futures = "0.3" mimalloc = { version = "0.1", optional = true, default-features = false } diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index a72c92ecb..b4314359e 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -1545,200 +1545,201 @@ mod tests { Ok(()) } +} - mod ballista_round_trip { - use super::*; - use ballista_core::serde::BallistaCodec; - use datafusion::datasource::listing::ListingTableUrl; - use datafusion::execution::options::ReadOptions; - use datafusion::physical_plan::ExecutionPlan; - use datafusion_proto::logical_plan::AsLogicalPlan; - use datafusion_proto::physical_plan::AsExecutionPlan; - use std::ops::Deref; - - async fn round_trip_logical_plan(n: usize) -> Result<()> { - let config = SessionConfig::new() - .with_target_partitions(1) - .with_batch_size(10); - let ctx = SessionContext::with_config(config); - let session_state = ctx.state(); - let codec: BallistaCodec< - datafusion_proto::protobuf::LogicalPlanNode, - datafusion_proto::protobuf::PhysicalPlanNode, - > = BallistaCodec::default(); - - // set tpch_data_path to dummy value and skip physical plan serde test when TPCH_DATA - // is not set. - let tpch_data_path = - env::var("TPCH_DATA").unwrap_or_else(|_| "./".to_string()); - let path = ListingTableUrl::parse(tpch_data_path)?; - - for &table in TABLES { - let schema = get_schema(table); - let options = CsvReadOptions::new() - .schema(&schema) - .delimiter(b'|') - .has_header(false) - .file_extension(".tbl"); - let cfg = SessionConfig::new(); - let listing_options = options.to_listing_options(&cfg); - let config = ListingTableConfig::new(path.clone()) - .with_listing_options(listing_options) - .with_schema(Arc::new(schema)); - let provider = ListingTable::try_new(config)?; - ctx.register_table(table, Arc::new(provider))?; - } +#[cfg(test)] +#[cfg(feature = "ci")] +mod ballista_round_trip { + use super::*; + use ballista_core::serde::BallistaCodec; + use datafusion::datasource::listing::ListingTableUrl; + use datafusion::execution::options::ReadOptions; + use datafusion::physical_plan::ExecutionPlan; + use datafusion_proto::logical_plan::AsLogicalPlan; + use datafusion_proto::physical_plan::AsExecutionPlan; + use std::env; + use std::ops::Deref; - // test logical plan round trip - let plans = create_logical_plans(&ctx, n).await?; - for plan in plans { - // test optimized logical plan round trip - let plan = session_state.optimize(&plan)?; - let proto: datafusion_proto::protobuf::LogicalPlanNode = - datafusion_proto::protobuf::LogicalPlanNode::try_from_logical_plan( - &plan, - codec.logical_extension_codec(), - ) - .unwrap(); - let round_trip: LogicalPlan = proto - .try_into_logical_plan(&ctx, codec.logical_extension_codec()) - .unwrap(); - assert_eq!( - format!("{plan:?}"), - format!("{round_trip:?}"), - "optimized logical plan round trip failed" - ); - } + async fn round_trip_logical_plan(n: usize) -> Result<()> { + let config = SessionConfig::new() + .with_target_partitions(1) + .with_batch_size(10); + let ctx = SessionContext::with_config(config); + let session_state = ctx.state(); + let codec: BallistaCodec< + datafusion_proto::protobuf::LogicalPlanNode, + datafusion_proto::protobuf::PhysicalPlanNode, + > = BallistaCodec::default(); + + // set tpch_data_path to dummy value and skip physical plan serde test when TPCH_DATA + // is not set. + let tpch_data_path = env::var("TPCH_DATA").unwrap_or_else(|_| "./".to_string()); + let path = ListingTableUrl::parse(tpch_data_path)?; - Ok(()) + for &table in TABLES { + let schema = get_schema(table); + let options = CsvReadOptions::new() + .schema(&schema) + .delimiter(b'|') + .has_header(false) + .file_extension(".tbl"); + let cfg = SessionConfig::new(); + let listing_options = options.to_listing_options(&cfg); + let config = ListingTableConfig::new(path.clone()) + .with_listing_options(listing_options) + .with_schema(Arc::new(schema)); + let provider = ListingTable::try_new(config)?; + ctx.register_table(table, Arc::new(provider))?; } - async fn round_trip_physical_plan(n: usize) -> Result<()> { - let config = SessionConfig::new() - .with_target_partitions(1) - .with_batch_size(10); - let ctx = SessionContext::with_config(config); - let session_state = ctx.state(); - let codec: BallistaCodec< - datafusion_proto::protobuf::LogicalPlanNode, - datafusion_proto::protobuf::PhysicalPlanNode, - > = BallistaCodec::default(); - - // set tpch_data_path to dummy value and skip physical plan serde test when TPCH_DATA - // is not set. - let tpch_data_path = - env::var("TPCH_DATA").unwrap_or_else(|_| "./".to_string()); - let path = ListingTableUrl::parse(tpch_data_path)?; - - for &table in TABLES { - let schema = get_schema(table); - let options = CsvReadOptions::new() - .schema(&schema) - .delimiter(b'|') - .has_header(false) - .file_extension(".tbl"); - let cfg = SessionConfig::new(); - let listing_options = options.to_listing_options(&cfg); - let config = ListingTableConfig::new(path.clone()) - .with_listing_options(listing_options) - .with_schema(Arc::new(schema)); - let provider = ListingTable::try_new(config)?; - ctx.register_table(table, Arc::new(provider))?; - } + // test logical plan round trip + let plans = create_logical_plans(&ctx, n).await?; + for plan in plans { + // test optimized logical plan round trip + let plan = session_state.optimize(&plan)?; + let proto: datafusion_proto::protobuf::LogicalPlanNode = + datafusion_proto::protobuf::LogicalPlanNode::try_from_logical_plan( + &plan, + codec.logical_extension_codec(), + ) + .unwrap(); + let round_trip: LogicalPlan = proto + .try_into_logical_plan(&ctx, codec.logical_extension_codec()) + .unwrap(); + assert_eq!( + format!("{plan:?}"), + format!("{round_trip:?}"), + "optimized logical plan round trip failed" + ); + } - // test logical plan round trip - let plans = create_logical_plans(&ctx, n).await?; - for plan in plans { - let plan = session_state.optimize(&plan)?; - - // test physical plan roundtrip - let physical_plan = session_state.create_physical_plan(&plan).await?; - let proto: datafusion_proto::protobuf::PhysicalPlanNode = - datafusion_proto::protobuf::PhysicalPlanNode::try_from_physical_plan( - physical_plan.clone(), - codec.physical_extension_codec(), - ) - .unwrap(); - let runtime = ctx.runtime_env(); - let round_trip: Arc = proto - .try_into_physical_plan( - &ctx, - runtime.deref(), - codec.physical_extension_codec(), - ) - .unwrap(); - assert_eq!( - format!("{}", displayable(physical_plan.as_ref()).indent()), - format!("{}", displayable(round_trip.as_ref()).indent()), - "physical plan round trip failed" - ); - } + Ok(()) + } - Ok(()) - } + async fn round_trip_physical_plan(n: usize) -> Result<()> { + let config = SessionConfig::new() + .with_target_partitions(1) + .with_batch_size(10); + let ctx = SessionContext::with_config(config); + let session_state = ctx.state(); + let codec: BallistaCodec< + datafusion_proto::protobuf::LogicalPlanNode, + datafusion_proto::protobuf::PhysicalPlanNode, + > = BallistaCodec::default(); - macro_rules! test_round_trip_logical { - ($tn:ident, $query:expr) => { - #[tokio::test] - async fn $tn() -> Result<()> { - round_trip_logical_plan($query).await - } - }; + // set tpch_data_path to dummy value and skip physical plan serde test when TPCH_DATA + // is not set. + let tpch_data_path = env::var("TPCH_DATA").unwrap_or_else(|_| "./".to_string()); + let path = ListingTableUrl::parse(tpch_data_path)?; + + for &table in TABLES { + let schema = get_schema(table); + let options = CsvReadOptions::new() + .schema(&schema) + .delimiter(b'|') + .has_header(false) + .file_extension(".tbl"); + let cfg = SessionConfig::new(); + let listing_options = options.to_listing_options(&cfg); + let config = ListingTableConfig::new(path.clone()) + .with_listing_options(listing_options) + .with_schema(Arc::new(schema)); + let provider = ListingTable::try_new(config)?; + ctx.register_table(table, Arc::new(provider))?; } - test_round_trip_logical!(q1, 1); - test_round_trip_logical!(q2, 2); - test_round_trip_logical!(q3, 3); - test_round_trip_logical!(q4, 4); - test_round_trip_logical!(q5, 5); - test_round_trip_logical!(q6, 6); - test_round_trip_logical!(q7, 7); - test_round_trip_logical!(q8, 8); - test_round_trip_logical!(q9, 9); - test_round_trip_logical!(q10, 10); - test_round_trip_logical!(q11, 11); - test_round_trip_logical!(q12, 12); - test_round_trip_logical!(q13, 13); - test_round_trip_logical!(q14, 14); - //test_round_trip_logical!(q15, 15); // https://github.com/apache/arrow-ballista/issues/330 - test_round_trip_logical!(q16, 16); - test_round_trip_logical!(q17, 17); - test_round_trip_logical!(q18, 18); - test_round_trip_logical!(q19, 19); - test_round_trip_logical!(q20, 20); - test_round_trip_logical!(q21, 21); - test_round_trip_logical!(q22, 22); - - macro_rules! test_round_trip_physical { - ($tn:ident, $query:expr) => { - #[tokio::test] - async fn $tn() -> Result<()> { - round_trip_physical_plan($query).await - } - }; + // test logical plan round trip + let plans = create_logical_plans(&ctx, n).await?; + for plan in plans { + let plan = session_state.optimize(&plan)?; + + // test physical plan roundtrip + let physical_plan = session_state.create_physical_plan(&plan).await?; + let proto: datafusion_proto::protobuf::PhysicalPlanNode = + datafusion_proto::protobuf::PhysicalPlanNode::try_from_physical_plan( + physical_plan.clone(), + codec.physical_extension_codec(), + ) + .unwrap(); + let runtime = ctx.runtime_env(); + let round_trip: Arc = proto + .try_into_physical_plan( + &ctx, + runtime.deref(), + codec.physical_extension_codec(), + ) + .unwrap(); + assert_eq!( + format!("{}", displayable(physical_plan.as_ref()).indent()), + format!("{}", displayable(round_trip.as_ref()).indent()), + "physical plan round trip failed" + ); } - test_round_trip_physical!(physical_round_trip_q1, 1); - test_round_trip_physical!(physical_round_trip_q2, 2); - test_round_trip_physical!(physical_round_trip_q3, 3); - test_round_trip_physical!(physical_round_trip_q4, 4); - test_round_trip_physical!(physical_round_trip_q5, 5); - test_round_trip_physical!(physical_round_trip_q6, 6); - test_round_trip_physical!(physical_round_trip_q7, 7); - test_round_trip_physical!(physical_round_trip_q8, 8); - test_round_trip_physical!(physical_round_trip_q9, 9); - test_round_trip_physical!(physical_round_trip_q10, 10); - test_round_trip_physical!(physical_round_trip_q11, 11); - test_round_trip_physical!(physical_round_trip_q12, 12); - test_round_trip_physical!(physical_round_trip_q13, 13); - test_round_trip_physical!(physical_round_trip_q14, 14); - // test_round_trip_physical!(physical_round_trip_q15, 15); // https://github.com/apache/arrow-ballista/issues/330 - test_round_trip_physical!(physical_round_trip_q16, 16); - test_round_trip_physical!(physical_round_trip_q17, 17); - test_round_trip_physical!(physical_round_trip_q18, 18); - test_round_trip_physical!(physical_round_trip_q19, 19); - test_round_trip_physical!(physical_round_trip_q20, 20); - test_round_trip_physical!(physical_round_trip_q21, 21); - test_round_trip_physical!(physical_round_trip_q22, 22); + Ok(()) } + + macro_rules! test_round_trip_logical { + ($tn:ident, $query:expr) => { + #[tokio::test] + async fn $tn() -> Result<()> { + round_trip_logical_plan($query).await + } + }; + } + + test_round_trip_logical!(q1, 1); + test_round_trip_logical!(q2, 2); + test_round_trip_logical!(q3, 3); + test_round_trip_logical!(q4, 4); + test_round_trip_logical!(q5, 5); + test_round_trip_logical!(q6, 6); + test_round_trip_logical!(q7, 7); + test_round_trip_logical!(q8, 8); + test_round_trip_logical!(q9, 9); + test_round_trip_logical!(q10, 10); + test_round_trip_logical!(q11, 11); + test_round_trip_logical!(q12, 12); + test_round_trip_logical!(q13, 13); + test_round_trip_logical!(q14, 14); + //test_round_trip_logical!(q15, 15); // https://github.com/apache/arrow-ballista/issues/330 + test_round_trip_logical!(q16, 16); + test_round_trip_logical!(q17, 17); + test_round_trip_logical!(q18, 18); + test_round_trip_logical!(q19, 19); + test_round_trip_logical!(q20, 20); + test_round_trip_logical!(q21, 21); + test_round_trip_logical!(q22, 22); + + macro_rules! test_round_trip_physical { + ($tn:ident, $query:expr) => { + #[tokio::test] + async fn $tn() -> Result<()> { + round_trip_physical_plan($query).await + } + }; + } + + test_round_trip_physical!(physical_round_trip_q1, 1); + test_round_trip_physical!(physical_round_trip_q2, 2); + test_round_trip_physical!(physical_round_trip_q3, 3); + test_round_trip_physical!(physical_round_trip_q4, 4); + test_round_trip_physical!(physical_round_trip_q5, 5); + test_round_trip_physical!(physical_round_trip_q6, 6); + test_round_trip_physical!(physical_round_trip_q7, 7); + test_round_trip_physical!(physical_round_trip_q8, 8); + test_round_trip_physical!(physical_round_trip_q9, 9); + test_round_trip_physical!(physical_round_trip_q10, 10); + test_round_trip_physical!(physical_round_trip_q11, 11); + test_round_trip_physical!(physical_round_trip_q12, 12); + test_round_trip_physical!(physical_round_trip_q13, 13); + test_round_trip_physical!(physical_round_trip_q14, 14); + // test_round_trip_physical!(physical_round_trip_q15, 15); // https://github.com/apache/arrow-ballista/issues/330 + test_round_trip_physical!(physical_round_trip_q16, 16); + test_round_trip_physical!(physical_round_trip_q17, 17); + test_round_trip_physical!(physical_round_trip_q18, 18); + test_round_trip_physical!(physical_round_trip_q19, 19); + test_round_trip_physical!(physical_round_trip_q20, 20); + test_round_trip_physical!(physical_round_trip_q21, 21); + test_round_trip_physical!(physical_round_trip_q22, 22); } diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 5f6543832..5eeb66b35 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -35,7 +35,7 @@ required-features = ["ballista/standalone"] [dependencies] ballista = { path = "../ballista/client", version = "0.11.0" } -datafusion = "19.0.0" +datafusion = { workspace = true } futures = "0.3" num_cpus = "1.13.0" prost = "0.11"