Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Patched DataFusion 45+ with unified execution plans #30

Draft
wants to merge 24 commits into
base: alamb/test_datasource_exec_base
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
22a2061
fix: `List` of `FixedSizeList` coercion issue in SQL (#14468)
alan910127 Feb 5, 2025
fe8ab01
make datafusion-catalog-listing and move some implementation of listi…
logan-keede Feb 5, 2025
d5ff3e7
refactor: remove uses of `arrow_buffer` & `arrow_array` and use reexp…
Chen-Yuan-Lai Feb 5, 2025
61ab9d0
core: Support uncorrelated EXISTS (#14474)
findepi Feb 5, 2025
304488d
chore(deps): Update sqlparser to `0.54.0` (#14255)
alamb Feb 5, 2025
5239d1a
Validate and unpack function arguments tersely (#14513)
findepi Feb 5, 2025
da67917
change FileType enum into a dyn Trait so that it can be extensible
mertak-synnada Feb 6, 2025
62e23a2
bug: Fix edge cases in array_slice (#14489)
jkosh44 Feb 6, 2025
6957121
remove metadata_size_hint from required ParquetSource parameters
mertak-synnada Feb 6, 2025
55730dc
Feat: Add fetch to CoalescePartitionsExec (#14499)
mertak-synnada Feb 6, 2025
d39e441
remove FileType trait and split with_predicate logic for ParquetSource
mertak-synnada Feb 6, 2025
4c32ca2
remove predicate from initialization of ParquetSource
mertak-synnada Feb 6, 2025
91071bd
remove unnecessary imports
mertak-synnada Feb 6, 2025
e2d5a8d
deprecate ParquetExecBuilder and add doc hints
mertak-synnada Feb 6, 2025
81179ae
Merge branch 'refs/heads/apache_main' into chore/single-source-exec
mertak-synnada Feb 6, 2025
9a32a0a
fix slt
mertak-synnada Feb 6, 2025
72059ee
fix clippy
mertak-synnada Feb 6, 2025
3e10283
fix fmt
mertak-synnada Feb 6, 2025
26abfea
Update changelog (#14460)
alamb Feb 3, 2025
399677f
fix: handle when the left side of the union has no fields (e.g. an em…
wiedld Oct 11, 2024
456899b
chore: default=true for skip_physical_aggregate_schema_check, and add…
wiedld Oct 15, 2024
b575426
(New) Test + workaround for SanityCheck plan
alamb Jul 16, 2024
2c74c70
chore: skip order calculation / exponential planning
alamb Feb 3, 2025
278b49b
fix: clippy warnings
wiedld Feb 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ members = [
"datafusion/common",
"datafusion/common-runtime",
"datafusion/catalog",
"datafusion/catalog-listing",
"datafusion/core",
"datafusion/expr",
"datafusion/expr-common",
Expand Down Expand Up @@ -100,6 +101,7 @@ ctor = "0.2.9"
dashmap = "6.0.1"
datafusion = { path = "datafusion/core", version = "45.0.0", default-features = false }
datafusion-catalog = { path = "datafusion/catalog", version = "45.0.0" }
datafusion-catalog-listing = { path = "datafusion/catalog-listing", version = "45.0.0" }
datafusion-common = { path = "datafusion/common", version = "45.0.0", default-features = false }
datafusion-common-runtime = { path = "datafusion/common-runtime", version = "45.0.0" }
datafusion-doc = { path = "datafusion/doc", version = "45.0.0" }
Expand Down Expand Up @@ -148,7 +150,7 @@ recursive = "0.1.1"
regex = "1.8"
rstest = "0.24.0"
serde_json = "1"
sqlparser = { version = "0.53.0", features = ["visitor"] }
sqlparser = { version = "0.54.0", features = ["visitor"] }
tempfile = "3"
tokio = { version = "1.43", features = ["macros", "rt", "sync"] }
url = "2.5.4"
Expand Down
34 changes: 26 additions & 8 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 4 additions & 9 deletions datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use std::sync::Arc;
use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow::datatypes::SchemaRef;
use datafusion::catalog::Session;
use datafusion::common::config::TableParquetOptions;
use datafusion::common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
};
Expand Down Expand Up @@ -492,16 +491,12 @@ impl TableProvider for IndexTableProvider {
.with_file(indexed_file);

let file_source = Arc::new(
ParquetSource::new(
Arc::clone(&schema),
ParquetSource::default()
// provide the predicate so the DataSourceExec can try and prune
// row groups internally
Some(predicate),
None,
TableParquetOptions::default(),
)
// provide the factory to create parquet reader without re-reading metadata
.with_parquet_file_reader_factory(Arc::new(reader_factory)),
.with_predicate(Arc::clone(&schema), predicate)
// provide the factory to create parquet reader without re-reading metadata
.with_parquet_file_reader_factory(Arc::new(reader_factory)),
);
let file_scan_config = FileScanConfig::new(object_store_url, schema, file_source)
.with_limit(limit)
Expand Down
9 changes: 7 additions & 2 deletions datafusion-examples/examples/parquet_exec_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::Arc;

use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{ListingOptions, PartitionedFile};
use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionContext;
use datafusion::physical_plan::metrics::MetricValue;
Expand Down Expand Up @@ -100,7 +100,12 @@ impl ExecutionPlanVisitor for ParquetExecVisitor {
if let Some(data_source) = plan.as_any().downcast_ref::<DataSourceExec>() {
let source = data_source.source();
if let Some(file_config) = source.as_any().downcast_ref::<FileScanConfig>() {
if file_config.file_source().file_type().is_parquet() {
if file_config
.file_source()
.as_any()
.downcast_ref::<ParquetSource>()
.is_some()
{
self.file_groups = Some(file_config.file_groups.clone());

let metrics = match data_source.metrics() {
Expand Down
9 changes: 2 additions & 7 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use arrow::datatypes::{Int32Type, SchemaRef};
use arrow::util::pretty::pretty_format_batches;
use async_trait::async_trait;
use datafusion::catalog::Session;
use datafusion::common::config::TableParquetOptions;
use datafusion::common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
};
Expand Down Expand Up @@ -242,12 +241,8 @@ impl TableProvider for IndexTableProvider {
let files = self.index.get_files(predicate.clone())?;

let object_store_url = ObjectStoreUrl::parse("file://")?;
let source = Arc::new(ParquetSource::new(
self.schema(),
Some(predicate),
None,
TableParquetOptions::default(),
));
let source =
Arc::new(ParquetSource::default().with_predicate(self.schema(), predicate));
let mut file_scan_config =
FileScanConfig::new(object_store_url, self.schema(), source)
.with_projection(projection.cloned())
Expand Down
65 changes: 65 additions & 0 deletions datafusion/catalog-listing/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "datafusion-catalog-listing"
description = "datafusion-catalog-listing"
authors.workspace = true
edition.workspace = true
homepage.workspace = true
license.workspace = true
readme.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true

[dependencies]
arrow = { workspace = true }
arrow-schema = { workspace = true }
async-compression = { version = "0.4.0", features = [
"bzip2",
"gzip",
"xz",
"zstd",
"tokio",
], optional = true }
chrono = { workspace = true }
datafusion-catalog = { workspace = true }
datafusion-common = { workspace = true, features = ["object_store"] }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
futures = { workspace = true }
glob = "0.3.0"
itertools = { workspace = true }
log = { workspace = true }
object_store = { workspace = true }
url = { workspace = true }

[dev-dependencies]
async-trait = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }

[lints]
workspace = true

[lib]
name = "datafusion_catalog_listing"
path = "src/mod.rs"
1 change: 1 addition & 0 deletions datafusion/catalog-listing/LICENSE.txt
1 change: 1 addition & 0 deletions datafusion/catalog-listing/NOTICE.txt
30 changes: 30 additions & 0 deletions datafusion/catalog-listing/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# DataFusion catalog-listing

[DataFusion][df] is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format.

This crate is a submodule of DataFusion with [ListingTable], an implementation
of [TableProvider] based on files in a directory (either locally or on remote
object storage such as S3).

[df]: https://crates.io/crates/datafusion
[listingtable]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
[tableprovider]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html
Loading
Loading