diff --git a/Cargo.lock b/Cargo.lock index 89449a00..46ad9603 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -57,9 +57,9 @@ dependencies = [ [[package]] name = "allocator-api2" -version = "0.2.20" +version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45862d1c77f2228b9e10bc609d5bc203d86ebc9b87ad8d5d5167a6c9abf739d9" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" [[package]] name = "android-tzdata" @@ -78,9 +78,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.93" +version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775" +checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04" [[package]] name = "arrayref" @@ -97,7 +97,7 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=f34f7eb3c2#f34f7eb3c2ee666f82ce5e042521ad2a0ddb62b9" dependencies = [ "arrow-arith", "arrow-array", @@ -117,7 +117,7 @@ dependencies = [ [[package]] name = "arrow-arith" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=f34f7eb3c2#f34f7eb3c2ee666f82ce5e042521ad2a0ddb62b9" dependencies = [ "arrow-array", "arrow-buffer", @@ -131,7 +131,7 @@ dependencies = [ [[package]] name = "arrow-array" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=f34f7eb3c2#f34f7eb3c2ee666f82ce5e042521ad2a0ddb62b9" dependencies = [ "ahash", "arrow-buffer", @@ -147,7 +147,7 @@ dependencies = [ [[package]] name = "arrow-buffer" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=f34f7eb3c2#f34f7eb3c2ee666f82ce5e042521ad2a0ddb62b9" dependencies = [ "bytes", "half", @@ -157,7 +157,7 @@ dependencies = [ [[package]] name = "arrow-cast" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=f34f7eb3c2#f34f7eb3c2ee666f82ce5e042521ad2a0ddb62b9" dependencies = [ "arrow-array", "arrow-buffer", @@ -177,7 +177,7 @@ dependencies = [ [[package]] name = "arrow-csv" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=f34f7eb3c2#f34f7eb3c2ee666f82ce5e042521ad2a0ddb62b9" dependencies = [ "arrow-array", "arrow-buffer", @@ -195,7 +195,7 @@ dependencies = [ [[package]] name = "arrow-data" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=f34f7eb3c2#f34f7eb3c2ee666f82ce5e042521ad2a0ddb62b9" dependencies = [ "arrow-buffer", "arrow-schema", @@ -206,7 +206,7 @@ dependencies = [ [[package]] name = "arrow-ipc" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=f34f7eb3c2#f34f7eb3c2ee666f82ce5e042521ad2a0ddb62b9" dependencies = [ "arrow-array", "arrow-buffer", @@ -220,7 +220,7 @@ dependencies = [ [[package]] name = "arrow-json" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=f34f7eb3c2#f34f7eb3c2ee666f82ce5e042521ad2a0ddb62b9" dependencies = [ "arrow-array", "arrow-buffer", @@ -239,7 +239,7 @@ dependencies = [ [[package]] name = "arrow-ord" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=f34f7eb3c2#f34f7eb3c2ee666f82ce5e042521ad2a0ddb62b9" dependencies = [ "arrow-array", "arrow-buffer", @@ -253,7 +253,7 @@ dependencies = [ [[package]] name = "arrow-row" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=f34f7eb3c2#f34f7eb3c2ee666f82ce5e042521ad2a0ddb62b9" dependencies = [ "ahash", "arrow-array", @@ -266,7 +266,7 @@ dependencies = [ [[package]] name = "arrow-schema" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=f34f7eb3c2#f34f7eb3c2ee666f82ce5e042521ad2a0ddb62b9" dependencies = [ "bitflags 2.6.0", "serde", @@ -275,7 +275,7 @@ dependencies = [ [[package]] name = "arrow-select" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=f34f7eb3c2#f34f7eb3c2ee666f82ce5e042521ad2a0ddb62b9" dependencies = [ "ahash", "arrow-array", @@ -288,7 +288,7 @@ dependencies = [ [[package]] name = "arrow-string" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=f34f7eb3c2#f34f7eb3c2ee666f82ce5e042521ad2a0ddb62b9" dependencies = [ "arrow-array", "arrow-buffer", @@ -303,9 +303,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.17" +version = "0.4.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cb8f1d480b0ea3783ab015936d2a55c87e219676f0c0b7dec61494043f21857" +checksum = "df895a515f70646414f4b45c0b79082783b80552b373a68283012928df56f522" dependencies = [ "bzip2", "flate2", @@ -327,7 +327,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.91", ] [[package]] @@ -401,9 +401,9 @@ dependencies = [ [[package]] name = "blake3" -version = "1.5.4" +version = "1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d82033247fd8e890df8f740e407ad4d038debb9eb1f40533fffb32e7d17dc6f7" +checksum = "b8ee0c1824c4dea5b5f81736aff91bae041d2c07ee1192bec91054e10e3e601e" dependencies = [ "arrayref", "arrayvec", @@ -431,6 +431,7 @@ dependencies = [ "panic-message", "paste", "prost 0.13.4", + "raw-cpuid", "tokio", ] @@ -500,9 +501,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "bytemuck" -version = "1.20.0" +version = "1.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b37c88a63ffd85d15b406896cc343916d7cf57838a847b3a6f2ca5d39a5695a" +checksum = "ef657dfab802224e671f5818e9a4935f9b1957ed18e58292690cc39e7a4092a3" [[package]] name = "byteorder" @@ -545,9 +546,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.1" +version = "1.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd9de9f2205d5ef3fd67e685b0df337994ddd4495e2a28d185500d0e1edfea47" +checksum = "c31a0499c1dc64f458ad13872de75c0eb7e3fdb0e67964610c914b034fc5956e" dependencies = [ "jobserver", "libc", @@ -568,9 +569,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.38" +version = "0.4.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" +checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" dependencies = [ "android-tzdata", "iana-time-zone", @@ -661,9 +662,9 @@ checksum = "ced507ab50aa0123e2c54db8b5f44fdfee04b1c93744d69e924307945fe57a85" [[package]] name = "cpufeatures" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ca741a962e1b0bff6d724a1a0958b686406e853bb14061f218562e1896f95e6" +checksum = "16b80225097f2e5ae4e7179dd2266824648f3e2f49d9134d584b76389d31c4c3" dependencies = [ "libc", ] @@ -679,9 +680,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.20" +version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" [[package]] name = "crunchy" @@ -737,7 +738,7 @@ dependencies = [ [[package]] name = "datafusion" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=e9e2128a7#e9e2128a7c1d2354342907afa511b7c12601b7a6" dependencies = [ "ahash", "arrow", @@ -793,7 +794,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=e9e2128a7#e9e2128a7c1d2354342907afa511b7c12601b7a6" dependencies = [ "arrow-schema", "async-trait", @@ -807,7 +808,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=e9e2128a7#e9e2128a7c1d2354342907afa511b7c12601b7a6" dependencies = [ "ahash", "arrow", @@ -830,7 +831,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=e9e2128a7#e9e2128a7c1d2354342907afa511b7c12601b7a6" dependencies = [ "log", "tokio", @@ -839,7 +840,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=e9e2128a7#e9e2128a7c1d2354342907afa511b7c12601b7a6" dependencies = [ "arrow", "chrono", @@ -859,7 +860,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=e9e2128a7#e9e2128a7c1d2354342907afa511b7c12601b7a6" dependencies = [ "ahash", "arrow", @@ -880,7 +881,7 @@ dependencies = [ [[package]] name = "datafusion-expr-common" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=e9e2128a7#e9e2128a7c1d2354342907afa511b7c12601b7a6" dependencies = [ "arrow", "datafusion-common", @@ -995,7 +996,7 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=e9e2128a7#e9e2128a7c1d2354342907afa511b7c12601b7a6" dependencies = [ "arrow", "arrow-buffer", @@ -1021,7 +1022,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=e9e2128a7#e9e2128a7c1d2354342907afa511b7c12601b7a6" dependencies = [ "ahash", "arrow", @@ -1041,7 +1042,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=e9e2128a7#e9e2128a7c1d2354342907afa511b7c12601b7a6" dependencies = [ "ahash", "arrow", @@ -1054,7 +1055,7 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=e9e2128a7#e9e2128a7c1d2354342907afa511b7c12601b7a6" dependencies = [ "arrow", "arrow-array", @@ -1076,7 +1077,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=e9e2128a7#e9e2128a7c1d2354342907afa511b7c12601b7a6" dependencies = [ "datafusion-common", "datafusion-expr", @@ -1087,7 +1088,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=e9e2128a7#e9e2128a7c1d2354342907afa511b7c12601b7a6" dependencies = [ "arrow", "async-trait", @@ -1106,7 +1107,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=e9e2128a7#e9e2128a7c1d2354342907afa511b7c12601b7a6" dependencies = [ "ahash", "arrow", @@ -1137,7 +1138,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=e9e2128a7#e9e2128a7c1d2354342907afa511b7c12601b7a6" dependencies = [ "ahash", "arrow", @@ -1150,7 +1151,7 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=e9e2128a7#e9e2128a7c1d2354342907afa511b7c12601b7a6" dependencies = [ "arrow-schema", "datafusion-common", @@ -1163,7 +1164,7 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=e9e2128a7#e9e2128a7c1d2354342907afa511b7c12601b7a6" dependencies = [ "ahash", "arrow", @@ -1197,7 +1198,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=e9e2128a7#e9e2128a7c1d2354342907afa511b7c12601b7a6" dependencies = [ "arrow", "arrow-array", @@ -1240,7 +1241,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.91", ] [[package]] @@ -1257,12 +1258,12 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.9" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" +checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys", ] [[package]] @@ -1273,17 +1274,18 @@ checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" [[package]] name = "fastrand" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "faststr" -version = "0.2.23" +version = "0.2.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dc21a7d5a45182c2bb5ae9471b93f10919c0744b54403e54a9e2329c26ed5a3" +checksum = "9154486833a83cb5d99de8c4d831314b8ae810dd4ef18d89ceb7a9c7c728dd74" dependencies = [ "bytes", + "rkyv", "serde", "simdutf8", ] @@ -1296,9 +1298,9 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" [[package]] name = "flatbuffers" -version = "24.3.25" +version = "24.12.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8add37afff2d4ffa83bc748a70b4b1370984f6980768554182424ef71447c35f" +checksum = "4f1baf0dbf96932ec9a3038d57900329c015b0bfb7b63d904f3bc27e2b02a096" dependencies = [ "bitflags 1.3.2", "rustc_version", @@ -1391,7 +1393,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.91", ] [[package]] @@ -1480,9 +1482,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.15.1" +version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a9bfc1af68b1726ea47d3d5109de126281def866b33970e10fbab11b5dafab3" +checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" [[package]] name = "heck" @@ -1646,7 +1648,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.91", ] [[package]] @@ -1672,12 +1674,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.6.0" +version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" +checksum = "62f822373a4fe84d4bb149bf54e584a7f4abec90e072ed49cda0edea5b95471f" dependencies = [ "equivalent", - "hashbrown 0.15.1", + "hashbrown 0.15.2", ] [[package]] @@ -1718,9 +1720,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.12" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a73e9fe3c49d7afb2ace819fa181a287ce54a0983eda4e0eb05c22f82ffe534" +checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" [[package]] name = "jemalloc-sys" @@ -1773,10 +1775,11 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.72" +version = "0.3.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9" +checksum = "6717b6b5b077764fb5966237269cb3c64edddde4b14ce42647430a78ced9e7b7" dependencies = [ + "once_cell", "wasm-bindgen", ] @@ -1852,9 +1855,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.164" +version = "0.2.169" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f" +checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" [[package]] name = "libm" @@ -1870,9 +1873,9 @@ checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" [[package]] name = "litemap" -version = "0.7.3" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "643cb0b8d4fcc284004d5fd0d67ccf61dfffadb7f75e1e71bc420f4688a3a704" +checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104" [[package]] name = "lock_api" @@ -1938,9 +1941,9 @@ checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "miniz_oxide" -version = "0.8.0" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +checksum = "4ffbe83022cedc1d264172192511ae958937694cd57ce297164951b8b3568394" dependencies = [ "adler2", ] @@ -1951,6 +1954,26 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +[[package]] +name = "munge" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64142d38c84badf60abf06ff9bd80ad2174306a5b11bd4706535090a30a419df" +dependencies = [ + "munge_macro", +] + +[[package]] +name = "munge_macro" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bb5c1d8184f13f7d0ccbeeca0def2f9a181bce2624302793005f5ca8aa62e5e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.91", +] + [[package]] name = "num" version = "0.4.3" @@ -2037,9 +2060,9 @@ dependencies = [ [[package]] name = "object" -version = "0.36.5" +version = "0.36.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e" +checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" dependencies = [ "memchr", ] @@ -2074,7 +2097,7 @@ checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" [[package]] name = "orc-rust" version = "0.4.1" -source = "git+https://github.com/blaze-init/datafusion-orc.git?rev=7833d7d#7833d7dcbb22b613d882b9217a1ca35fb9e9bd70" +source = "git+https://github.com/blaze-init/datafusion-orc.git?rev=0d798f8#0d798f828d8ef033d482544427e012e2887caf68" dependencies = [ "arrow", "async-trait", @@ -2137,7 +2160,7 @@ dependencies = [ [[package]] name = "parquet" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=f34f7eb3c2#f34f7eb3c2ee666f82ce5e042521ad2a0ddb62b9" dependencies = [ "ahash", "arrow-array", @@ -2272,14 +2295,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033" dependencies = [ "proc-macro2", - "syn 2.0.87", + "syn 2.0.91", ] [[package]] name = "proc-macro2" -version = "1.0.89" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e" +checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0" dependencies = [ "unicode-ident", ] @@ -2306,11 +2329,10 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" +checksum = "d0f3e5beed80eb580c68e2c600937ac2c4eedabdfd5ef1e5b7ea4f3fba84497b" dependencies = [ - "bytes", "heck", "itertools 0.13.0", "log", @@ -2321,7 +2343,7 @@ dependencies = [ "prost 0.13.4", "prost-types", "regex", - "syn 2.0.87", + "syn 2.0.91", "tempfile", ] @@ -2335,7 +2357,7 @@ dependencies = [ "itertools 0.12.1", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.91", ] [[package]] @@ -2348,18 +2370,38 @@ dependencies = [ "itertools 0.13.0", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.91", ] [[package]] name = "prost-types" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4759aa0d3a6232fb8dbdb97b61de2c20047c68aca932c7ed76da9d788508d670" +checksum = "cc2f1e56baa61e93533aebc21af4d2134b70f66275e0fcdf3cbe43d77ff7e8fc" dependencies = [ "prost 0.13.4", ] +[[package]] +name = "ptr_meta" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe9e76f66d3f9606f44e45598d155cb13ecf09f4a28199e48daf8c8fc937ea90" +dependencies = [ + "ptr_meta_derive", +] + +[[package]] +name = "ptr_meta_derive" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca414edb151b4c8d125c12566ab0d74dc9cdba36fb80eb7b848c15f495fd32d1" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.91", +] + [[package]] name = "quote" version = "1.0.37" @@ -2381,6 +2423,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "019b4b213425016d7d84a153c4c73afb0946fbb4840e4eece7ba8848b9d6da22" +[[package]] +name = "rancor" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "caf5f7161924b9d1cea0e4cabc97c372cea92b5f927fc13c6bca67157a0ad947" +dependencies = [ + "ptr_meta", +] + [[package]] name = "rand" version = "0.8.5" @@ -2411,11 +2462,20 @@ dependencies = [ "getrandom", ] +[[package]] +name = "raw-cpuid" +version = "11.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "redox_syscall" -version = "0.5.7" +version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" +checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834" dependencies = [ "bitflags 2.6.0", ] @@ -2437,7 +2497,7 @@ checksum = "bcc303e793d3734489387d205e9b186fac9c6cfacedd98cbb2e8a5943595f3e6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.91", ] [[package]] @@ -2469,6 +2529,41 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +[[package]] +name = "rend" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a35e8a6bf28cd121053a66aa2e6a2e3eaffad4a60012179f0e864aa5ffeff215" + +[[package]] +name = "rkyv" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b11a153aec4a6ab60795f8ebe2923c597b16b05bb1504377451e705ef1a45323" +dependencies = [ + "bytes", + "hashbrown 0.15.2", + "indexmap", + "munge", + "ptr_meta", + "rancor", + "rend", + "rkyv_derive", + "tinyvec", + "uuid", +] + +[[package]] +name = "rkyv_derive" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "beb382a4d9f53bd5c0be86b10d8179c3f8a14c30bf774ff77096ed6581e35981" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.91", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -2486,15 +2581,15 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.41" +version = "0.38.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7f649912bc1495e167a6edee79151c84b1bad49748cb4f1f1167f459f6224f6" +checksum = "f93dc38ecbab2eb790ff964bb77fa94faf256fd3e73285fd7ba0903b76bedb85" dependencies = [ "bitflags 2.6.0", "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys", ] [[package]] @@ -2526,9 +2621,9 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "semver" -version = "1.0.23" +version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" +checksum = "3cb6eb87a131f756572d7fb904f6e7b68633f09cca868c5df1c4b8d1a694bbba" [[package]] name = "seq-macro" @@ -2538,22 +2633,22 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.215" +version = "1.0.216" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6513c1ad0b11a9376da888e3e0baa0077f1aed55c17f50e7b2397136129fb88f" +checksum = "0b9781016e935a97e8beecf0c933758c97a5520d32930e460142b4cd80c6338e" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.215" +version = "1.0.216" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" +checksum = "46f859dbbf73865c6627ed570e78961cd3ac92407a2d117204c49232485da55e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.91", ] [[package]] @@ -2634,7 +2729,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.91", ] [[package]] @@ -2669,7 +2764,7 @@ dependencies = [ "simdutf8", "sonic-number", "sonic-simd", - "thiserror 2.0.3", + "thiserror 2.0.9", ] [[package]] @@ -2699,7 +2794,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.91", ] [[package]] @@ -2733,7 +2828,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.87", + "syn 2.0.91", ] [[package]] @@ -2755,9 +2850,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.87" +version = "2.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d" +checksum = "d53cbcb5a243bd33b7858b1d7f4aca2153490815872d86d955d6ea29f743c035" dependencies = [ "proc-macro2", "quote", @@ -2772,7 +2867,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.91", ] [[package]] @@ -2791,7 +2886,7 @@ dependencies = [ "fastrand", "once_cell", "rustix", - "windows-sys 0.59.0", + "windows-sys", ] [[package]] @@ -2805,11 +2900,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.3" +version = "2.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa" +checksum = "f072643fd0190df67a8bab670c20ef5d8737177d6ac6b2e9a236cb096206b2cc" dependencies = [ - "thiserror-impl 2.0.3", + "thiserror-impl 2.0.9", ] [[package]] @@ -2820,18 +2915,18 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.91", ] [[package]] name = "thiserror-impl" -version = "2.0.3" +version = "2.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568" +checksum = "7b50fa271071aae2e6ee85f842e2e28ba8cd2c5fb67f11fcb1fd70b276f9e7d4" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.91", ] [[package]] @@ -2875,6 +2970,21 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tinyvec" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "022db8904dfa342efe721985167e9fcd16c29b226db4397ed752a761cfce81e8" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.42.0" @@ -2895,14 +3005,14 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.91", ] [[package]] name = "tokio-util" -version = "0.7.12" +version = "0.7.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" +checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" dependencies = [ "bytes", "futures-core", @@ -2922,14 +3032,14 @@ dependencies = [ "prost-build", "prost-types", "quote", - "syn 2.0.87", + "syn 2.0.91", ] [[package]] name = "tracing" -version = "0.1.40" +version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "pin-project-lite", "tracing-attributes", @@ -2938,20 +3048,20 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.27" +version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.91", ] [[package]] name = "tracing-core" -version = "0.1.32" +version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" dependencies = [ "once_cell", ] @@ -2998,9 +3108,9 @@ checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd" [[package]] name = "url" -version = "2.5.3" +version = "2.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d157f1b96d14500ffdc1f10ba712e780825526c03d9a49b4d0324b0d9113ada" +checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" dependencies = [ "form_urlencoded", "idna", @@ -3052,9 +3162,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.95" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e" +checksum = "a474f6281d1d70c17ae7aa6a613c87fce69a127e2624002df63dcb39d6cf6396" dependencies = [ "cfg-if", "once_cell", @@ -3063,24 +3173,23 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.95" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358" +checksum = "5f89bb38646b4f81674e8f5c3fb81b562be1fd936d84320f3264486418519c79" dependencies = [ "bumpalo", "log", - "once_cell", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.91", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-macro" -version = "0.2.95" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56" +checksum = "2cc6181fd9a7492eef6fef1f33961e3695e4579b9872a6f7c83aee556666d4fe" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3088,28 +3197,28 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.95" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" +checksum = "30d7a95b763d3c45903ed6c81f156801839e5ee968bb07e534c44df0fcd330c2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.91", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.95" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" +checksum = "943aab3fdaaa029a6e0271b35ea10b72b943135afe9bffca82384098ad0e06a6" [[package]] name = "web-sys" -version = "0.3.72" +version = "0.3.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112" +checksum = "04dd7223427d52553d3702c004d3b2fe07c148165faa56313cb00211e31c12bc" dependencies = [ "js-sys", "wasm-bindgen", @@ -3121,7 +3230,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.59.0", + "windows-sys", ] [[package]] @@ -3133,15 +3242,6 @@ dependencies = [ "windows-targets", ] -[[package]] -name = "windows-sys" -version = "0.52.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" -dependencies = [ - "windows-targets", -] - [[package]] name = "windows-sys" version = "0.59.0" @@ -3247,9 +3347,9 @@ dependencies = [ [[package]] name = "yoke" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c5b1314b079b0930c31e3af543d8ee1757b1951ae1e1565ec704403a7240ca5" +checksum = "120e6aef9aa629e3d4f52dc8cc43a015c7724194c97dfaf45180d2daf2b77f40" dependencies = [ "serde", "stable_deref_trait", @@ -3259,13 +3359,13 @@ dependencies = [ [[package]] name = "yoke-derive" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95" +checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.91", "synstructure", ] @@ -3287,27 +3387,27 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.91", ] [[package]] name = "zerofrom" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91ec111ce797d0e0784a1116d0ddcdbea84322cd79e5d5ad173daeba4f93ab55" +checksum = "cff3ee08c995dee1859d998dea82f7374f2826091dd9cd47def953cae446cd2e" dependencies = [ "zerofrom-derive", ] [[package]] name = "zerofrom-derive" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5" +checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.91", "synstructure", ] @@ -3330,7 +3430,7 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.91", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 05e2bbb2..e280a566 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,27 +66,27 @@ serde_json = { version = "1.0.96" } [patch.crates-io] # datafusion: branch=v42-blaze -datafusion = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"} -datafusion-common = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"} -datafusion-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"} -datafusion-execution = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"} -datafusion-optimizer = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"} -datafusion-physical-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"} -orc-rust = { git = "https://github.com/blaze-init/datafusion-orc.git", rev = "7833d7d"} +datafusion = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "e9e2128a7"} +datafusion-common = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "e9e2128a7"} +datafusion-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "e9e2128a7"} +datafusion-execution = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "e9e2128a7"} +datafusion-optimizer = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "e9e2128a7"} +datafusion-physical-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "e9e2128a7"} +orc-rust = { git = "https://github.com/blaze-init/datafusion-orc.git", rev = "0d798f8"} # arrow: branch=v53-blaze -arrow = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} -arrow-arith = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} -arrow-array = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} -arrow-buffer = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} -arrow-cast = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} -arrow-data = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} -arrow-ord = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} -arrow-row = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} -arrow-schema = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} -arrow-select = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} -arrow-string = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} -parquet = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} +arrow = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"} +arrow-arith = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"} +arrow-array = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"} +arrow-buffer = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"} +arrow-cast = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"} +arrow-data = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"} +arrow-ord = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"} +arrow-row = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"} +arrow-schema = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"} +arrow-select = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"} +arrow-string = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"} +parquet = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"} # serde_json: branch=v1.0.96-blaze serde_json = { git = "https://github.com/blaze-init/json", branch = "v1.0.96-blaze" } diff --git a/native-engine/blaze-jni-bridge/src/conf.rs b/native-engine/blaze-jni-bridge/src/conf.rs index 440b3ce2..f75672d2 100644 --- a/native-engine/blaze-jni-bridge/src/conf.rs +++ b/native-engine/blaze-jni-bridge/src/conf.rs @@ -30,7 +30,6 @@ macro_rules! define_conf { define_conf!(IntConf, BATCH_SIZE); define_conf!(DoubleConf, MEMORY_FRACTION); -define_conf!(IntConf, TOKIO_NUM_WORKER_THREADS); define_conf!(BooleanConf, SMJ_INEQUALITY_JOIN_ENABLE); define_conf!(BooleanConf, CASE_CONVERT_FUNCTIONS_ENABLE); define_conf!(BooleanConf, INPUT_BATCH_STATISTICS_ENABLE); diff --git a/native-engine/blaze/Cargo.toml b/native-engine/blaze/Cargo.toml index f5e63574..5456251d 100644 --- a/native-engine/blaze/Cargo.toml +++ b/native-engine/blaze/Cargo.toml @@ -25,6 +25,7 @@ once_cell = "1.20.2" panic-message = "0.3.0" paste = "1.0.15" prost = "0.13.4" +raw-cpuid = "11.2.0" tokio = "=1.42.0" [target.'cfg(not(windows))'.dependencies] diff --git a/native-engine/blaze/src/rt.rs b/native-engine/blaze/src/rt.rs index 3afa18a5..33d05a7a 100644 --- a/native-engine/blaze/src/rt.rs +++ b/native-engine/blaze/src/rt.rs @@ -24,11 +24,9 @@ use arrow::{ record_batch::RecordBatch, }; use blaze_jni_bridge::{ - conf::{IntConf, TOKIO_NUM_WORKER_THREADS}, - is_task_running, - jni_bridge::JavaClasses, - jni_call, jni_call_static, jni_convert_byte_array, jni_exception_check, jni_exception_occurred, - jni_new_global_ref, jni_new_object, jni_new_string, + is_task_running, jni_bridge::JavaClasses, jni_call, jni_call_static, jni_convert_byte_array, + jni_exception_check, jni_exception_occurred, jni_new_global_ref, jni_new_object, + jni_new_string, }; use blaze_serde::protobuf::TaskDefinition; use datafusion::{ @@ -49,6 +47,7 @@ use datafusion_ext_plans::{ use futures::{FutureExt, StreamExt}; use jni::objects::{GlobalRef, JObject}; use prost::Message; +use raw_cpuid::CpuId; use tokio::{runtime::Runtime, task::JoinHandle}; use crate::{ @@ -95,13 +94,29 @@ impl NativeExecutionRuntime { &ExecutionPlanMetricsSet::new(), ); + // determine number of tokio worker threads + // use the real number of available physical cores + let default_parallelism = std::thread::available_parallelism() + .map(|v| v.get()) + .unwrap_or(1); + let has_htt = CpuId::new() + .get_feature_info() + .map(|info| info.has_htt()) + .unwrap_or(false); + let mut num_worker_threads = if has_htt { + default_parallelism / 2 + } else { + default_parallelism + }; + num_worker_threads = num_worker_threads.max(1); + // create tokio runtime // propagate classloader and task context to spawned children threads let spark_task_context = jni_call_static!(JniBridge.getTaskContext() -> JObject)?; let spark_task_context_global = jni_new_global_ref!(spark_task_context.as_obj())?; let tokio_runtime = tokio::runtime::Builder::new_multi_thread() .thread_name(format!("blaze-native-stage-{stage_id}-part-{partition_id}")) - .worker_threads(TOKIO_NUM_WORKER_THREADS.value()? as usize) + .worker_threads(num_worker_threads) .on_thread_start(move || { let classloader = JavaClasses::get().classloader; let _ = jni_call_static!( diff --git a/native-engine/datafusion-ext-commons/src/algorithm/rdx_tournament_tree.rs b/native-engine/datafusion-ext-commons/src/algorithm/rdx_tournament_tree.rs index 7c3f1938..663f8920 100644 --- a/native-engine/datafusion-ext-commons/src/algorithm/rdx_tournament_tree.rs +++ b/native-engine/datafusion-ext-commons/src/algorithm/rdx_tournament_tree.rs @@ -33,6 +33,7 @@ pub struct RadixTournamentTree { #[allow(clippy::len_without_is_empty)] impl RadixTournamentTree { pub fn new(values: Vec, num_keys: usize) -> Self { + let num_keys = num_keys + 1; // avoid overflow let num_values = values.len(); let mut tree = unsafe { // safety: diff --git a/native-engine/datafusion-ext-commons/src/algorithm/rdxsort.rs b/native-engine/datafusion-ext-commons/src/algorithm/rdxsort.rs index f32c6104..d5ce81ca 100644 --- a/native-engine/datafusion-ext-commons/src/algorithm/rdxsort.rs +++ b/native-engine/datafusion-ext-commons/src/algorithm/rdxsort.rs @@ -12,46 +12,103 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::vec::IntoIter; +use crate::unchecked; -use radsort::Key; +/// Perform radix sort on a single array +/// +/// - array: the array to be sorted +/// - counts: the counters to be used for counting, must be initialized to 0. +/// will be filled with the number of elements in each bucket after sorting. +/// - key: a function to extract the key from the array element +pub fn radix_sort_by_key(array: &mut [T], counts: &mut [usize], key: impl Fn(&T) -> usize) { + #[derive(Default, Clone, Copy)] + struct Part { + cur: usize, + end: usize, + } -const STD_SORT_LIMIT: usize = 4096; + let num_keys = counts.len(); + let mut counts = unchecked!(counts); + let mut parts = unchecked!(vec![Part::default(); num_keys]); -pub fn radix_sort_unstable(array: &mut [impl Key + Ord]) { - radix_sort_unstable_by_key(array, |v| *v); -} + // count + array.iter().for_each(|item| counts[key(item)] += 1); + + // construct parts + let mut beg = 0; + for (idx, count) in counts.iter().enumerate() { + if *count > 0 { + parts[idx] = Part { + cur: beg, + end: beg + count, + }; + beg += count; + } + } -pub fn radix_sort_unstable_by_key(array: &mut [T], key: impl Fn(&T) -> K) { - if array.len() < STD_SORT_LIMIT { - array.sort_unstable_by_key(key); - } else { - radsort::sort_by_key(array, key); + // reorganize each partition + let mut inexhausted_part_indices = unchecked!(vec![0; num_keys]); + for i in 0..num_keys { + inexhausted_part_indices[i] = i; + } + while { + inexhausted_part_indices.retain(|&i| parts[i].cur < parts[i].end); + inexhausted_part_indices.len() > 1 + } { + for &part_idx in inexhausted_part_indices.iter() { + let cur_part = &parts[part_idx]; + let cur = cur_part.cur; + let end = cur_part.end; + for item_idx in cur..end { + let target_part_idx = key(&array[item_idx]); + let target_part = &mut parts[target_part_idx]; + unsafe { + // safety: skip bound check + array.swap_unchecked(item_idx, target_part.cur); + } + target_part.cur += 1; + } + } } } -pub trait RadixSortIterExt: Iterator { - fn radix_sorted_unstable(self) -> IntoIter - where - Self: Sized, - Self::Item: Key + Ord, - { - let mut vec: Vec = self.collect(); - radix_sort_unstable(&mut vec); - vec.into_iter() +#[cfg(test)] +mod test { + use rand::Rng; + + use super::*; + + #[test] + fn fuzzytest_u16_small() { + for n in 0..1000 { + let mut array = vec![]; + for _ in 0..n { + array.push(rand::thread_rng().gen::()); + } + + let mut array1 = array.clone(); + radix_sort_by_key(&mut array1, &mut [0; 65536], |key| *key as usize); + + let mut array2 = array.clone(); + array2.sort_unstable(); + + assert_eq!(array1, array2); + } } - fn radix_sorted_unstable_by_key( - self, - key: impl Fn(&Self::Item) -> K, - ) -> IntoIter - where - Self: Sized, - { - let mut vec: Vec = self.collect(); - radix_sort_unstable_by_key(&mut vec, key); - vec.into_iter() + #[test] + fn fuzzytest_u16_1m() { + let mut array = vec![]; + for _ in 0..1000000 { + array.push(rand::thread_rng().gen::()); + } + + let mut array1 = array.clone(); + radix_sort_by_key(&mut array1, &mut [0; 65536], |key| *key as usize); + + let mut array2 = array.clone(); + array2.sort_unstable(); + + assert_eq!(array1, array2); } } - -impl> RadixSortIterExt for I {} diff --git a/native-engine/datafusion-ext-commons/src/arrow/cast.rs b/native-engine/datafusion-ext-commons/src/arrow/cast.rs index 93d5f04f..4ad7c9b5 100644 --- a/native-engine/datafusion-ext-commons/src/arrow/cast.rs +++ b/native-engine/datafusion-ext-commons/src/arrow/cast.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use arrow::{array::*, datatypes::*}; use datafusion::common::Result; +use num::{Bounded, FromPrimitive, Integer, Signed}; use crate::df_execution_err; @@ -37,6 +38,11 @@ pub fn cast_impl( (_, &DataType::Null) => Arc::new(NullArray::new(array.len())), + // spark compatible str to int + (&DataType::Utf8, to_dt) if to_dt.is_signed_integer() => { + return try_cast_string_array_to_integer(array, to_dt); + } + // float to int // use unchecked casting, which is compatible with spark (&DataType::Float32, &DataType::Int8) => { @@ -201,6 +207,109 @@ pub fn cast_impl( }) } +fn try_cast_string_array_to_integer(array: &dyn Array, cast_type: &DataType) -> Result { + macro_rules! cast { + ($target_type:ident) => {{ + type B = paste::paste! {[<$target_type Builder>]}; + let array = array.as_any().downcast_ref::().unwrap(); + let mut builder = B::new(); + + for v in array.iter() { + match v { + Some(s) => builder.append_option(to_integer(s)), + None => builder.append_null(), + } + } + Arc::new(builder.finish()) + }}; + } + + Ok(match cast_type { + DataType::Int8 => cast!(Int8), + DataType::Int16 => cast!(Int16), + DataType::Int32 => cast!(Int32), + DataType::Int64 => cast!(Int64), + _ => arrow::compute::cast(array, cast_type)?, + }) +} + +// this implementation is original copied from spark UTF8String.scala +fn to_integer(input: &str) -> Option { + let bytes = input.as_bytes(); + + if bytes.is_empty() { + return None; + } + + let b = bytes[0]; + let negative = b == b'-'; + let mut offset = 0; + + if negative || b == b'+' { + offset += 1; + if bytes.len() == 1 { + return None; + } + } + + let separator = b'.'; + let radix = T::from_usize(10).unwrap(); + let stop_value = T::min_value() / radix; + let mut result = T::zero(); + + while offset < bytes.len() { + let b = bytes[offset]; + offset += 1; + if b == separator { + // We allow decimals and will return a truncated integral in that case. + // Therefore we won't throw an exception here (checking the fractional + // part happens below.) + break; + } + + let digit = if b.is_ascii_digit() { + b - b'0' + } else { + return None; + }; + + // We are going to process the new digit and accumulate the result. However, + // before doing this, if the result is already smaller than the + // stopValue(Long.MIN_VALUE / radix), then result * 10 will definitely + // be smaller than minValue, and we can stop. + if result < stop_value { + return None; + } + + result = result * radix - T::from_u8(digit).unwrap(); + // Since the previous result is less than or equal to stopValue(Long.MIN_VALUE / + // radix), we can just use `result > 0` to check overflow. If result + // overflows, we should stop. + if result > T::zero() { + return None; + } + } + + // This is the case when we've encountered a decimal separator. The fractional + // part will not change the number, but we will verify that the fractional part + // is well formed. + while offset < bytes.len() { + let current_byte = bytes[offset]; + if !current_byte.is_ascii_digit() { + return None; + } + offset += 1; + } + + if !negative { + result = -result; + if result < T::zero() { + return None; + } + } + Some(result) +} + #[cfg(test)] mod test { use datafusion::common::cast::{as_decimal128_array, as_float64_array, as_int32_array}; @@ -343,4 +452,30 @@ mod test { ]) ); } + + #[test] + fn test_string_to_bigint() { + let string_array: ArrayRef = Arc::new(StringArray::from_iter(vec![ + None, + Some("123"), + Some("987"), + Some("987.654"), + Some("123456789012345"), + Some("-123456789012345"), + Some("999999999999999999999999999999999"), + ])); + let casted = cast(&string_array, &DataType::Int64).unwrap(); + assert_eq!( + casted.as_any().downcast_ref::().unwrap(), + &Int64Array::from_iter(vec![ + None, + Some(123), + Some(987), + Some(987), + Some(123456789012345), + Some(-123456789012345), + None, + ]) + ); + } } diff --git a/native-engine/datafusion-ext-commons/src/arrow/selection.rs b/native-engine/datafusion-ext-commons/src/arrow/selection.rs index 6774e640..095ef85a 100644 --- a/native-engine/datafusion-ext-commons/src/arrow/selection.rs +++ b/native-engine/datafusion-ext-commons/src/arrow/selection.rs @@ -143,8 +143,13 @@ pub fn create_array_interleaver( }); } } - let v = interleaver.arrays[*a].value(*b); - values.push(v) + let array = &interleaver.arrays[*a]; + if array.is_valid(*b) { + let v = interleaver.arrays[*a].value(*b); + values.push(v) + } else { + values.push(Default::default()); + } } let array = PrimitiveArray::::new(values.into(), nulls); @@ -172,9 +177,12 @@ pub fn create_array_interleaver( }); } } - let o = interleaver.arrays[*a].value_offsets(); - let element_len = o[*b + 1].as_usize() - o[*b].as_usize(); - capacity += element_len; + let array = &interleaver.arrays[*a]; + if array.is_valid(*b) { + let o = array.value_offsets(); + let element_len = o[*b + 1].as_usize() - o[*b].as_usize(); + capacity += element_len; + } offsets.append(T::Offset::from_usize(capacity).expect("overflow")); } @@ -192,7 +200,10 @@ pub fn create_array_interleaver( }); } } - values.extend_from_slice(interleaver.arrays[*a].value(*b).as_ref()); + let array = &interleaver.arrays[*a]; + if array.is_valid(*b) { + values.extend_from_slice(interleaver.arrays[*a].value(*b).as_ref()); + } } // Safety: safe by construction diff --git a/native-engine/datafusion-ext-plans/src/agg/agg_hash_map.rs b/native-engine/datafusion-ext-plans/src/agg/agg_hash_map.rs index 6e526a0a..c612f6af 100644 --- a/native-engine/datafusion-ext-plans/src/agg/agg_hash_map.rs +++ b/native-engine/datafusion-ext-plans/src/agg/agg_hash_map.rs @@ -209,8 +209,10 @@ impl AggHashMap { } pub fn upsert_records(&mut self, keys: Vec) -> Vec { - self.map.reserve(keys.len()); - self.map.upsert_many(keys) + tokio::task::block_in_place(|| { + self.map.reserve(keys.len()); + self.map.upsert_many(keys) + }) } pub fn take_keys(&mut self) -> Vec { diff --git a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs index 5b07c9b1..ff7806a8 100644 --- a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs +++ b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs @@ -28,7 +28,7 @@ use datafusion::{ use datafusion_ext_commons::{ algorithm::{ rdx_tournament_tree::{KeyForRadixTournamentTree, RadixTournamentTree}, - rdxsort::radix_sort_unstable_by_key, + rdxsort::radix_sort_by_key, }, batch_size, df_execution_err, downcast_any, io::{read_bytes_slice, read_len, write_len}, @@ -480,33 +480,31 @@ impl HashingData { .enumerate() .map(|(record_idx, key)| (bucket_id(key) as u32, record_idx as u32)) .collect::>(); - radix_sort_unstable_by_key(&mut entries, |v| v.0 as u16); + + let mut bucket_counts = vec![0; NUM_SPILL_BUCKETS]; + radix_sort_by_key(&mut entries, &mut bucket_counts, |(bucket_id, ..)| { + *bucket_id as usize + }); let mut writer = spill.get_compressed_writer(); - let mut begin = 0; - let mut end; - while begin < entries.len() { - let cur_bucket_id = entries[begin].0; - end = begin + 1; - while end < entries.len() && entries[end].0 == cur_bucket_id { - end += 1; + let mut offset = 0; + for (cur_bucket_id, bucket_count) in bucket_counts.into_iter().enumerate() { + if bucket_count == 0 { + continue; } - - write_len(cur_bucket_id as usize, &mut writer)?; - write_len(end - begin, &mut writer)?; + write_len(cur_bucket_id, &mut writer)?; + write_len(bucket_count, &mut writer)?; write_spill_bucket( &mut writer, &acc_table, - entries[begin..end] + entries[offset..][..bucket_count] .iter() .map(|&(_, record_idx)| &key_rows[record_idx as usize]), - entries[begin..end] + entries[offset..][..bucket_count] .iter() .map(|&(_, record_idx)| record_idx as usize), )?; - - // next bucket - begin = end; + offset += bucket_count; } // EOF write_len(NUM_SPILL_BUCKETS, &mut writer)?; @@ -584,24 +582,24 @@ impl MergingData { let mut entries = self.entries; let key_rows = self.key_rows; let acc_table = self.acc_table; - radix_sort_unstable_by_key(&mut entries, |(bucket_id, ..)| *bucket_id as u16); + + let mut bucket_counts = vec![0; NUM_SPILL_BUCKETS]; + radix_sort_by_key(&mut entries, &mut bucket_counts, |(bucket_id, ..)| { + *bucket_id as usize + }); let mut writer = spill.get_compressed_writer(); - let mut begin = 0; - let mut end; - while begin < entries.len() { - let cur_bucket_id = entries[begin].0; - end = begin + 1; - while end < entries.len() && entries[end].0 == cur_bucket_id { - end += 1; + let mut offset = 0; + for (cur_bucket_id, bucket_count) in bucket_counts.into_iter().enumerate() { + if bucket_count == 0 { + continue; } - - write_len(cur_bucket_id as usize, &mut writer)?; - write_len(end - begin, &mut writer)?; + write_len(cur_bucket_id, &mut writer)?; + write_len(bucket_count, &mut writer)?; write_spill_bucket( &mut writer, &acc_table, - entries[begin..end] + entries[offset..][..bucket_count] .iter() .map(|&(_, batch_idx, row_idx, _)| { key_rows[batch_idx as usize] @@ -609,13 +607,11 @@ impl MergingData { .as_ref() .as_raw_bytes() }), - entries[begin..end] + entries[offset..][..bucket_count] .iter() .map(|&(_, _, _, record_idx)| record_idx as usize), )?; - - // next bucket - begin = end; + offset += bucket_count; } // EOF diff --git a/native-engine/datafusion-ext-plans/src/agg/collect.rs b/native-engine/datafusion-ext-plans/src/agg/collect.rs index b684a013..1f988eae 100644 --- a/native-engine/datafusion-ext-plans/src/agg/collect.rs +++ b/native-engine/datafusion-ext-plans/src/agg/collect.rs @@ -136,7 +136,6 @@ impl Agg for AggGenericCollect { ) -> Result<()> { let accs = downcast_any!(accs, mut C).unwrap(); let merging_accs = downcast_any!(merging_accs, mut C).unwrap(); - idx_for_zipped! { ((acc_idx, merging_acc_idx) in (acc_idx, merging_acc_idx)) => { accs.merge_items(acc_idx, merging_accs, merging_acc_idx); @@ -152,7 +151,7 @@ impl Agg for AggGenericCollect { idx_for! { (acc_idx in acc_idx) => { list.push(ScalarValue::List(ScalarValue::new_list( - &accs.take_values(acc_idx, self.arg_type.clone()), + &accs.take_values(acc_idx), &self.arg_type, true, ))); @@ -168,7 +167,7 @@ pub trait AccCollectionColumn: AccColumn + Send + Sync + 'static { fn merge_items(&mut self, idx: usize, other: &mut Self, other_idx: usize); fn save_raw(&self, idx: usize, w: &mut impl Write) -> Result<()>; fn load_raw(&mut self, idx: usize, r: &mut impl Read) -> Result<()>; - fn take_values(&mut self, idx: usize, dt: DataType) -> Vec; + fn take_values(&mut self, idx: usize) -> Vec; fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec]) -> Result<()> { let mut array_idx = 0; @@ -195,25 +194,6 @@ pub trait AccCollectionColumn: AccColumn + Send + Sync + 'static { } Ok(()) } - - fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { - idx_for! { - (idx in idx) => { - self.save_raw(idx, w)?; - } - } - Ok(()) - } - - fn unspill(&mut self, num_rows: usize, r: &mut SpillCompressedReader) -> Result<()> { - let idx = self.num_records(); - self.resize(idx + num_rows); - - while idx < self.num_records() { - self.load_raw(idx, r)?; - } - Ok(()) - } } pub struct AccSetColumn { @@ -265,10 +245,10 @@ impl AccCollectionColumn for AccSetColumn { Ok(()) } - fn take_values(&mut self, idx: usize, dt: DataType) -> Vec { + fn take_values(&mut self, idx: usize) -> Vec { self.mem_used -= self.set[idx].mem_size(); std::mem::take(&mut self.set[idx]) - .into_values(dt, false) + .into_values(self.dt.clone(), false) .collect() } } @@ -309,23 +289,37 @@ impl AccColumn for AccSetColumn { } fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { - AccCollectionColumn::spill(self, idx, w) + idx_for! { + (idx in idx) => { + self.save_raw(idx, w)?; + } + } + Ok(()) } fn unspill(&mut self, num_rows: usize, r: &mut SpillCompressedReader) -> Result<()> { - AccCollectionColumn::unspill(self, num_rows, r) + let mut idx = self.num_records(); + self.resize(idx + num_rows); + + while idx < self.num_records() { + self.load_raw(idx, r)?; + idx += 1; + } + Ok(()) } } pub struct AccListColumn { list: Vec, + dt: DataType, mem_used: usize, } impl AccCollectionColumn for AccListColumn { - fn empty(_dt: DataType) -> Self { + fn empty(dt: DataType) -> Self { Self { list: vec![], + dt, mem_used: 0, } } @@ -360,10 +354,10 @@ impl AccCollectionColumn for AccListColumn { Ok(()) } - fn take_values(&mut self, idx: usize, dt: DataType) -> Vec { + fn take_values(&mut self, idx: usize) -> Vec { self.mem_used -= self.list[idx].mem_size(); std::mem::take(&mut self.list[idx]) - .into_values(dt, false) + .into_values(self.dt.clone(), false) .collect() } } @@ -404,11 +398,23 @@ impl AccColumn for AccListColumn { } fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { - AccCollectionColumn::spill(self, idx, w) + idx_for! { + (idx in idx) => { + self.save_raw(idx, w)?; + } + } + Ok(()) } fn unspill(&mut self, num_rows: usize, r: &mut SpillCompressedReader) -> Result<()> { - AccCollectionColumn::unspill(self, num_rows, r) + let mut idx = self.num_records(); + self.resize(idx + num_rows); + + while idx < self.num_records() { + self.load_raw(idx, r)?; + idx += 1; + } + Ok(()) } } @@ -622,3 +628,99 @@ fn acc_hash(value: impl AsRef<[u8]>) -> u64 { foldhash::fast::FixedState::with_seed(ACC_HASH_SEED as u64); HASHER.hash_one(value.as_ref()) } + +#[cfg(test)] +mod tests { + use arrow::datatypes::DataType; + use datafusion::common::ScalarValue; + + use super::*; + use crate::memmgr::spill::Spill; + + #[test] + fn test_acc_set_append() { + let mut acc_set = AccSet::default(); + let value1 = ScalarValue::Int32(Some(1)); + let value2 = ScalarValue::Int32(Some(2)); + + acc_set.append(&value1, false); + acc_set.append(&value2, false); + + assert_eq!(acc_set.list.raw.len(), 8); // 4 bytes for each int32 + assert_eq!(acc_set.set.len(), 2); + } + + #[test] + fn test_acc_set_merge() { + let mut acc_set1 = AccSet::default(); + let mut acc_set2 = AccSet::default(); + let value1 = ScalarValue::Int32(Some(1)); + let value2 = ScalarValue::Int32(Some(2)); + let value3 = ScalarValue::Int32(Some(3)); + + acc_set1.append(&value1, false); + acc_set1.append(&value2, false); + acc_set2.append(&value2, false); + acc_set2.append(&value3, false); + + acc_set1.merge(&mut acc_set2); + + assert_eq!(acc_set1.list.raw.len(), 12); // 4 bytes for each int32 + assert_eq!(acc_set1.set.len(), 3); + } + + #[test] + fn test_acc_set_into_values() { + let mut acc_set = AccSet::default(); + let value1 = ScalarValue::Int32(Some(1)); + let value2 = ScalarValue::Int32(Some(2)); + + acc_set.append(&value1, false); + acc_set.append(&value2, false); + + let values: Vec = acc_set.into_values(DataType::Int32, false).collect(); + assert_eq!(values, vec![value1, value2]); + } + + #[test] + fn test_acc_set_duplicate_append() { + let mut acc_set = AccSet::default(); + let value1 = ScalarValue::Int32(Some(1)); + + acc_set.append(&value1, false); + acc_set.append(&value1, false); + + assert_eq!(acc_set.list.raw.len(), 4); // 4 bytes for one int32 + assert_eq!(acc_set.set.len(), 1); + } + + #[test] + fn test_acc_set_spill() { + let mut acc_col = AccSetColumn::empty(DataType::Int32); + acc_col.resize(3); + acc_col.append_item(1, &ScalarValue::Int32(Some(1))); + acc_col.append_item(1, &ScalarValue::Int32(Some(2))); + acc_col.append_item(2, &ScalarValue::Int32(Some(3))); + acc_col.append_item(2, &ScalarValue::Int32(Some(4))); + acc_col.append_item(2, &ScalarValue::Int32(Some(5))); + acc_col.append_item(2, &ScalarValue::Int32(Some(6))); + acc_col.append_item(2, &ScalarValue::Int32(Some(7))); + + let mut spill: Box = Box::new(vec![]); + acc_col + .spill( + IdxSelection::Range(0, 3), + &mut spill.get_compressed_writer(), + ) + .unwrap(); + + let mut acc_col_unspill = AccSetColumn::empty(DataType::Int32); + acc_col_unspill + .unspill(3, &mut spill.get_compressed_reader()) + .unwrap(); + + assert_eq!(acc_col.take_values(0), acc_col_unspill.take_values(0)); + assert_eq!(acc_col.take_values(1), acc_col_unspill.take_values(1)); + assert_eq!(acc_col.take_values(2), acc_col_unspill.take_values(2)); + } +} diff --git a/native-engine/datafusion-ext-plans/src/agg/maxmin.rs b/native-engine/datafusion-ext-plans/src/agg/maxmin.rs index e1f45408..24800b1c 100644 --- a/native-engine/datafusion-ext-plans/src/agg/maxmin.rs +++ b/native-engine/datafusion-ext-plans/src/agg/maxmin.rs @@ -110,7 +110,7 @@ impl Agg for AggMaxMin

{ if !accs.prim_valid(acc_idx) { accs.set_prim_valid(acc_idx, true); - accs.set_prim_value(acc_idx, partial_arg.value(partial_arg_idx)); + accs.set_prim_value(acc_idx, partial_value); continue; } if partial_value.partial_cmp(&accs.prim_value(acc_idx)) == Some(P::ORD) { diff --git a/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs b/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs index 68ff8d89..105fccb8 100644 --- a/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs +++ b/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs @@ -17,11 +17,7 @@ use std::{ fmt::{Debug, Formatter}, fs::File, io::{BufReader, Cursor, Read, Seek, SeekFrom}, - sync::{ - atomic::{AtomicUsize, Ordering::SeqCst}, - mpsc::Receiver, - Arc, - }, + sync::{mpsc::Receiver, Arc}, }; use arrow::{ @@ -50,7 +46,7 @@ use datafusion_ext_commons::{ }; use jni::objects::{GlobalRef, JObject}; use once_cell::sync::OnceCell; -use parking_lot::Mutex; +use tokio::task::JoinHandle; use crate::common::{ execution_context::ExecutionContext, ipc_compression::IpcCompressionReader, @@ -147,15 +143,23 @@ impl ExecutionPlan for IpcReaderExec { // spawn a blocking thread for reading ipcs and providing batches let blocks = jni_new_global_ref!(blocks_local.as_obj())?; - let rx = read_ipc_into_channel(blocks, exec_ctx.clone()); - Ok( - exec_ctx.output_with_sender("IpcReader", move |sender| async move { - while let Some(batch) = rx.recv().expect("receive error").transpose()? { - sender.send(batch).await; + let (rx, handle) = read_ipc_into_channel(blocks, exec_ctx.clone()); + let output = exec_ctx.output_with_sender("IpcReader", move |sender| async move { + loop { + match rx.recv() { + Ok(batch) => { + sender.send(batch).await; + } + Err(_disconnected) => { + drop(rx); + handle.await.expect("tokio error")?; + break; + } } - Ok(()) - }), - ) + } + Ok(()) + }); + Ok(output) } fn metrics(&self) -> Option { @@ -170,118 +174,94 @@ impl ExecutionPlan for IpcReaderExec { fn read_ipc_into_channel( blocks: GlobalRef, exec_ctx: Arc, -) -> Receiver>> { - let (tx, rx) = std::sync::mpsc::channel(); - tokio::task::spawn_blocking(move || { +) -> (Receiver, JoinHandle>) { + let (tx, rx) = std::sync::mpsc::sync_channel(1); + let handle = tokio::task::spawn_blocking(move || { let elapsed_compute = exec_ctx.baseline_metrics().elapsed_compute().clone(); let _timer = elapsed_compute.timer(); log::info!("start ipc reading"); let size_counter = exec_ctx.register_counter_metric("size"); let batch_size = batch_size(); - let staging_cols: Arc>>> = Arc::new(Mutex::new(vec![])); - let staging_num_rows = AtomicUsize::new(0); - let staging_mem_size = AtomicUsize::new(0); - - let provide_batches = || -> Result<()> { - while is_task_running() { - // get next block - let blocks = blocks.clone(); - if !jni_call!(ScalaIterator(blocks.as_obj()).hasNext() -> bool)? { - break; + let output_batch_mem_size = suggested_output_batch_mem_size(); + let mut staging_cols: Vec> = vec![]; + let mut staging_num_rows = 0; + let mut staging_mem_size = 0; + + while is_task_running() { + // get next block + let blocks = blocks.clone(); + if !jni_call!(ScalaIterator(blocks.as_obj()).hasNext() -> bool)? { + break; + } + let next_block = jni_new_global_ref!( + jni_call!(ScalaIterator(blocks.as_obj()).next() -> JObject)?.as_obj() + )?; + + // get ipc reader + let mut reader = match next_block { + b if jni_call!(BlazeBlockObject(b.as_obj()).hasFileSegment() -> bool)? => { + get_file_reader(b.as_obj())? } - let next_block = jni_new_global_ref!( - jni_call!(ScalaIterator(blocks.as_obj()).next() -> JObject)?.as_obj() - )?; - - // get ipc reader - let mut reader = Box::pin(match next_block { - b if jni_call!(BlazeBlockObject(b.as_obj()).hasFileSegment() -> bool)? => { - get_file_reader(b.as_obj())? - } - b if jni_call!(BlazeBlockObject(b.as_obj()).hasByteBuffer() -> bool)? => { - get_byte_buffer_reader(b.as_obj())? - } - b => get_channel_reader(b.as_obj())?, - }); - - while let Some((num_rows, cols)) = - reader.as_mut().read_batch(&exec_ctx.output_schema())? - { - let (cur_staging_num_rows, cur_staging_mem_size) = { - let staging_cols_cloned = staging_cols.clone(); - let mut staging_cols = staging_cols_cloned.lock(); - let mut cols_mem_size = 0; - staging_cols.resize_with(cols.len(), || vec![]); - for (col_idx, col) in cols.into_iter().enumerate() { - cols_mem_size += col.get_array_mem_size(); - staging_cols[col_idx].push(col); - } - drop(staging_cols); - staging_num_rows.fetch_add(num_rows, SeqCst); - staging_mem_size.fetch_add(cols_mem_size, SeqCst); - (staging_num_rows.load(SeqCst), staging_mem_size.load(SeqCst)) - }; - - if cur_staging_num_rows >= batch_size - || cur_staging_mem_size >= suggested_output_batch_mem_size() - { - let coalesced_cols = std::mem::take(&mut *staging_cols.clone().lock()) - .into_iter() - .map(|cols| coalesce_arrays_unchecked(cols[0].data_type(), &cols)) - .collect::>(); - let batch = RecordBatch::try_new_with_options( - exec_ctx.output_schema(), - coalesced_cols, - &RecordBatchOptions::new().with_row_count(Some(cur_staging_num_rows)), - )?; - staging_num_rows.store(0, SeqCst); - staging_mem_size.store(0, SeqCst); - size_counter.add(batch.get_array_mem_size()); - exec_ctx.baseline_metrics().record_output(batch.num_rows()); - - if elapsed_compute - .exclude_timer(|| tx.send(Some(Ok(batch)))) - .is_err() - { - break; - } + b if jni_call!(BlazeBlockObject(b.as_obj()).hasByteBuffer() -> bool)? => { + get_byte_buffer_reader(b.as_obj())? + } + b => get_channel_reader(b.as_obj())?, + }; + + while let Some((num_rows, cols)) = reader.read_batch(&exec_ctx.output_schema())? { + let mut cols_mem_size = 0; + staging_cols.resize_with(cols.len(), || vec![]); + for (col_idx, col) in cols.into_iter().enumerate() { + cols_mem_size += col.get_array_mem_size(); + staging_cols[col_idx].push(col); + } + staging_num_rows += num_rows; + staging_mem_size += cols_mem_size; + + if staging_num_rows >= batch_size || staging_mem_size >= output_batch_mem_size { + let coalesced_cols = std::mem::take(&mut staging_cols) + .into_iter() + .map(|cols| coalesce_arrays_unchecked(cols[0].data_type(), &cols)) + .collect::>(); + let batch = RecordBatch::try_new_with_options( + exec_ctx.output_schema(), + coalesced_cols, + &RecordBatchOptions::new().with_row_count(Some(staging_num_rows)), + )?; + staging_num_rows = 0; + staging_mem_size = 0; + size_counter.add(batch.get_array_mem_size()); + exec_ctx.baseline_metrics().record_output(batch.num_rows()); + if !elapsed_compute.exclude_timer(|| tx.send(batch)).is_ok() { + break; } } } + } - let cur_staging_num_rows = staging_num_rows.load(SeqCst); - if cur_staging_num_rows > 0 { - let coalesced_cols = std::mem::take(&mut *staging_cols.clone().lock()) - .into_iter() - .map(|cols| coalesce_arrays_unchecked(cols[0].data_type(), &cols)) - .collect::>(); - let batch = RecordBatch::try_new_with_options( - exec_ctx.output_schema(), - coalesced_cols, - &RecordBatchOptions::new().with_row_count(Some(cur_staging_num_rows)), - )?; - size_counter.add(batch.get_array_mem_size()); - exec_ctx.baseline_metrics().record_output(batch.num_rows()); - let _ = elapsed_compute.exclude_timer(|| tx.send(Some(Ok(batch)))); - } - let _ = elapsed_compute.exclude_timer(|| tx.send(None)); - Ok::<_, DataFusionError>(()) - }; - - if let Err(err) = provide_batches() { - elapsed_compute - .exclude_timer(|| tx.send(Some(Err(err)))) - .expect("send error"); + if staging_num_rows > 0 { + let coalesced_cols = staging_cols + .into_iter() + .map(|cols| coalesce_arrays_unchecked(cols[0].data_type(), &cols)) + .collect::>(); + let batch = RecordBatch::try_new_with_options( + exec_ctx.output_schema(), + coalesced_cols, + &RecordBatchOptions::new().with_row_count(Some(staging_num_rows)), + )?; + size_counter.add(batch.get_array_mem_size()); + exec_ctx.baseline_metrics().record_output(batch.num_rows()); + let _ = elapsed_compute.exclude_timer(|| tx.send(batch)); } + Ok::<_, DataFusionError>(()) }); - rx + (rx, handle) } fn get_channel_reader(block: JObject) -> Result>> { let channel_reader = ReadableByteChannelReader::try_new(block)?; - log::info!("start ipc channel reader"); Ok(IpcCompressionReader::new(Box::new( BufReader::with_capacity(65536, channel_reader), ))) diff --git a/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs b/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs index c31394aa..963d1d84 100644 --- a/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs +++ b/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs @@ -27,7 +27,6 @@ use arrow::{ }; use datafusion::{common::Result, physical_expr::PhysicalExprRef}; use datafusion_ext_commons::{ - algorithm::rdxsort::RadixSortIterExt, io::{read_len, read_raw_slice, write_len, write_raw_slice}, prefetch_read_data, spark_hash::create_hashes, @@ -117,7 +116,7 @@ impl Table { num_valid_items += 1; (idx as u32, hash) }) - .radix_sorted_unstable_by_key(|&(_idx, hash)| hash) + .sorted_unstable_by_key(|&(idx, hash)| (hash, idx)) .chunk_by(|(_, hash)| *hash) .into_iter() { @@ -403,7 +402,7 @@ impl JoinHashMap { } pub fn lookup_many(&self, hashes: Vec) -> Vec { - self.table.lookup_many(hashes) + tokio::task::block_in_place(|| self.table.lookup_many(hashes)) } pub fn get_range(&self, map_value: MapValue) -> &[u32] { diff --git a/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs b/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs index a9055bd7..16ae27f5 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs @@ -14,20 +14,26 @@ use std::io::Write; -use arrow::{array::ArrayRef, record_batch::RecordBatch}; +use arrow::record_batch::RecordBatch; use blaze_jni_bridge::{is_task_running, jni_call}; +use bytesize::ByteSize; use count_write::CountWrite; use datafusion::{ common::Result, physical_plan::{metrics::Time, Partitioning}, }; use datafusion_ext_commons::{ - algorithm::rdx_tournament_tree::{KeyForRadixTournamentTree, RadixTournamentTree}, - arrow::{array_size::ArraySize, coalesce::coalesce_arrays_unchecked, selection::take_batch}, - assume, compute_suggested_batch_size_for_output, df_execution_err, unchecked, + algorithm::{ + rdx_tournament_tree::{KeyForRadixTournamentTree, RadixTournamentTree}, + rdxsort::radix_sort_by_key, + }, + arrow::{ + array_size::ArraySize, + selection::{create_batch_interleaver, BatchInterleaver}, + }, + compute_suggested_batch_size_for_output, df_execution_err, }; use jni::objects::GlobalRef; -use unchecked_index::UncheckedIndex; use crate::{ common::{ipc_compression::IpcCompressionWriter, timer_helper::TimerHelper}, @@ -38,56 +44,98 @@ use crate::{ pub struct BufferedData { partition_id: usize, + partitioning: Partitioning, + staging_batches: Vec, + staging_num_rows: usize, + staging_mem_used: usize, sorted_batches: Vec, - sorted_parts: Vec>, + sorted_offsets: Vec>, num_rows: usize, - mem_used: usize, + sorted_mem_used: usize, sort_time: Time, } impl BufferedData { - pub fn new(partition_id: usize, sort_time: Time) -> Self { + pub fn new(partitioning: Partitioning, partition_id: usize, sort_time: Time) -> Self { Self { partition_id, + partitioning, + staging_batches: vec![], + staging_num_rows: 0, + staging_mem_used: 0, sorted_batches: vec![], - sorted_parts: vec![], + sorted_offsets: vec![], num_rows: 0, - mem_used: 0, + sorted_mem_used: 0, sort_time, } } pub fn drain(&mut self) -> Self { - std::mem::replace(self, Self::new(self.partition_id, self.sort_time.clone())) + std::mem::replace( + self, + Self::new( + self.partitioning.clone(), + self.partition_id, + self.sort_time.clone(), + ), + ) } - pub fn add_batch(&mut self, batch: RecordBatch, partitioning: &Partitioning) -> Result<()> { - let current_num_rows = self.num_rows; + pub fn add_batch(&mut self, batch: RecordBatch) -> Result<()> { self.num_rows += batch.num_rows(); - let (parts, sorted_batch) = self.sort_time.with_timer(|| { - sort_batch_by_partition_id(batch, partitioning, current_num_rows, self.partition_id) + self.staging_num_rows += batch.num_rows(); + self.staging_mem_used += batch.get_array_mem_size(); + self.staging_batches.push(batch); + + let suggested_batch_size = + compute_suggested_batch_size_for_output(self.staging_mem_used, self.staging_num_rows); + if self.staging_mem_used > suggested_batch_size { + self.flush_staging()?; + } + Ok(()) + } + + fn flush_staging(&mut self) -> Result<()> { + let sorted_num_rows = self.num_rows - self.staging_num_rows; + let staging_batches = std::mem::take(&mut self.staging_batches); + let (offsets, sorted_batch) = self.sort_time.with_timer(|| { + sort_batches_by_partition_id( + staging_batches, + &self.partitioning, + sorted_num_rows, + self.partition_id, + ) })?; - self.mem_used += - sorted_batch.get_array_mem_size() + parts.len() * size_of::(); + self.staging_num_rows = 0; + self.staging_mem_used = 0; + + self.sorted_mem_used += sorted_batch.get_array_mem_size() + offsets.len() * 4; self.sorted_batches.push(sorted_batch); - self.sorted_parts.push(parts); + self.sorted_offsets.push(offsets); Ok(()) } // write buffered data to spill/target file, returns uncompressed size and // offsets to each partition - pub fn write(self, mut w: W, partitioning: &Partitioning) -> Result> { - log::info!("draining all buffered data, total_mem={}", self.mem_used()); + pub fn write(mut self, mut w: W) -> Result> { + if !self.staging_batches.is_empty() { + self.flush_staging()?; + } + + let mem_used = ByteSize(self.mem_used() as u64); + log::info!("draining all buffered data, total_mem={mem_used}"); if self.num_rows == 0 { - return Ok(vec![0; partitioning.partition_count() + 1]); + return Ok(vec![0; self.partitioning.partition_count() + 1]); } + let num_partitions = self.partitioning.partition_count(); let mut writer = IpcCompressionWriter::new(CountWrite::from(&mut w)); let mut offsets = vec![]; let mut offset = 0; - let mut iter = self.into_sorted_batches(partitioning)?; + let mut iter = self.into_sorted_batches()?; - while (iter.cur_part_id() as usize) < partitioning.partition_count() { + while !iter.finished() { if !is_task_running() { df_execution_err!("task completed/killed")?; } @@ -98,39 +146,38 @@ impl BufferedData { // write all batches with this part id while iter.cur_part_id() == cur_part_id { - let (num_rows, cols) = iter.next_batch(); - writer.write_batch(num_rows, &cols)?; + let batch = iter.next_batch()?; + writer.write_batch(batch.num_rows(), batch.columns())?; } writer.finish_current_buf()?; offset = writer.inner().count(); offsets.push(offset); } - while offsets.len() <= partitioning.partition_count() { + while offsets.len() <= num_partitions { offsets.push(offset); // fill offsets of empty partitions } - let compressed_size = offsets.last().cloned().unwrap_or_default(); + let compressed_size = ByteSize(offsets.last().cloned().unwrap_or_default() as u64); log::info!("all buffered data drained, compressed_size={compressed_size}"); Ok(offsets) } // write buffered data to rss, returns uncompressed size - pub fn write_rss( - self, - rss_partition_writer: GlobalRef, - partitioning: &Partitioning, - ) -> Result<()> { + pub fn write_rss(mut self, rss_partition_writer: GlobalRef) -> Result<()> { + if !self.staging_batches.is_empty() { + self.flush_staging()?; + } + + let mem_used = ByteSize(self.mem_used() as u64); + log::info!("draining all buffered data to rss, total_mem={mem_used}"); + if self.num_rows == 0 { return Ok(()); } - log::info!( - "draining all buffered data to rss, total_mem={}", - self.mem_used() - ); - let mut iter = self.into_sorted_batches(partitioning)?; + let mut iter = self.into_sorted_batches()?; let mut writer = IpcCompressionWriter::new(RssWriter::new(rss_partition_writer.clone(), 0)); - while (iter.cur_part_id() as usize) < partitioning.partition_count() { + while !iter.finished() { if !is_task_running() { df_execution_err!("task completed/killed")?; } @@ -142,8 +189,8 @@ impl BufferedData { // write all batches with this part id while iter.cur_part_id() == cur_part_id { - let (num_rows, cols) = iter.next_batch(); - writer.write_batch(num_rows, &cols)?; + let batch = iter.next_batch()?; + writer.write_batch(batch.num_rows(), batch.columns())?; } writer.finish_current_buf()?; } @@ -152,48 +199,47 @@ impl BufferedData { Ok(()) } - fn into_sorted_batches( - self, - partitioning: &Partitioning, - ) -> Result { + fn into_sorted_batches(self) -> Result { let sub_batch_size = compute_suggested_batch_size_for_output(self.mem_used(), self.num_rows); Ok(PartitionedBatchesIterator { - batches: unchecked!(self.sorted_batches.clone()), + batch_interleaver: create_batch_interleaver(&self.sorted_batches, true)?, cursors: RadixTournamentTree::new( - self.sorted_parts + self.sorted_offsets .into_iter() .enumerate() - .map(|(idx, partition_indices)| { + .map(|(idx, offsets)| { let mut cur = PartCursor { idx, - parts: partition_indices, + offsets, parts_idx: 0, }; cur.skip_empty_parts(); cur }) .collect(), - partitioning.partition_count(), + self.partitioning.partition_count(), ), num_output_rows: 0, num_rows: self.num_rows, - num_cols: self.sorted_batches[0].num_columns(), batch_size: sub_batch_size, }) } pub fn mem_used(&self) -> usize { - self.mem_used + self.sorted_mem_used + self.staging_mem_used + } + + pub fn is_empty(&self) -> bool { + self.sorted_batches.is_empty() && self.staging_batches.is_empty() } } struct PartitionedBatchesIterator { - batches: UncheckedIndex>, + batch_interleaver: BatchInterleaver, cursors: RadixTournamentTree, num_output_rows: usize, num_rows: usize, - num_cols: usize, batch_size: usize, } @@ -202,121 +248,131 @@ impl PartitionedBatchesIterator { self.cursors.peek().rdx() as u32 } - fn next_batch(&mut self) -> (usize, Vec) { + pub fn finished(&self) -> bool { + self.num_output_rows >= self.num_rows + } + + pub fn next_batch(&mut self) -> Result { let cur_batch_size = self.batch_size.min(self.num_rows - self.num_output_rows); let cur_part_id = self.cur_part_id(); - let mut slices = vec![vec![]; self.num_cols]; - let mut slices_len = 0; + let mut indices = Vec::with_capacity(cur_batch_size); // add rows with same parition id under this cursor - while slices_len < cur_batch_size { + while indices.len() < cur_batch_size { let mut min_cursor = self.cursors.peek_mut(); if min_cursor.rdx() as u32 != cur_part_id { break; } - - let cur_part = min_cursor.parts[min_cursor.parts_idx]; - for i in 0..self.num_cols { - slices[i].push( - self.batches[min_cursor.idx] - .column(i) - .slice(cur_part.start as usize, cur_part.len as usize), - ); - } - slices_len += cur_part.len as usize; + let batch_idx = min_cursor.idx; + let min_offsets = &min_cursor.offsets; + let min_parts_idx = min_cursor.parts_idx; + let cur_offset_range = min_offsets[min_parts_idx]..min_offsets[min_parts_idx + 1]; + indices.extend(cur_offset_range.map(|offset| (batch_idx, offset as usize))); // forward to next non-empty partition min_cursor.parts_idx += 1; min_cursor.skip_empty_parts(); } - let output_slices = slices - .into_iter() - .map(|s| coalesce_arrays_unchecked(s[0].data_type(), &s)) - .collect::>(); - - self.num_output_rows += slices_len; - (slices_len, output_slices) + let batch_interleaver = &mut self.batch_interleaver; + let output_batch = batch_interleaver(&indices)?; + self.num_output_rows += output_batch.num_rows(); + Ok(output_batch) } } struct PartCursor { idx: usize, - parts: Vec, + offsets: Vec, parts_idx: usize, } impl PartCursor { fn skip_empty_parts(&mut self) { - while self.parts_idx < self.parts.len() && self.parts[self.parts_idx].len == 0 { - self.parts_idx += 1; + if self.parts_idx < self.num_partitions() { + if self.offsets[self.parts_idx + 1] == self.offsets[self.parts_idx] { + self.parts_idx += 1; + self.skip_empty_parts(); + } } } + + fn num_partitions(&self) -> usize { + self.offsets.len() - 1 + } } impl KeyForRadixTournamentTree for PartCursor { fn rdx(&self) -> usize { - self.parts_idx.min(self.parts.len()) + self.parts_idx } } -#[derive(Clone, Copy, Default)] -struct PartitionInBatch { - start: u32, - len: u32, -} - -fn sort_batch_by_partition_id( - batch: RecordBatch, +fn sort_batches_by_partition_id( + batches: Vec, partitioning: &Partitioning, current_num_rows: usize, partition_id: usize, -) -> Result<(Vec, RecordBatch)> { +) -> Result<(Vec, RecordBatch)> { let num_partitions = partitioning.partition_count(); - let num_rows = batch.num_rows(); - - let part_ids: Vec = match partitioning { - Partitioning::Hash(..) => { - // compute partition indices - let hashes = evaluate_hashes(partitioning, &batch) - .expect(&format!("error evaluating hashes with {partitioning}")); - evaluate_partition_ids(hashes, partitioning.partition_count()) - } - Partitioning::RoundRobinBatch(..) => { - let start_rows = - (partition_id * 1000193 + current_num_rows) % partitioning.partition_count(); - evaluate_robin_partition_ids(partitioning, &batch, start_rows) - } - _ => unreachable!("unsupported partitioning: {:?}", partitioning), - }; + let mut round_robin_start_rows = + (partition_id * 1000193 + current_num_rows) % partitioning.partition_count(); + + // compute partition indices + let mut partition_indices = batches + .iter() + .enumerate() + .flat_map(|(batch_idx, batch)| { + let part_ids: Vec; + + match partitioning { + Partitioning::Hash(..) => { + // compute partition indices + let hashes = evaluate_hashes(partitioning, &batch) + .expect(&format!("error evaluating hashes with {partitioning}")); + part_ids = evaluate_partition_ids(hashes, partitioning.partition_count()); + } + Partitioning::RoundRobinBatch(..) => { + part_ids = + evaluate_robin_partition_ids(partitioning, &batch, round_robin_start_rows); + round_robin_start_rows += batch.num_rows(); + round_robin_start_rows %= partitioning.partition_count(); + } + _ => unreachable!("unsupported partitioning: {:?}", partitioning), + }; + part_ids + .into_iter() + .enumerate() + .map(move |(row_idx, part_id)| (part_id, batch_idx as u32, row_idx as u32)) + }) + .collect::>(); - // compute partitions - let mut partitions = vec![PartitionInBatch::default(); num_partitions]; - let mut start = 0; + // sort + let mut part_counts = vec![0; num_partitions]; + radix_sort_by_key( + &mut partition_indices, + &mut part_counts, + |&(part_id, ..)| part_id as usize, + ); - for &part_id in &part_ids { - assume!((part_id as usize) < partitions.len()); - partitions[part_id as usize].len += 1; - } - for part in &mut partitions { - part.start = start; - start += part.len; + // compute partitions + let mut partition_offsets = Vec::with_capacity(num_partitions + 1); + let mut offset = 0; + for part_count in part_counts { + partition_offsets.push(offset); + offset += part_count as u32; } + partition_offsets.push(offset); - // bucket sort - let mut sorted_row_indices = vec![0; num_rows]; - let mut bucket_starts = partitions.iter().map(|part| part.start).collect::>(); - - for (row_idx, part_id) in part_ids.into_iter().enumerate() { - let start = bucket_starts[part_id as usize]; - - assume!((part_id as usize) < bucket_starts.len()); - assume!((start as usize) < sorted_row_indices.len()); - bucket_starts[part_id as usize] += 1; - sorted_row_indices[start as usize] = row_idx as u32; - } - let sorted_batch = take_batch(batch, sorted_row_indices)?; - return Ok((partitions, sorted_batch)); + // get sorted batch + let batches_interleaver = create_batch_interleaver(&batches, true)?; + let sorted_batch = batches_interleaver( + &partition_indices + .into_iter() + .map(|(_, batch_idx, row_idx)| (batch_idx as usize, row_idx as usize)) + .collect::>(), + )?; + return Ok((partition_offsets, sorted_batch)); } #[cfg(test)] @@ -328,13 +384,9 @@ mod test { datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, }; - use datafusion::{ - assert_batches_eq, - common::Result, - physical_expr::{expressions::Column, Partitioning, PhysicalExpr}, - }; + use datafusion::{assert_batches_eq, common::Result, physical_expr::Partitioning}; - use crate::shuffle::buffered_data::sort_batch_by_partition_id; + use super::*; fn build_table_i32( a: (&str, &Vec), @@ -359,28 +411,16 @@ mod test { } #[tokio::test] - async fn sort_partition_test() -> Result<()> { + async fn test_round_robin() -> Result<()> { let record_batch = build_table_i32( ("a", &vec![19, 18, 17, 16, 15, 14, 13, 12, 11, 10]), ("b", &vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), ("c", &vec![5, 6, 7, 8, 9, 0, 1, 2, 3, 4]), ); - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Int32, false), - Field::new("c", DataType::Int32, false), - ])); - - let partition_exprs_a: Vec> = vec![ - Arc::new(Column::new_with_schema("a", &schema).unwrap()), // Partition by column "a" - ]; - let round_robin_partitioning = Partitioning::RoundRobinBatch(4); - let hash_partitioning_a = Partitioning::Hash(partition_exprs_a, 4); - - let (parts, sorted_batch) = - sort_batch_by_partition_id(record_batch, &round_robin_partitioning, 3, 0)?; + let (_parts, sorted_batch) = + sort_batches_by_partition_id(vec![record_batch], &round_robin_partitioning, 3, 0)?; let expected = vec![ "+----+---+---+", diff --git a/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs index 78eb4a45..1bbcb2a6 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs @@ -20,7 +20,7 @@ use datafusion::{ common::Result, physical_plan::{metrics::Time, Partitioning}, }; -use datafusion_ext_commons::{arrow::array_size::ArraySize, df_execution_err}; +use datafusion_ext_commons::arrow::array_size::ArraySize; use futures::lock::Mutex; use jni::objects::GlobalRef; @@ -33,7 +33,6 @@ pub struct RssSortShuffleRepartitioner { name: String, mem_consumer_info: Option>, data: Mutex, - partitioning: Partitioning, rss: GlobalRef, } @@ -47,8 +46,7 @@ impl RssSortShuffleRepartitioner { Self { name: format!("RssSortShufflePartitioner[partition={}]", partition_id), mem_consumer_info: None, - data: Mutex::new(BufferedData::new(partition_id, sort_time)), - partitioning, + data: Mutex::new(BufferedData::new(partitioning, partition_id, sort_time)), rss: rss_partition_writer, } } @@ -73,11 +71,10 @@ impl MemConsumer for RssSortShuffleRepartitioner { async fn spill(&self) -> Result<()> { let data = self.data.lock().await.drain(); let rss = self.rss.clone(); - let partitioning = self.partitioning.clone(); - tokio::task::spawn_blocking(move || data.write_rss(rss, &partitioning)) + tokio::task::spawn_blocking(move || data.write_rss(rss)) .await - .or_else(|err| df_execution_err!("{err}"))??; + .expect("tokio error")?; self.update_mem_used(0).await?; Ok(()) } @@ -99,7 +96,7 @@ impl ShuffleRepartitioner for RssSortShuffleRepartitioner { // add batch to buffered data let mem_used = { let mut data = self.data.lock().await; - data.add_batch(input, &self.partitioning)?; + data.add_batch(input)?; data.mem_used() }; self.update_mem_used(mem_used).await?; @@ -115,7 +112,7 @@ impl ShuffleRepartitioner for RssSortShuffleRepartitioner { async fn shuffle_write(&self) -> Result<()> { self.set_spillable(false); - let has_data = self.data.lock().await.mem_used() > 0; + let has_data = !self.data.lock().await.is_empty(); if has_data { self.spill().await?; } diff --git a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs index f8eeb998..1366c852 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs @@ -49,7 +49,6 @@ pub struct SortShuffleRepartitioner { output_index_file: String, data: Mutex, spills: Mutex>, - partitioning: Partitioning, num_output_partitions: usize, output_io_time: Time, } @@ -71,9 +70,8 @@ impl SortShuffleRepartitioner { mem_consumer_info: None, output_data_file, output_index_file, - data: Mutex::new(BufferedData::new(partition_id, sort_time)), + data: Mutex::new(BufferedData::new(partitioning, partition_id, sort_time)), spills: Mutex::default(), - partitioning, num_output_partitions, output_io_time, } @@ -100,7 +98,7 @@ impl MemConsumer for SortShuffleRepartitioner { let data = self.data.lock().await.drain(); let mut spill = try_new_spill(self.exec_ctx.spill_metrics())?; - let offsets = data.write(spill.get_buf_writer(), &self.partitioning)?; + let offsets = data.write(spill.get_buf_writer())?; self.spills .lock() .await @@ -126,7 +124,7 @@ impl ShuffleRepartitioner for SortShuffleRepartitioner { // add batch to buffered data let mem_used = { let mut data = self.data.lock().await; - data.add_batch(input, &self.partitioning)?; + data.add_batch(input)?; data.mem_used() }; self.update_mem_used(mem_used).await?; @@ -163,7 +161,6 @@ impl ShuffleRepartitioner for SortShuffleRepartitioner { // no spills - directly write current batches into final file if spills.is_empty() { - let partitioning = self.partitioning.clone(); let output_io_time = self.output_io_time.clone(); tokio::task::spawn_blocking(move || { let mut output_data = output_io_time.wrap_writer( @@ -182,7 +179,7 @@ impl ShuffleRepartitioner for SortShuffleRepartitioner { ); // write data file - let offsets = data.write(&mut output_data, &partitioning)?; + let offsets = data.write(&mut output_data)?; // write index file let mut offsets_data = vec![]; @@ -221,10 +218,10 @@ impl ShuffleRepartitioner for SortShuffleRepartitioner { } // write rest data into an in-memory buffer - if data.mem_used() > 0 { + if !data.is_empty() { let mut spill = Box::new(vec![]); let writer = spill.get_buf_writer(); - let offsets = data.write(writer, &self.partitioning)?; + let offsets = data.write(writer)?; self.update_mem_used(spill.len()).await?; spills.push(ShuffleSpill { spill, offsets }); } diff --git a/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java b/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java index a78f60bc..a376e25a 100644 --- a/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java +++ b/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java @@ -27,11 +27,6 @@ public enum BlazeConf { /// actual off-heap memory usage is expected to be spark.executor.memoryOverhead * fraction. MEMORY_FRACTION("spark.blaze.memoryFraction", 0.6), - /// number of worker threads used in tokio runtime, 0 to use default available parallism value. - /// for cpus those support hyperthreading, it is recommended to set this value to the number - /// of available physical cores. - TOKIO_NUM_WORKER_THREADS("spark.blaze.tokio.num.worker.threads", 1), - /// enable converting upper/lower functions to native, special cases may provide different /// outputs from spark due to different unicode versions. CASE_CONVERT_FUNCTIONS_ENABLE("spark.blaze.enable.caseconvert.functions", true),