From 3f91a20962730e6fc0a9c46a143b76cece29001f Mon Sep 17 00:00:00 2001 From: Sergey Zhukov <62326549+cj-zhukov@users.noreply.github.com> Date: Tue, 14 Jan 2025 16:36:09 +0300 Subject: [PATCH] Move JoinSelection into datafusion-physical-optimizer crate (#14073) (#14085) * Move JoinSelection into datafusion-physical-optimizer crate (#14073) * Fix issues causing GitHub checks to fail * Fix issues causing GitHub checks to fail * Fix issues causing GitHub checks to fail * Lock aws-sdk crates to fix MSRV check * fix comment * fix compilation --------- Co-authored-by: Sergey Zhukov Co-authored-by: Andrew Lamb --- datafusion-cli/Cargo.lock | 274 +++++++------ datafusion-cli/Cargo.toml | 12 +- datafusion/core/src/physical_optimizer/mod.rs | 1 - datafusion/core/src/test/mod.rs | 96 +---- datafusion/core/src/test_util/mod.rs | 140 +------ datafusion/physical-optimizer/Cargo.toml | 2 + .../src}/join_selection.rs | 362 +++++++++++++++--- datafusion/physical-optimizer/src/lib.rs | 1 + 8 files changed, 464 insertions(+), 424 deletions(-) rename datafusion/{core/src/physical_optimizer => physical-optimizer/src}/join_selection.rs (88%) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index dfc75f15b03b..9be9e50cc737 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -424,9 +424,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.83" +version = "0.1.85" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" +checksum = "3f934833b4b7233644e5848f235df3f57ed8c80f1528a26c3dfa13d2147fa056" dependencies = [ "proc-macro2", "quote", @@ -498,9 +498,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.4.4" +version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5ac934720fbb46206292d2c75b57e67acfc56fe7dfd34fb9a02334af08409ea" +checksum = "b16d1aa50accc11a4b4d5c50f7fb81cc0cf60328259c587d0e6b0f11385bde46" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -613,9 +613,9 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "1.2.1" +version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62220bc6e97f946ddd51b5f1361f78996e704677afc518a4ff66b7a72ea1378c" +checksum = "427cb637d15d63d6f9aae26358e1c9a9c09d5aa490d64b09354c8217cfef0f28" dependencies = [ "futures-util", "pin-project-lite", @@ -672,9 +672,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.7.4" +version = "1.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f20685047ca9d6f17b994a07f629c813f08b5bce65523e47124879e60103d45" +checksum = "a05dd41a70fc74051758ee75b5c4db2c0ca070ed9229c3df50e9475cda1cb985" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -716,9 +716,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.2.9" +version = "1.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fbd94a32b3a7d55d3806fe27d98d3ad393050439dd05eb53ece36ec5e3d3510" +checksum = "38ddc9bd6c28aeb303477170ddd183760a956a03e083b3902a990238a7e3792d" dependencies = [ "base64-simd", "bytes", @@ -822,9 +822,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.6.0" +version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +checksum = "1be3f42a67d6d345ecd59f675f3f012d6974981560836e938c22b424b85ce1be" [[package]] name = "blake2" @@ -880,9 +880,9 @@ dependencies = [ [[package]] name = "bstr" -version = "1.11.1" +version = "1.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "786a307d683a5bf92e6fd5fd69a7eb613751668d1d8d67d802846dfe367c62c8" +checksum = "531a9155a481e2ee699d4f98f43c0ca4ff8ee1bfd55c31e9e98fb29d2b176fe0" dependencies = [ "memchr", "regex-automata", @@ -950,9 +950,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.4" +version = "1.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9157bbaa6b165880c27a4293a474c91cdcf265cc68cc829bf10be0964a391caf" +checksum = "c8293772165d9345bdaaa39b45b2109591e63fe5e6fbc23c6ff930a048aa310b" dependencies = [ "jobserver", "libc", @@ -1013,9 +1013,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.23" +version = "4.5.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3135e7ec2ef7b10c6ed8950f0f792ed96ee093fa088608f1c76e569722700c84" +checksum = "a8eb5e908ef3a6efbe1ed62520fb7287959888c88485abe072543190ecc66783" dependencies = [ "clap_builder", "clap_derive", @@ -1023,9 +1023,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.23" +version = "4.5.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30582fc632330df2bd26877bde0c1f4470d57c582bbc070376afcd04d8cb4838" +checksum = "96b01801b5fc6a0a232407abc821660c9c6d25a1cafc0d4f85f29fb8d9afc121" dependencies = [ "anstream", "anstyle", @@ -1035,9 +1035,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.18" +version = "4.5.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ac6a0c7b1a9e9a5186361f67dfa1b88213572f427fb9ab038efb2bd8c582dab" +checksum = "54b755194d6389280185988721fffba69495eed5ee9feeee9a599b53db80318c" dependencies = [ "heck", "proc-macro2", @@ -1597,6 +1597,7 @@ dependencies = [ "datafusion-expr-common", "datafusion-physical-expr", "datafusion-physical-plan", + "futures", "itertools 0.14.0", "log", "recursive", @@ -1727,9 +1728,9 @@ checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" [[package]] name = "env_filter" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f2c92ceda6ceec50f43169f9ee8424fe2db276791afde7b2cd8bc084cb376ab" +checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0" dependencies = [ "log", "regex", @@ -1737,9 +1738,9 @@ dependencies = [ [[package]] name = "env_logger" -version = "0.11.5" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e13fa619b91fb2381732789fc5de83b45675e882f66623b7d8cb4f643017018d" +checksum = "dcaee3d8e3cfc3fd92428d477bc97fc29ec8716d180c0d74c643bb26166660e0" dependencies = [ "anstream", "anstyle", @@ -1795,9 +1796,9 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" [[package]] name = "flatbuffers" -version = "24.3.25" +version = "24.12.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8add37afff2d4ffa83bc748a70b4b1370984f6980768554182424ef71447c35f" +checksum = "4f1baf0dbf96932ec9a3038d57900329c015b0bfb7b63d904f3bc27e2b02a096" dependencies = [ "bitflags 1.3.2", "rustc_version", @@ -1815,9 +1816,9 @@ dependencies = [ [[package]] name = "float-cmp" -version = "0.9.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" +checksum = "b09cf3155332e944990140d967ff5eceb70df778b34f77d8075db46e4704e6d8" dependencies = [ "num-traits", ] @@ -1963,9 +1964,9 @@ checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" [[package]] name = "glob" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" [[package]] name = "h2" @@ -2198,15 +2199,15 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.27.3" +version = "0.27.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" +checksum = "2d191583f3da1305256f22463b9bb0471acad48a4e534a5218b9963e9c1f59b2" dependencies = [ "futures-util", "http 1.2.0", "hyper 1.5.2", "hyper-util", - "rustls 0.23.20", + "rustls 0.23.21", "rustls-native-certs 0.8.1", "rustls-pki-types", "tokio", @@ -2538,9 +2539,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.168" +version = "0.2.169" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aaeb2981e0606ca11d79718f8bb01164f1d6ed75080182d3abf017e6d244b6d" +checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" [[package]] name = "libflate" @@ -2588,15 +2589,15 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.7.0", "libc", ] [[package]] name = "linux-raw-sys" -version = "0.4.14" +version = "0.4.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" [[package]] name = "litemap" @@ -2673,9 +2674,9 @@ checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" [[package]] name = "miniz_oxide" -version = "0.8.0" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +checksum = "4ffbe83022cedc1d264172192511ae958937694cd57ce297164951b8b3568394" dependencies = [ "adler2", ] @@ -2706,7 +2707,7 @@ version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.7.0", "cfg-if", "cfg_aliases 0.1.1", "libc", @@ -2801,18 +2802,18 @@ dependencies = [ [[package]] name = "object" -version = "0.36.5" +version = "0.36.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e" +checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" dependencies = [ "memchr", ] [[package]] name = "object_store" -version = "0.11.1" +version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6eb4c22c6154a1e759d7099f9ffad7cc5ef8245f9efbab4a41b92623079c82f3" +checksum = "3cfccb68961a56facde1163f9319e0d15743352344e7808a11795fb99698dcaf" dependencies = [ "async-trait", "base64 0.22.1", @@ -2964,18 +2965,18 @@ dependencies = [ [[package]] name = "phf" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" +checksum = "1fd6780a80ae0c52cc120a26a1a42c1ae51b247a253e4e06113d23d2c2edd078" dependencies = [ "phf_shared", ] [[package]] name = "phf_codegen" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8d39688d359e6b34654d328e262234662d16cc0f60ec8dcbe5e718709342a5a" +checksum = "aef8048c789fa5e851558d709946d6d79a8ff88c0440c587967f8e94bfb1216a" dependencies = [ "phf_generator", "phf_shared", @@ -2983,9 +2984,9 @@ dependencies = [ [[package]] name = "phf_generator" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48e4cc64c2ad9ebe670cb8fd69dd50ae301650392e81c05f9bfcb2d5bdbc24b0" +checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d" dependencies = [ "phf_shared", "rand", @@ -2993,18 +2994,18 @@ dependencies = [ [[package]] name = "phf_shared" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" +checksum = "67eabc2ef2a60eb7faa00097bd1ffdb5bd28e62bf39990626a582201b7a754e5" dependencies = [ "siphasher", ] [[package]] name = "pin-project-lite" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "915a1e146535de9163f3987b8944ed8cf49a18bb0056bcebcdcece385cece4ff" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" [[package]] name = "pin-utils" @@ -3035,9 +3036,9 @@ dependencies = [ [[package]] name = "predicates" -version = "3.1.2" +version = "3.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e9086cc7640c29a356d1a29fd134380bee9d8f79a17410aa76e7ad295f42c97" +checksum = "a5d19ee57562043d37e82899fade9a22ebab7be9cef5026b07fda9cdd4293573" dependencies = [ "anstyle", "difflib", @@ -3049,15 +3050,15 @@ dependencies = [ [[package]] name = "predicates-core" -version = "1.0.8" +version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae8177bee8e75d6846599c6b9ff679ed51e882816914eec639944d7c9aa11931" +checksum = "727e462b119fe9c93fd0eb1429a5f7647394014cf3c04ab2c0350eeb09095ffa" [[package]] name = "predicates-tree" -version = "1.0.11" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41b740d195ed3166cd147c8047ec98db0e22ec019eb8eeb76d343b795304fb13" +checksum = "72dd2d6d381dfb73a193c7fca536518d7caee39fc8503f74e7dc0be0531b425c" dependencies = [ "predicates-core", "termtree", @@ -3074,9 +3075,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.92" +version = "1.0.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0" +checksum = "60946a68e5f9d28b0dc1c21bb8a97ee7d018a8b322fa57838ba31cc878e22d99" dependencies = [ "unicode-ident", ] @@ -3098,9 +3099,9 @@ checksum = "5a651516ddc9168ebd67b24afd085a718be02f8858fe406591b013d101ce2f40" [[package]] name = "quick-xml" -version = "0.36.2" +version = "0.37.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7649a7b4df05aed9ea7ec6f628c67c9953a43869b8bc50929569b2999d443fe" +checksum = "165859e9e55f79d67b96c5d96f4e88b6f2695a1972849c15a6a3f5c59fc2c003" dependencies = [ "memchr", "serde", @@ -3117,9 +3118,9 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls 0.23.20", + "rustls 0.23.21", "socket2", - "thiserror 2.0.7", + "thiserror 2.0.11", "tokio", "tracing", ] @@ -3135,10 +3136,10 @@ dependencies = [ "rand", "ring", "rustc-hash", - "rustls 0.23.20", + "rustls 0.23.21", "rustls-pki-types", "slab", - "thiserror 2.0.7", + "thiserror 2.0.11", "tinyvec", "tracing", "web-time", @@ -3146,9 +3147,9 @@ dependencies = [ [[package]] name = "quinn-udp" -version = "0.5.8" +version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52cd4b1eff68bf27940dd39811292c49e007f4d0b4c357358dc9b0197be6b527" +checksum = "1c40286217b4ba3a71d644d752e6a0b71f13f1b6a2c5311acfcbe0c2418ed904" dependencies = [ "cfg_aliases 0.2.1", "libc", @@ -3160,9 +3161,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.37" +version = "1.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af" +checksum = "0e4dccaaaf89514f546c693ddc140f729f958c247918a13380cccc6078391acc" dependencies = [ "proc-macro2", ] @@ -3233,7 +3234,7 @@ version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.7.0", ] [[package]] @@ -3290,9 +3291,9 @@ checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" [[package]] name = "reqwest" -version = "0.12.9" +version = "0.12.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a77c62af46e79de0a562e1a9849205ffcb7fc1238876e9bd743357570e04046f" +checksum = "43e734407157c3c2034e0258f5e4473ddb361b1e85f95a66690d67264d7cd1da" dependencies = [ "base64 0.22.1", "bytes", @@ -3303,7 +3304,7 @@ dependencies = [ "http-body 1.0.1", "http-body-util", "hyper 1.5.2", - "hyper-rustls 0.27.3", + "hyper-rustls 0.27.5", "hyper-util", "ipnet", "js-sys", @@ -3313,7 +3314,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls 0.23.20", + "rustls 0.23.21", "rustls-native-certs 0.8.1", "rustls-pemfile 2.2.0", "rustls-pki-types", @@ -3324,6 +3325,7 @@ dependencies = [ "tokio", "tokio-rustls 0.26.1", "tokio-util", + "tower", "tower-service", "url", "wasm-bindgen", @@ -3407,11 +3409,11 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.42" +version = "0.38.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f93dc38ecbab2eb790ff964bb77fa94faf256fd3e73285fd7ba0903b76bedb85" +checksum = "a78891ee6bf2340288408954ac787aa063d8e8817e9f53abb37c695c6d834ef6" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.7.0", "errno", "libc", "linux-raw-sys", @@ -3432,9 +3434,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.20" +version = "0.23.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5065c3f250cbd332cd894be57c40fa52387247659b14a2d6041d121547903b1b" +checksum = "8f287924602bf649d949c63dc8ac8b235fa5387d394020705b80c4eb597ce5b8" dependencies = [ "once_cell", "ring", @@ -3465,7 +3467,7 @@ dependencies = [ "openssl-probe", "rustls-pki-types", "schannel", - "security-framework 3.0.1", + "security-framework 3.2.0", ] [[package]] @@ -3518,9 +3520,9 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.18" +version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248" +checksum = "f7c45b9784283f1b2e7fb61b42047c2fd678ef0960d4f6f1eba131594cc369d4" [[package]] name = "rustyline" @@ -3528,7 +3530,7 @@ version = "14.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7803e8936da37efd9b6d4478277f4b2b9bb5cdb37a113e8d63222e58da647e63" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.7.0", "cfg-if", "clipboard-win", "fd-lock", @@ -3590,7 +3592,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.7.0", "core-foundation 0.9.4", "core-foundation-sys", "libc", @@ -3599,11 +3601,11 @@ dependencies = [ [[package]] name = "security-framework" -version = "3.0.1" +version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1415a607e92bec364ea2cf9264646dcce0f91e6d65281bd6f2819cca3bf39c8" +checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.7.0", "core-foundation 0.10.0", "core-foundation-sys", "libc", @@ -3612,9 +3614,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.12.1" +version = "2.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa39c7303dc58b5543c94d22c1766b0d31f2ee58306363ea622b10bbc075eaa2" +checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32" dependencies = [ "core-foundation-sys", "libc", @@ -3634,9 +3636,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.216" +version = "1.0.217" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b9781016e935a97e8beecf0c933758c97a5520d32930e460142b4cd80c6338e" +checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" dependencies = [ "serde_derive", ] @@ -3652,9 +3654,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.216" +version = "1.0.217" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46f859dbbf73865c6627ed570e78961cd3ac92407a2d117204c49232485da55e" +checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" dependencies = [ "proc-macro2", "quote", @@ -3663,9 +3665,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.133" +version = "1.0.135" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" +checksum = "2b0d7ba2887406110130a978386c4e1befb98c674b4fba677954e4db976630d9" dependencies = [ "itoa", "memchr", @@ -3713,9 +3715,9 @@ dependencies = [ [[package]] name = "siphasher" -version = "0.3.11" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" [[package]] name = "slab" @@ -3854,9 +3856,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.90" +version = "2.0.96" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "919d3b74a5dd0ccd15aeb8f93e7006bd9e14c295087c9896a110f490752bcf31" +checksum = "d5d0adab1ae378d7f53bdebc67a39f1f151407ef230f0ce2883572f5d8985c80" dependencies = [ "proc-macro2", "quote", @@ -3885,12 +3887,13 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.14.0" +version = "3.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" +checksum = "9a8a559c81686f576e8cd0290cd2a24a2a9ad80c98b3478856500fcbd7acd704" dependencies = [ "cfg-if", "fastrand", + "getrandom", "once_cell", "rustix", "windows-sys 0.59.0", @@ -3898,9 +3901,9 @@ dependencies = [ [[package]] name = "termtree" -version = "0.4.1" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" +checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683" [[package]] name = "thiserror" @@ -3913,11 +3916,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.7" +version = "2.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93605438cbd668185516ab499d589afb7ee1859ea3d5fc8f6b0755e1c7443767" +checksum = "d452f284b73e6d76dd36758a0c8684b1d5be31f92b89d07fd5822175732206fc" dependencies = [ - "thiserror-impl 2.0.7", + "thiserror-impl 2.0.11", ] [[package]] @@ -3933,9 +3936,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.7" +version = "2.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1d8749b4531af2117677a5fcd12b1348a3fe2b81e36e61ffeac5c4aa3273e36" +checksum = "26afc1baea8a989337eeb52b6e72a039780ce45c3edfcc9c5b9d112feeb173c2" dependencies = [ "proc-macro2", "quote", @@ -4004,9 +4007,9 @@ dependencies = [ [[package]] name = "tinyvec" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "445e881f4f6d382d5f27c034e25eb92edd7c784ceab92a0937db7f2e9471b938" +checksum = "022db8904dfa342efe721985167e9fcd16c29b226db4397ed752a761cfce81e8" dependencies = [ "tinyvec_macros", ] @@ -4019,9 +4022,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.42.0" +version = "1.43.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551" +checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" dependencies = [ "backtrace", "bytes", @@ -4037,9 +4040,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" dependencies = [ "proc-macro2", "quote", @@ -4062,7 +4065,7 @@ version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f6d0975eaace0cf0fcadee4e4aaa5da15b5c079146f2cffb67c113be122bf37" dependencies = [ - "rustls 0.23.20", + "rustls 0.23.21", "tokio", ] @@ -4096,6 +4099,27 @@ dependencies = [ "winnow", ] +[[package]] +name = "tower" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper", + "tokio", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + [[package]] name = "tower-service" version = "0.3.3" @@ -4242,9 +4266,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" +checksum = "b913a3b5fe84142e269d63cc62b64319ccaf89b748fc31fe025177f767a756c4" dependencies = [ "getrandom", "serde", @@ -4594,9 +4618,9 @@ checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "winnow" -version = "0.6.20" +version = "0.6.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36c1fec1a2bb5866f07c25f68c26e565c4c200aebb96d7e55710c19d3e8ac49b" +checksum = "c8d71a593cc5c42ad7876e2c1fda56f314f3754c084128833e64f1345ff8a03a" dependencies = [ "memchr", ] diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 8754612bfb11..8b5bb901b713 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -31,10 +31,14 @@ readme = "README.md" [dependencies] arrow = { version = "53.0.0" } async-trait = "0.1.73" -aws-config = "1.5.5" -aws-sdk-sso = "1.43.0" -aws-sdk-ssooidc = "1.44.0" -aws-sdk-sts = "1.43.0" +## 1.5.13 requires a hiher MSRV 1.81 so lock until DataFusion MSRV catches up +aws-config = "=1.5.10" +## 1.53.0 requires a higher MSRV 1.81 so lock until DataFusion MSRV catches up +aws-sdk-sso = "=1.50.0" +## 1.54.0 requires a higher MSRV 1.81 so lock until DataFusion MSRV catches up +aws-sdk-ssooidc = "=1.51.0" +## 1.54.1 requires a higher MSRV 1.81 so lock until DataFusion MSRV catches up +aws-sdk-sts = "=1.51.0" # end pin aws-sdk crates aws-credential-types = "1.2.0" clap = { version = "4.5.16", features = ["derive", "cargo"] } diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index 000c27effdb6..e2c497936afa 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -24,7 +24,6 @@ pub mod enforce_distribution; pub mod enforce_sorting; -pub mod join_selection; pub mod optimizer; pub mod projection_pushdown; pub mod replace_with_order_preserving_variants; diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index b4167900d4c2..e91785c7421a 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -19,7 +19,6 @@ #![allow(missing_docs)] -use std::any::Any; use std::fs::File; use std::io::prelude::*; use std::io::{BufReader, BufWriter}; @@ -42,12 +41,10 @@ use crate::test_util::{aggr_test_schema, arrow_test_data}; use arrow::array::{self, Array, ArrayRef, Decimal128Builder, Int32Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; -use datafusion_common::{DataFusionError, Statistics}; +use datafusion_common::DataFusionError; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; -use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr}; -use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; -use datafusion_physical_plan::{DisplayAs, DisplayFormatType, PlanProperties}; #[cfg(feature = "compression")] use bzip2::write::BzEncoder; @@ -384,94 +381,5 @@ pub fn csv_exec_ordered( ) } -/// A mock execution plan that simply returns the provided statistics -#[derive(Debug, Clone)] -pub struct StatisticsExec { - stats: Statistics, - schema: Arc, - cache: PlanProperties, -} - -impl StatisticsExec { - pub fn new(stats: Statistics, schema: Schema) -> Self { - assert_eq!( - stats.column_statistics.len(), schema.fields().len(), - "if defined, the column statistics vector length should be the number of fields" - ); - let cache = Self::compute_properties(Arc::new(schema.clone())); - Self { - stats, - schema: Arc::new(schema), - cache, - } - } - - /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. - fn compute_properties(schema: SchemaRef) -> PlanProperties { - PlanProperties::new( - EquivalenceProperties::new(schema), - Partitioning::UnknownPartitioning(2), - EmissionType::Incremental, - Boundedness::Bounded, - ) - } -} - -impl DisplayAs for StatisticsExec { - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "StatisticsExec: col_count={}, row_count={:?}", - self.schema.fields().len(), - self.stats.num_rows, - ) - } - } - } -} - -impl ExecutionPlan for StatisticsExec { - fn name(&self) -> &'static str { - Self::static_name() - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn properties(&self) -> &PlanProperties { - &self.cache - } - - fn children(&self) -> Vec<&Arc> { - vec![] - } - - fn with_new_children( - self: Arc, - _: Vec>, - ) -> Result> { - Ok(self) - } - - fn execute( - &self, - _partition: usize, - _context: Arc, - ) -> Result { - unimplemented!("This plan only serves for testing statistics") - } - - fn statistics(&self) -> Result { - Ok(self.stats.clone()) - } -} - pub mod object_store; pub mod variable; diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index b1a6f014380e..d608db25fe98 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -36,12 +36,8 @@ use crate::dataframe::DataFrame; use crate::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable}; use crate::datasource::{empty::EmptyTable, provider_as_source}; use crate::error::Result; -use crate::execution::context::TaskContext; use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE}; -use crate::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, - RecordBatchStream, SendableRecordBatchStream, -}; +use crate::physical_plan::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream}; use crate::prelude::{CsvReadOptions, SessionContext}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; @@ -52,8 +48,7 @@ use datafusion_expr::utils::COUNT_STAR_EXPANSION; use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType}; use datafusion_functions_aggregate::count::count_udaf; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; -use datafusion_physical_expr::{expressions, EquivalenceProperties, PhysicalExpr}; -use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion_physical_expr::{expressions, PhysicalExpr}; use async_trait::async_trait; use futures::Stream; @@ -230,137 +225,6 @@ impl TableProvider for TestTableProvider { } } -/// A mock execution plan that simply returns the provided data source characteristic -#[derive(Debug, Clone)] -pub struct UnboundedExec { - batch_produce: Option, - batch: RecordBatch, - cache: PlanProperties, -} -impl UnboundedExec { - /// Create new exec that clones the given record batch to its output. - /// - /// Set `batch_produce` to `Some(n)` to emit exactly `n` batches per partition. - pub fn new( - batch_produce: Option, - batch: RecordBatch, - partitions: usize, - ) -> Self { - let cache = Self::compute_properties(batch.schema(), batch_produce, partitions); - Self { - batch_produce, - batch, - cache, - } - } - - /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. - fn compute_properties( - schema: SchemaRef, - batch_produce: Option, - n_partitions: usize, - ) -> PlanProperties { - let boundedness = if batch_produce.is_none() { - Boundedness::Unbounded { - requires_infinite_memory: false, - } - } else { - Boundedness::Bounded - }; - PlanProperties::new( - EquivalenceProperties::new(schema), - Partitioning::UnknownPartitioning(n_partitions), - EmissionType::Incremental, - boundedness, - ) - } -} - -impl DisplayAs for UnboundedExec { - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "UnboundedExec: unbounded={}", - self.batch_produce.is_none(), - ) - } - } - } -} - -impl ExecutionPlan for UnboundedExec { - fn name(&self) -> &'static str { - Self::static_name() - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn properties(&self) -> &PlanProperties { - &self.cache - } - - fn children(&self) -> Vec<&Arc> { - vec![] - } - - fn with_new_children( - self: Arc, - _: Vec>, - ) -> Result> { - Ok(self) - } - - fn execute( - &self, - _partition: usize, - _context: Arc, - ) -> Result { - Ok(Box::pin(UnboundedStream { - batch_produce: self.batch_produce, - count: 0, - batch: self.batch.clone(), - })) - } -} - -#[derive(Debug)] -struct UnboundedStream { - batch_produce: Option, - count: usize, - batch: RecordBatch, -} - -impl Stream for UnboundedStream { - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll> { - if let Some(val) = self.batch_produce { - if val <= self.count { - return Poll::Ready(None); - } - } - self.count += 1; - Poll::Ready(Some(Ok(self.batch.clone()))) - } -} - -impl RecordBatchStream for UnboundedStream { - fn schema(&self) -> SchemaRef { - self.batch.schema() - } -} - /// This function creates an unbounded sorted file for testing purposes. pub fn register_unbounded_file_with_ordering( ctx: &SessionContext, diff --git a/datafusion/physical-optimizer/Cargo.toml b/datafusion/physical-optimizer/Cargo.toml index 3454209445dc..4e8b36546dae 100644 --- a/datafusion/physical-optimizer/Cargo.toml +++ b/datafusion/physical-optimizer/Cargo.toml @@ -41,6 +41,7 @@ datafusion-execution = { workspace = true } datafusion-expr-common = { workspace = true, default-features = true } datafusion-physical-expr = { workspace = true } datafusion-physical-plan = { workspace = true } +futures = { workspace = true } itertools = { workspace = true } log = { workspace = true } recursive = { workspace = true, optional = true } @@ -49,4 +50,5 @@ recursive = { workspace = true, optional = true } datafusion-expr = { workspace = true } datafusion-functions-aggregate = { workspace = true } datafusion-functions-nested = { workspace = true } +rstest = { workspace = true } tokio = { workspace = true } diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/physical-optimizer/src/join_selection.rs similarity index 88% rename from datafusion/core/src/physical_optimizer/join_selection.rs rename to datafusion/physical-optimizer/src/join_selection.rs index 736c3fbd0184..1cd05933bedb 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/physical-optimizer/src/join_selection.rs @@ -25,22 +25,22 @@ use std::sync::Arc; -use crate::config::ConfigOptions; -use crate::error::Result; -use crate::physical_plan::joins::utils::{ColumnIndex, JoinFilter}; -use crate::physical_plan::joins::{ - CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, - StreamJoinPartitionMode, SymmetricHashJoinExec, -}; -use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; +use crate::PhysicalOptimizerRule; +use datafusion_common::config::ConfigOptions; +use datafusion_common::error::Result; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{internal_err, JoinSide, JoinType}; -use datafusion_expr::sort_properties::SortProperties; +use datafusion_expr_common::sort_properties::SortProperties; use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr_common::sort_expr::LexOrdering; -use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_expr::LexOrdering; use datafusion_physical_plan::execution_plan::EmissionType; +use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter}; +use datafusion_physical_plan::joins::{ + CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, + StreamJoinPartitionMode, SymmetricHashJoinExec, +}; +use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; /// The [`JoinSelection`] rule tries to modify a given plan so that it can /// accommodate infinite sources and optimize joins in the plan according to @@ -592,20 +592,17 @@ fn apply_subrules( #[cfg(test)] mod tests_statistical { use super::*; - use crate::{ - physical_plan::{displayable, ColumnStatistics, Statistics}, - test::StatisticsExec, - }; + use util_tests::StatisticsExec; - use arrow::datatypes::{DataType, Field}; - use arrow_schema::Schema; - use datafusion_common::{stats::Precision, JoinType, ScalarValue}; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::{ + stats::Precision, ColumnStatistics, JoinType, ScalarValue, Statistics, + }; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::col; use datafusion_physical_expr::expressions::BinaryExpr; - use datafusion_physical_expr::PhysicalExprRef; - - use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; + use datafusion_physical_plan::displayable; use datafusion_physical_plan::projection::ProjectionExec; use rstest::rstest; @@ -878,32 +875,27 @@ mod tests_statistical { for join_type in join_types { let (big, small) = create_big_and_small(); - let join = Arc::new( - HashJoinExec::try_new( - Arc::clone(&big), - Arc::clone(&small), - vec![( - Arc::new( - Column::new_with_schema("big_col", &big.schema()).unwrap(), - ), - Arc::new( - Column::new_with_schema("small_col", &small.schema()) - .unwrap(), - ), - )], - None, - &join_type, - None, - PartitionMode::Partitioned, - false, - ) - .unwrap(), - ); + let join = HashJoinExec::try_new( + Arc::clone(&big), + Arc::clone(&small), + vec![( + Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()), + Arc::new( + Column::new_with_schema("small_col", &small.schema()).unwrap(), + ), + )], + None, + &join_type, + None, + PartitionMode::Partitioned, + false, + ) + .unwrap(); let original_schema = join.schema(); let optimized_join = JoinSelection::new() - .optimize(join.clone(), &ConfigOptions::new()) + .optimize(Arc::new(join), &ConfigOptions::new()) .unwrap(); let swapped_join = optimized_join @@ -1136,7 +1128,10 @@ mod tests_statistical { ); let optimized_join = JoinSelection::new() - .optimize(join.clone(), &ConfigOptions::new()) + .optimize( + Arc::::clone(&join), + &ConfigOptions::new(), + ) .unwrap(); let swapped_join = optimized_join @@ -1266,8 +1261,8 @@ mod tests_statistical { col("big_col", &big.schema()).unwrap(), )]; check_join_partition_mode( - small.clone(), - big.clone(), + Arc::::clone(&small), + Arc::::clone(&big), join_on, false, PartitionMode::CollectLeft, @@ -1279,7 +1274,7 @@ mod tests_statistical { )]; check_join_partition_mode( big, - small.clone(), + Arc::::clone(&small), join_on, true, PartitionMode::CollectLeft, @@ -1290,8 +1285,8 @@ mod tests_statistical { col("empty_col", &empty.schema()).unwrap(), )]; check_join_partition_mode( - small.clone(), - empty.clone(), + Arc::::clone(&small), + Arc::::clone(&empty), join_on, false, PartitionMode::CollectLeft, @@ -1333,8 +1328,8 @@ mod tests_statistical { as _, )]; check_join_partition_mode( - big.clone(), - bigger.clone(), + Arc::::clone(&big), + Arc::::clone(&bigger), join_on, false, PartitionMode::Partitioned, @@ -1347,7 +1342,7 @@ mod tests_statistical { )]; check_join_partition_mode( bigger, - big.clone(), + Arc::::clone(&big), join_on, true, PartitionMode::Partitioned, @@ -1358,8 +1353,8 @@ mod tests_statistical { Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()) as _, )]; check_join_partition_mode( - empty.clone(), - big.clone(), + Arc::::clone(&empty), + Arc::::clone(&big), join_on, false, PartitionMode::Partitioned, @@ -1423,13 +1418,258 @@ mod tests_statistical { #[cfg(test)] mod util_tests { - use std::sync::Arc; + use std::{ + any::Any, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + }; - use arrow_schema::{DataType, Field, Schema}; + use arrow::{ + array::RecordBatch, + datatypes::{DataType, Field, Schema, SchemaRef}, + }; + use datafusion_common::{Result, Statistics}; + use datafusion_execution::{ + RecordBatchStream, SendableRecordBatchStream, TaskContext, + }; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::{BinaryExpr, Column, NegativeExpr}; use datafusion_physical_expr::intervals::utils::check_support; - use datafusion_physical_expr::PhysicalExpr; + use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; + use datafusion_physical_plan::{ + execution_plan::{Boundedness, EmissionType}, + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, + }; + use futures::Stream; + + #[derive(Debug)] + struct UnboundedStream { + batch_produce: Option, + count: usize, + batch: RecordBatch, + } + + impl Stream for UnboundedStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + if let Some(val) = self.batch_produce { + if val <= self.count { + return Poll::Ready(None); + } + } + self.count += 1; + Poll::Ready(Some(Ok(self.batch.clone()))) + } + } + + impl RecordBatchStream for UnboundedStream { + fn schema(&self) -> SchemaRef { + self.batch.schema() + } + } + + /// A mock execution plan that simply returns the provided data source characteristic + #[derive(Debug, Clone)] + pub struct UnboundedExec { + batch_produce: Option, + batch: RecordBatch, + cache: PlanProperties, + } + + impl UnboundedExec { + /// Create new exec that clones the given record batch to its output. + /// + /// Set `batch_produce` to `Some(n)` to emit exactly `n` batches per partition. + pub fn new( + batch_produce: Option, + batch: RecordBatch, + partitions: usize, + ) -> Self { + let cache = + Self::compute_properties(batch.schema(), batch_produce, partitions); + Self { + batch_produce, + batch, + cache, + } + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties( + schema: SchemaRef, + batch_produce: Option, + n_partitions: usize, + ) -> PlanProperties { + let boundedness = if batch_produce.is_none() { + Boundedness::Unbounded { + requires_infinite_memory: false, + } + } else { + Boundedness::Bounded + }; + PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(n_partitions), + EmissionType::Incremental, + boundedness, + ) + } + } + + impl DisplayAs for UnboundedExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "UnboundedExec: unbounded={}", + self.batch_produce.is_none(), + ) + } + } + } + } + + impl ExecutionPlan for UnboundedExec { + fn name(&self) -> &'static str { + Self::static_name() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + Ok(Box::pin(UnboundedStream { + batch_produce: self.batch_produce, + count: 0, + batch: self.batch.clone(), + })) + } + } + + #[derive(Eq, PartialEq, Debug)] + pub enum SourceType { + Unbounded, + Bounded, + } + + /// A mock execution plan that simply returns the provided statistics + #[derive(Debug, Clone)] + pub struct StatisticsExec { + stats: Statistics, + schema: Arc, + cache: PlanProperties, + } + + impl StatisticsExec { + pub fn new(stats: Statistics, schema: Schema) -> Self { + assert_eq!( + stats.column_statistics.len(), schema.fields().len(), + "if defined, the column statistics vector length should be the number of fields" + ); + let cache = Self::compute_properties(Arc::new(schema.clone())); + Self { + stats, + schema: Arc::new(schema), + cache, + } + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties(schema: SchemaRef) -> PlanProperties { + PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(2), + EmissionType::Incremental, + Boundedness::Bounded, + ) + } + } + + impl DisplayAs for StatisticsExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "StatisticsExec: col_count={}, row_count={:?}", + self.schema.fields().len(), + self.stats.num_rows, + ) + } + } + } + } + + impl ExecutionPlan for StatisticsExec { + fn name(&self) -> &'static str { + Self::static_name() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unimplemented!("This plan only serves for testing statistics") + } + + fn statistics(&self) -> Result { + Ok(self.stats.clone()) + } + } #[test] fn check_expr_supported() { @@ -1463,12 +1703,10 @@ mod util_tests { #[cfg(test)] mod hash_join_tests { use super::*; - use crate::physical_optimizer::test_utils::SourceType; - use crate::test_util::UnboundedExec; + use util_tests::{SourceType, UnboundedExec}; - use arrow::datatypes::{DataType, Field}; + use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; - use arrow_schema::Schema; use datafusion_physical_expr::expressions::col; use datafusion_physical_plan::projection::ProjectionExec; @@ -1849,7 +2087,7 @@ mod hash_join_tests { .expect( "A proj is required to swap columns back to their original order", ); - proj.input().clone() + Arc::::clone(proj.input()) } else { optimized_join_plan }; diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index c4f5fa74e122..ee1249febba8 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -21,6 +21,7 @@ pub mod aggregate_statistics; pub mod coalesce_batches; pub mod combine_partial_final_agg; +pub mod join_selection; pub mod limit_pushdown; pub mod limited_distinct_aggregation; mod optimizer;