diff --git a/datafusion-examples/examples/parquet_index.rs b/datafusion-examples/examples/parquet_index.rs index 0ed337611389c..8b6013ad4ebd1 100644 --- a/datafusion-examples/examples/parquet_index.rs +++ b/datafusion-examples/examples/parquet_index.rs @@ -38,7 +38,7 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*; use datafusion_common::config::TableParquetOptions; use datafusion_common::{ - internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue, Statistics, + internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue, }; use datafusion_expr::utils::conjunction; use datafusion_expr::{TableProviderFilterPushDown, TableType}; @@ -243,36 +243,23 @@ impl TableProvider for IndexTableProvider { // will not be returned. let files = self.index.get_files(predicate.clone())?; + // Create the scan configuration to scan the relevant files + let object_store_url = ObjectStoreUrl::parse("file://")?; + let mut base_config = FileScanConfig::new(object_store_url, self.schema()) + .with_projection(projection.cloned()) + .with_limit(limit); + // Transform to the format needed to pass to ParquetExec // Create one file group per file (default to scanning them all in parallel) - let file_groups = files - .into_iter() - .map(|(file_name, file_size)| { - let path = self.dir.join(file_name); - let canonical_path = fs::canonicalize(path)?; - Ok(vec![PartitionedFile::new( - canonical_path.display().to_string(), - file_size, - )]) - }) - .collect::>>()?; - - // for now, simply use ParquetExec - // TODO make a builder for FileScanConfig - let object_store_url = ObjectStoreUrl::parse("file://")?; - let base_config = FileScanConfig { - object_store_url, - file_schema: self.schema(), - file_groups, - statistics: Statistics::new_unknown(self.index.schema()), - projection: projection.cloned(), - limit, - table_partition_cols: vec![], - output_ordering: vec![], - }; + for (file_name, file_size) in files.into_iter() { + let path = self.dir.join(file_name); + let canonical_path = fs::canonicalize(path)?; + let partitioned_file = + PartitionedFile::new(canonical_path.display().to_string(), file_size); + base_config.add_file(partitioned_file); + } let metadata_size_hint = None; - let table_parquet_options = TableParquetOptions::default(); // TODO make a builder for parquet exec diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 4de7eb136f22e..7aeae273b5f88 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -64,12 +64,43 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue { /// The base configurations to provide when creating a physical plan for /// any given file format. +/// +/// # Example +/// ``` +/// # use std::sync::Arc; +/// # use arrow_schema::Schema; +/// use datafusion::datasource::listing::PartitionedFile; +/// # use datafusion::datasource::physical_plan::FileScanConfig; +/// # use datafusion_execution::object_store::ObjectStoreUrl; +/// # let file_schema = Arc::new(Schema::empty()); +/// // create FileScan config for reading data from file:// +/// let object_store_url = ObjectStoreUrl::local_filesystem(); +/// let mut config = FileScanConfig::new(object_store_url, file_schema) +/// .with_limit(Some(1000)) // read only the first 1000 records +/// .with_projection(Some(vec![2, 3])); // project columns 2 and 3 +/// +/// // Read /tmp/file1.parquet with known size of 1234 bytes in a single group +/// config.add_file(PartitionedFile::new("file1.parquet", 1234)); +/// +/// // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes +/// // in a single row group +/// config.add_file_group(vec![ +/// PartitionedFile::new("file2.parquet", 56), +/// PartitionedFile::new("file3.parquet", 78), +/// ]); +/// ``` #[derive(Clone)] pub struct FileScanConfig { /// Object store URL, used to get an [`ObjectStore`] instance from /// [`RuntimeEnv::object_store`] /// + /// This `ObjectStoreUrl` should be the prefix of the absolute url for files + /// as `file://` or `s3://my_bucket`. It should not include the path to the + /// file itself. The relevant URL prefix must be registered via + /// [`RuntimeEnv::register_object_store`] + /// /// [`ObjectStore`]: object_store::ObjectStore + /// [`RuntimeEnv::register_object_store`]: datafusion_execution::runtime_env::RuntimeEnv::register_object_store /// [`RuntimeEnv::object_store`]: datafusion_execution::runtime_env::RuntimeEnv::object_store pub object_store_url: ObjectStoreUrl, /// Schema before `projection` is applied. It contains the all columns that may @@ -87,6 +118,7 @@ pub struct FileScanConfig { /// sequentially, one after the next. pub file_groups: Vec>, /// Estimated overall statistics of the files, taking `filters` into account. + /// Defaults to [`Statistics::new_unknown`]. pub statistics: Statistics, /// Columns on which to project the data. Indexes that are higher than the /// number of columns of `file_schema` refer to `table_partition_cols`. @@ -101,6 +133,72 @@ pub struct FileScanConfig { } impl FileScanConfig { + /// Create a new `FileScanConfig` with default settings for scanning files. + /// + /// No file groups are added by default. See [`Self::add_file`] and + /// [`Self::add_file_group`] + /// + /// # Parameters: + /// * `object_store_url`: See [`Self::object_store_url`] + /// * `file_schema`: See [`Self::file_schema`] + pub fn new(object_store_url: ObjectStoreUrl, file_schema: SchemaRef) -> Self { + let statistics = Statistics::new_unknown(&file_schema); + Self { + object_store_url, + file_schema, + file_groups: vec![], + statistics, + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + } + } + + /// Add a new file as a single file group + /// + /// See [Self::file_groups] for more information + pub fn add_file(&mut self, file: PartitionedFile) { + self.add_file_group(vec![file]) + } + + /// Add a new file group + /// + /// See [Self::file_groups] for more information + pub fn add_file_group(&mut self, file_group: Vec) { + self.file_groups.push(file_group); + } + + /// Set the statistics of the files + pub fn with_statistics(mut self, statistics: Statistics) -> Self { + self.statistics = statistics; + self + } + + /// Set the projection of the files + pub fn with_projection(mut self, projection: Option>) -> Self { + self.projection = projection; + self + } + + /// Set the limit of the files + pub fn with_limit(mut self, limit: Option) -> Self { + self.limit = limit; + self + } + + /// Set the partitioning columns of the files + pub fn with_table_partition_cols(mut self, table_partition_cols: Vec) -> Self { + self.table_partition_cols = table_partition_cols; + self + } + + /// Set the output ordering of the files + pub fn with_output_ordering(mut self, output_ordering: Vec) -> Self { + self.output_ordering = output_ordering; + self + } + /// Project the schema and the statistics on the given column indices pub fn project(&self) -> (SchemaRef, Statistics, Vec) { if self.projection.is_none() && self.table_partition_cols.is_empty() { diff --git a/datafusion/execution/src/object_store.rs b/datafusion/execution/src/object_store.rs index 126f83f7e238e..7697c01d63f22 100644 --- a/datafusion/execution/src/object_store.rs +++ b/datafusion/execution/src/object_store.rs @@ -51,7 +51,14 @@ impl ObjectStoreUrl { Ok(Self { url: parsed }) } - /// An [`ObjectStoreUrl`] for the local filesystem + /// An [`ObjectStoreUrl`] for the local filesystem (`file://`) + /// + /// # Example + /// ``` + /// # use datafusion_execution::object_store::ObjectStoreUrl; + /// let local_fs = ObjectStoreUrl::parse("file://").unwrap(); + /// assert_eq!(local_fs, ObjectStoreUrl::local_filesystem()) + /// ``` pub fn local_filesystem() -> Self { Self::parse("file://").unwrap() }