diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b9f1720..81f6d9d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -130,7 +130,7 @@ jobs: run: | rustup toolchain install ${{ matrix.rust }} rustup default ${{ matrix.rust }} - - name: Run FlightSQL tests + - name: Run Functions-JSON tests run: | cargo test --features=functions-json extension_cases::functions_json test-deltalake: diff --git a/Cargo.lock b/Cargo.lock index 9a153e2..c8213f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -29,6 +29,17 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" +[[package]] +name = "ahash" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" +dependencies = [ + "getrandom", + "once_cell", + "version_check", +] + [[package]] name = "ahash" version = "0.8.11" @@ -165,12 +176,42 @@ dependencies = [ "strum 0.25.0", "strum_macros 0.25.3", "thiserror 1.0.63", - "typed-builder", + "typed-builder 0.16.2", "uuid", "xz2", "zstd 0.12.4", ] +[[package]] +name = "apache-avro" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aef82843a0ec9f8b19567445ad2421ceeb1d711514384bdd3d49fe37102ee13" +dependencies = [ + "bigdecimal", + "digest", + "libflate", + "log", + "num-bigint", + "quad-rand", + "rand", + "regex-lite", + "serde", + "serde_bytes", + "serde_json", + "strum 0.26.3", + "strum_macros 0.26.4", + "thiserror 1.0.63", + "typed-builder 0.19.1", + "uuid", +] + +[[package]] +name = "array-init" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d62b7694a562cdf5a74227903507c56ab2cc8bdd1f781ed5cb4cf9c9f810bfc" + [[package]] name = "arrayref" version = "0.3.9" @@ -225,7 +266,7 @@ version = "53.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd03279cea46569acf9295f6224fbc370c5df184b4d2ecfe97ccb131d5615a7f" dependencies = [ - "ahash", + "ahash 0.8.11", "arrow-buffer", "arrow-data", "arrow-schema", @@ -383,7 +424,7 @@ version = "53.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "745c114c8f0e8ce211c83389270de6fbe96a9088a7b32c2a041258a443fe83ff" dependencies = [ - "ahash", + "ahash 0.8.11", "arrow-array", "arrow-buffer", "arrow-data", @@ -406,7 +447,7 @@ version = "53.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e415279094ea70323c032c6e739c48ad8d80e78a09bef7117b8718ad5bf3722" dependencies = [ - "ahash", + "ahash 0.8.11", "arrow-array", "arrow-buffer", "arrow-data", @@ -465,6 +506,17 @@ dependencies = [ "zstd-safe 7.2.1", ] +[[package]] +name = "async-lock" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -593,6 +645,17 @@ dependencies = [ "tower-service", ] +[[package]] +name = "backon" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5289ec98f68f28dd809fd601059e6aa908bb8f6108620930828283d4ee23d7" +dependencies = [ + "fastrand", + "gloo-timers", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.71" @@ -614,6 +677,26 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "bigdecimal" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f850665a0385e070b64c38d2354e6c104c8479c59868d1e48a0c13ee2c7a1c1" +dependencies = [ + "autocfg", + "libm", + "num-bigint", + "num-integer", + "num-traits", + "serde", +] + +[[package]] +name = "bimap" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" + [[package]] name = "bindgen" version = "0.69.5" @@ -692,6 +775,29 @@ dependencies = [ "generic-array", ] +[[package]] +name = "borsh" +version = "1.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2506947f73ad44e344215ccd6403ac2ae18cd8e046e581a441bf8d199f257f03" +dependencies = [ + "borsh-derive", + "cfg_aliases", +] + +[[package]] +name = "borsh-derive" +version = "1.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2593a3b8b938bd68373196c9832f516be11fa487ef4ae745eb282e6a56a7244" +dependencies = [ + "once_cell", + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "brotli" version = "7.0.0" @@ -730,6 +836,28 @@ version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" +[[package]] +name = "bytecheck" +version = "0.6.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23cdc57ce23ac53c931e88a43d06d070a6fd142f2617be5855eb75efc9beb1c2" +dependencies = [ + "bytecheck_derive", + "ptr_meta", + "simdutf8", +] + +[[package]] +name = "bytecheck_derive" +version = "0.6.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3db406d29fbcd95542e92559bed4d8ad92636d1ca8b3b72ede10b4bcc010e659" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "bytemuck" version = "1.18.0" @@ -810,6 +938,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chrono" version = "0.4.38" @@ -964,6 +1098,15 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console" version = "0.15.8" @@ -976,6 +1119,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "const-oid" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" + [[package]] name = "const-random" version = "0.1.18" @@ -1036,6 +1185,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32c" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47" +dependencies = [ + "rustc_version", +] + [[package]] name = "crc32fast" version = "1.4.2" @@ -1045,6 +1203,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-epoch" version = "0.9.18" @@ -1123,6 +1290,41 @@ dependencies = [ "memchr", ] +[[package]] +name = "darling" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f63b86c8a8826a49b8c21f08a2d07338eec8d900540f8630dc76284be802989" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.87", +] + +[[package]] +name = "darling_macro" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" +dependencies = [ + "darling_core", + "quote", + "syn 2.0.87", +] + [[package]] name = "dary_heap" version = "0.3.6" @@ -1149,8 +1351,8 @@ version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cbba0799cf6913b456ed07a94f0f3b6e12c62a5d88b10809e2284a0f2b915c05" dependencies = [ - "ahash", - "apache-avro", + "ahash 0.8.11", + "apache-avro 0.16.0", "arrow", "arrow-array", "arrow-ipc", @@ -1223,8 +1425,8 @@ version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24953049ebbd6f8964f91f60aa3514e121b5e81e068e33b60e77815ab369b25c" dependencies = [ - "ahash", - "apache-avro", + "ahash 0.8.11", + "apache-avro 0.16.0", "arrow", "arrow-array", "arrow-buffer", @@ -1280,7 +1482,7 @@ version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8036495980e3131f706b7d33ab00b4492d73dc714e3cb74d11b50f9602a73246" dependencies = [ - "ahash", + "ahash 0.8.11", "arrow", "arrow-array", "arrow-buffer", @@ -1343,7 +1545,7 @@ version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5b8bb624597ba28ed7446df4a9bd7c7a7bde7c578b6b527da3f47371d5f6741" dependencies = [ - "ahash", + "ahash 0.8.11", "arrow", "arrow-schema", "datafusion-common", @@ -1364,7 +1566,7 @@ version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fb06208fc470bc8cf1ce2d9a1159d42db591f2c7264a8c1776b53ad8f675143" dependencies = [ - "ahash", + "ahash 0.8.11", "arrow", "datafusion-common", "datafusion-expr-common", @@ -1468,7 +1670,7 @@ version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3370357b8fc75ec38577700644e5d1b0bc78f38babab99c0b8bd26bafb3e4335" dependencies = [ - "ahash", + "ahash 0.8.11", "arrow", "arrow-array", "arrow-buffer", @@ -1496,7 +1698,7 @@ version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8b7734d94bf2fa6f6e570935b0ddddd8421179ce200065be97874e13d46a47b" dependencies = [ - "ahash", + "ahash 0.8.11", "arrow", "datafusion-common", "datafusion-expr-common", @@ -1526,7 +1728,7 @@ version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17e1fc2e2c239d14e8556f2622b19a726bf6bc6962cc00c71fc52626274bee24" dependencies = [ - "ahash", + "ahash 0.8.11", "arrow", "arrow-array", "arrow-buffer", @@ -1722,6 +1924,47 @@ dependencies = [ "z85", ] +[[package]] +name = "deranged" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +dependencies = [ + "powerfmt", + "serde", +] + +[[package]] +name = "derive_builder" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "507dfb09ea8b7fa618fcf76e953f4f5e192547945816d5358edffe39f6f94947" +dependencies = [ + "derive_builder_macro", +] + +[[package]] +name = "derive_builder_core" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.87", +] + +[[package]] +name = "derive_builder_macro" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c" +dependencies = [ + "derive_builder_core", + "syn 2.0.87", +] + [[package]] name = "dft" version = "0.1.0" @@ -1742,6 +1985,8 @@ dependencies = [ "futures", "http", "http-body", + "iceberg-catalog-rest", + "iceberg-datafusion", "insta", "itertools 0.13.0", "lazy_static", @@ -1783,6 +2028,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", + "const-oid", "crypto-common", "subtle", ] @@ -1808,6 +2054,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "dlv-list" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" +dependencies = [ + "const-random", +] + [[package]] name = "doc-comment" version = "0.3.3" @@ -1871,6 +2126,27 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "event-listener" +version = "5.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "eyre" version = "0.6.12" @@ -1913,6 +2189,12 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flagset" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3ea1ec5f8307826a5b71094dd91fc04d4ae75d5709b20ad351c7fb4815c86ec" + [[package]] name = "flatbuffers" version = "24.3.25" @@ -2090,8 +2372,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -2106,6 +2390,18 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "h2" version = "0.4.6" @@ -2141,6 +2437,9 @@ name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +dependencies = [ + "ahash 0.7.8", +] [[package]] name = "hashbrown" @@ -2148,7 +2447,7 @@ version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" dependencies = [ - "ahash", + "ahash 0.8.11", "allocator-api2", ] @@ -2185,6 +2484,15 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + [[package]] name = "home" version = "0.5.9" @@ -2283,6 +2591,7 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", + "webpki-roots", ] [[package]] @@ -2341,29 +2650,115 @@ dependencies = [ ] [[package]] -name = "idna" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" +name = "iceberg" +version = "0.3.0" +source = "git+https://github.com/apache/iceberg-rust?rev=16f9411dd3897134a401ece97d73cd33d6790bff#16f9411dd3897134a401ece97d73cd33d6790bff" dependencies = [ - "unicode-bidi", - "unicode-normalization", + "anyhow", + "apache-avro 0.17.0", + "array-init", + "arrow-arith", + "arrow-array", + "arrow-cast", + "arrow-ord", + "arrow-schema", + "arrow-select", + "arrow-string", + "async-trait", + "bimap", + "bitvec", + "bytes", + "chrono", + "derive_builder", + "fnv", + "futures", + "itertools 0.13.0", + "moka", + "murmur3", + "once_cell", + "opendal", + "ordered-float 4.5.0", + "parquet", + "paste", + "rand", + "reqwest", + "rust_decimal", + "serde", + "serde_bytes", + "serde_derive", + "serde_json", + "serde_repr", + "serde_with", + "tokio", + "typed-builder 0.20.0", + "url", + "uuid", ] [[package]] -name = "indenter" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683" - -[[package]] -name = "indexmap" -version = "1.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +name = "iceberg-catalog-rest" +version = "0.3.0" +source = "git+https://github.com/apache/iceberg-rust?rev=16f9411dd3897134a401ece97d73cd33d6790bff#16f9411dd3897134a401ece97d73cd33d6790bff" +dependencies = [ + "async-trait", + "chrono", + "http", + "iceberg", + "itertools 0.13.0", + "log", + "reqwest", + "serde", + "serde_derive", + "serde_json", + "tokio", + "typed-builder 0.20.0", + "uuid", +] + +[[package]] +name = "iceberg-datafusion" +version = "0.3.0" +source = "git+https://github.com/apache/iceberg-rust?rev=16f9411dd3897134a401ece97d73cd33d6790bff#16f9411dd3897134a401ece97d73cd33d6790bff" +dependencies = [ + "anyhow", + "async-trait", + "datafusion", + "futures", + "iceberg", + "tokio", +] + +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + +[[package]] +name = "idna" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + +[[package]] +name = "indenter" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683" + +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", "hashbrown 0.12.3", + "serde", ] [[package]] @@ -2374,6 +2769,7 @@ checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" dependencies = [ "equivalent", "hashbrown 0.15.1", + "serde", ] [[package]] @@ -2459,7 +2855,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "02e23549143ef50eddffd46ba8cd0229b0a4500aef7518cf2eb0f41c9a09d22b" dependencies = [ - "ahash", + "ahash 0.8.11", "bitvec", "lexical-parse-float 0.8.5", "num-bigint", @@ -2738,7 +3134,7 @@ version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ae428771d17306715c5091d446327d1cfdedc82185c65ba8423ab404e45bf10" dependencies = [ - "ahash", + "ahash 0.8.11", "portable-atomic", ] @@ -2826,6 +3222,36 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c9be0862c1b3f26a88803c4a49de6889c10e608b3ee9344e6ef5b45fb37ad3d1" +[[package]] +name = "moka" +version = "0.12.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32cf62eb4dd975d2dde76432fb1075c49e3ee2331cf36f1f8fd4b66550d32b6f" +dependencies = [ + "async-lock", + "async-trait", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "event-listener", + "futures-util", + "once_cell", + "parking_lot", + "quanta", + "rustc_version", + "smallvec", + "tagptr", + "thiserror 1.0.63", + "triomphe", + "uuid", +] + +[[package]] +name = "murmur3" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9252111cf132ba0929b6f8e030cac2a24b507f3a4d6db6fb2896f27b354c714b" + [[package]] name = "nom" version = "7.1.3" @@ -2874,6 +3300,7 @@ checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" dependencies = [ "num-integer", "num-traits", + "serde", ] [[package]] @@ -2885,6 +3312,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-integer" version = "0.1.46" @@ -2962,7 +3395,7 @@ dependencies = [ "md-5", "parking_lot", "percent-encoding", - "quick-xml", + "quick-xml 0.36.1", "rand", "reqwest", "ring", @@ -2981,6 +3414,36 @@ version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ea5043e58958ee56f3e15a90aee535795cd7dfd319846288d93c5b57d85cbe" +[[package]] +name = "opendal" +version = "0.50.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb28bb6c64e116ceaf8dd4e87099d3cfea4a58e85e62b104fef74c91afba0f44" +dependencies = [ + "anyhow", + "async-trait", + "backon", + "base64", + "bytes", + "chrono", + "crc32c", + "flagset", + "futures", + "getrandom", + "http", + "log", + "md-5", + "once_cell", + "percent-encoding", + "quick-xml 0.36.1", + "reqsign", + "reqwest", + "serde", + "serde_json", + "tokio", + "uuid", +] + [[package]] name = "openssl-probe" version = "0.1.5" @@ -3002,6 +3465,25 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-float" +version = "4.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c65ee1f9701bf938026630b455d5315f490640234259037edb259798b3bcf85e" +dependencies = [ + "num-traits", +] + +[[package]] +name = "ordered-multimap" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49203cdcae0030493bad186b28da2fa25645fa276a51b6fec8010d281e02ef79" +dependencies = [ + "dlv-list", + "hashbrown 0.14.5", +] + [[package]] name = "overload" version = "0.1.1" @@ -3014,6 +3496,12 @@ version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.3" @@ -3043,7 +3531,7 @@ version = "53.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b449890367085eb65d7d3321540abc3d7babbd179ce31df0016e90719114191" dependencies = [ - "ahash", + "ahash 0.8.11", "arrow-array", "arrow-buffer", "arrow-cast", @@ -3186,6 +3674,12 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.20" @@ -3235,6 +3729,15 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "proc-macro-crate" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecf48c7ca261d60b74ab1a7b20da18bede46776b2e55535cb958eb595c5fa7b" +dependencies = [ + "toml_edit", +] + [[package]] name = "proc-macro2" version = "1.0.86" @@ -3276,6 +3779,26 @@ dependencies = [ "prost", ] +[[package]] +name = "ptr_meta" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0738ccf7ea06b608c10564b31debd4f5bc5e197fc8bfe088f68ae5ce81e7a4f1" +dependencies = [ + "ptr_meta_derive", +] + +[[package]] +name = "ptr_meta_derive" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16b845dbfca988fa33db069c0e230574d15a3088f147a87b64c7589eb662c9ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "quad-rand" version = "0.2.2" @@ -3297,6 +3820,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "quick-xml" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86e446ed58cef1bbfe847bc2fda0e2e4ea9f0e57b90c507d4781292590d72a4e" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quick-xml" version = "0.36.1" @@ -3485,6 +4018,43 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" +[[package]] +name = "rend" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71fe3824f5629716b1589be05dacd749f6aa084c87e00e016714a8cdfccc997c" +dependencies = [ + "bytecheck", +] + +[[package]] +name = "reqsign" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb0075a66c8bfbf4cc8b70dca166e722e1f55a3ea9250ecbb85f4d92a5f64149" +dependencies = [ + "anyhow", + "async-trait", + "base64", + "chrono", + "form_urlencoded", + "getrandom", + "hex", + "hmac", + "home", + "http", + "log", + "percent-encoding", + "quick-xml 0.35.0", + "rand", + "reqwest", + "rust-ini", + "serde", + "serde_json", + "sha1", + "sha2", +] + [[package]] name = "reqwest" version = "0.12.7" @@ -3527,6 +4097,7 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", + "webpki-roots", "windows-registry", ] @@ -3545,6 +4116,35 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rkyv" +version = "0.7.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9008cd6385b9e161d8229e1f6549dd23c3d022f132a2ea37ac3a10ac4935779b" +dependencies = [ + "bitvec", + "bytecheck", + "bytes", + "hashbrown 0.12.3", + "ptr_meta", + "rend", + "rkyv_derive", + "seahash", + "tinyvec", + "uuid", +] + +[[package]] +name = "rkyv_derive" +version = "0.7.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "503d1d27590a2b0a3a4ca4c94755aa2875657196ecbf401a42eff41d7de532c0" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "rle-decode-fast" version = "1.0.3" @@ -3561,6 +4161,33 @@ dependencies = [ "byteorder", ] +[[package]] +name = "rust-ini" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e310ef0e1b6eeb79169a1171daf9abcb87a2e17c03bee2c4bb100b55c75409f" +dependencies = [ + "cfg-if", + "ordered-multimap", + "trim-in-place", +] + +[[package]] +name = "rust_decimal" +version = "1.36.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b082d80e3e3cc52b2ed634388d436fe1f4de6af5786cc2de9ba9737527bdf555" +dependencies = [ + "arrayvec", + "borsh", + "bytes", + "num-traits", + "rand", + "rkyv", + "serde", + "serde_json", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -3706,6 +4333,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "seahash" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" + [[package]] name = "security-framework" version = "2.11.1" @@ -3750,6 +4383,15 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde_bytes" +version = "0.11.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "387cc504cb06bb40a96c8e04e951fe01854cf6bc921053c954e4a606d9675c6a" +dependencies = [ + "serde", +] + [[package]] name = "serde_derive" version = "1.0.210" @@ -3773,6 +4415,17 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_repr" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "serde_spanned" version = "0.6.7" @@ -3794,6 +4447,47 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "3.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e28bdad6db2b8340e449f7108f020b3b092e8583a9e3fb82713e1d4e71fe817" +dependencies = [ + "base64", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.6.0", + "serde", + "serde_derive", + "serde_json", + "serde_with_macros", + "time", +] + +[[package]] +name = "serde_with_macros" +version = "3.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d846214a9854ef724f3da161b426242d8de7c1fc7de2f89bb1efcb154dca79d" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.87", +] + +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.8" @@ -3850,6 +4544,12 @@ dependencies = [ "libc", ] +[[package]] +name = "simdutf8" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" + [[package]] name = "similar" version = "2.6.0" @@ -4043,6 +4743,12 @@ dependencies = [ "futures-core", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tap" version = "1.0.1" @@ -4126,7 +4832,38 @@ checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" dependencies = [ "byteorder", "integer-encoding", - "ordered-float", + "ordered-float 2.10.1", +] + +[[package]] +name = "time" +version = "0.3.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" + +[[package]] +name = "time-macros" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" +dependencies = [ + "num-conv", + "time-core", ] [[package]] @@ -4395,6 +5132,18 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "trim-in-place" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc" + +[[package]] +name = "triomphe" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3" + [[package]] name = "try-lock" version = "0.2.5" @@ -4445,7 +5194,25 @@ version = "0.16.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34085c17941e36627a879208083e25d357243812c30e7d7387c3b954f30ade16" dependencies = [ - "typed-builder-macro", + "typed-builder-macro 0.16.2", +] + +[[package]] +name = "typed-builder" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06fbd5b8de54c5f7c91f6fe4cebb949be2125d7758e630bb58b1d831dbce600" +dependencies = [ + "typed-builder-macro 0.19.1", +] + +[[package]] +name = "typed-builder" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e14ed59dc8b7b26cacb2a92bad2e8b1f098806063898ab42a3bd121d7d45e75" +dependencies = [ + "typed-builder-macro 0.20.0", ] [[package]] @@ -4459,6 +5226,28 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "typed-builder-macro" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9534daa9fd3ed0bd911d462a37f172228077e7abf18c18a5f67199d959205f8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + +[[package]] +name = "typed-builder-macro" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "560b82d656506509d43abe30e0ba64c56b1953ab3d4fe7ba5902747a7a3cedd5" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "typenum" version = "1.17.0" @@ -4695,6 +5484,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.26.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d642ff16b7e79272ae451b7322067cdc17cadf68c23264be9d94a32319efe7e" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "which" version = "4.4.2" diff --git a/Cargo.toml b/Cargo.toml index d6b396b..8ff798e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,8 @@ env_logger = "0.11.5" futures = "0.3.30" http = "1" http-body = "1" +iceberg-catalog-rest = { git = "https://github.com/apache/iceberg-rust", rev = "16f9411dd3897134a401ece97d73cd33d6790bff", optional = true} +iceberg-datafusion = { git = "https://github.com/apache/iceberg-rust", rev = "16f9411dd3897134a401ece97d73cd33d6790bff", optional = true } itertools = "0.13.0" lazy_static = "1.4.0" log = "0.4.22" @@ -64,6 +66,7 @@ url = "2.5.2" [features] default = ["functions-parquet"] deltalake = ["dep:deltalake"] +iceberg = ["dep:iceberg-datafusion", "dep:iceberg-catalog-rest"] flightsql = ["dep:arrow-flight", "dep:tonic"] experimental-flightsql-server = ["flightsql"] s3 = ["object_store/aws", "url"] diff --git a/README.md b/README.md index fceebae..8a679e1 100644 --- a/README.md +++ b/README.md @@ -201,6 +201,23 @@ Register deltalake tables. For example: CREATE EXTERNAL TABLE table_name STORED AS DELTATABLE LOCATION 's3://bucket/table' ``` +##### Iceberg (`--features=iceberg`) + +Register iceberg tables. For example: + +```sql +CREATE EXTERNAL TABLE table_name STORED AS ICEBERG LOCATION 's3://bucket/table' +``` + +Register Iceberg Rest Catalog + +```toml +[[execution.iceberg.rest_catalog]] +name = "my_iceberg_catalog" +addr = "192.168.1.1:8181" +``` + + ##### Json Functions (`--features=function-json`) Adds functions from [datafusion-function-json] for querying JSON strings in DataFusion in `dft`. For example: diff --git a/rust-toolchain b/rust-toolchain new file mode 100644 index 0000000..0193dee --- /dev/null +++ b/rust-toolchain @@ -0,0 +1,2 @@ +[toolchain] +channel = "1.83.0" diff --git a/src/config.rs b/src/config.rs index adcb3f2..594260f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -176,6 +176,8 @@ pub struct ExecutionConfig { pub dedicated_executor_enabled: bool, #[serde(default = "default_dedicated_executor_threads")] pub dedicated_executor_threads: usize, + #[serde(default = "default_iceberg_config")] + pub iceberg: IcebergConfig, } fn default_ddl_path() -> Option { @@ -220,6 +222,12 @@ fn default_dedicated_executor_threads() -> usize { num_cpus::get() } +fn default_iceberg_config() -> IcebergConfig { + IcebergConfig { + rest_catalogs: Vec::new(), + } +} + impl Default for ExecutionConfig { fn default() -> Self { Self { @@ -231,10 +239,22 @@ impl Default for ExecutionConfig { flightsql_server_batch_size: default_flightsql_server_batch_size(), dedicated_executor_enabled: default_dedicated_executor_enabled(), dedicated_executor_threads: default_dedicated_executor_threads(), + iceberg: default_iceberg_config(), } } } +#[derive(Clone, Debug, Deserialize)] +pub struct RestCatalogConfig { + pub name: String, + pub addr: String, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct IcebergConfig { + pub rest_catalogs: Vec, +} + #[derive(Clone, Debug, Default, Deserialize)] pub struct InteractionConfig { #[serde(default = "default_mouse")] diff --git a/src/execution/local.rs b/src/execution/local.rs index 8ee694c..331b99f 100644 --- a/src/execution/local.rs +++ b/src/execution/local.rs @@ -27,9 +27,8 @@ use futures::TryFutureExt; use log::{debug, error, info}; use crate::config::ExecutionConfig; -use crate::extensions::{enabled_extensions, DftSessionStateBuilder}; use color_eyre::eyre::{self, Result}; -use datafusion::execution::SendableRecordBatchStream; +use datafusion::execution::{SendableRecordBatchStream, SessionState}; use datafusion::physical_plan::{execute_stream, ExecutionPlan}; use datafusion::prelude::*; use datafusion::sql::parser::{DFParser, Statement}; @@ -72,43 +71,29 @@ impl std::fmt::Debug for ExecutionContext { impl ExecutionContext { /// Construct a new `ExecutionContext` with the specified configuration - pub fn try_new(config: &ExecutionConfig, app_type: AppType) -> Result { - let mut builder = DftSessionStateBuilder::new(); + pub fn try_new( + config: &ExecutionConfig, + session_state: SessionState, + app_type: AppType, + ) -> Result { let mut executor = None; - match app_type { - AppType::Cli => { - builder = builder.with_batch_size(config.cli_batch_size); - } - AppType::Tui => { - builder = builder.with_batch_size(config.tui_batch_size); - } - AppType::FlightSQLServer => { - builder = builder.with_batch_size(config.flightsql_server_batch_size); - if config.dedicated_executor_enabled { - // Ideally we would only use `enable_time` but we are still doing - // some network requests as part of planning / execution which require network - // functionality. - - let runtime_builder = tokio::runtime::Builder::new_multi_thread(); - let dedicated_executor = - DedicatedExecutor::new("cpu_runtime", config.clone(), runtime_builder); - executor = Some(dedicated_executor) - } + if let AppType::FlightSQLServer = app_type { + if config.dedicated_executor_enabled { + // Ideally we would only use `enable_time` but we are still doing + // some network requests as part of planning / execution which require network + // functionality. + + let runtime_builder = tokio::runtime::Builder::new_multi_thread(); + let dedicated_executor = + DedicatedExecutor::new("cpu_runtime", config.clone(), runtime_builder); + executor = Some(dedicated_executor) } } - let extensions = enabled_extensions(); - for extension in &extensions { - builder = extension.register(config, builder)?; - } - let state = builder.build()?; - let mut session_ctx = SessionContext::new_with_state(state); + let mut session_ctx = SessionContext::new_with_state(session_state); - // Apply any additional setup to the session context (e.g. registering - // functions) - for extension in &extensions { - extension.register_on_ctx(config, &mut session_ctx)?; - } + #[cfg(feature = "functions-json")] + datafusion_functions_json::register_all(&mut session_ctx)?; // Register Parquet Metadata Function let session_ctx = session_ctx.enable_url_table(); diff --git a/src/execution/mod.rs b/src/execution/mod.rs index c96b651..5872951 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -31,8 +31,6 @@ pub use stats::{collect_plan_io_stats, ExecutionStats}; #[cfg(feature = "flightsql")] use self::flightsql::{FlightSQLClient, FlightSQLContext}; use self::local::ExecutionContext; -use crate::config::AppConfig; -use color_eyre::eyre::Result; use datafusion::prelude::*; pub enum AppType { @@ -58,17 +56,6 @@ impl AppExecution { } } - pub fn try_new_from_config(config: AppConfig, app_type: AppType) -> Result { - let context = ExecutionContext::try_new(&config.execution, app_type)?; - #[cfg(feature = "flightsql")] - let flightsql_context = FlightSQLContext::new(config.flightsql); - Ok(Self { - context, - #[cfg(feature = "flightsql")] - flightsql_context, - }) - } - pub fn execution_ctx(&self) -> &ExecutionContext { &self.context } diff --git a/src/extensions/builder.rs b/src/extensions/builder.rs index 119301b..2359098 100644 --- a/src/extensions/builder.rs +++ b/src/extensions/builder.rs @@ -17,7 +17,9 @@ //! [`DftSessionStateBuilder`] for configuring DataFusion [`SessionState`] -use datafusion::catalog::TableProviderFactory; +use color_eyre::eyre; +use datafusion::catalog::{CatalogProvider, CatalogProviderList, TableProviderFactory}; +use datafusion::catalog_common::MemoryCatalogProviderList; use datafusion::execution::context::SessionState; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::execution::session_state::SessionStateBuilder; @@ -26,6 +28,10 @@ use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; +use crate::{config::ExecutionConfig, execution::AppType}; + +use super::{enabled_extensions, Extension}; + /// Builds a DataFusion [`SessionState`] with any necessary configuration /// /// Ideally we would use the DataFusion [`SessionStateBuilder`], but it doesn't @@ -45,8 +51,11 @@ use std::sync::Arc; /// //#[derive(Debug)] pub struct DftSessionStateBuilder { + app_type: Option, + execution_config: Option, session_config: SessionConfig, table_factories: Option>>, + catalog_providers: Option>>, runtime_env: Option>, } @@ -54,10 +63,9 @@ impl Debug for DftSessionStateBuilder { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("DftSessionStateBuilder") .field("session_config", &self.session_config) - //.field("table_factories", &self.table_factories) .field( "table_factories", - &"TODO TableFactoryDoes not implement Debug", + &"TODO TableFactory does not implement Debug", ) .field("runtime_env", &self.runtime_env) .finish() @@ -77,11 +85,24 @@ impl DftSessionStateBuilder { Self { session_config, + app_type: None, + execution_config: None, table_factories: None, + catalog_providers: None, runtime_env: None, } } + pub fn with_app_type(mut self, app_type: AppType) -> Self { + self.app_type = Some(app_type); + self + } + + pub fn with_execution_config(mut self, app_type: ExecutionConfig) -> Self { + self.execution_config = Some(app_type); + self + } + /// Set the `batch_size` on the [`SessionConfig`] pub fn with_batch_size(mut self, batch_size: usize) -> Self { self.session_config = self.session_config.with_batch_size(batch_size); @@ -89,11 +110,7 @@ impl DftSessionStateBuilder { } /// Add a table factory to the list of factories on this builder - pub fn with_table_factory( - mut self, - name: &str, - factory: Arc, - ) -> Self { + pub fn add_table_factory(&mut self, name: &str, factory: Arc) { if self.table_factories.is_none() { self.table_factories = Some(HashMap::from([(name.to_string(), factory)])); } else { @@ -102,7 +119,18 @@ impl DftSessionStateBuilder { .unwrap() .insert(name.to_string(), factory); } - self + } + + /// Add a catalog provider to the list of providers on this builder + pub fn add_catalog_provider(&mut self, name: &str, factory: Arc) { + if self.catalog_providers.is_none() { + self.catalog_providers = Some(HashMap::from([(name.to_string(), factory)])); + } else { + self.catalog_providers + .as_mut() + .unwrap() + .insert(name.to_string(), factory); + } } /// Return the current [`RuntimeEnv`], creating a default if it doesn't exist @@ -113,14 +141,67 @@ impl DftSessionStateBuilder { self.runtime_env.as_ref().unwrap() } + pub async fn register_extension( + &mut self, + config: ExecutionConfig, + extension: Arc, + ) -> color_eyre::Result<()> { + extension + .register(config, self) + .await + .map_err(|_| eyre::eyre!("E")) + } + + pub async fn with_extensions(mut self) -> color_eyre::Result { + let extensions = enabled_extensions(); + + for extension in extensions { + let execution_config = self.execution_config.clone().unwrap_or_default(); + self.register_extension(execution_config, extension).await?; + } + + Ok(self) + } + + /// Apply all enabled extensions to the `SessionContext` + pub async fn register_extensions(&mut self, config: ExecutionConfig) -> color_eyre::Result<()> { + let extensions = enabled_extensions(); + + for extension in extensions { + self.register_extension(config.clone(), extension).await?; + } + + Ok(()) + } + /// Build the [`SessionState`] from the specified configuration pub fn build(self) -> datafusion_common::Result { let Self { - session_config, + app_type, + execution_config, + mut session_config, table_factories, + catalog_providers, runtime_env, + .. } = self; + let app_type = app_type.unwrap_or(AppType::Cli); + let execution_config = execution_config.unwrap_or_default(); + + match app_type { + AppType::Cli => { + session_config = session_config.with_batch_size(execution_config.cli_batch_size); + } + AppType::Tui => { + session_config = session_config.with_batch_size(execution_config.tui_batch_size); + } + AppType::FlightSQLServer => { + session_config = + session_config.with_batch_size(execution_config.flightsql_server_batch_size); + } + } + let mut builder = SessionStateBuilder::new() .with_default_features() .with_config(session_config); @@ -132,6 +213,14 @@ impl DftSessionStateBuilder { builder = builder.with_table_factories(table_factories); } + if let Some(catalog_providers) = catalog_providers { + let catalogs_list = MemoryCatalogProviderList::new(); + for (k, v) in catalog_providers { + catalogs_list.register_catalog(k, v); + } + builder = builder.with_catalog_list(Arc::new(catalogs_list)); + } + Ok(builder.build()) } } diff --git a/src/extensions/deltalake.rs b/src/extensions/deltalake.rs index 5004e0d..2732254 100644 --- a/src/extensions/deltalake.rs +++ b/src/extensions/deltalake.rs @@ -31,12 +31,15 @@ impl DeltaLakeExtension { } } +#[async_trait::async_trait] impl Extension for DeltaLakeExtension { - fn register( + async fn register( &self, - _config: &ExecutionConfig, - builder: DftSessionStateBuilder, - ) -> datafusion_common::Result { - Ok(builder.with_table_factory("DELTATABLE", Arc::new(DeltaTableFactory {}))) + _config: ExecutionConfig, + builder: &mut DftSessionStateBuilder, + ) -> datafusion_common::Result<()> { + println!("Registering deltalake"); + builder.add_table_factory("DELTATABLE", Arc::new(DeltaTableFactory {})); + Ok(()) } } diff --git a/src/extensions/functions_json.rs b/src/extensions/functions_json.rs deleted file mode 100644 index 7925fb7..0000000 --- a/src/extensions/functions_json.rs +++ /dev/null @@ -1,50 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! [datafusion-function-json] Integration: [JsonFunctionsExtension] -//! -//! [datafusion-function-json]: https://github.com/datafusion-contrib/datafusion-functions-json - -use crate::config::ExecutionConfig; -use crate::extensions::{DftSessionStateBuilder, Extension}; -use datafusion::prelude::SessionContext; -use datafusion_common::Result; - -#[derive(Debug, Default)] -pub struct JsonFunctionsExtension {} - -impl JsonFunctionsExtension { - pub fn new() -> Self { - Self {} - } -} - -impl Extension for JsonFunctionsExtension { - fn register( - &self, - _config: &ExecutionConfig, - builder: DftSessionStateBuilder, - ) -> datafusion_common::Result { - // - Ok(builder) - } - - fn register_on_ctx(&self, _config: &ExecutionConfig, ctx: &mut SessionContext) -> Result<()> { - datafusion_functions_json::register_all(ctx)?; - Ok(()) - } -} diff --git a/src/extensions/iceberg.rs b/src/extensions/iceberg.rs new file mode 100644 index 0000000..3c08ccd --- /dev/null +++ b/src/extensions/iceberg.rs @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! DeltaLake integration: [DeltaLakeExtension] + +use crate::config::ExecutionConfig; +use crate::extensions::{DftSessionStateBuilder, Extension}; +use datafusion_common::DataFusionError; +use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; +use iceberg_datafusion::{IcebergCatalogProvider, IcebergTableProviderFactory}; +use std::sync::Arc; + +#[derive(Debug, Default)] +pub struct IcebergExtension {} + +impl IcebergExtension { + pub fn new() -> Self { + Self {} + } +} + +#[async_trait::async_trait] +impl Extension for IcebergExtension { + async fn register( + &self, + config: ExecutionConfig, + builder: &mut DftSessionStateBuilder, + ) -> datafusion_common::Result<()> { + for cfg in config.iceberg.rest_catalogs { + let rest_catalog_config = RestCatalogConfig::builder().uri(cfg.addr).build(); + let rest_catalog = RestCatalog::new(rest_catalog_config); + let catalog_provider = IcebergCatalogProvider::try_new(Arc::new(rest_catalog)) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + builder.add_catalog_provider(&cfg.name, Arc::new(catalog_provider)); + } + // TODO Add Iceberg Catalog + let factory = Arc::new(IcebergTableProviderFactory {}); + builder.add_table_factory("ICEBERG", factory); + Ok(()) + } +} diff --git a/src/extensions/mod.rs b/src/extensions/mod.rs index 9e36b8d..f2d4bcc 100644 --- a/src/extensions/mod.rs +++ b/src/extensions/mod.rs @@ -20,42 +20,43 @@ use crate::config::ExecutionConfig; use datafusion::common::Result; use datafusion::prelude::SessionContext; -use std::fmt::Debug; +use std::{fmt::Debug, sync::Arc}; mod builder; #[cfg(feature = "deltalake")] mod deltalake; -#[cfg(feature = "functions-json")] -mod functions_json; +#[cfg(feature = "iceberg")] +mod iceberg; #[cfg(feature = "s3")] mod s3; pub use builder::DftSessionStateBuilder; +#[async_trait::async_trait] pub trait Extension: Debug { /// Registers this extension with the DataFusion [`SessionStateBuilder`] - fn register( + async fn register( &self, - _config: &ExecutionConfig, - _builder: DftSessionStateBuilder, - ) -> Result; + _config: ExecutionConfig, + _builder: &mut DftSessionStateBuilder, + ) -> Result<()>; - /// Registers this extension after the SessionContext has been created - /// (this is to match the historic way many extensions were registered) - /// TODO file a ticket upstream to use the builder pattern + // Registers this extension after the SessionContext has been created + // (this is to match the historic way many extensions were registered) + // TODO file a ticket upstream to use the builder pattern fn register_on_ctx(&self, _config: &ExecutionConfig, _ctx: &mut SessionContext) -> Result<()> { Ok(()) } } /// Return all extensions currently enabled -pub fn enabled_extensions() -> Vec> { +pub fn enabled_extensions() -> Vec> { vec![ #[cfg(feature = "s3")] - Box::new(s3::AwsS3Extension::new()), + Arc::new(s3::AwsS3Extension::new()), #[cfg(feature = "deltalake")] - Box::new(deltalake::DeltaLakeExtension::new()), - #[cfg(feature = "functions-json")] - Box::new(functions_json::JsonFunctionsExtension::new()), + Arc::new(deltalake::DeltaLakeExtension::new()), + #[cfg(feature = "iceberg")] + Arc::new(iceberg::IcebergExtension::new()), ] } diff --git a/src/extensions/s3.rs b/src/extensions/s3.rs index 377bd44..ed7ae7e 100644 --- a/src/extensions/s3.rs +++ b/src/extensions/s3.rs @@ -33,18 +33,19 @@ impl AwsS3Extension { } } +#[async_trait::async_trait] impl Extension for AwsS3Extension { - fn register( + async fn register( &self, - config: &ExecutionConfig, - mut builder: DftSessionStateBuilder, - ) -> datafusion_common::Result { + config: ExecutionConfig, + builder: &mut DftSessionStateBuilder, + ) -> datafusion_common::Result<()> { let Some(object_store_config) = &config.object_store else { - return Ok(builder); + return Ok(()); }; let Some(s3_configs) = &object_store_config.s3 else { - return Ok(builder); + return Ok(()); }; info!("S3 configs exists"); @@ -69,6 +70,6 @@ impl Extension for AwsS3Extension { } } - Ok(builder) + Ok(()) } } diff --git a/src/main.rs b/src/main.rs index a5f0b12..bbeb707 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,10 +22,10 @@ use dft::cli::CliApp; #[cfg(feature = "flightsql")] use dft::execution::flightsql::FlightSQLContext; use dft::execution::{local::ExecutionContext, AppExecution, AppType}; +use dft::extensions::DftSessionStateBuilder; #[cfg(feature = "experimental-flightsql-server")] use dft::server::FlightSqlApp; use dft::telemetry; -use dft::tui::state::AppState; use dft::tui::{state, App}; #[cfg(feature = "experimental-flightsql-server")] use log::info; @@ -34,8 +34,6 @@ use log::info; fn main() -> Result<()> { let cli = DftArgs::parse(); - let state = state::initialize(cli.config_path()); - // With Runtimes configured correctly the main Tokio runtime should only be used for network // IO, in which a single thread should be sufficient. // @@ -45,19 +43,30 @@ fn main() -> Result<()> { .enable_all() .build()?; - let entry_point = app_entry_point(cli, state); + let entry_point = app_entry_point(cli); runtime.block_on(entry_point) } -async fn app_entry_point(cli: DftArgs, state: AppState<'_>) -> Result<()> { +async fn app_entry_point(cli: DftArgs) -> Result<()> { + let state = state::initialize(cli.config_path()); + let session_state_builder = DftSessionStateBuilder::new() + .with_execution_config(state.config.execution.clone()) + .with_extensions() + .await?; #[cfg(feature = "experimental-flightsql-server")] if cli.serve { + // FlightSQL Server mode: start a FlightSQL server env_logger::init(); const DEFAULT_SERVER_ADDRESS: &str = "127.0.0.1:50051"; info!("Starting FlightSQL server on {}", DEFAULT_SERVER_ADDRESS); - let state = state::initialize(cli.config_path()); - let execution_ctx = - ExecutionContext::try_new(&state.config.execution, AppType::FlightSQLServer)?; + let session_state = session_state_builder + .with_app_type(AppType::FlightSQLServer) + .build()?; + let execution_ctx = ExecutionContext::try_new( + &state.config.execution, + session_state, + AppType::FlightSQLServer, + )?; if cli.run_ddl { execution_ctx.execute_ddl().await; } @@ -72,10 +81,12 @@ async fn app_entry_point(cli: DftArgs, state: AppState<'_>) -> Result<()> { app.run_app().await; return Ok(()); } - // CLI mode: executing commands from files or CLI arguments if !cli.files.is_empty() || !cli.commands.is_empty() { + // CLI mode: executing commands from files or CLI arguments env_logger::init(); - let execution_ctx = ExecutionContext::try_new(&state.config.execution, AppType::Cli)?; + let session_state = session_state_builder.with_app_type(AppType::Cli).build()?; + let execution_ctx = + ExecutionContext::try_new(&state.config.execution, session_state, AppType::Cli)?; #[allow(unused_mut)] let mut app_execution = AppExecution::new(execution_ctx); #[cfg(feature = "flightsql")] @@ -90,12 +101,13 @@ async fn app_entry_point(cli: DftArgs, state: AppState<'_>) -> Result<()> { } let app = CliApp::new(app_execution, cli.clone()); app.execute_files_or_commands().await?; - // FlightSQL Server mode: start a FlightSQL server } else { // TUI mode: running the TUI telemetry::initialize_logs()?; // use alternate logging for TUI let state = state::initialize(cli.config_path()); - let execution_ctx = ExecutionContext::try_new(&state.config.execution, AppType::Tui)?; + let session_state = session_state_builder.with_app_type(AppType::Tui).build()?; + let execution_ctx = + ExecutionContext::try_new(&state.config.execution, session_state, AppType::Tui)?; let app_execution = AppExecution::new(execution_ctx); let app = App::new(state, cli, app_execution); app.run_app().await?; diff --git a/src/tui/state/mod.rs b/src/tui/state/mod.rs index 2b94eda..bc359b6 100644 --- a/src/tui/state/mod.rs +++ b/src/tui/state/mod.rs @@ -82,7 +82,7 @@ pub fn initialize<'app>(config_path: PathBuf) -> AppState<'app> { AppState::new(config) } -impl<'app> AppState<'app> { +impl AppState<'_> { pub fn new(config: AppConfig) -> Self { let tabs = Tabs::default(); diff --git a/src/tui/state/tabs/flightsql.rs b/src/tui/state/tabs/flightsql.rs index 1b7fd85..030c99d 100644 --- a/src/tui/state/tabs/flightsql.rs +++ b/src/tui/state/tabs/flightsql.rs @@ -72,7 +72,7 @@ pub struct FlightSQLTabState<'app> { connection_status: FlightSQLConnectionStatus, } -impl<'app> FlightSQLTabState<'app> { +impl FlightSQLTabState<'_> { pub fn new(config: &AppConfig) -> Self { let empty_text = vec!["Enter a query here.".to_string()]; // TODO: Enable vim mode from config? diff --git a/src/tui/state/tabs/sql.rs b/src/tui/state/tabs/sql.rs index ed7dadb..630d9fb 100644 --- a/src/tui/state/tabs/sql.rs +++ b/src/tui/state/tabs/sql.rs @@ -74,7 +74,7 @@ pub struct SQLTabState<'app> { mode: SQLTabMode, } -impl<'app> SQLTabState<'app> { +impl SQLTabState<'_> { pub fn new(config: &AppConfig) -> Self { let empty_text = vec!["Enter a query here.".to_string()]; // TODO: Enable vim mode from config? diff --git a/tests/extension_cases/deltalake.rs b/tests/extension_cases/deltalake.rs index ef43692..b8d5659 100644 --- a/tests/extension_cases/deltalake.rs +++ b/tests/extension_cases/deltalake.rs @@ -19,7 +19,7 @@ use url::Url; use crate::extension_cases::TestExecution; -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_deltalake() { let test_exec = TestExecution::new(); @@ -27,6 +27,7 @@ async fn test_deltalake() { let path = Url::from_file_path(cwd.join("data/deltalake/simple_table")).unwrap(); let mut test_exec = test_exec + .await .with_setup(&format!( "CREATE EXTERNAL TABLE d STORED AS DELTATABLE LOCATION '{}';", path diff --git a/tests/extension_cases/functions_json.rs b/tests/extension_cases/functions_json.rs index 837abae..bf8ae1e 100644 --- a/tests/extension_cases/functions_json.rs +++ b/tests/extension_cases/functions_json.rs @@ -32,9 +32,9 @@ CREATE TABLE test_table ( "#; /// Ensure one of the functions `json_contains` function is properly registered -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_basic() { - let mut execution = TestExecution::new().with_setup(TEST_TABLE).await; + let mut execution = TestExecution::new().await.with_setup(TEST_TABLE).await; let actual = execution .run_and_format("SELECT id, json_contains(json_col, 'b') as json_contains FROM test_table") @@ -54,9 +54,9 @@ async fn test_basic() { } /// ensure the json operators like -> are properly registered -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_operators() { - let mut execution = TestExecution::new().with_setup(TEST_TABLE).await; + let mut execution = TestExecution::new().await.with_setup(TEST_TABLE).await; let actual = execution .run_and_format("SELECT id, json_col->'a' as json_col_a FROM test_table") diff --git a/tests/extension_cases/mod.rs b/tests/extension_cases/mod.rs index c91af39..9f50373 100644 --- a/tests/extension_cases/mod.rs +++ b/tests/extension_cases/mod.rs @@ -31,6 +31,7 @@ use datafusion::arrow::util::pretty::pretty_format_batches; use datafusion::sql::parser::DFParser; use datafusion_common::Result; use dft::execution::local::ExecutionContext; +use dft::extensions::DftSessionStateBuilder; use dft::{config::AppConfig, execution::AppType}; use futures::{StreamExt, TryStreamExt}; use log::debug; @@ -40,17 +41,25 @@ pub struct TestExecution { execution: ExecutionContext, } -impl Default for TestExecution { - fn default() -> Self { - Self::new() - } -} +// impl Default for TestExecution { +// fn default() -> Self { +// Self::new() +// } +// } impl TestExecution { - pub fn new() -> Self { + pub async fn new() -> Self { let config = AppConfig::default(); - let execution = ExecutionContext::try_new(&config.execution, AppType::Cli) - .expect("cannot create execution context"); + + let session_state = DftSessionStateBuilder::new() + .with_app_type(AppType::Cli) + .with_extensions() + .await + .unwrap() + .build() + .unwrap(); + let execution = + ExecutionContext::try_new(&config.execution, session_state, AppType::Cli).unwrap(); Self { execution } } diff --git a/tests/extension_cases/s3.rs b/tests/extension_cases/s3.rs index 0cce6da..989bb40 100644 --- a/tests/extension_cases/s3.rs +++ b/tests/extension_cases/s3.rs @@ -21,8 +21,8 @@ use assert_cmd::Command; use crate::{cli_cases::contains_str, config::TestConfigBuilder}; -#[test] -fn test_s3_basic() { +#[tokio::test(flavor = "multi_thread")] +async fn test_s3_basic() { let tempdir = tempfile::tempdir().unwrap(); let ddl_path = tempdir.path().join("my_ddl.sql"); let mut file = std::fs::File::create(ddl_path.clone()).unwrap(); diff --git a/tests/tui_cases/ddl.rs b/tests/tui_cases/ddl.rs index 4cdd848..aefd7a2 100644 --- a/tests/tui_cases/ddl.rs +++ b/tests/tui_cases/ddl.rs @@ -22,7 +22,7 @@ use crate::tui_cases::TestApp; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_create_table_ddl() { - let mut test_app = TestApp::new(); + let mut test_app = TestApp::new().await; let ddl = "CREATE TABLE foo AS VALUES (1);"; test_app @@ -45,7 +45,7 @@ async fn test_create_table_ddl() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_create_table_in_new_schema() { - let mut test_app = TestApp::new(); + let mut test_app = TestApp::new().await; let create_schema = "CREATE SCHEMA foo;"; let create_table = "CREATE TABLE foo.bar AS VALUES (1);"; @@ -70,7 +70,7 @@ async fn test_create_table_in_new_schema() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_create_table_in_new_catalog() { - let mut test_app = TestApp::new(); + let mut test_app = TestApp::new().await; let create_catalog = "CREATE DATABASE foo;"; let create_schema = "CREATE SCHEMA foo.bar;"; @@ -97,7 +97,7 @@ async fn test_create_table_in_new_catalog() { // Stupid test but its all ive got for now #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_create_table_ddl_error() { - let mut test_app = TestApp::new(); + let mut test_app = TestApp::new().await; test_app.handle_app_event(AppEvent::DDLError).unwrap(); diff --git a/tests/tui_cases/flightsql_pagination.rs b/tests/tui_cases/flightsql_pagination.rs index 959178a..6d2d429 100644 --- a/tests/tui_cases/flightsql_pagination.rs +++ b/tests/tui_cases/flightsql_pagination.rs @@ -44,7 +44,7 @@ async fn create_execution_results(query: &str) -> ExecutionResultsBatch { // Tests that a single page of results is displayed correctly #[tokio::test] async fn single_page() { - let mut test_app = TestApp::new(); + let mut test_app = TestApp::new().await; test_app .handle_app_event(AppEvent::FlightSQLNewExecution) @@ -86,7 +86,7 @@ fn create_values_query(num: usize) -> String { // Tests that we can paginate through multiple pages and go back to the first page #[tokio::test] async fn multiple_pages_forward_and_back() { - let mut test_app = TestApp::new(); + let mut test_app = TestApp::new().await; let query = create_values_query(101); let res1 = create_execution_results(&query).await; let event1 = AppEvent::FlightSQLExecutionResultsNextBatch(res1); @@ -151,7 +151,7 @@ async fn multiple_pages_forward_and_back() { // page #[tokio::test] async fn multiple_pages_forward_and_back_and_forward() { - let mut test_app = TestApp::new(); + let mut test_app = TestApp::new().await; let query = create_values_query(101); let res1 = create_execution_results(&query).await; let event1 = AppEvent::FlightSQLExecutionResultsNextBatch(res1); diff --git a/tests/tui_cases/mod.rs b/tests/tui_cases/mod.rs index 7aab7db..5728ddc 100644 --- a/tests/tui_cases/mod.rs +++ b/tests/tui_cases/mod.rs @@ -26,6 +26,7 @@ use datafusion::common::Result; use dft::{ args::DftArgs, execution::{local::ExecutionContext, AppExecution, AppType}, + extensions::DftSessionStateBuilder, tui::{state::initialize, App, AppEvent}, }; use tempfile::{tempdir, TempDir}; @@ -46,11 +47,23 @@ struct TestApp<'app> { impl<'app> TestApp<'app> { /// Create a new [`TestApp`] instance configured with a temporary directory - fn new() -> Self { + async fn new() -> Self { let config_path = tempdir().unwrap(); let state = initialize(config_path.path().to_path_buf()); + let session_state = DftSessionStateBuilder::new() + .with_app_type(AppType::Tui) + .with_extensions() + .await + .unwrap() + .build() + .unwrap(); let execution_ctx = - ExecutionContext::try_new(&state.config.execution, AppType::Tui).unwrap(); + ExecutionContext::try_new(&state.config.execution, session_state, AppType::Tui) + .unwrap(); + // let fut = execution_ctx.register_extensions(); + // tokio::task::block_in_place(move || { + // tokio::runtime::Handle::current().block_on(fut).unwrap() + // }); let app_execution = AppExecution::new(execution_ctx); let args = DftArgs::default(); let mut app = App::new(state, args, app_execution); diff --git a/tests/tui_cases/pagination.rs b/tests/tui_cases/pagination.rs index 58f9f82..2509582 100644 --- a/tests/tui_cases/pagination.rs +++ b/tests/tui_cases/pagination.rs @@ -41,9 +41,9 @@ fn create_execution_results(query: &str, adj: u32) -> ExecutionResultsBatch { } // Tests that a single page of results is displayed correctly -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn single_page() { - let mut test_app = TestApp::new(); + let mut test_app = TestApp::new().await; let res1 = create_execution_results("SELECT 1", 0); let event1 = AppEvent::ExecutionResultsNextBatch(res1); @@ -76,9 +76,9 @@ async fn single_page() { } // Tests that we can paginate through multiple pages and go back to the first page -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn multiple_pages_forward_and_back() { - let mut test_app = TestApp::new(); + let mut test_app = TestApp::new().await; let res1 = create_execution_results("SELECT 1", 0); let event1 = AppEvent::ExecutionResultsNextBatch(res1); @@ -153,9 +153,9 @@ async fn multiple_pages_forward_and_back() { // Tests that we can still paginate when we already have the batch because we previously viewed the // page -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn multiple_pages_forward_and_back_and_forward() { - let mut test_app = TestApp::new(); + let mut test_app = TestApp::new().await; let res1 = create_execution_results("SELECT 1", 0); let event1 = AppEvent::ExecutionResultsNextBatch(res1); diff --git a/tests/tui_cases/quit.rs b/tests/tui_cases/quit.rs index aa28201..22e5a40 100644 --- a/tests/tui_cases/quit.rs +++ b/tests/tui_cases/quit.rs @@ -19,19 +19,20 @@ use dft::args::DftArgs; use dft::execution::{local::ExecutionContext, AppExecution, AppType}; +use dft::extensions::DftSessionStateBuilder; use dft::tui::state::initialize; use dft::tui::{App, AppEvent}; use ratatui::crossterm::event; use tempfile::{tempdir, TempDir}; -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn construct_with_no_args() { - let _test_app = TestApp::new(); + let _test_app = TestApp::new().await; } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn quit_app_from_sql_tab() { - let mut test_app = TestApp::new(); + let mut test_app = TestApp::new().await; // SQL Tab let key = event::KeyEvent::new(event::KeyCode::Char('q'), event::KeyModifiers::NONE); let app_event = AppEvent::Key(key); @@ -40,9 +41,9 @@ async fn quit_app_from_sql_tab() { assert!(test_app.state().should_quit); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn quit_app_from_flightsql_tab() { - let mut test_app = TestApp::new(); + let mut test_app = TestApp::new().await; let flightsql_key = event::KeyEvent::new(event::KeyCode::Char('2'), event::KeyModifiers::NONE); let app_event = AppEvent::Key(flightsql_key); test_app.handle_app_event(app_event).unwrap(); @@ -52,9 +53,9 @@ async fn quit_app_from_flightsql_tab() { assert!(test_app.state().should_quit); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn quit_app_from_history_tab() { - let mut test_app = TestApp::new(); + let mut test_app = TestApp::new().await; let history_key = event::KeyEvent::new(event::KeyCode::Char('3'), event::KeyModifiers::NONE); let app_event = AppEvent::Key(history_key); test_app.handle_app_event(app_event).unwrap(); @@ -64,9 +65,9 @@ async fn quit_app_from_history_tab() { assert!(test_app.state().should_quit); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn quit_app_from_logs_tab() { - let mut test_app = TestApp::new(); + let mut test_app = TestApp::new().await; let logs_key = event::KeyEvent::new(event::KeyCode::Char('4'), event::KeyModifiers::NONE); let app_event = AppEvent::Key(logs_key); test_app.handle_app_event(app_event).unwrap(); @@ -76,9 +77,9 @@ async fn quit_app_from_logs_tab() { assert!(test_app.state().should_quit); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn quit_app_from_context_tab() { - let mut test_app = TestApp::new(); + let mut test_app = TestApp::new().await; let context_key = event::KeyEvent::new(event::KeyCode::Char('5'), event::KeyModifiers::NONE); let app_event = AppEvent::Key(context_key); test_app.handle_app_event(app_event).unwrap(); @@ -102,10 +103,25 @@ struct TestApp<'app> { impl<'app> TestApp<'app> { /// Create a new [`TestApp`] instance configured with a temporary directory - fn new() -> Self { + async fn new() -> Self { let config_path = tempdir().unwrap(); let state = initialize(config_path.path().to_path_buf()); - let execution = ExecutionContext::try_new(&state.config.execution, AppType::Tui).unwrap(); + + let session_state = DftSessionStateBuilder::new() + .with_app_type(AppType::Tui) + .with_extensions() + .await + .unwrap() + .build() + .unwrap(); + let execution = + ExecutionContext::try_new(&state.config.execution, session_state, AppType::Tui) + .unwrap(); + // let fut = execution.register_extensions(); + + // tokio::task::block_in_place(move || { + // tokio::runtime::Handle::current().block_on(fut).unwrap() + // }); let args = DftArgs::default(); let app_execution = AppExecution::new(execution); let app = App::new(state, args, app_execution);