From cb9068ce56a6870b84c4878db0255048e7df9e97 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 7 Jun 2024 21:16:24 +0800 Subject: [PATCH] build(deps): update Arrow/Parquet to `52.0`, object-store to `0.10` (#10765) * fix compile on default feature config Signed-off-by: Ruihang Xia * fix test of common, functions, optimizer and physical-expr Signed-off-by: Ruihang Xia * fix other tests Signed-off-by: Ruihang Xia * fix one last test Signed-off-by: Ruihang Xia * fix clippy warnings Signed-off-by: Ruihang Xia * fix datafusion-cli Signed-off-by: Ruihang Xia * switch to git deps Signed-off-by: Ruihang Xia * regen proto file Signed-off-by: Ruihang Xia * fix pyo3 feature Signed-off-by: Ruihang Xia * fix slt Signed-off-by: Ruihang Xia * fix symmetric hash join cases Signed-off-by: Ruihang Xia * update integration result Signed-off-by: Ruihang Xia * fix up spill test Signed-off-by: Ruihang Xia * shift to the released packages Signed-off-by: Ruihang Xia * Update cargo.lock * Update datafusion/optimizer/src/analyzer/type_coercion.rs Co-authored-by: Andrew Lamb * update document Signed-off-by: Ruihang Xia * move memory limit to parameter pos Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia Co-authored-by: Andrew Lamb --- Cargo.toml | 34 +- datafusion-cli/Cargo.lock | 374 +++++++++++------- datafusion-cli/Cargo.toml | 6 +- datafusion-examples/examples/json_opener.rs | 2 +- datafusion/common/Cargo.toml | 2 +- datafusion/common/src/hash_utils.rs | 4 +- datafusion/common/src/pyarrow.rs | 37 +- datafusion/common/src/scalar/mod.rs | 65 ++- datafusion/core/Cargo.toml | 1 + datafusion/core/benches/sql_query_with_io.rs | 2 +- .../core/src/datasource/file_format/mod.rs | 22 +- .../src/datasource/file_format/parquet.rs | 20 +- .../core/src/datasource/physical_plan/csv.rs | 2 +- .../physical_plan/parquet/row_groups.rs | 2 +- .../core/tests/expr_api/simplification.rs | 8 +- .../core/tests/parquet/custom_reader.rs | 2 +- datafusion/core/tests/path_partition.rs | 22 +- datafusion/expr/Cargo.toml | 1 + datafusion/expr/src/interval_arithmetic.rs | 51 ++- datafusion/functions-array/src/range.rs | 12 +- datafusion/functions/Cargo.toml | 1 + datafusion/functions/src/datetime/date_bin.rs | 67 +++- .../functions/src/datetime/to_timestamp.rs | 2 +- datafusion/optimizer/Cargo.toml | 2 + .../optimizer/src/analyzer/type_coercion.rs | 6 +- .../physical-expr/src/expressions/in_list.rs | 2 + .../physical-expr/src/intervals/cp_solver.rs | 51 ++- .../physical-expr/src/intervals/utils.rs | 29 +- .../src/aggregates/group_values/primitive.rs | 2 + .../physical-plan/src/aggregates/mod.rs | 22 +- .../src/aggregates/topk/hash_table.rs | 2 + .../physical-plan/src/aggregates/topk/heap.rs | 2 + .../src/joins/symmetric_hash_join.rs | 12 +- .../physical-plan/src/joins/test_utils.rs | 8 +- .../proto/datafusion_common.proto | 7 +- datafusion/proto-common/src/from_proto/mod.rs | 8 +- .../proto-common/src/generated/pbjson.rs | 140 ++++++- .../proto-common/src/generated/prost.rs | 14 +- datafusion/proto-common/src/to_proto/mod.rs | 23 +- .../src/generated/datafusion_proto_common.rs | 14 +- datafusion/sql/tests/sql_integration.rs | 13 +- .../sqllogictest/test_files/arrow_typeof.slt | 2 +- datafusion/sqllogictest/test_files/expr.slt | 2 +- .../sqllogictest/test_files/group_by.slt | 8 +- datafusion/sqllogictest/test_files/order.slt | 4 +- .../sqllogictest/test_files/set_variable.slt | 8 +- .../sqllogictest/test_files/timestamps.slt | 2 +- datafusion/substrait/Cargo.toml | 1 + .../substrait/src/logical_plan/consumer.rs | 23 +- .../substrait/src/logical_plan/producer.rs | 5 +- 50 files changed, 777 insertions(+), 374 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ccd54d7d2538..83faf4b6a8f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,20 +64,28 @@ version = "38.0.0" ahash = { version = "0.8", default-features = false, features = [ "runtime-rng", ] } -arrow = { version = "51.0.0", features = ["prettyprint"] } -arrow-array = { version = "51.0.0", default-features = false, features = ["chrono-tz"] } -arrow-buffer = { version = "51.0.0", default-features = false } -arrow-flight = { version = "51.0.0", features = ["flight-sql-experimental"] } -arrow-ipc = { version = "51.0.0", default-features = false, features = ["lz4"] } -arrow-ord = { version = "51.0.0", default-features = false } -arrow-schema = { version = "51.0.0", default-features = false } -arrow-string = { version = "51.0.0", default-features = false } +arrow = { version = "52.0.0", features = [ + "prettyprint", +] } +arrow-array = { version = "52.0.0", default-features = false, features = [ + "chrono-tz", +] } +arrow-buffer = { version = "52.0.0", default-features = false } +arrow-flight = { version = "52.0.0", features = [ + "flight-sql-experimental", +] } +arrow-ipc = { version = "52.0.0", default-features = false, features = [ + "lz4", +] } +arrow-ord = { version = "52.0.0", default-features = false } +arrow-schema = { version = "52.0.0", default-features = false } +arrow-string = { version = "52.0.0", default-features = false } async-trait = "0.1.73" bigdecimal = "=0.4.1" bytes = "1.4" chrono = { version = "0.4.34", default-features = false } ctor = "0.2.0" -dashmap = "5.4.0" +dashmap = "5.5.0" datafusion = { path = "datafusion/core", version = "38.0.0", default-features = false } datafusion-common = { path = "datafusion/common", version = "38.0.0", default-features = false } datafusion-common-runtime = { path = "datafusion/common-runtime", version = "38.0.0" } @@ -104,9 +112,13 @@ indexmap = "2.0.0" itertools = "0.12" log = "^0.4" num_cpus = "1.13.0" -object_store = { version = "0.9.1", default-features = false } +object_store = { version = "0.10.1", default-features = false } parking_lot = "0.12" -parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] } +parquet = { version = "52.0.0", default-features = false, features = [ + "arrow", + "async", + "object_store", +] } rand = "0.8" regex = "1.8" rstest = "0.21.0" diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index c4a447d133a3..04ced84d9950 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -130,9 +130,9 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "arrow" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "219d05930b81663fd3b32e3bde8ce5bff3c4d23052a99f11a8fa50a3b47b2658" +checksum = "7ae9728f104939be6d8d9b368a354b4929b0569160ea1641f0721b55a861ce38" dependencies = [ "arrow-arith", "arrow-array", @@ -151,9 +151,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0272150200c07a86a390be651abdd320a2d12e84535f0837566ca87ecd8f95e0" +checksum = "a7029a5b3efbeafbf4a12d12dc16b8f9e9bff20a410b8c25c5d28acc089e1043" dependencies = [ "arrow-array", "arrow-buffer", @@ -166,9 +166,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8010572cf8c745e242d1b632bd97bd6d4f40fefed5ed1290a8f433abaa686fea" +checksum = "d33238427c60271710695f17742f45b1a5dc5bcfc5c15331c25ddfe7abf70d97" dependencies = [ "ahash", "arrow-buffer", @@ -183,9 +183,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d0a2432f0cba5692bf4cb757469c66791394bac9ec7ce63c1afe74744c37b27" +checksum = "fe9b95e825ae838efaf77e366c00d3fc8cca78134c9db497d6bda425f2e7b7c1" dependencies = [ "bytes", "half", @@ -194,9 +194,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9abc10cd7995e83505cc290df9384d6e5412b207b79ce6bdff89a10505ed2cba" +checksum = "87cf8385a9d5b5fcde771661dd07652b79b9139fea66193eda6a88664400ccab" dependencies = [ "arrow-array", "arrow-buffer", @@ -215,9 +215,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95cbcba196b862270bf2a5edb75927380a7f3a163622c61d40cbba416a6305f2" +checksum = "cea5068bef430a86690059665e40034625ec323ffa4dd21972048eebb0127adc" dependencies = [ "arrow-array", "arrow-buffer", @@ -234,9 +234,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2742ac1f6650696ab08c88f6dd3f0eb68ce10f8c253958a18c943a68cd04aec5" +checksum = "cb29be98f987bcf217b070512bb7afba2f65180858bca462edf4a39d84a23e10" dependencies = [ "arrow-buffer", "arrow-schema", @@ -246,9 +246,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a42ea853130f7e78b9b9d178cb4cd01dee0f78e64d96c2949dc0a915d6d9e19d" +checksum = "ffc68f6523970aa6f7ce1dc9a33a7d9284cfb9af77d4ad3e617dbe5d79cc6ec8" dependencies = [ "arrow-array", "arrow-buffer", @@ -261,9 +261,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eaafb5714d4e59feae964714d724f880511500e3569cc2a94d02456b403a2a49" +checksum = "2041380f94bd6437ab648e6c2085a045e45a0c44f91a1b9a4fe3fed3d379bfb1" dependencies = [ "arrow-array", "arrow-buffer", @@ -281,9 +281,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3e6b61e3dc468f503181dccc2fc705bdcc5f2f146755fa5b56d0a6c5943f412" +checksum = "fcb56ed1547004e12203652f12fe12e824161ff9d1e5cf2a7dc4ff02ba94f413" dependencies = [ "arrow-array", "arrow-buffer", @@ -296,9 +296,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "848ee52bb92eb459b811fb471175ea3afcf620157674c8794f539838920f9228" +checksum = "575b42f1fc588f2da6977b94a5ca565459f5ab07b60545e17243fb9a7ed6d43e" dependencies = [ "ahash", "arrow-array", @@ -311,15 +311,15 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02d9483aaabe910c4781153ae1b6ae0393f72d9ef757d38d09d450070cf2e528" +checksum = "32aae6a60458a2389c0da89c9de0b7932427776127da1a738e2efc21d32f3393" [[package]] name = "arrow-select" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "849524fa70e0e3c5ab58394c770cb8f514d0122d20de08475f7b472ed8075830" +checksum = "de36abaef8767b4220d7b4a8c2fe5ffc78b47db81b03d77e2136091c3ba39102" dependencies = [ "ahash", "arrow-array", @@ -331,9 +331,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9373cb5a021aee58863498c37eb484998ef13377f69989c6c5ccfbd258236cdb" +checksum = "e435ada8409bcafc910bc3e0077f532a4daa20e99060a496685c0e3e53cc2597" dependencies = [ "arrow-array", "arrow-buffer", @@ -375,8 +375,8 @@ dependencies = [ "pin-project-lite", "tokio", "xz2", - "zstd 0.13.1", - "zstd-safe 7.1.0", + "zstd 0.13.0", + "zstd-safe 7.0.0", ] [[package]] @@ -399,6 +399,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "atty" version = "0.2.14" @@ -436,8 +442,8 @@ dependencies = [ "bytes", "fastrand 1.9.0", "hex", - "http", - "hyper", + "http 0.2.12", + "hyper 0.14.29", "ring 0.16.20", "time", "tokio", @@ -469,7 +475,7 @@ dependencies = [ "aws-smithy-http", "aws-smithy-types", "aws-types", - "http", + "http 0.2.12", "regex", "tracing", ] @@ -485,8 +491,8 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.6", "lazy_static", "percent-encoding", "pin-project-lite", @@ -511,7 +517,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "http", + "http 0.2.12", "regex", "tokio-stream", "tower", @@ -538,7 +544,7 @@ dependencies = [ "aws-smithy-xml", "aws-types", "bytes", - "http", + "http 0.2.12", "regex", "tower", "tracing", @@ -554,7 +560,7 @@ dependencies = [ "aws-sigv4", "aws-smithy-http", "aws-types", - "http", + "http 0.2.12", "tracing", ] @@ -568,7 +574,7 @@ dependencies = [ "form_urlencoded", "hex", "hmac", - "http", + "http 0.2.12", "once_cell", "percent-encoding", "regex", @@ -601,9 +607,9 @@ dependencies = [ "aws-smithy-types", "bytes", "fastrand 1.9.0", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.29", "hyper-rustls 0.23.2", "lazy_static", "pin-project-lite", @@ -623,9 +629,9 @@ dependencies = [ "bytes", "bytes-utils", "futures-core", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.29", "once_cell", "percent-encoding", "pin-project-lite", @@ -642,8 +648,8 @@ dependencies = [ "aws-smithy-http", "aws-smithy-types", "bytes", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.6", "pin-project-lite", "tower", "tracing", @@ -701,7 +707,7 @@ dependencies = [ "aws-smithy-client", "aws-smithy-http", "aws-smithy-types", - "http", + "http 0.2.12", "rustc_version", "tracing", ] @@ -788,9 +794,9 @@ dependencies = [ [[package]] name = "brotli" -version = "3.5.0" +version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d640d25bc63c50fb1f0b545ffd80207d2e10a4c965530809b40ba3386825c391" +checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", @@ -799,9 +805,9 @@ dependencies = [ [[package]] name = "brotli-decompressor" -version = "2.5.1" +version = "4.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e2e4afe60d7dd600fdd3de8d0f08c2b7ec039712e3b6137ff98b7004e82de4f" +checksum = "9a45bd2e4095a8b518033b128020dd4a55aab1c0a381ba4404a472630f4bc362" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", @@ -899,9 +905,9 @@ dependencies = [ [[package]] name = "chrono-tz" -version = "0.8.6" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d59ae0466b83e838b81a54256c39d5d7c20b9d7daa10510a242d9b75abd5936e" +checksum = "93698b29de5e97ad0ae26447b344c482a7284c737d9ddc5f9e52b74a336671bb" dependencies = [ "chrono", "chrono-tz-build", @@ -910,9 +916,9 @@ dependencies = [ [[package]] name = "chrono-tz-build" -version = "0.2.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "433e39f13c9a060046954e0592a8d0a4bcb1040125cbf91cb8ee58964cfb350f" +checksum = "0c088aee841df9c3041febbb73934cfc39708749bf96dc827e3359cd39ef11b1" dependencies = [ "parse-zoneinfo", "phf", @@ -1166,7 +1172,7 @@ dependencies = [ "url", "uuid", "xz2", - "zstd 0.13.1", + "zstd 0.13.0", ] [[package]] @@ -1250,6 +1256,7 @@ dependencies = [ "ahash", "arrow", "arrow-array", + "arrow-buffer", "chrono", "datafusion-common", "paste", @@ -1500,15 +1507,6 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dca9240753cf90908d7e4aac30f630662b02aebaa1b58a3cadabdb23385b58b" -[[package]] -name = "encoding_rs" -version = "0.8.34" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" -dependencies = [ - "cfg-if", -] - [[package]] name = "endian-type" version = "0.1.2" @@ -1588,9 +1586,9 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" [[package]] name = "flatbuffers" -version = "23.5.26" +version = "24.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dac53e22462d78c16d64a1cd22371b54cc3fe94aa15e7886a2fa6e5d1ab8640" +checksum = "8add37afff2d4ffa83bc748a70b4b1370984f6980768554182424ef71447c35f" dependencies = [ "bitflags 1.3.2", "rustc_version", @@ -1769,7 +1767,26 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", + "indexmap 2.2.6", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "h2" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.1.0", "indexmap 2.2.6", "slab", "tokio", @@ -1857,6 +1874,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -1864,7 +1892,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.12", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" +dependencies = [ + "bytes", + "futures-core", + "http 1.1.0", + "http-body 1.0.0", "pin-project-lite", ] @@ -1896,9 +1947,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", - "http", - "http-body", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -1910,33 +1961,76 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.5", + "http 1.1.0", + "http-body 1.0.0", + "httparse", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + [[package]] name = "hyper-rustls" version = "0.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c" dependencies = [ - "http", - "hyper", + "http 0.2.12", + "hyper 0.14.29", "log", "rustls 0.20.9", - "rustls-native-certs", + "rustls-native-certs 0.6.3", "tokio", "tokio-rustls 0.23.4", ] [[package]] name = "hyper-rustls" -version = "0.24.2" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" dependencies = [ "futures-util", - "http", - "hyper", - "rustls 0.21.12", + "http 1.1.0", + "hyper 1.3.1", + "hyper-util", + "rustls 0.22.4", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.25.0", + "tower-service", +] + +[[package]] +name = "hyper-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "hyper 1.3.1", + "pin-project-lite", + "socket2", "tokio", - "tokio-rustls 0.24.1", + "tower", + "tower-service", + "tracing", ] [[package]] @@ -2395,17 +2489,17 @@ dependencies = [ [[package]] name = "object_store" -version = "0.9.1" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8718f8b65fdf67a45108d1548347d4af7d71fb81ce727bbf9e3b2535e079db3" +checksum = "fbebfd32c213ba1907fa7a9c9138015a8de2b43e30c5aa45b18f7deb46786ad6" dependencies = [ "async-trait", - "base64 0.21.7", + "base64 0.22.1", "bytes", "chrono", "futures", "humantime", - "hyper", + "hyper 1.3.1", "itertools", "md-5", "parking_lot", @@ -2482,9 +2576,9 @@ dependencies = [ [[package]] name = "parquet" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "096795d4f47f65fd3ee1ec5a98b77ab26d602f2cc785b0e4be5443add17ecc32" +checksum = "29c3b5322cc1bbf67f11c079c42be41a55949099b78732f7dba9e15edde40eab" dependencies = [ "ahash", "arrow-array", @@ -2512,7 +2606,8 @@ dependencies = [ "thrift", "tokio", "twox-hash", - "zstd 0.13.1", + "zstd 0.13.0", + "zstd-sys", ] [[package]] @@ -2819,20 +2914,21 @@ checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" [[package]] name = "reqwest" -version = "0.11.27" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" +checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10" dependencies = [ - "base64 0.21.7", + "base64 0.22.1", "bytes", - "encoding_rs", "futures-core", "futures-util", - "h2", - "http", - "http-body", - "hyper", - "hyper-rustls 0.24.2", + "h2 0.4.5", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.3.1", + "hyper-rustls 0.26.0", + "hyper-util", "ipnet", "js-sys", "log", @@ -2840,16 +2936,16 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.21.12", - "rustls-native-certs", - "rustls-pemfile 1.0.4", + "rustls 0.22.4", + "rustls-native-certs 0.7.0", + "rustls-pemfile 2.1.2", + "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", "sync_wrapper", - "system-configuration", "tokio", - "tokio-rustls 0.24.1", + "tokio-rustls 0.25.0", "tokio-util", "tower-service", "url", @@ -2964,14 +3060,16 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.12" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" dependencies = [ "log", "ring 0.17.8", + "rustls-pki-types", "rustls-webpki", - "sct", + "subtle", + "zeroize", ] [[package]] @@ -2986,6 +3084,19 @@ dependencies = [ "security-framework", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.1.2", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "1.0.4" @@ -3013,11 +3124,12 @@ checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" [[package]] name = "rustls-webpki" -version = "0.101.7" +version = "0.102.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +checksum = "ff448f7e92e913c4b7d4c6d8e4540a1724b319b4152b8aef6d4cf8339712b33e" dependencies = [ "ring 0.17.8", + "rustls-pki-types", "untrusted 0.9.0", ] @@ -3373,27 +3485,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" -[[package]] -name = "system-configuration" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" -dependencies = [ - "bitflags 1.3.2", - "core-foundation", - "system-configuration-sys", -] - -[[package]] -name = "system-configuration-sys" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "tempfile" version = "3.10.1" @@ -3555,11 +3646,12 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.24.1" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" dependencies = [ - "rustls 0.21.12", + "rustls 0.22.4", + "rustls-pki-types", "tokio", ] @@ -4093,9 +4185,9 @@ checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" [[package]] name = "winreg" -version = "0.50.0" +version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +checksum = "a277a57398d4bfa075df44f501a17cfdf8542d224f0d36095a2adc7aee4ef0a5" dependencies = [ "cfg-if", "windows-sys 0.48.0", @@ -4153,11 +4245,11 @@ dependencies = [ [[package]] name = "zstd" -version = "0.13.1" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d789b1514203a1120ad2429eae43a7bd32b90976a7bb8a05f7ec02fa88cc23a" +checksum = "bffb3309596d527cfcba7dfc6ed6052f1d39dfbd7c867aa2e865e4a449c10110" dependencies = [ - "zstd-safe 7.1.0", + "zstd-safe 7.0.0", ] [[package]] @@ -4172,18 +4264,18 @@ dependencies = [ [[package]] name = "zstd-safe" -version = "7.1.0" +version = "7.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cd99b45c6bc03a018c8b8a86025678c87e55526064e38f9df301989dce7ec0a" +checksum = "43747c7422e2924c11144d5229878b98180ef8b06cca4ab5af37afc8a8d8ea3e" dependencies = [ "zstd-sys", ] [[package]] name = "zstd-sys" -version = "2.0.10+zstd.1.5.6" +version = "2.0.9+zstd.1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c253a4914af5bafc8fa8c86ee400827e83cf6ec01195ec1f1ed8441bf00d65aa" +checksum = "9e16efa8a874a0481a574084d34cc26fdb3b99627480f785888deb6386506656" dependencies = [ "cc", "pkg-config", diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 4e3d800cfe97..5578d7fe5839 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -30,7 +30,7 @@ rust-version = "1.73" readme = "README.md" [dependencies] -arrow = "51.0.0" +arrow = { version = "52.0.0" } async-trait = "0.1.41" aws-config = "0.55" aws-credential-types = "0.55" @@ -49,9 +49,9 @@ dirs = "4.0.0" env_logger = "0.9" futures = "0.3" mimalloc = { version = "0.1", default-features = false } -object_store = { version = "0.9.0", features = ["aws", "gcp", "http"] } +object_store = { version = "0.10.1", features = ["aws", "gcp", "http"] } parking_lot = { version = "0.12" } -parquet = { version = "51.0.0", default-features = false } +parquet = { version = "52.0.0", default-features = false } regex = "1.8" rustyline = "11.0" tokio = { version = "1.24", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot", "signal"] } diff --git a/datafusion-examples/examples/json_opener.rs b/datafusion-examples/examples/json_opener.rs index e32fb9b09630..7bc431c5c5ee 100644 --- a/datafusion-examples/examples/json_opener.rs +++ b/datafusion-examples/examples/json_opener.rs @@ -44,7 +44,7 @@ async fn main() -> Result<()> { {"num":2,"str":"hello"} {"num":4,"str":"foo"}"#, ); - object_store.put(&path, data).await.unwrap(); + object_store.put(&path, data.into()).await.unwrap(); let schema = Arc::new(Schema::new(vec![ Field::new("num", DataType::Int64, false), diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 7085732b562e..62ea85a4a33d 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -59,7 +59,7 @@ libc = "0.2.140" num_cpus = { workspace = true } object_store = { workspace = true, optional = true } parquet = { workspace = true, optional = true, default-features = true } -pyo3 = { version = "0.20.0", optional = true } +pyo3 = { version = "0.21.0", optional = true } sqlparser = { workspace = true } [target.'cfg(target_family = "wasm")'.dependencies] diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 9819fc7b344d..7eecdec8abef 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -24,6 +24,8 @@ use arrow::array::*; use arrow::datatypes::*; use arrow::row::Rows; use arrow::{downcast_dictionary_array, downcast_primitive_array}; +use arrow_buffer::IntervalDayTime; +use arrow_buffer::IntervalMonthDayNano; use crate::cast::{ as_boolean_array, as_fixed_size_list_array, as_generic_binary_array, @@ -72,7 +74,7 @@ macro_rules! hash_value { }; } hash_value!(i8, i16, i32, i64, i128, i256, u8, u16, u32, u64); -hash_value!(bool, str, [u8]); +hash_value!(bool, str, [u8], IntervalDayTime, IntervalMonthDayNano); macro_rules! hash_float_value { ($(($t:ty, $i:ty)),+) => { diff --git a/datafusion/common/src/pyarrow.rs b/datafusion/common/src/pyarrow.rs index f4356477532f..87254a499fb1 100644 --- a/datafusion/common/src/pyarrow.rs +++ b/datafusion/common/src/pyarrow.rs @@ -22,8 +22,8 @@ use arrow::pyarrow::{FromPyArrow, ToPyArrow}; use arrow_array::Array; use pyo3::exceptions::PyException; use pyo3::prelude::PyErr; -use pyo3::types::PyList; -use pyo3::{FromPyObject, IntoPy, PyAny, PyObject, PyResult, Python}; +use pyo3::types::{PyAnyMethods, PyList}; +use pyo3::{Bound, FromPyObject, IntoPy, PyAny, PyObject, PyResult, Python}; use crate::{DataFusionError, ScalarValue}; @@ -34,18 +34,18 @@ impl From for PyErr { } impl FromPyArrow for ScalarValue { - fn from_pyarrow(value: &PyAny) -> PyResult { + fn from_pyarrow_bound(value: &pyo3::Bound<'_, pyo3::PyAny>) -> PyResult { let py = value.py(); let typ = value.getattr("type")?; let val = value.call_method0("as_py")?; // construct pyarrow array from the python value and pyarrow type - let factory = py.import("pyarrow")?.getattr("array")?; - let args = PyList::new(py, [val]); + let factory = py.import_bound("pyarrow")?.getattr("array")?; + let args = PyList::new_bound(py, [val]); let array = factory.call1((args, typ))?; // convert the pyarrow array to rust array using C data interface - let array = arrow::array::make_array(ArrayData::from_pyarrow(array)?); + let array = arrow::array::make_array(ArrayData::from_pyarrow_bound(&array)?); let scalar = ScalarValue::try_from_array(&array, 0)?; Ok(scalar) @@ -64,8 +64,8 @@ impl ToPyArrow for ScalarValue { } impl<'source> FromPyObject<'source> for ScalarValue { - fn extract(value: &'source PyAny) -> PyResult { - Self::from_pyarrow(value) + fn extract_bound(value: &Bound<'source, PyAny>) -> PyResult { + Self::from_pyarrow_bound(value) } } @@ -86,19 +86,19 @@ mod tests { fn init_python() { prepare_freethreaded_python(); Python::with_gil(|py| { - if py.run("import pyarrow", None, None).is_err() { - let locals = PyDict::new(py); - py.run( + if py.run_bound("import pyarrow", None, None).is_err() { + let locals = PyDict::new_bound(py); + py.run_bound( "import sys; executable = sys.executable; python_path = sys.path", None, - Some(locals), + Some(&locals), ) .expect("Couldn't get python info"); - let executable = locals.get_item("executable").unwrap().unwrap(); + let executable = locals.get_item("executable").unwrap(); let executable: String = executable.extract().unwrap(); - let python_path = locals.get_item("python_path").unwrap().unwrap(); - let python_path: Vec<&str> = python_path.extract().unwrap(); + let python_path = locals.get_item("python_path").unwrap(); + let python_path: Vec = python_path.extract().unwrap(); panic!("pyarrow not found\nExecutable: {executable}\nPython path: {python_path:?}\n\ HINT: try `pip install pyarrow`\n\ @@ -125,9 +125,10 @@ mod tests { Python::with_gil(|py| { for scalar in example_scalars.iter() { - let result = - ScalarValue::from_pyarrow(scalar.to_pyarrow(py).unwrap().as_ref(py)) - .unwrap(); + let result = ScalarValue::from_pyarrow_bound( + scalar.to_pyarrow(py).unwrap().bind(py), + ) + .unwrap(); assert_eq!(scalar, &result); } }); diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index ba006247cd70..8073b21cdde0 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -52,7 +52,7 @@ use arrow::{ UInt16Type, UInt32Type, UInt64Type, UInt8Type, DECIMAL128_MAX_PRECISION, }, }; -use arrow_buffer::Buffer; +use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, ScalarBuffer}; use arrow_schema::{UnionFields, UnionMode}; use half::f16; @@ -266,11 +266,11 @@ pub enum ScalarValue { IntervalYearMonth(Option), /// Number of elapsed days and milliseconds (no leap seconds) /// stored as 2 contiguous 32-bit signed integers - IntervalDayTime(Option), + IntervalDayTime(Option), /// A triple of the number of elapsed months, days, and nanoseconds. /// Months and days are encoded as 32-bit signed integers. /// Nanoseconds is encoded as a 64-bit signed integer (no leap seconds). - IntervalMonthDayNano(Option), + IntervalMonthDayNano(Option), /// Duration in seconds DurationSecond(Option), /// Duration in milliseconds @@ -988,10 +988,10 @@ impl ScalarValue { ScalarValue::IntervalYearMonth(Some(0)) } DataType::Interval(IntervalUnit::DayTime) => { - ScalarValue::IntervalDayTime(Some(0)) + ScalarValue::IntervalDayTime(Some(IntervalDayTime::ZERO)) } DataType::Interval(IntervalUnit::MonthDayNano) => { - ScalarValue::IntervalMonthDayNano(Some(0)) + ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano::ZERO)) } DataType::Duration(TimeUnit::Second) => ScalarValue::DurationSecond(Some(0)), DataType::Duration(TimeUnit::Millisecond) => { @@ -2151,9 +2151,8 @@ impl ScalarValue { ), ScalarValue::Union(value, fields, _mode) => match value { Some((v_id, value)) => { - let mut field_type_ids = Vec::::with_capacity(fields.len()); - let mut child_arrays = - Vec::<(Field, ArrayRef)>::with_capacity(fields.len()); + let mut new_fields = Vec::with_capacity(fields.len()); + let mut child_arrays = Vec::::with_capacity(fields.len()); for (f_id, field) in fields.iter() { let ar = if f_id == *v_id { value.to_array_of_size(size)? @@ -2162,14 +2161,14 @@ impl ScalarValue { new_null_array(dt, size) }; let field = (**field).clone(); - child_arrays.push((field, ar)); - field_type_ids.push(f_id); + child_arrays.push(ar); + new_fields.push(field.clone()); } - let type_ids = repeat(*v_id).take(size).collect::>(); - let type_ids = Buffer::from_slice_ref(type_ids); - let value_offsets: Option = None; + let type_ids = repeat(*v_id).take(size); + let type_ids = ScalarBuffer::::from_iter(type_ids); + let value_offsets: Option> = None; let ar = UnionArray::try_new( - field_type_ids.as_slice(), + fields.clone(), type_ids, value_offsets, child_arrays, @@ -3219,9 +3218,13 @@ impl fmt::Display for ScalarValue { ScalarValue::Time32Millisecond(e) => format_option!(f, e)?, ScalarValue::Time64Microsecond(e) => format_option!(f, e)?, ScalarValue::Time64Nanosecond(e) => format_option!(f, e)?, - ScalarValue::IntervalDayTime(e) => format_option!(f, e)?, ScalarValue::IntervalYearMonth(e) => format_option!(f, e)?, - ScalarValue::IntervalMonthDayNano(e) => format_option!(f, e)?, + ScalarValue::IntervalMonthDayNano(e) => { + format_option!(f, e.map(|v| format!("{v:?}")))? + } + ScalarValue::IntervalDayTime(e) => { + format_option!(f, e.map(|v| format!("{v:?}")))?; + } ScalarValue::DurationSecond(e) => format_option!(f, e)?, ScalarValue::DurationMillisecond(e) => format_option!(f, e)?, ScalarValue::DurationMicrosecond(e) => format_option!(f, e)?, @@ -3447,6 +3450,7 @@ mod tests { use arrow::buffer::OffsetBuffer; use arrow::compute::{is_null, kernels}; use arrow::util::pretty::pretty_format_columns; + use arrow_buffer::Buffer; use arrow_schema::Fields; use chrono::NaiveDate; use rand::Rng; @@ -3988,7 +3992,11 @@ mod tests { #[test] fn test_interval_add_timestamp() -> Result<()> { - let interval = ScalarValue::IntervalMonthDayNano(Some(123)); + let interval = ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano { + months: 1, + days: 2, + nanoseconds: 3, + })); let timestamp = ScalarValue::TimestampNanosecond(Some(123), None); let result = interval.add(×tamp)?; let expect = timestamp.add(&interval)?; @@ -4000,7 +4008,10 @@ mod tests { let expect = timestamp.add(&interval)?; assert_eq!(result, expect); - let interval = ScalarValue::IntervalDayTime(Some(123)); + let interval = ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 1, + milliseconds: 23, + })); let timestamp = ScalarValue::TimestampNanosecond(Some(123), None); let result = interval.add(×tamp)?; let expect = timestamp.add(&interval)?; @@ -4650,6 +4661,17 @@ mod tests { let str_vals = [Some("foo"), None, Some("bar")]; + let interval_dt_vals = [ + Some(IntervalDayTime::MINUS_ONE), + None, + Some(IntervalDayTime::ONE), + ]; + let interval_mdn_vals = [ + Some(IntervalMonthDayNano::MINUS_ONE), + None, + Some(IntervalMonthDayNano::ONE), + ]; + /// Test each value in `scalar` with the corresponding element /// at `array`. Assumes each element is unique (aka not equal /// with all other indexes) @@ -4795,7 +4817,12 @@ mod tests { Some("UTC".into()) ), make_test_case!(i32_vals, IntervalYearMonthArray, IntervalYearMonth), - make_test_case!(i64_vals, IntervalDayTimeArray, IntervalDayTime), + make_test_case!(interval_dt_vals, IntervalDayTimeArray, IntervalDayTime), + make_test_case!( + interval_mdn_vals, + IntervalMonthDayNanoArray, + IntervalMonthDayNano + ), make_str_dict_test_case!(str_vals, Int8Type), make_str_dict_test_case!(str_vals, Int16Type), make_str_dict_test_case!(str_vals, Int32Type), diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 3946758ff937..7533e2cff198 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -134,6 +134,7 @@ xz2 = { version = "0.1", optional = true, features = ["static"] } zstd = { version = "0.13", optional = true, default-features = false } [dev-dependencies] +arrow-buffer = { workspace = true } async-trait = { workspace = true } bigdecimal = { workspace = true } criterion = { version = "0.5", features = ["async_tokio"] } diff --git a/datafusion/core/benches/sql_query_with_io.rs b/datafusion/core/benches/sql_query_with_io.rs index 916f48ce40c6..aef39a04e47e 100644 --- a/datafusion/core/benches/sql_query_with_io.rs +++ b/datafusion/core/benches/sql_query_with_io.rs @@ -96,7 +96,7 @@ async fn setup_files(store: Arc) { let location = Path::from(format!( "{table_name}/partition={partition}/{file}.parquet" )); - store.put(&location, data).await.unwrap(); + store.put(&location, data.into()).await.unwrap(); } } } diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 7cc3421ebb48..9462cde43610 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -121,10 +121,9 @@ pub(crate) mod test_util { use object_store::local::LocalFileSystem; use object_store::path::Path; use object_store::{ - GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, PutOptions, - PutResult, + Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, + PutMultipartOpts, PutOptions, PutPayload, PutResult, }; - use tokio::io::AsyncWrite; pub async fn scan_format( state: &SessionState, @@ -185,25 +184,17 @@ pub(crate) mod test_util { async fn put_opts( &self, _location: &Path, - _bytes: Bytes, + _payload: PutPayload, _opts: PutOptions, ) -> object_store::Result { unimplemented!() } - async fn put_multipart( + async fn put_multipart_opts( &self, _location: &Path, - ) -> object_store::Result<(MultipartId, Box)> - { - unimplemented!() - } - - async fn abort_multipart( - &self, - _location: &Path, - _multipart_id: &MultipartId, - ) -> object_store::Result<()> { + _opts: PutMultipartOpts, + ) -> object_store::Result> { unimplemented!() } @@ -229,6 +220,7 @@ pub(crate) mod test_util { version: None, }, range: Default::default(), + attributes: Attributes::default(), }) } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 39e6900ed53a..99c38d3f0980 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1115,7 +1115,6 @@ mod tests { use arrow::array::{Array, ArrayRef, StringArray}; use arrow_schema::Field; use async_trait::async_trait; - use bytes::Bytes; use datafusion_common::cast::{ as_binary_array, as_boolean_array, as_float32_array, as_float64_array, as_int32_array, as_timestamp_nanosecond_array, @@ -1129,7 +1128,8 @@ mod tests { use log::error; use object_store::local::LocalFileSystem; use object_store::{ - GetOptions, GetResult, ListResult, MultipartId, PutOptions, PutResult, + GetOptions, GetResult, ListResult, MultipartUpload, PutMultipartOpts, PutOptions, + PutPayload, PutResult, }; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::ParquetRecordBatchStreamBuilder; @@ -1252,25 +1252,17 @@ mod tests { async fn put_opts( &self, _location: &Path, - _bytes: Bytes, + _payload: PutPayload, _opts: PutOptions, ) -> object_store::Result { Err(object_store::Error::NotImplemented) } - async fn put_multipart( + async fn put_multipart_opts( &self, _location: &Path, - ) -> object_store::Result<(MultipartId, Box)> - { - Err(object_store::Error::NotImplemented) - } - - async fn abort_multipart( - &self, - _location: &Path, - _multipart_id: &MultipartId, - ) -> object_store::Result<()> { + _opts: PutMultipartOpts, + ) -> object_store::Result> { Err(object_store::Error::NotImplemented) } diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index a1e43b20a2da..8203e414de97 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -965,7 +965,7 @@ mod tests { let data = bytes::Bytes::from("a,b\n1,2\n3,4"); let path = object_store::path::Path::from("a.csv"); - store.put(&path, data).await.unwrap(); + store.put(&path, data.into()).await.unwrap(); let url = Url::parse("memory://").unwrap(); session_ctx.register_object_store(&url, Arc::new(store)); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index e2548412cc9d..20656634c472 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -1376,7 +1376,7 @@ mod tests { }; let in_memory = object_store::memory::InMemory::new(); in_memory - .put(&object_meta.location, data) + .put(&object_meta.location, data.into()) .await .expect("put parquet file into in memory object store"); diff --git a/datafusion/core/tests/expr_api/simplification.rs b/datafusion/core/tests/expr_api/simplification.rs index 9d714df331c3..9ce47153ba4a 100644 --- a/datafusion/core/tests/expr_api/simplification.rs +++ b/datafusion/core/tests/expr_api/simplification.rs @@ -19,6 +19,7 @@ use arrow::datatypes::{DataType, Field, Schema}; use arrow_array::{ArrayRef, Int32Array}; +use arrow_buffer::IntervalDayTime; use chrono::{DateTime, TimeZone, Utc}; use datafusion::{error::Result, execution::context::ExecutionProps, prelude::*}; use datafusion_common::cast::as_int32_array; @@ -281,7 +282,10 @@ fn select_date_plus_interval() -> Result<()> { let date_plus_interval_expr = to_timestamp_expr(ts_string) .cast_to(&DataType::Date32, schema)? - + Expr::Literal(ScalarValue::IntervalDayTime(Some(123i64 << 32))); + + Expr::Literal(ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 123, + milliseconds: 0, + }))); let plan = LogicalPlanBuilder::from(table_scan.clone()) .project(vec![date_plus_interval_expr])? @@ -289,7 +293,7 @@ fn select_date_plus_interval() -> Result<()> { // Note that constant folder runs and folds the entire // expression down to a single constant (true) - let expected = r#"Projection: Date32("2021-01-09") AS to_timestamp(Utf8("2020-09-08T12:05:00+00:00")) + IntervalDayTime("528280977408") + let expected = r#"Projection: Date32("2021-01-09") AS to_timestamp(Utf8("2020-09-08T12:05:00+00:00")) + IntervalDayTime("IntervalDayTime { days: 123, milliseconds: 0 }") TableScan: test"#; let actual = get_optimized_plan_formatted(plan, &time); diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 0e515fd4647b..7c1e199ceb95 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -192,7 +192,7 @@ async fn store_parquet_in_memory( let mut objects = Vec::with_capacity(parquet_batches.len()); for (meta, bytes) in parquet_batches { in_memory - .put(&meta.location, bytes) + .put(&meta.location, bytes.into()) .await .expect("put parquet file into in memory object store"); objects.push(meta); diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index ce71c890698e..bfc5b59f0952 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -45,10 +45,10 @@ use bytes::Bytes; use chrono::{TimeZone, Utc}; use futures::stream::{self, BoxStream}; use object_store::{ - path::Path, GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, - ObjectMeta, ObjectStore, PutOptions, PutResult, + path::Path, GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, + ObjectStore, PutOptions, PutResult, }; -use tokio::io::AsyncWrite; +use object_store::{Attributes, MultipartUpload, PutMultipartOpts, PutPayload}; use url::Url; #[tokio::test] @@ -631,24 +631,17 @@ impl ObjectStore for MirroringObjectStore { async fn put_opts( &self, _location: &Path, - _bytes: Bytes, + _put_payload: PutPayload, _opts: PutOptions, ) -> object_store::Result { unimplemented!() } - async fn put_multipart( + async fn put_multipart_opts( &self, _location: &Path, - ) -> object_store::Result<(MultipartId, Box)> { - unimplemented!() - } - - async fn abort_multipart( - &self, - _location: &Path, - _multipart_id: &MultipartId, - ) -> object_store::Result<()> { + _opts: PutMultipartOpts, + ) -> object_store::Result> { unimplemented!() } @@ -673,6 +666,7 @@ impl ObjectStore for MirroringObjectStore { range: 0..meta.size, payload: GetResultPayload::File(file, path), meta, + attributes: Attributes::default(), }) } diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index df91d9313746..1b6878b6f49e 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -41,6 +41,7 @@ path = "src/lib.rs" ahash = { workspace = true } arrow = { workspace = true } arrow-array = { workspace = true } +arrow-buffer = { workspace = true } chrono = { workspace = true } datafusion-common = { workspace = true, default-features = true } paste = "^1.0" diff --git a/datafusion/expr/src/interval_arithmetic.rs b/datafusion/expr/src/interval_arithmetic.rs index c4890b97e748..18f92334ff14 100644 --- a/datafusion/expr/src/interval_arithmetic.rs +++ b/datafusion/expr/src/interval_arithmetic.rs @@ -17,13 +17,13 @@ //! Interval arithmetic library +use crate::type_coercion::binary::get_result_type; +use crate::Operator; +use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use std::borrow::Borrow; use std::fmt::{self, Display, Formatter}; use std::ops::{AddAssign, SubAssign}; -use crate::type_coercion::binary::get_result_type; -use crate::Operator; - use arrow::compute::{cast_with_options, CastOptions}; use arrow::datatypes::DataType; use arrow::datatypes::{IntervalUnit, TimeUnit}; @@ -71,10 +71,10 @@ macro_rules! get_extreme_value { ScalarValue::IntervalYearMonth(Some(i32::$extreme)) } DataType::Interval(IntervalUnit::DayTime) => { - ScalarValue::IntervalDayTime(Some(i64::$extreme)) + ScalarValue::IntervalDayTime(Some(IntervalDayTime::$extreme)) } DataType::Interval(IntervalUnit::MonthDayNano) => { - ScalarValue::IntervalMonthDayNano(Some(i128::$extreme)) + ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano::$extreme)) } _ => unreachable!(), } @@ -119,8 +119,14 @@ macro_rules! value_transition { IntervalYearMonth(Some(value)) if value == i32::$bound => { IntervalYearMonth(None) } - IntervalDayTime(Some(value)) if value == i64::$bound => IntervalDayTime(None), - IntervalMonthDayNano(Some(value)) if value == i128::$bound => { + IntervalDayTime(Some(value)) + if value == arrow_buffer::IntervalDayTime::$bound => + { + IntervalDayTime(None) + } + IntervalMonthDayNano(Some(value)) + if value == arrow_buffer::IntervalMonthDayNano::$bound => + { IntervalMonthDayNano(None) } _ => next_value_helper::<$direction>($value), @@ -1013,6 +1019,25 @@ macro_rules! impl_OneTrait{ } impl_OneTrait! {u8, u16, u32, u64, i8, i16, i32, i64, i128} +impl OneTrait for IntervalDayTime { + fn one() -> Self { + IntervalDayTime { + days: 0, + milliseconds: 1, + } + } +} + +impl OneTrait for IntervalMonthDayNano { + fn one() -> Self { + IntervalMonthDayNano { + months: 0, + days: 0, + nanoseconds: 1, + } + } +} + /// This function either increments or decrements its argument, depending on /// the `INC` value (where a `true` value corresponds to the increment). fn increment_decrement( @@ -1075,11 +1100,15 @@ fn next_value_helper(value: ScalarValue) -> ScalarValue { IntervalYearMonth(Some(val)) => { IntervalYearMonth(Some(increment_decrement::(val))) } - IntervalDayTime(Some(val)) => { - IntervalDayTime(Some(increment_decrement::(val))) - } + IntervalDayTime(Some(val)) => IntervalDayTime(Some(increment_decrement::< + INC, + arrow_buffer::IntervalDayTime, + >(val))), IntervalMonthDayNano(Some(val)) => { - IntervalMonthDayNano(Some(increment_decrement::(val))) + IntervalMonthDayNano(Some(increment_decrement::< + INC, + arrow_buffer::IntervalMonthDayNano, + >(val))) } _ => value, // Unbounded values return without change. } diff --git a/datafusion/functions-array/src/range.rs b/datafusion/functions-array/src/range.rs index 8c73bd821346..269eaa560230 100644 --- a/datafusion/functions-array/src/range.rs +++ b/datafusion/functions-array/src/range.rs @@ -22,7 +22,9 @@ use arrow::array::{Array, ArrayRef, Int64Array, ListArray}; use arrow::datatypes::{DataType, Field}; use arrow_array::types::{Date32Type, IntervalMonthDayNanoType}; use arrow_array::{Date32Array, NullArray}; -use arrow_buffer::{BooleanBufferBuilder, NullBuffer, OffsetBuffer}; +use arrow_buffer::{ + BooleanBufferBuilder, IntervalMonthDayNano, NullBuffer, OffsetBuffer, +}; use arrow_schema::DataType::{Date32, Int64, Interval, List}; use arrow_schema::IntervalUnit::MonthDayNano; use datafusion_common::cast::{as_date32_array, as_int64_array, as_interval_mdn_array}; @@ -314,7 +316,13 @@ fn gen_range_date(args: &[ArrayRef], include_upper: bool) -> Result { for (idx, stop) in stop_array.iter().enumerate() { let mut stop = stop.unwrap_or(0); let start = start_array.as_ref().map(|x| x.value(idx)).unwrap_or(0); - let step = step_array.as_ref().map(|arr| arr.value(idx)).unwrap_or(1); + let step = step_array.as_ref().map(|arr| arr.value(idx)).unwrap_or( + IntervalMonthDayNano { + months: 0, + days: 0, + nanoseconds: 1, + }, + ); let (months, days, _) = IntervalMonthDayNanoType::to_parts(step); let neg = months < 0 || days < 0; if !include_upper { diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml index efc12e71a9ac..20d6cbc37459 100644 --- a/datafusion/functions/Cargo.toml +++ b/datafusion/functions/Cargo.toml @@ -87,6 +87,7 @@ uuid = { version = "1.7", features = ["v4"], optional = true } [dev-dependencies] arrow = { workspace = true, features = ["test_utils"] } +arrow-buffer = { workspace = true } criterion = "0.5" rand = { workspace = true } rstest = { workspace = true } diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index a5404532ace6..e777e5ea95d0 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -445,6 +445,7 @@ mod tests { use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos; use arrow::datatypes::{DataType, TimeUnit}; + use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use datafusion_common::ScalarValue; use datafusion_expr::{ColumnarValue, ScalarUDFImpl}; @@ -453,7 +454,10 @@ mod tests { #[test] fn test_date_bin() { let res = DateBinFunc::new().invoke(&[ - ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(1))), + ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 1, + }))), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ]); @@ -461,21 +465,33 @@ mod tests { let timestamps = Arc::new((1..6).map(Some).collect::()); let res = DateBinFunc::new().invoke(&[ - ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(1))), + ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 1, + }))), ColumnarValue::Array(timestamps), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ]); assert!(res.is_ok()); let res = DateBinFunc::new().invoke(&[ - ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(1))), + ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 1, + }))), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ]); assert!(res.is_ok()); // stride supports month-day-nano let res = DateBinFunc::new().invoke(&[ - ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(1))), + ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some( + IntervalMonthDayNano { + months: 0, + days: 0, + nanoseconds: 1, + }, + ))), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ]); @@ -486,8 +502,12 @@ mod tests { // // invalid number of arguments - let res = DateBinFunc::new() - .invoke(&[ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(1)))]); + let res = DateBinFunc::new().invoke(&[ColumnarValue::Scalar( + ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 1, + })), + )]); assert_eq!( res.err().unwrap().strip_backtrace(), "Execution error: DATE_BIN expected two or three arguments" @@ -506,7 +526,10 @@ mod tests { // stride: invalid value let res = DateBinFunc::new().invoke(&[ - ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(0))), + ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 0, + }))), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ]); @@ -517,7 +540,9 @@ mod tests { // stride: overflow of day-time interval let res = DateBinFunc::new().invoke(&[ - ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(i64::MAX))), + ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some( + IntervalDayTime::MAX, + ))), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ]); @@ -550,7 +575,10 @@ mod tests { // origin: invalid type let res = DateBinFunc::new().invoke(&[ - ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(1))), + ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 1, + }))), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)), ]); @@ -560,14 +588,26 @@ mod tests { ); let res = DateBinFunc::new().invoke(&[ - ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(1))), + ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 1, + }))), ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ]); assert!(res.is_ok()); // unsupported array type for stride - let intervals = Arc::new((1..6).map(Some).collect::()); + let intervals = Arc::new( + (1..6) + .map(|x| { + Some(IntervalDayTime { + days: 0, + milliseconds: x, + }) + }) + .collect::(), + ); let res = DateBinFunc::new().invoke(&[ ColumnarValue::Array(intervals), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), @@ -581,7 +621,10 @@ mod tests { // unsupported array type for origin let timestamps = Arc::new((1..6).map(Some).collect::()); let res = DateBinFunc::new().invoke(&[ - ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(1))), + ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 1, + }))), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Array(timestamps), ]); diff --git a/datafusion/functions/src/datetime/to_timestamp.rs b/datafusion/functions/src/datetime/to_timestamp.rs index af878b4505bc..7c6f2e42605a 100644 --- a/datafusion/functions/src/datetime/to_timestamp.rs +++ b/datafusion/functions/src/datetime/to_timestamp.rs @@ -591,7 +591,7 @@ mod tests { ColumnarValue::Array(Arc::new(date_string_builder.finish()) as ArrayRef); let expected_err = - "Arrow error: Parser error: Invalid timezone \"ZZ\": 'ZZ' is not a valid timezone"; + "Arrow error: Parser error: Invalid timezone \"ZZ\": failed to parse timezone"; match to_timestamp(&[string_array]) { Ok(_) => panic!("Expected error but got success"), Err(e) => { diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index 67d5c9b23b74..e703250c92e1 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -51,7 +51,9 @@ indexmap = { workspace = true } itertools = { workspace = true } log = { workspace = true } regex-syntax = "0.8.0" + [dev-dependencies] +arrow-buffer = { workspace = true } ctor = { workspace = true } datafusion-sql = { workspace = true } env_logger = { workspace = true } diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 31dc9028b915..0c8e4ae34a90 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -1082,13 +1082,13 @@ mod test { #[test] fn binary_op_date32_op_interval() -> Result<()> { - //CAST(Utf8("1998-03-18") AS Date32) + IntervalDayTime("386547056640") + // CAST(Utf8("1998-03-18") AS Date32) + IntervalDayTime("...") let expr = cast(lit("1998-03-18"), DataType::Date32) - + lit(ScalarValue::IntervalDayTime(Some(386547056640))); + + lit(ScalarValue::new_interval_dt(123, 456)); let empty = empty(); let plan = LogicalPlan::Projection(Projection::try_new(vec![expr], empty)?); let expected = - "Projection: CAST(Utf8(\"1998-03-18\") AS Date32) + IntervalDayTime(\"386547056640\")\n EmptyRelation"; + "Projection: CAST(Utf8(\"1998-03-18\") AS Date32) + IntervalDayTime(\"IntervalDayTime { days: 123, milliseconds: 456 }\")\n EmptyRelation"; assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan, expected)?; Ok(()) } diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index dd61fc802441..a36ec9c8ebdc 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -33,6 +33,7 @@ use arrow::compute::take; use arrow::datatypes::*; use arrow::util::bit_iterator::BitIndexIterator; use arrow::{downcast_dictionary_array, downcast_primitive_array}; +use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use datafusion_common::cast::{ as_boolean_array, as_generic_binary_array, as_string_array, }; @@ -258,6 +259,7 @@ macro_rules! is_equal { } is_equal!(i8, i16, i32, i64, i128, i256, u8, u16, u32, u64); is_equal!(bool, str, [u8]); +is_equal!(IntervalDayTime, IntervalMonthDayNano); macro_rules! is_equal_float { ($($t:ty),+) => { diff --git a/datafusion/physical-expr/src/intervals/cp_solver.rs b/datafusion/physical-expr/src/intervals/cp_solver.rs index 0c25e26d17aa..5ba628e7ce40 100644 --- a/datafusion/physical-expr/src/intervals/cp_solver.rs +++ b/datafusion/physical-expr/src/intervals/cp_solver.rs @@ -723,6 +723,7 @@ mod tests { use crate::intervals::test_utils::gen_conjunctive_numerical_expr; use arrow::datatypes::TimeUnit; + use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use arrow_schema::Field; use datafusion_common::ScalarValue; @@ -1390,9 +1391,17 @@ mod tests { )?; let right_child = Interval::try_new( // 1 day 321 ns - ScalarValue::IntervalMonthDayNano(Some(0x1_0000_0000_0000_0141)), + ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano { + months: 0, + days: 1, + nanoseconds: 321, + })), // 1 day 321 ns - ScalarValue::IntervalMonthDayNano(Some(0x1_0000_0000_0000_0141)), + ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano { + months: 0, + days: 1, + nanoseconds: 321, + })), )?; let children = vec![&left_child, &right_child]; let result = expression @@ -1415,9 +1424,17 @@ mod tests { )?, Interval::try_new( // 1 day 321 ns in Duration type - ScalarValue::IntervalMonthDayNano(Some(0x1_0000_0000_0000_0141)), + ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano { + months: 0, + days: 1, + nanoseconds: 321, + })), // 1 day 321 ns in Duration type - ScalarValue::IntervalMonthDayNano(Some(0x1_0000_0000_0000_0141)), + ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano { + months: 0, + days: 1, + nanoseconds: 321, + })), )? ], result @@ -1446,10 +1463,16 @@ mod tests { ScalarValue::TimestampMillisecond(Some(1_603_188_672_000), None), )?; let left_child = Interval::try_new( - // 2 days - ScalarValue::IntervalDayTime(Some(172_800_000)), - // 10 days - ScalarValue::IntervalDayTime(Some(864_000_000)), + // 2 days in millisecond + ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 172_800_000, + })), + // 10 days in millisecond + ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 864_000_000, + })), )?; let children = vec![&left_child, &right_child]; let result = expression @@ -1459,10 +1482,16 @@ mod tests { assert_eq!( vec![ Interval::try_new( - // 2 days - ScalarValue::IntervalDayTime(Some(172_800_000)), + // 2 days in millisecond + ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 172_800_000, + })), // 6 days - ScalarValue::IntervalDayTime(Some(518_400_000)), + ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 518_400_000, + })), )?, Interval::try_new( // 10.10.2020 - 10:11:12 AM diff --git a/datafusion/physical-expr/src/intervals/utils.rs b/datafusion/physical-expr/src/intervals/utils.rs index e188b2d56bae..b426a656fba9 100644 --- a/datafusion/physical-expr/src/intervals/utils.rs +++ b/datafusion/physical-expr/src/intervals/utils.rs @@ -24,15 +24,12 @@ use crate::{ PhysicalExpr, }; +use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use arrow_schema::{DataType, SchemaRef}; -use datafusion_common::{internal_datafusion_err, internal_err, Result, ScalarValue}; +use datafusion_common::{internal_err, Result, ScalarValue}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::Operator; -const MDN_DAY_MASK: i128 = 0xFFFF_FFFF_0000_0000_0000_0000; -const MDN_NS_MASK: i128 = 0xFFFF_FFFF_FFFF_FFFF; -const DT_MS_MASK: i64 = 0xFFFF_FFFF; - /// Indicates whether interval arithmetic is supported for the given expression. /// Currently, we do not support all [`PhysicalExpr`]s for interval calculations. /// We do not support every type of [`Operator`]s either. Over time, this check @@ -172,15 +169,9 @@ fn convert_duration_bound_to_interval( /// If both the month and day fields of [`ScalarValue::IntervalMonthDayNano`] are zero, this function returns the nanoseconds part. /// Otherwise, it returns an error. -fn interval_mdn_to_duration_ns(mdn: &i128) -> Result { - let months = mdn >> 96; - let days = (mdn & MDN_DAY_MASK) >> 64; - let nanoseconds = mdn & MDN_NS_MASK; - - if months == 0 && days == 0 { - nanoseconds - .try_into() - .map_err(|_| internal_datafusion_err!("Resulting duration exceeds i64::MAX")) +fn interval_mdn_to_duration_ns(mdn: &IntervalMonthDayNano) -> Result { + if mdn.months == 0 && mdn.days == 0 { + Ok(mdn.nanoseconds) } else { internal_err!( "The interval cannot have a non-zero month or day value for duration convertibility" @@ -190,12 +181,10 @@ fn interval_mdn_to_duration_ns(mdn: &i128) -> Result { /// If the day field of the [`ScalarValue::IntervalDayTime`] is zero, this function returns the milliseconds part. /// Otherwise, it returns an error. -fn interval_dt_to_duration_ms(dt: &i64) -> Result { - let days = dt >> 32; - let milliseconds = dt & DT_MS_MASK; - - if days == 0 { - Ok(milliseconds) +fn interval_dt_to_duration_ms(dt: &IntervalDayTime) -> Result { + if dt.days == 0 { + // Safe to cast i32 to i64 + Ok(dt.milliseconds as i64) } else { internal_err!( "The interval cannot have a non-zero day value for duration convertibility" diff --git a/datafusion/physical-plan/src/aggregates/group_values/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/primitive.rs index 18d20f3c47e6..d5b7f1b11ac5 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/primitive.rs @@ -23,6 +23,7 @@ use arrow::datatypes::i256; use arrow::record_batch::RecordBatch; use arrow_array::cast::AsArray; use arrow_array::{ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, PrimitiveArray}; +use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use arrow_schema::DataType; use datafusion_common::Result; use datafusion_execution::memory_pool::proxy::VecAllocExt; @@ -53,6 +54,7 @@ macro_rules! hash_integer { } hash_integer!(i8, i16, i32, i64, i128, i256); hash_integer!(u8, u16, u32, u64); +hash_integer!(IntervalDayTime, IntervalMonthDayNano); macro_rules! hash_float { ($($t:ty),+) => { diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 2bb95852ff43..356eb7c86a69 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1484,6 +1484,7 @@ mod tests { ))]; let task_ctx = if spill { + // set to an appropriate value to trigger spill new_spill_ctx(2, 1600) } else { Arc::new(TaskContext::default()) @@ -1545,8 +1546,13 @@ mod tests { input_schema, )?); - let result = - common::collect(merged_aggregate.execute(0, task_ctx.clone())?).await?; + let task_ctx = if spill { + // enlarge memory limit to let the final aggregation finish + new_spill_ctx(2, 2600) + } else { + task_ctx.clone() + }; + let result = common::collect(merged_aggregate.execute(0, task_ctx)?).await?; let batch = concat_batches(&result[0].schema(), &result)?; assert_eq!(batch.num_columns(), 2); assert_eq!(batch.num_rows(), 3); @@ -1941,8 +1947,13 @@ mod tests { for use_coalesce_batches in [false, true] { for is_first_acc in [false, true] { for spill in [false, true] { - first_last_multi_partitions(use_coalesce_batches, is_first_acc, spill) - .await? + first_last_multi_partitions( + use_coalesce_batches, + is_first_acc, + spill, + 4200, + ) + .await? } } } @@ -2030,9 +2041,10 @@ mod tests { use_coalesce_batches: bool, is_first_acc: bool, spill: bool, + max_memory: usize, ) -> Result<()> { let task_ctx = if spill { - new_spill_ctx(2, 3200) + new_spill_ctx(2, max_memory) } else { Arc::new(TaskContext::default()) }; diff --git a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs index bae4c6133b9f..2b02fff1f573 100644 --- a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs +++ b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs @@ -26,6 +26,7 @@ use arrow_array::cast::AsArray; use arrow_array::{ downcast_primitive, Array, ArrayRef, ArrowPrimitiveType, PrimitiveArray, StringArray, }; +use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use arrow_schema::DataType; use datafusion_common::DataFusionError; use datafusion_common::Result; @@ -363,6 +364,7 @@ macro_rules! has_integer { has_integer!(i8, i16, i32, i64, i128, i256); has_integer!(u8, u16, u32, u64); +has_integer!(IntervalDayTime, IntervalMonthDayNano); hash_float!(f16, f32, f64); pub fn new_hash_table(limit: usize, kt: DataType) -> Result> { diff --git a/datafusion/physical-plan/src/aggregates/topk/heap.rs b/datafusion/physical-plan/src/aggregates/topk/heap.rs index 41826ed72853..51593f5c28ce 100644 --- a/datafusion/physical-plan/src/aggregates/topk/heap.rs +++ b/datafusion/physical-plan/src/aggregates/topk/heap.rs @@ -20,6 +20,7 @@ use arrow::datatypes::i256; use arrow_array::cast::AsArray; use arrow_array::{downcast_primitive, ArrayRef, ArrowPrimitiveType, PrimitiveArray}; +use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use arrow_schema::DataType; use datafusion_common::DataFusionError; use datafusion_common::Result; @@ -431,6 +432,7 @@ macro_rules! compare_integer { compare_integer!(i8, i16, i32, i64, i128, i256); compare_integer!(u8, u16, u32, u64); +compare_integer!(IntervalDayTime, IntervalMonthDayNano); compare_float!(f16, f32, f64); pub fn new_heap(limit: usize, desc: bool, vt: DataType) -> Result> { diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 449c42d69797..7b4d790479b1 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -2290,8 +2290,8 @@ mod tests { )] join_type: JoinType, #[values( - (4, 5), - (12, 17), + (4, 5), + (12, 17), )] cardinality: (i32, i32), #[values(0, 1, 2)] case_expr: usize, @@ -2375,8 +2375,8 @@ mod tests { )] join_type: JoinType, #[values( - (4, 5), - (12, 17), + (4, 5), + (12, 17), )] cardinality: (i32, i32), ) -> Result<()> { @@ -2452,8 +2452,8 @@ mod tests { )] join_type: JoinType, #[values( - (4, 5), - (12, 17), + (4, 5), + (12, 17), )] cardinality: (i32, i32), #[values(0, 1, 2, 3, 4, 5)] case_expr: usize, diff --git a/datafusion/physical-plan/src/joins/test_utils.rs b/datafusion/physical-plan/src/joins/test_utils.rs index 6fb3aef5d5bf..9598ed83aa58 100644 --- a/datafusion/physical-plan/src/joins/test_utils.rs +++ b/datafusion/physical-plan/src/joins/test_utils.rs @@ -33,6 +33,7 @@ use arrow_array::{ ArrayRef, Float64Array, Int32Array, IntervalDayTimeArray, RecordBatch, TimestampMillisecondArray, }; +use arrow_buffer::IntervalDayTime; use arrow_schema::{DataType, Schema}; use datafusion_common::{Result, ScalarValue}; use datafusion_execution::TaskContext; @@ -462,8 +463,11 @@ pub fn build_sides_record_batches( )); let interval_time: ArrayRef = Arc::new(IntervalDayTimeArray::from( initial_range - .map(|x| x as i64 * 100) // x * 100ms - .collect::>(), + .map(|x| IntervalDayTime { + days: 0, + milliseconds: x * 100, + }) // x * 100ms + .collect::>(), )); let float_asc = Arc::new(Float64Array::from_iter_values( diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index bd9901206404..d9ec7dbb5143 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -206,6 +206,11 @@ message ScalarDictionaryValue { ScalarValue value = 2; } +message IntervalDayTimeValue { + int32 days = 1; + int32 milliseconds = 2; +} + message IntervalMonthDayNanoValue { int32 months = 1; int32 days = 2; @@ -266,7 +271,6 @@ message ScalarValue{ int64 date_64_value = 21; int32 interval_yearmonth_value = 24; - int64 interval_daytime_value = 25; int64 duration_second_value = 35; int64 duration_millisecond_value = 36; @@ -278,6 +282,7 @@ message ScalarValue{ bytes binary_value = 28; bytes large_binary_value = 29; ScalarTime64Value time64_value = 30; + IntervalDayTimeValue interval_daytime_value = 25; IntervalMonthDayNanoValue interval_month_day_nano = 31; ScalarFixedSizeBinary fixed_size_binary_value = 34; UnionValue union_value = 42; diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index aa2f22a36560..3ae70318fa15 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -25,8 +25,8 @@ use arrow::array::{ArrayRef, AsArray}; use arrow::buffer::Buffer; use arrow::csv::WriterBuilder; use arrow::datatypes::{ - i256, DataType, Field, IntervalMonthDayNanoType, IntervalUnit, Schema, TimeUnit, - UnionFields, UnionMode, + i256, DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, + Schema, TimeUnit, UnionFields, UnionMode, }; use arrow::ipc::{reader::read_record_batch, root_as_message}; @@ -525,7 +525,6 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue { } } Value::IntervalYearmonthValue(v) => Self::IntervalYearMonth(Some(*v)), - Value::IntervalDaytimeValue(v) => Self::IntervalDayTime(Some(*v)), Value::DurationSecondValue(v) => Self::DurationSecond(Some(*v)), Value::DurationMillisecondValue(v) => Self::DurationMillisecond(Some(*v)), Value::DurationMicrosecondValue(v) => Self::DurationMicrosecond(Some(*v)), @@ -573,6 +572,9 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue { } Value::BinaryValue(v) => Self::Binary(Some(v.clone())), Value::LargeBinaryValue(v) => Self::LargeBinary(Some(v.clone())), + Value::IntervalDaytimeValue(v) => Self::IntervalDayTime(Some( + IntervalDayTimeType::make_value(v.days, v.milliseconds), + )), Value::IntervalMonthDayNano(v) => Self::IntervalMonthDayNano(Some( IntervalMonthDayNanoType::make_value(v.months, v.days, v.nanos), )), diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index a5f7ec298e87..6b2372433684 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -3417,6 +3417,118 @@ impl<'de> serde::Deserialize<'de> for FixedSizeList { deserializer.deserialize_struct("datafusion_common.FixedSizeList", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for IntervalDayTimeValue { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.days != 0 { + len += 1; + } + if self.milliseconds != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion_common.IntervalDayTimeValue", len)?; + if self.days != 0 { + struct_ser.serialize_field("days", &self.days)?; + } + if self.milliseconds != 0 { + struct_ser.serialize_field("milliseconds", &self.milliseconds)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for IntervalDayTimeValue { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "days", + "milliseconds", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Days, + Milliseconds, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "days" => Ok(GeneratedField::Days), + "milliseconds" => Ok(GeneratedField::Milliseconds), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = IntervalDayTimeValue; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion_common.IntervalDayTimeValue") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut days__ = None; + let mut milliseconds__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Days => { + if days__.is_some() { + return Err(serde::de::Error::duplicate_field("days")); + } + days__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::Milliseconds => { + if milliseconds__.is_some() { + return Err(serde::de::Error::duplicate_field("milliseconds")); + } + milliseconds__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + } + } + Ok(IntervalDayTimeValue { + days: days__.unwrap_or_default(), + milliseconds: milliseconds__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion_common.IntervalDayTimeValue", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for IntervalMonthDayNanoValue { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -6186,10 +6298,6 @@ impl serde::Serialize for ScalarValue { scalar_value::Value::IntervalYearmonthValue(v) => { struct_ser.serialize_field("intervalYearmonthValue", v)?; } - scalar_value::Value::IntervalDaytimeValue(v) => { - #[allow(clippy::needless_borrow)] - struct_ser.serialize_field("intervalDaytimeValue", ToString::to_string(&v).as_str())?; - } scalar_value::Value::DurationSecondValue(v) => { #[allow(clippy::needless_borrow)] struct_ser.serialize_field("durationSecondValue", ToString::to_string(&v).as_str())?; @@ -6223,6 +6331,9 @@ impl serde::Serialize for ScalarValue { scalar_value::Value::Time64Value(v) => { struct_ser.serialize_field("time64Value", v)?; } + scalar_value::Value::IntervalDaytimeValue(v) => { + struct_ser.serialize_field("intervalDaytimeValue", v)?; + } scalar_value::Value::IntervalMonthDayNano(v) => { struct_ser.serialize_field("intervalMonthDayNano", v)?; } @@ -6292,8 +6403,6 @@ impl<'de> serde::Deserialize<'de> for ScalarValue { "date64Value", "interval_yearmonth_value", "intervalYearmonthValue", - "interval_daytime_value", - "intervalDaytimeValue", "duration_second_value", "durationSecondValue", "duration_millisecond_value", @@ -6312,6 +6421,8 @@ impl<'de> serde::Deserialize<'de> for ScalarValue { "largeBinaryValue", "time64_value", "time64Value", + "interval_daytime_value", + "intervalDaytimeValue", "interval_month_day_nano", "intervalMonthDayNano", "fixed_size_binary_value", @@ -6346,7 +6457,6 @@ impl<'de> serde::Deserialize<'de> for ScalarValue { Decimal256Value, Date64Value, IntervalYearmonthValue, - IntervalDaytimeValue, DurationSecondValue, DurationMillisecondValue, DurationMicrosecondValue, @@ -6356,6 +6466,7 @@ impl<'de> serde::Deserialize<'de> for ScalarValue { BinaryValue, LargeBinaryValue, Time64Value, + IntervalDaytimeValue, IntervalMonthDayNano, FixedSizeBinaryValue, UnionValue, @@ -6404,7 +6515,6 @@ impl<'de> serde::Deserialize<'de> for ScalarValue { "decimal256Value" | "decimal256_value" => Ok(GeneratedField::Decimal256Value), "date64Value" | "date_64_value" => Ok(GeneratedField::Date64Value), "intervalYearmonthValue" | "interval_yearmonth_value" => Ok(GeneratedField::IntervalYearmonthValue), - "intervalDaytimeValue" | "interval_daytime_value" => Ok(GeneratedField::IntervalDaytimeValue), "durationSecondValue" | "duration_second_value" => Ok(GeneratedField::DurationSecondValue), "durationMillisecondValue" | "duration_millisecond_value" => Ok(GeneratedField::DurationMillisecondValue), "durationMicrosecondValue" | "duration_microsecond_value" => Ok(GeneratedField::DurationMicrosecondValue), @@ -6414,6 +6524,7 @@ impl<'de> serde::Deserialize<'de> for ScalarValue { "binaryValue" | "binary_value" => Ok(GeneratedField::BinaryValue), "largeBinaryValue" | "large_binary_value" => Ok(GeneratedField::LargeBinaryValue), "time64Value" | "time64_value" => Ok(GeneratedField::Time64Value), + "intervalDaytimeValue" | "interval_daytime_value" => Ok(GeneratedField::IntervalDaytimeValue), "intervalMonthDayNano" | "interval_month_day_nano" => Ok(GeneratedField::IntervalMonthDayNano), "fixedSizeBinaryValue" | "fixed_size_binary_value" => Ok(GeneratedField::FixedSizeBinaryValue), "unionValue" | "union_value" => Ok(GeneratedField::UnionValue), @@ -6591,12 +6702,6 @@ impl<'de> serde::Deserialize<'de> for ScalarValue { } value__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| scalar_value::Value::IntervalYearmonthValue(x.0)); } - GeneratedField::IntervalDaytimeValue => { - if value__.is_some() { - return Err(serde::de::Error::duplicate_field("intervalDaytimeValue")); - } - value__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| scalar_value::Value::IntervalDaytimeValue(x.0)); - } GeneratedField::DurationSecondValue => { if value__.is_some() { return Err(serde::de::Error::duplicate_field("durationSecondValue")); @@ -6652,6 +6757,13 @@ impl<'de> serde::Deserialize<'de> for ScalarValue { return Err(serde::de::Error::duplicate_field("time64Value")); } value__ = map_.next_value::<::std::option::Option<_>>()?.map(scalar_value::Value::Time64Value) +; + } + GeneratedField::IntervalDaytimeValue => { + if value__.is_some() { + return Err(serde::de::Error::duplicate_field("intervalDaytimeValue")); + } + value__ = map_.next_value::<::std::option::Option<_>>()?.map(scalar_value::Value::IntervalDaytimeValue) ; } GeneratedField::IntervalMonthDayNano => { diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index c8f277c8f37e..48da143bc7ed 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -275,6 +275,14 @@ pub struct ScalarDictionaryValue { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct IntervalDayTimeValue { + #[prost(int32, tag = "1")] + pub days: i32, + #[prost(int32, tag = "2")] + pub milliseconds: i32, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct IntervalMonthDayNanoValue { #[prost(int32, tag = "1")] pub months: i32, @@ -318,7 +326,7 @@ pub struct ScalarFixedSizeBinary { pub struct ScalarValue { #[prost( oneof = "scalar_value::Value", - tags = "33, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 32, 20, 39, 21, 24, 25, 35, 36, 37, 38, 26, 27, 28, 29, 30, 31, 34, 42" + tags = "33, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 32, 20, 39, 21, 24, 35, 36, 37, 38, 26, 27, 28, 29, 30, 25, 31, 34, 42" )] pub value: ::core::option::Option, } @@ -378,8 +386,6 @@ pub mod scalar_value { Date64Value(i64), #[prost(int32, tag = "24")] IntervalYearmonthValue(i32), - #[prost(int64, tag = "25")] - IntervalDaytimeValue(i64), #[prost(int64, tag = "35")] DurationSecondValue(i64), #[prost(int64, tag = "36")] @@ -398,6 +404,8 @@ pub mod scalar_value { LargeBinaryValue(::prost::alloc::vec::Vec), #[prost(message, tag = "30")] Time64Value(super::ScalarTime64Value), + #[prost(message, tag = "25")] + IntervalDaytimeValue(super::IntervalDayTimeValue), #[prost(message, tag = "31")] IntervalMonthDayNano(super::IntervalMonthDayNanoValue), #[prost(message, tag = "34")] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index a92deaa88b1c..28f6952aac44 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -24,8 +24,8 @@ use crate::protobuf_common::{ use arrow::array::{ArrayRef, RecordBatch}; use arrow::csv::WriterBuilder; use arrow::datatypes::{ - DataType, Field, IntervalMonthDayNanoType, IntervalUnit, Schema, SchemaRef, TimeUnit, - UnionMode, + DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema, + SchemaRef, TimeUnit, UnionMode, }; use arrow::ipc::writer::{DictionaryTracker, IpcDataGenerator}; use datafusion_common::{ @@ -452,11 +452,6 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { Value::IntervalYearmonthValue(*s) }) } - ScalarValue::IntervalDayTime(val) => { - create_proto_scalar(val.as_ref(), &data_type, |s| { - Value::IntervalDaytimeValue(*s) - }) - } ScalarValue::Null => Ok(protobuf::ScalarValue { value: Some(Value::NullValue((&data_type).try_into()?)), }), @@ -526,6 +521,20 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { }) } + ScalarValue::IntervalDayTime(val) => { + let value = if let Some(v) = val { + let (days, milliseconds) = IntervalDayTimeType::to_parts(*v); + Value::IntervalDaytimeValue(protobuf::IntervalDayTimeValue { + days, + milliseconds, + }) + } else { + Value::NullValue((&data_type).try_into()?) + }; + + Ok(protobuf::ScalarValue { value: Some(value) }) + } + ScalarValue::IntervalMonthDayNano(v) => { let value = if let Some(v) = v { let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v); diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index c8f277c8f37e..48da143bc7ed 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -275,6 +275,14 @@ pub struct ScalarDictionaryValue { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct IntervalDayTimeValue { + #[prost(int32, tag = "1")] + pub days: i32, + #[prost(int32, tag = "2")] + pub milliseconds: i32, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct IntervalMonthDayNanoValue { #[prost(int32, tag = "1")] pub months: i32, @@ -318,7 +326,7 @@ pub struct ScalarFixedSizeBinary { pub struct ScalarValue { #[prost( oneof = "scalar_value::Value", - tags = "33, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 32, 20, 39, 21, 24, 25, 35, 36, 37, 38, 26, 27, 28, 29, 30, 31, 34, 42" + tags = "33, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 32, 20, 39, 21, 24, 35, 36, 37, 38, 26, 27, 28, 29, 30, 25, 31, 34, 42" )] pub value: ::core::option::Option, } @@ -378,8 +386,6 @@ pub mod scalar_value { Date64Value(i64), #[prost(int32, tag = "24")] IntervalYearmonthValue(i32), - #[prost(int64, tag = "25")] - IntervalDaytimeValue(i64), #[prost(int64, tag = "35")] DurationSecondValue(i64), #[prost(int64, tag = "36")] @@ -398,6 +404,8 @@ pub mod scalar_value { LargeBinaryValue(::prost::alloc::vec::Vec), #[prost(message, tag = "30")] Time64Value(super::ScalarTime64Value), + #[prost(message, tag = "25")] + IntervalDaytimeValue(super::IntervalDayTimeValue), #[prost(message, tag = "31")] IntervalMonthDayNano(super::IntervalMonthDayNanoValue), #[prost(message, tag = "34")] diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 1f064ea0f543..f7c4edbcc7ad 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -2139,7 +2139,7 @@ fn union_with_incompatible_data_type() { .expect_err("query should have failed") .strip_backtrace(); assert_eq!( - "Error during planning: UNION Column Int64(1) (type: Int64) is not compatible with column IntervalMonthDayNano(\"950737950189618795196236955648\") (type: Interval(MonthDayNano))", + "Error during planning: UNION Column Int64(1) (type: Int64) is not compatible with column IntervalMonthDayNano(\"IntervalMonthDayNano { months: 12, days: 1, nanoseconds: 0 }\") (type: Interval(MonthDayNano))", err ); } @@ -2829,7 +2829,7 @@ fn join_with_aliases() { fn negative_interval_plus_interval_in_projection() { let sql = "select -interval '2 days' + interval '5 days';"; let expected = - "Projection: IntervalMonthDayNano(\"79228162477370849446124847104\") + IntervalMonthDayNano(\"92233720368547758080\")\n EmptyRelation"; + "Projection: IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: -2, nanoseconds: 0 }\") + IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: 5, nanoseconds: 0 }\")\n EmptyRelation"; quick_test(sql, expected); } @@ -2837,7 +2837,7 @@ fn negative_interval_plus_interval_in_projection() { fn complex_interval_expression_in_projection() { let sql = "select -interval '2 days' + interval '5 days'+ (-interval '3 days' + interval '5 days');"; let expected = - "Projection: IntervalMonthDayNano(\"79228162477370849446124847104\") + IntervalMonthDayNano(\"92233720368547758080\") + IntervalMonthDayNano(\"79228162458924105372415295488\") + IntervalMonthDayNano(\"92233720368547758080\")\n EmptyRelation"; + "Projection: IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: -2, nanoseconds: 0 }\") + IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: 5, nanoseconds: 0 }\") + IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: -3, nanoseconds: 0 }\") + IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: 5, nanoseconds: 0 }\")\n EmptyRelation"; quick_test(sql, expected); } @@ -2845,7 +2845,7 @@ fn complex_interval_expression_in_projection() { fn negative_sum_intervals_in_projection() { let sql = "select -((interval '2 days' + interval '5 days') + -(interval '4 days' + interval '7 days'));"; let expected = - "Projection: (- IntervalMonthDayNano(\"36893488147419103232\") + IntervalMonthDayNano(\"92233720368547758080\") + (- IntervalMonthDayNano(\"73786976294838206464\") + IntervalMonthDayNano(\"129127208515966861312\")))\n EmptyRelation"; + "Projection: (- IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: 2, nanoseconds: 0 }\") + IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: 5, nanoseconds: 0 }\") + (- IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: 4, nanoseconds: 0 }\") + IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: 7, nanoseconds: 0 }\")))\n EmptyRelation"; quick_test(sql, expected); } @@ -2853,8 +2853,7 @@ fn negative_sum_intervals_in_projection() { fn date_plus_interval_in_projection() { let sql = "select t_date32 + interval '5 days' FROM test"; let expected = - "Projection: test.t_date32 + IntervalMonthDayNano(\"92233720368547758080\")\ - \n TableScan: test"; + "Projection: test.t_date32 + IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: 5, nanoseconds: 0 }\")\n TableScan: test"; quick_test(sql, expected); } @@ -2866,7 +2865,7 @@ fn date_plus_interval_in_filter() { AND cast('1999-12-31' as date) + interval '30 days'"; let expected = "Projection: test.t_date64\ - \n Filter: test.t_date64 BETWEEN CAST(Utf8(\"1999-12-31\") AS Date32) AND CAST(Utf8(\"1999-12-31\") AS Date32) + IntervalMonthDayNano(\"553402322211286548480\")\ + \n Filter: test.t_date64 BETWEEN CAST(Utf8(\"1999-12-31\") AS Date32) AND CAST(Utf8(\"1999-12-31\") AS Date32) + IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: 30, nanoseconds: 0 }\")\ \n TableScan: test"; quick_test(sql, expected); } diff --git a/datafusion/sqllogictest/test_files/arrow_typeof.slt b/datafusion/sqllogictest/test_files/arrow_typeof.slt index 94cce61245e1..c928b96e0321 100644 --- a/datafusion/sqllogictest/test_files/arrow_typeof.slt +++ b/datafusion/sqllogictest/test_files/arrow_typeof.slt @@ -336,7 +336,7 @@ select arrow_cast(timestamp '2000-01-01T00:00:00Z', 'Timestamp(Nanosecond, Some( ---- 2000-01-01T00:00:00+08:00 -statement error DataFusion error: Arrow error: Parser error: Invalid timezone "\+25:00": '\+25:00' is not a valid timezone +statement error DataFusion error: Arrow error: Parser error: Invalid timezone "\+25:00": failed to parse timezone select arrow_cast(timestamp '2000-01-01T00:00:00', 'Timestamp(Nanosecond, Some( "+25:00" ))'); diff --git a/datafusion/sqllogictest/test_files/expr.slt b/datafusion/sqllogictest/test_files/expr.slt index cb2bb9fad1b7..033ea2208f1a 100644 --- a/datafusion/sqllogictest/test_files/expr.slt +++ b/datafusion/sqllogictest/test_files/expr.slt @@ -63,7 +63,7 @@ SELECT NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL # test_array_cast_invalid_timezone_will_panic -statement error Parser error: Invalid timezone "Foo": 'Foo' is not a valid timezone +statement error Parser error: Invalid timezone "Foo": failed to parse timezone SELECT arrow_cast('2021-01-02T03:04:00', 'Timestamp(Nanosecond, Some("Foo"))') # test_array_index diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index efbf0df3830c..cbcfc9364565 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -4226,7 +4226,7 @@ logical_plan 01)Limit: skip=0, fetch=5 02)--Sort: time_chunks DESC NULLS FIRST, fetch=5 03)----Projection: date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts) AS time_chunks -04)------Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("900000000000"), unbounded_csv_with_timestamps.ts) AS date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)]], aggr=[[]] +04)------Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }"), unbounded_csv_with_timestamps.ts) AS date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)]], aggr=[[]] 05)--------TableScan: unbounded_csv_with_timestamps projection=[ts] physical_plan 01)GlobalLimitExec: skip=0, fetch=5 @@ -4235,7 +4235,7 @@ physical_plan 04)------AggregateExec: mode=FinalPartitioned, gby=[date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)@0 as date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)], aggr=[], ordering_mode=Sorted 05)--------CoalesceBatchesExec: target_batch_size=2 06)----------RepartitionExec: partitioning=Hash([date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)@0], 8), input_partitions=8, preserve_order=true, sort_exprs=date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)@0 DESC -07)------------AggregateExec: mode=Partial, gby=[date_bin(900000000000, ts@0) as date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)], aggr=[], ordering_mode=Sorted +07)------------AggregateExec: mode=Partial, gby=[date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@0) as date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)], aggr=[], ordering_mode=Sorted 08)--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 09)----------------StreamingTableExec: partition_sizes=1, projection=[ts], infinite_source=true, output_ordering=[ts@0 DESC] @@ -4328,12 +4328,12 @@ EXPLAIN SELECT name, date_bin('15 minutes', ts) as time_chunks logical_plan 01)Limit: skip=0, fetch=5 02)--Sort: unbounded_csv_with_timestamps2.name DESC NULLS FIRST, time_chunks DESC NULLS FIRST, fetch=5 -03)----Projection: unbounded_csv_with_timestamps2.name, date_bin(IntervalMonthDayNano("900000000000"), unbounded_csv_with_timestamps2.ts) AS time_chunks +03)----Projection: unbounded_csv_with_timestamps2.name, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }"), unbounded_csv_with_timestamps2.ts) AS time_chunks 04)------TableScan: unbounded_csv_with_timestamps2 projection=[name, ts] physical_plan 01)GlobalLimitExec: skip=0, fetch=5 02)--SortPreservingMergeExec: [name@0 DESC,time_chunks@1 DESC], fetch=5 -03)----ProjectionExec: expr=[name@0 as name, date_bin(900000000000, ts@1) as time_chunks] +03)----ProjectionExec: expr=[name@0 as name, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@1) as time_chunks] 04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 05)--------StreamingTableExec: partition_sizes=1, projection=[name, ts], infinite_source=true, output_ordering=[name@0 DESC, ts@1 DESC] diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 2678e8cbd1ba..51de40fb1972 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -465,11 +465,11 @@ ORDER BY db15; ---- logical_plan 01)Sort: db15 ASC NULLS LAST -02)--Projection: date_bin(IntervalMonthDayNano("900000000000"), csv_with_timestamps.ts, TimestampNanosecond(1659537600000000000, None)) AS db15 +02)--Projection: date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }"), csv_with_timestamps.ts, TimestampNanosecond(1659537600000000000, None)) AS db15 03)----TableScan: csv_with_timestamps projection=[ts] physical_plan 01)SortPreservingMergeExec: [db15@0 ASC NULLS LAST] -02)--ProjectionExec: expr=[date_bin(900000000000, ts@0, 1659537600000000000) as db15] +02)--ProjectionExec: expr=[date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@0, 1659537600000000000) as db15] 03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/timestamps.csv]]}, projection=[ts], output_ordering=[ts@0 ASC NULLS LAST], has_header=false diff --git a/datafusion/sqllogictest/test_files/set_variable.slt b/datafusion/sqllogictest/test_files/set_variable.slt index fccd144a37fb..6f19c9f4d42f 100644 --- a/datafusion/sqllogictest/test_files/set_variable.slt +++ b/datafusion/sqllogictest/test_files/set_variable.slt @@ -216,19 +216,19 @@ set datafusion.catalog.information_schema = true statement ok SET TIME ZONE = '+08:00:00' -statement error Arrow error: Parser error: Invalid timezone "\+08:00:00": '\+08:00:00' is not a valid timezone +statement error Arrow error: Parser error: Invalid timezone "\+08:00:00": failed to parse timezone SELECT '2000-01-01T00:00:00'::TIMESTAMP::TIMESTAMPTZ statement ok SET TIME ZONE = '08:00' -statement error Arrow error: Parser error: Invalid timezone "08:00": '08:00' is not a valid timezone +statement error Arrow error: Parser error: Invalid timezone "08:00": failed to parse timezone SELECT '2000-01-01T00:00:00'::TIMESTAMP::TIMESTAMPTZ statement ok SET TIME ZONE = '08' -statement error Arrow error: Parser error: Invalid timezone "08": '08' is not a valid timezone +statement error Arrow error: Parser error: Invalid timezone "08": failed to parse timezone SELECT '2000-01-01T00:00:00'::TIMESTAMP::TIMESTAMPTZ statement ok @@ -242,5 +242,5 @@ SELECT '2000-01-01T00:00:00'::TIMESTAMP::TIMESTAMPTZ statement ok SET TIME ZONE = 'Asia/Taipei2' -statement error Arrow error: Parser error: Invalid timezone "Asia/Taipei2": 'Asia/Taipei2' is not a valid timezone +statement error Arrow error: Parser error: Invalid timezone "Asia/Taipei2": failed to parse timezone SELECT '2000-01-01T00:00:00'::TIMESTAMP::TIMESTAMPTZ diff --git a/datafusion/sqllogictest/test_files/timestamps.slt b/datafusion/sqllogictest/test_files/timestamps.slt index 5f75bca4f0fa..7d5d601bbfdd 100644 --- a/datafusion/sqllogictest/test_files/timestamps.slt +++ b/datafusion/sqllogictest/test_files/timestamps.slt @@ -466,7 +466,7 @@ query error Cannot cast string '24:01:02' to value of Time64\(Nanosecond\) type SELECT TIME '24:01:02' as time; # invalid timezone -query error Arrow error: Parser error: Invalid timezone "ZZ": 'ZZ' is not a valid timezone +query error Arrow error: Parser error: Invalid timezone "ZZ": failed to parse timezone SELECT TIMESTAMP '2023-12-05T21:58:10.45ZZ'; statement ok diff --git a/datafusion/substrait/Cargo.toml b/datafusion/substrait/Cargo.toml index f614fd6b3fd0..9322412c0ddb 100644 --- a/datafusion/substrait/Cargo.toml +++ b/datafusion/substrait/Cargo.toml @@ -32,6 +32,7 @@ rust-version = "1.73" workspace = true [dependencies] +arrow-buffer = { workspace = true } async-recursion = "1.0" chrono = { workspace = true } datafusion = { workspace = true, default-features = true } diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 597f34e89a02..d68711e8609c 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -23,11 +23,13 @@ use datafusion::common::{ not_impl_err, substrait_datafusion_err, substrait_err, DFSchema, DFSchemaRef, }; +use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::{ aggregate_function, expr::find_df_window_func, BinaryExpr, Case, EmptyRelation, Expr, LogicalPlan, Operator, ScalarUDF, Values, }; + use datafusion::logical_expr::{ expr, Cast, Extension, GroupingSet, Like, LogicalPlanBuilder, Partitioning, Repartition, Subquery, WindowFrameBound, WindowFrameUnits, @@ -1563,7 +1565,13 @@ fn from_substrait_literal( "Failed to parse interval day time value" ) })?; - ScalarValue::IntervalDayTime(Some(i64::from_le_bytes(value_slice))) + let days = i32::from_le_bytes(value_slice[0..4].try_into().unwrap()); + let milliseconds = + i32::from_le_bytes(value_slice[4..8].try_into().unwrap()); + ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days, + milliseconds, + })) } INTERVAL_MONTH_DAY_NANO_TYPE_REF => { let Some(Val::Value(raw_val)) = user_defined.val.as_ref() else { @@ -1575,9 +1583,16 @@ fn from_substrait_literal( "Failed to parse interval month day nano value" ) })?; - ScalarValue::IntervalMonthDayNano(Some(i128::from_le_bytes( - value_slice, - ))) + let months = + i32::from_le_bytes(value_slice[0..4].try_into().unwrap()); + let days = i32::from_le_bytes(value_slice[4..8].try_into().unwrap()); + let nanoseconds = + i64::from_le_bytes(value_slice[8..16].try_into().unwrap()); + ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano { + months, + days, + nanoseconds, + })) } _ => { return not_impl_err!( diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index 0208b010c856..6c8be4aa9b12 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; +use arrow_buffer::ToByteSlice; use datafusion::arrow::datatypes::IntervalUnit; use datafusion::logical_expr::{ CrossJoin, Distinct, Like, Partitioning, WindowFrameUnits, @@ -1949,7 +1950,7 @@ fn to_substrait_literal(value: &ScalarValue) -> Result { } ScalarValue::IntervalMonthDayNano(Some(i)) => { // treat `i128` as two contiguous `i64` - let bytes = i.to_le_bytes(); + let bytes = i.to_byte_slice(); let i64_param = Parameter { parameter: Some(parameter::Parameter::DataType(substrait::proto::Type { kind: Some(r#type::Kind::I64(r#type::I64 { @@ -1971,7 +1972,7 @@ fn to_substrait_literal(value: &ScalarValue) -> Result { ) } ScalarValue::IntervalDayTime(Some(i)) => { - let bytes = i.to_le_bytes(); + let bytes = i.to_byte_slice(); ( LiteralType::UserDefined(UserDefined { type_reference: INTERVAL_DAY_TIME_TYPE_REF,