Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/single source exec #56

Open
wants to merge 31 commits into
base: apache_main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
5b1714c
unify ParquetExec, AvroExec, ArrowExec, NDJsonExec, MemoryExec into o…
mertak-synnada Jan 7, 2025
d2fe022
Merge branch 'refs/heads/apache_main' into chore/single-source-exec
mertak-synnada Jan 7, 2025
70505b3
fix license headers
mertak-synnada Jan 7, 2025
00db759
fix compile errors on documents
mertak-synnada Jan 7, 2025
cea3ecd
separate non-parquet code
mertak-synnada Jan 7, 2025
2331577
format code
mertak-synnada Jan 8, 2025
15b812f
fix typo
mertak-synnada Jan 8, 2025
f020bd2
fix imports
mertak-synnada Jan 8, 2025
a7a5bd8
fix clippy
mertak-synnada Jan 8, 2025
94a306f
Merge branch 'refs/heads/apache_main' into chore/single-source-exec
mertak-synnada Jan 8, 2025
d69b012
add comment to the example
mertak-synnada Jan 8, 2025
f14a6d8
fix cargo docs
mertak-synnada Jan 8, 2025
8040147
change MemoryExec with MemorySourceConfig
mertak-synnada Jan 8, 2025
f540df0
Merge branch 'refs/heads/apache_main' into chore/single-source-exec
mertak-synnada Jan 10, 2025
ddb221d
merge fixes
mertak-synnada Jan 10, 2025
069d28c
change MemoryExec to DataSourceExec
mertak-synnada Jan 10, 2025
78dfce8
Merge branch 'refs/heads/apache_main' into chore/single-source-exec
mertak-synnada Jan 15, 2025
cb6a5ff
fix merge conflicts
mertak-synnada Jan 15, 2025
4acb2fc
apply some syntactic sugars
mertak-synnada Jan 15, 2025
ff68caa
fix imports and comment line
mertak-synnada Jan 15, 2025
7a62190
simplify some lines
mertak-synnada Jan 15, 2025
fd37183
rename source_config as file_source
mertak-synnada Jan 15, 2025
cf13028
format code
mertak-synnada Jan 15, 2025
201d8a0
format code
mertak-synnada Jan 15, 2025
7edc80f
make memory metrics default behavior
mertak-synnada Jan 15, 2025
fe25de3
remove unnecessary cfg check
mertak-synnada Jan 16, 2025
2452825
format code
mertak-synnada Jan 16, 2025
e084359
Merge branch 'refs/heads/apache_main' into chore/single-source-exec
mertak-synnada Jan 16, 2025
a4d5da3
remove ParquetExec strings
mertak-synnada Jan 16, 2025
a6c018e
fix documents and imports
mertak-synnada Jan 16, 2025
104c428
fix imports
mertak-synnada Jan 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,32 @@
// under the License.

//! Functions that are query-able and searchable via the `\h` command

use std::fmt;
use std::fs::File;
use std::str::FromStr;
use std::sync::Arc;

use arrow::array::{Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use async_trait::async_trait;

use datafusion::catalog::Session;
use datafusion::common::{plan_err, Column};
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::memory::MemorySourceConfig;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::scalar::ScalarValue;
use datafusion_catalog::TableFunctionImpl;

use async_trait::async_trait;
use parquet::basic::ConvertedType;
use parquet::data_type::{ByteArray, FixedLenByteArray};
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use parquet::file::statistics::Statistics;
use std::fmt;
use std::fs::File;
use std::str::FromStr;
use std::sync::Arc;

#[derive(Debug)]
pub enum Function {
Expand Down Expand Up @@ -241,11 +243,11 @@ impl TableProvider for ParquetMetadataTable {
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(MemoryExec::try_new(
Ok(MemorySourceConfig::try_new_exec(
&[vec![self.batch.clone()]],
TableProvider::schema(self),
projection.cloned(),
)?))
)?)
}
}

