Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/join_on_docs
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Oct 14, 2023
2 parents 27917c0 + a86ee16 commit 49631f5
Show file tree
Hide file tree
Showing 42 changed files with 798 additions and 422 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -505,12 +505,11 @@ jobs:

- name: Check Cargo.toml formatting
run: |
# if you encounter error, try rerun the command below, finally run 'git diff' to
# check which Cargo.toml introduces formatting violation
# if you encounter an error, try running 'cargo tomlfmt -p path/to/Cargo.toml' to fix the formatting automatically.
# If the error still persists, you need to manually edit the Cargo.toml file, which introduces formatting violation.
#
# ignore ./Cargo.toml because putting workspaces in multi-line lists make it easy to read
ci/scripts/rust_toml_fmt.sh
git diff --exit-code
config-docs-check:
name: check configs.md is up-to-date
Expand Down
8 changes: 7 additions & 1 deletion ci/scripts/rust_toml_fmt.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,11 @@
# specific language governing permissions and limitations
# under the License.

# Run cargo-tomlfmt with flag `-d` in dry run to check formatting
# without overwritng the file. If any error occur, you may want to
# rerun 'cargo tomlfmt -p path/to/Cargo.toml' without '-d' to fix
# the formatting automatically.
set -ex
find . -mindepth 2 -name 'Cargo.toml' -exec cargo tomlfmt -p {} \;
for toml in $(find . -mindepth 2 -name 'Cargo.toml'); do
cargo tomlfmt -d -p $toml
done
49 changes: 33 additions & 16 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ tokio-util = { version = "0.7.4", features = ["io"] }
url = "2.2"
uuid = { version = "1.0", features = ["v4"] }
xz2 = { version = "0.1", optional = true }
zstd = { version = "0.12", optional = true, default-features = false }
zstd = { version = "0.13", optional = true, default-features = false }


[dev-dependencies]
Expand Down
41 changes: 22 additions & 19 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1738,8 +1738,11 @@ mod tests {
_t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
let expr: Vec<String> = self.expr.iter().map(|e| e.to_string()).collect();
write!(f, "SortRequiredExec: [{}]", expr.join(","))
write!(
f,
"SortRequiredExec: [{}]",
PhysicalSortExpr::format_list(&self.expr)
)
}
}

Expand Down Expand Up @@ -3056,16 +3059,16 @@ mod tests {
vec![
top_join_plan.as_str(),
join_plan.as_str(),
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[a@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, sort_exprs=b1@1 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@1 ASC]",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, sort_exprs=c@2 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
Expand All @@ -3082,21 +3085,21 @@ mod tests {
_ => vec![
top_join_plan.as_str(),
// Below 4 operators are differences introduced, when join mode is changed
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[a@0 ASC]",
"CoalescePartitionsExec",
join_plan.as_str(),
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[a@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, sort_exprs=b1@1 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@1 ASC]",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, sort_exprs=c@2 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
Expand Down Expand Up @@ -3170,38 +3173,38 @@ mod tests {
JoinType::Inner | JoinType::Right => vec![
top_join_plan.as_str(),
join_plan.as_str(),
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[a@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, sort_exprs=b1@1 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@1 ASC]",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, sort_exprs=c@2 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
// Should include 8 RepartitionExecs (4 of them preserves order) and 4 SortExecs
JoinType::Left | JoinType::Full => vec![
top_join_plan.as_str(),
"SortPreservingRepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10, sort_exprs=b1@6 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@6 ASC]",
"CoalescePartitionsExec",
join_plan.as_str(),
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[a@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, sort_exprs=b1@1 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@1 ASC]",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, sort_exprs=c@2 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
Expand Down Expand Up @@ -3292,7 +3295,7 @@ mod tests {

let expected_first_sort_enforcement = &[
"SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]",
"SortPreservingRepartitionExec: partitioning=Hash([b3@1, a3@0], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([b3@1, a3@0], 10), input_partitions=10, sort_exprs=b3@1 ASC,a3@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b3@1 ASC,a3@0 ASC]",
"CoalescePartitionsExec",
Expand All @@ -3303,7 +3306,7 @@ mod tests {
"AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([b2@1, a2@0], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([b2@1, a2@0], 10), input_partitions=10, sort_exprs=b2@1 ASC,a2@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b2@1 ASC,a2@0 ASC]",
"CoalescePartitionsExec",
Expand Down Expand Up @@ -4382,7 +4385,7 @@ mod tests {
let expected = &[
"SortPreservingMergeExec: [c@2 ASC]",
"FilterExec: c@2 = 0",
"SortPreservingRepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
"SortPreservingRepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, sort_exprs=c@2 ASC",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];

Expand Down
Loading

0 comments on commit 49631f5

Please sign in to comment.