Skip to content

Commit

Permalink
Merge branch 'main' into physicalexpr-eq-partialeq-hash
Browse files Browse the repository at this point in the history
# Conflicts:
#	datafusion/physical-expr/src/expressions/not.rs
  • Loading branch information
peter-toth committed Oct 31, 2024
2 parents f1917fa + f23360f commit 4a69b2c
Show file tree
Hide file tree
Showing 437 changed files with 22,548 additions and 13,681 deletions.
69 changes: 36 additions & 33 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ members = [
"datafusion/expr",
"datafusion/expr-common",
"datafusion/execution",
"datafusion/ffi",
"datafusion/functions",
"datafusion/functions-aggregate",
"datafusion/functions-aggregate-common",
Expand Down Expand Up @@ -59,7 +60,7 @@ license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/apache/datafusion"
rust-version = "1.79"
version = "42.0.0"
version = "42.1.0"

[workspace.dependencies]
# We turn off default-features for some dependencies here so the workspaces which inherit them can
Expand All @@ -70,51 +71,52 @@ version = "42.0.0"
ahash = { version = "0.8", default-features = false, features = [
"runtime-rng",
] }
arrow = { version = "53.1.0", features = [
arrow = { version = "53.2.0", features = [
"prettyprint",
] }
arrow-array = { version = "53.1.0", default-features = false, features = [
arrow-array = { version = "53.2.0", default-features = false, features = [
"chrono-tz",
] }
arrow-buffer = { version = "53.1.0", default-features = false }
arrow-flight = { version = "53.1.0", features = [
arrow-buffer = { version = "53.2.0", default-features = false }
arrow-flight = { version = "53.2.0", features = [
"flight-sql-experimental",
] }
arrow-ipc = { version = "53.1.0", default-features = false, features = [
arrow-ipc = { version = "53.2.0", default-features = false, features = [
"lz4",
] }
arrow-ord = { version = "53.1.0", default-features = false }
arrow-schema = { version = "53.1.0", default-features = false }
arrow-string = { version = "53.1.0", default-features = false }
arrow-ord = { version = "53.2.0", default-features = false }
arrow-schema = { version = "53.2.0", default-features = false }
arrow-string = { version = "53.2.0", default-features = false }
async-trait = "0.1.73"
bigdecimal = "=0.4.1"
bytes = "1.4"
chrono = { version = "0.4.38", default-features = false }
ctor = "0.2.0"
dashmap = "6.0.1"
datafusion = { path = "datafusion/core", version = "42.0.0", default-features = false }
datafusion-catalog = { path = "datafusion/catalog", version = "42.0.0" }
datafusion-common = { path = "datafusion/common", version = "42.0.0", default-features = false }
datafusion-common-runtime = { path = "datafusion/common-runtime", version = "42.0.0" }
datafusion-execution = { path = "datafusion/execution", version = "42.0.0" }
datafusion-expr = { path = "datafusion/expr", version = "42.0.0" }
datafusion-expr-common = { path = "datafusion/expr-common", version = "42.0.0" }
datafusion-functions = { path = "datafusion/functions", version = "42.0.0" }
datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "42.0.0" }
datafusion-functions-aggregate-common = { path = "datafusion/functions-aggregate-common", version = "42.0.0" }
datafusion-functions-nested = { path = "datafusion/functions-nested", version = "42.0.0" }
datafusion-functions-window = { path = "datafusion/functions-window", version = "42.0.0" }
datafusion-functions-window-common = { path = "datafusion/functions-window-common", version = "42.0.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "42.0.0", default-features = false }
datafusion-physical-expr = { path = "datafusion/physical-expr", version = "42.0.0", default-features = false }
datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "42.0.0", default-features = false }
datafusion-physical-optimizer = { path = "datafusion/physical-optimizer", version = "42.0.0" }
datafusion-physical-plan = { path = "datafusion/physical-plan", version = "42.0.0" }
datafusion-proto = { path = "datafusion/proto", version = "42.0.0" }
datafusion-proto-common = { path = "datafusion/proto-common", version = "42.0.0" }
datafusion-sql = { path = "datafusion/sql", version = "42.0.0" }
datafusion-sqllogictest = { path = "datafusion/sqllogictest", version = "42.0.0" }
datafusion-substrait = { path = "datafusion/substrait", version = "42.0.0" }
datafusion = { path = "datafusion/core", version = "42.1.0", default-features = false }
datafusion-catalog = { path = "datafusion/catalog", version = "42.1.0" }
datafusion-common = { path = "datafusion/common", version = "42.1.0", default-features = false }
datafusion-common-runtime = { path = "datafusion/common-runtime", version = "42.1.0" }
datafusion-execution = { path = "datafusion/execution", version = "42.1.0" }
datafusion-expr = { path = "datafusion/expr", version = "42.1.0" }
datafusion-expr-common = { path = "datafusion/expr-common", version = "42.1.0" }
datafusion-ffi = { path = "datafusion/ffi", version = "42.1.0" }
datafusion-functions = { path = "datafusion/functions", version = "42.1.0" }
datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "42.1.0" }
datafusion-functions-aggregate-common = { path = "datafusion/functions-aggregate-common", version = "42.1.0" }
datafusion-functions-nested = { path = "datafusion/functions-nested", version = "42.1.0" }
datafusion-functions-window = { path = "datafusion/functions-window", version = "42.1.0" }
datafusion-functions-window-common = { path = "datafusion/functions-window-common", version = "42.1.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "42.1.0", default-features = false }
datafusion-physical-expr = { path = "datafusion/physical-expr", version = "42.1.0", default-features = false }
datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "42.1.0", default-features = false }
datafusion-physical-optimizer = { path = "datafusion/physical-optimizer", version = "42.1.0" }
datafusion-physical-plan = { path = "datafusion/physical-plan", version = "42.1.0" }
datafusion-proto = { path = "datafusion/proto", version = "42.1.0" }
datafusion-proto-common = { path = "datafusion/proto-common", version = "42.1.0" }
datafusion-sql = { path = "datafusion/sql", version = "42.1.0" }
datafusion-sqllogictest = { path = "datafusion/sqllogictest", version = "42.1.0" }
datafusion-substrait = { path = "datafusion/substrait", version = "42.1.0" }
doc-comment = "0.3"
env_logger = "0.11"
futures = "0.3"
Expand All @@ -126,7 +128,7 @@ log = "^0.4"
num_cpus = "1.13.0"
object_store = { version = "0.11.0", default-features = false }
parking_lot = "0.12"
parquet = { version = "53.1.0", default-features = false, features = [
parquet = { version = "53.2.0", default-features = false, features = [
"arrow",
"async",
"object_store",
Expand Down Expand Up @@ -169,3 +171,4 @@ large_futures = "warn"

[workspace.lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(tarpaulin)"] }
unused_qualifications = "deny"
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,8 @@ For example, given the releases `1.78.0`, `1.79.0`, `1.80.0`, `1.80.1` and `1.81
If a hotfix is released for the minimum supported Rust version (MSRV), the MSRV will be the minor version with all hotfixes, even if it surpasses the four-month window.

We enforce this policy using a [MSRV CI Check](https://github.com/search?q=repo%3Aapache%2Fdatafusion+rust-version+language%3ATOML+path%3A%2F%5ECargo.toml%2F&type=code)

## DataFusion API evolution policy

Public methods in Apache DataFusion are subject to evolve as part of the API lifecycle.
Deprecated methods will be phased out in accordance with the [policy](https://datafusion.apache.org/library-user-guide/api-health.html), ensuring the API is stable and healthy.
38 changes: 38 additions & 0 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,16 @@ steps.
The tests sort the entire dataset using several different sort
orders.

## IMDB

Run Join Order Benchmark (JOB) on IMDB dataset.

The Internet Movie Database (IMDB) dataset contains real-world movie data. Unlike synthetic datasets like TPCH, which assume uniform data distribution and uncorrelated columns, the IMDB dataset includes skewed data and correlated columns (which are common for real dataset), making it more suitable for testing query optimizers, particularly for cardinality estimation.

This benchmark is derived from [Join Order Benchmark](https://github.com/gregrahn/join-order-benchmark).

See paper [How Good Are Query Optimizers, Really](http://www.vldb.org/pvldb/vol9/p204-leis.pdf) for more details.

## TPCH

Run the tpch benchmark.
Expand All @@ -342,6 +352,34 @@ This benchmarks is derived from the [TPC-H][1] version
[2]: https://github.com/databricks/tpch-dbgen.git,
[2.17.1]: https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-h_v2.17.1.pdf

## External Aggregation

Run the benchmark for aggregations with limited memory.

When the memory limit is exceeded, the aggregation intermediate results will be spilled to disk, and finally read back with sort-merge.

External aggregation benchmarks run several aggregation queries with different memory limits, on TPCH `lineitem` table. Queries can be found in [`external_aggr.rs`](src/bin/external_aggr.rs).

This benchmark is inspired by [DuckDB's external aggregation paper](https://hannes.muehleisen.org/publications/icde2024-out-of-core-kuiper-boncz-muehleisen.pdf), specifically Section VI.

### External Aggregation Example Runs
1. Run all queries with predefined memory limits:
```bash
# Under 'benchmarks/' directory
cargo run --release --bin external_aggr -- benchmark -n 4 --iterations 3 -p '....../data/tpch_sf1' -o '/tmp/aggr.json'
```

2. Run a query with specific memory limit:
```bash
cargo run --release --bin external_aggr -- benchmark -n 4 --iterations 3 -p '....../data/tpch_sf1' -o '/tmp/aggr.json' --query 1 --memory-limit 30M
```

3. Run all queries with `bench.sh` script:
```bash
./bench.sh data external_aggr
./bench.sh run external_aggr
```


# Older Benchmarks

Expand Down
29 changes: 26 additions & 3 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ sort: Benchmark of sorting speed
clickbench_1: ClickBench queries against a single parquet file
clickbench_partitioned: ClickBench queries against a partitioned (100 files) parquet
clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific)
external_aggr: External aggregation benchmark
**********
* Supported Configuration (Environment Variables)
Expand Down Expand Up @@ -170,6 +171,10 @@ main() {
imdb)
data_imdb
;;
external_aggr)
# same data as for tpch
data_tpch "1"
;;
*)
echo "Error: unknown benchmark '$BENCHMARK' for data generation"
usage
Expand Down Expand Up @@ -212,6 +217,7 @@ main() {
run_clickbench_partitioned
run_clickbench_extended
run_imdb
run_external_aggr
;;
tpch)
run_tpch "1"
Expand Down Expand Up @@ -243,6 +249,9 @@ main() {
imdb)
run_imdb
;;
external_aggr)
run_external_aggr
;;
*)
echo "Error: unknown benchmark '$BENCHMARK' for run"
usage
Expand Down Expand Up @@ -357,15 +366,15 @@ run_parquet() {
RESULTS_FILE="${RESULTS_DIR}/parquet.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running parquet filter benchmark..."
$CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --scale-factor 1.0 --iterations 5 -o "${RESULTS_FILE}"
$CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --scale-factor 1.0 --iterations 5 -o "${RESULTS_FILE}"
}

# Runs the sort benchmark
run_sort() {
RESULTS_FILE="${RESULTS_DIR}/sort.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running sort benchmark..."
$CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --scale-factor 1.0 --iterations 5 -o "${RESULTS_FILE}"
$CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --scale-factor 1.0 --iterations 5 -o "${RESULTS_FILE}"
}


Expand Down Expand Up @@ -524,7 +533,21 @@ run_imdb() {
$CARGO_COMMAND --bin imdb -- benchmark datafusion --iterations 5 --path "${IMDB_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}"
}


# Runs the external aggregation benchmark
run_external_aggr() {
# Use TPC-H SF1 dataset
TPCH_DIR="${DATA_DIR}/tpch_sf1"
RESULTS_FILE="${RESULTS_DIR}/external_aggr.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running external aggregation benchmark..."

# Only parquet is supported.
# Since per-operator memory limit is calculated as (total-memory-limit /
# number-of-partitions), and by default `--partitions` is set to number of
# CPU cores, we set a constant number of partitions to prevent this
# benchmark to fail on some machines.
$CARGO_COMMAND --bin external_aggr -- benchmark --partitions 4 --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}"
}


compare_benchmarks() {
Expand Down
Loading

0 comments on commit 4a69b2c

Please sign in to comment.