Expand Down
102 changes: 55 additions & 47 deletions datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,22 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::catalog::Session;
use datafusion::datasource::data_source::FileSourceConfig;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::{
ParquetAccessPlan, ParquetExecBuilder,
};
use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan;
use datafusion::datasource::physical_plan::{
parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig,
parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig, ParquetConfig,
};
use datafusion::datasource::TableProvider;
use datafusion::execution::object_store::ObjectStoreUrl;
Expand All @@ -42,22 +47,19 @@ use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use datafusion_common::config::TableParquetOptions;
use datafusion_common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::utils::conjunction;
use datafusion_expr::{TableProviderFilterPushDown, TableType};
use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee};

use async_trait::async_trait;
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::FutureExt;
use object_store::ObjectStore;
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tempfile::TempDir;
use url::Url;

Expand All @@ -82,8 +84,8 @@ use url::Url;
/// Specifically, this example illustrates how to:
/// 1. Use [`ParquetFileReaderFactory`] to avoid re-reading parquet metadata on each query
/// 2. Use [`PruningPredicate`] for predicate analysis
/// 3. Pass a row group selection to [`ParquetExec`]
/// 4. Pass a row selection (within a row group) to [`ParquetExec`]
/// 3. Pass a row group selection to [`ParquetConfig`]
/// 4. Pass a row selection (within a row group) to [`ParquetConfig`]
///
/// Note this is a *VERY* low level example for people who want to build their
/// own custom indexes (e.g. for low latency queries). Most users should use
Expand All @@ -93,38 +95,38 @@ use url::Url;
///
/// # Diagram
///
/// This diagram shows how the `ParquetExec` is configured to do only a single
/// This diagram shows how the `DataSourceExec` with `ParquetConfig` is configured to do only a single
/// (range) read from a parquet file, for the data that is needed. It does
/// not read the file footer or any of the row groups that are not needed.
///
/// ```text
/// ┌───────────────────────┐ The TableProvider configures the
/// │ ┌───────────────────┐ │ ParquetExec:
/// │ ┌───────────────────┐ │ DataSourceExec:
/// │ │ │ │
/// │ └───────────────────┘ │
/// │ ┌───────────────────┐ │
/// Row │ │ │ │ 1. To read only specific Row
/// Groups │ └───────────────────┘ │ Groups (the ParquetExec tries
/// Groups │ └───────────────────┘ │ Groups (the DataSourceExec tries
/// │ ┌───────────────────┐ │ to reduce this further based
/// │ │ │ │ on metadata)
/// │ └───────────────────┘ │ ┌────────────────────┐
/// │ ┌───────────────────┐ │ │ │
/// │ │ │◀┼ ─ ─ ┐ │ ParquetExec
/// │ └───────────────────┘ │ │ (Parquet Reader) │
/// │ ... │ └ ─ ─ ─ ─│ │
/// │ ┌───────────────────┐ │ │ ╔═══════════════╗ │
/// │ │ │ │ │ ║ParquetMetadata║ │
/// │ └───────────────────┘ │ │ ╚═══════════════╝ │
/// │ ╔═══════════════════╗ │ └────────────────────┘
/// │ └───────────────────┘ │ ┌──────────────────────
/// │ ┌───────────────────┐ │ │
/// │ │ │◀┼ ─ ─ ┐ │ DataSourceExec
/// │ └───────────────────┘ │ │ (Parquet Reader)
/// │ ... │ └ ─ ─ ─ ─│
/// │ ┌───────────────────┐ │ │ ╔═══════════════╗
/// │ │ │ │ │ ║ParquetMetadata║
/// │ └───────────────────┘ │ │ ╚═══════════════╝
/// │ ╔═══════════════════╗ │ └──────────────────────
/// │ ║ Thrift metadata ║ │
/// │ ╚═══════════════════╝ │ 1. With cached ParquetMetadata, so
/// └───────────────────────┘ the ParquetExec does not re-read /
/// └───────────────────────┘ the ParquetConfig does not re-read /
/// Parquet File decode the thrift footer
///
/// ```
///
/// Within a Row Group, Column Chunks store data in DataPages. This example also
/// shows how to configure the ParquetExec to read a `RowSelection` (row ranges)
/// shows how to configure the ParquetConfig to read a `RowSelection` (row ranges)
/// which will skip unneeded data pages. This requires that the Parquet file has
/// a [Page Index].
///
Expand All @@ -134,15 +136,15 @@ use url::Url;
/// │ │ Data Page is not fetched or decoded.
/// │ ┌───────────────────┐ │ Note this requires a PageIndex
/// │ │ ┌──────────┐ │ │
/// Row │ │ │DataPage 0│ │ │ ┌────────────────────┐
/// Groups │ │ └──────────┘ │ │ │ │
/// │ │ ┌──────────┐ │ │ │ ParquetExec
/// │ │ ... │DataPage 1│ ◀┼ ┼ ─ ─ ─ │ (Parquet Reader) │
/// │ │ └──────────┘ │ │ └ ─ ─ ─ ─ ─│ │
/// │ │ ┌──────────┐ │ │ │ ╔═══════════════╗ │
/// │ │ │DataPage 2│ │ │ If only rows │ ║ParquetMetadata║ │
/// │ │ └──────────┘ │ │ from DataPage 1 │ ╚═══════════════╝ │
/// │ └───────────────────┘ │ are selected, └────────────────────┘
/// Row │ │ │DataPage 0│ │ │ ┌──────────────────────
/// Groups │ │ └──────────┘ │ │ │
/// │ │ ┌──────────┐ │ │ │ DataSourceExec
/// │ │ ... │DataPage 1│ ◀┼ ┼ ─ ─ ─ │ (Parquet Reader)
/// │ │ └──────────┘ │ │ └ ─ ─ ─ ─ ─│
/// │ │ ┌──────────┐ │ │ │ ╔═══════════════╗
/// │ │ │DataPage 2│ │ │ If only rows │ ║ParquetMetadata║
/// │ │ └──────────┘ │ │ from DataPage 1 │ ╚═══════════════╝
/// │ └───────────────────┘ │ are selected, └──────────────────────
/// │ │ only DataPage 1
/// │ ... │ is fetched and
/// │ │ decoded
Expand Down Expand Up @@ -210,7 +212,7 @@ async fn main() -> Result<()> {
// pages that must be decoded
//
// Note: in order to prune pages, the Page Index must be loaded and the
// ParquetExec will load it on demand if not present. To avoid a second IO
// DataSourceExec will load it on demand if not present. To avoid a second IO
// during query, this example loaded the Page Index preemptively by setting
// `ArrowReader::with_page_index` in `IndexedFile::try_new`
provider.set_use_row_selection(true);
Expand Down Expand Up @@ -477,7 +479,7 @@ impl TableProvider for IndexTableProvider {

let partitioned_file = indexed_file
.partitioned_file()
// provide the starting access plan to the ParquetExec by
// provide the starting access plan to the DataSourceExec by
// storing it as "extensions" on PartitionedFile
.with_extensions(Arc::new(access_plan) as _);

Expand All @@ -494,14 +496,20 @@ impl TableProvider for IndexTableProvider {
CachedParquetFileReaderFactory::new(Arc::clone(&self.object_store))
.with_file(indexed_file);

// Finally, put it all together into a ParquetExec
Ok(ParquetExecBuilder::new(file_scan_config)
// provide the predicate so the ParquetExec can try and prune
// row groups internally
.with_predicate(predicate)
let source_config = Arc::new(
ParquetConfig::new(
Arc::clone(&file_scan_config.file_schema),
// provide the predicate so the DataSourceExec can try and prune
// row groups internally
Some(predicate),
None,
TableParquetOptions::default(),
)
// provide the factory to create parquet reader without re-reading metadata
.with_parquet_file_reader_factory(Arc::new(reader_factory))
.build_arc())
.with_parquet_file_reader_factory(Arc::new(reader_factory)),
);
// Finally, put it all together into a DataSourceExec
Ok(FileSourceConfig::new_exec(file_scan_config, source_config))
}

/// Tell DataFusion to push filters down to the scan method
Expand Down
43 changes: 22 additions & 21 deletions datafusion-examples/examples/csv_json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@
use std::sync::Arc;

use arrow_schema::{DataType, Field, Schema};
use datafusion::datasource::data_source::FileSource;
use datafusion::{
assert_batches_eq,
datasource::{
file_format::file_compression_type::FileCompressionType,
listing::PartitionedFile,
object_store::ObjectStoreUrl,
physical_plan::{CsvConfig, CsvOpener, FileScanConfig, FileStream, JsonOpener},
physical_plan::{CsvConfig, FileScanConfig, FileStream, JsonOpener},
},
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
test_util::aggr_test_schema,
};

use futures::StreamExt;
use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore};

Expand All @@ -48,29 +50,24 @@ async fn csv_opener() -> Result<()> {
let object_store = Arc::new(LocalFileSystem::new());
let schema = aggr_test_schema();

let config = CsvConfig::new(
8192,
schema.clone(),
Some(vec![12, 0]),
true,
b',',
b'"',
None,
object_store,
Some(b'#'),
);

let opener = CsvOpener::new(Arc::new(config), FileCompressionType::UNCOMPRESSED);

let testdata = datafusion::test_util::arrow_test_data();
let path = format!("{testdata}/csv/aggregate_test_100.csv");

let path = std::path::Path::new(&path).canonicalize()?;

let scan_config = FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema)
.with_projection(Some(vec![12, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.display().to_string(), 10));
let scan_config =
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), Arc::clone(&schema))
.with_projection(Some(vec![12, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.display().to_string(), 10));

let config = CsvConfig::new(true, b',', b'"')
.with_comment(Some(b'#'))
.with_schema(schema)
.with_batch_size(8192)
.with_projection(&scan_config);

let opener = config.create_file_opener(Ok(object_store), &scan_config, 0)?;

let mut result = vec![];
let mut stream =
Expand Down Expand Up @@ -125,8 +122,12 @@ async fn json_opener() -> Result<()> {
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.to_string(), 10));

let mut stream =
FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())?;
let mut stream = FileStream::new(
&scan_config,
0,
Arc::new(opener),
&ExecutionPlanMetricsSet::new(),
)?;
let mut result = vec![];
while let Some(batch) = stream.next().await.transpose()? {
result.push(batch);
Expand Down
31 changes: 22 additions & 9 deletions datafusion-examples/examples/parquet_exec_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

use std::sync::Arc;

use datafusion::datasource::data_source::FileSourceConfig;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{ListingOptions, PartitionedFile};
use datafusion::datasource::physical_plan::ParquetExec;
use datafusion::datasource::physical_plan::ParquetConfig;
use datafusion::execution::context::SessionContext;
use datafusion::physical_plan::metrics::MetricValue;
use datafusion::physical_plan::source::DataSourceExec;
use datafusion::physical_plan::{
execute_stream, visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor,
};
Expand Down Expand Up @@ -95,15 +97,26 @@ impl ExecutionPlanVisitor for ParquetExecVisitor {
/// or `post_visit` (visit each node after its children/inputs)
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
// If needed match on a specific `ExecutionPlan` node type
let maybe_parquet_exec = plan.as_any().downcast_ref::<ParquetExec>();
if let Some(parquet_exec) = maybe_parquet_exec {
self.file_groups = Some(parquet_exec.base_config().file_groups.clone());
if let Some(data_source) = plan.as_any().downcast_ref::<DataSourceExec>() {
let source = data_source.source();
if let Some(file_config) = source.as_any().downcast_ref::<FileSourceConfig>()
{
if file_config
.file_source()
.as_any()
.downcast_ref::<ParquetConfig>()
.is_some()
{
self.file_groups =
Some(file_config.base_config().file_groups.clone());

let metrics = match parquet_exec.metrics() {
None => return Ok(true),
Some(metrics) => metrics,
};
self.bytes_scanned = metrics.sum_by_name("bytes_scanned");
let metrics = match data_source.metrics() {
None => return Ok(true),
Some(metrics) => metrics,
};
self.bytes_scanned = metrics.sum_by_name("bytes_scanned");
}
}
}
Ok(true)
}
Expand Down
Loading
Loading