From 51adfa731ec17c311de497a25d27a2f07c165bd7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 20 Jan 2023 07:16:40 -0500 Subject: [PATCH] Minor: Document how to create ListingTables --- .../core/src/datasource/listing/table.rs | 94 ++++++++++++++++--- datafusion/core/src/execution/context.rs | 15 ++- 2 files changed, 96 insertions(+), 13 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 954e7bf6b098..f595e52461c6 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -57,7 +57,7 @@ use super::PartitionedFile; use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files}; -/// Configuration for creating a 'ListingTable' +/// Configuration for creating a [`ListingTable`] #[derive(Debug, Clone)] pub struct ListingTableConfig { /// Paths on the `ObjectStore` for creating `ListingTable`. @@ -70,8 +70,10 @@ pub struct ListingTableConfig { } impl ListingTableConfig { - /// Creates new `ListingTableConfig`. - /// The `SchemaRef` and `ListingOptions` are inferred based on the suffix of the provided `table_paths` first element. + /// Creates new [`ListingTableConfig`]. + /// + /// The [`SchemaRef`] and [`ListingOptions`] are inferred based on + /// the suffix of the provided `table_paths` first element. pub fn new(table_path: ListingTableUrl) -> Self { let table_paths = vec![table_path]; Self { @@ -81,8 +83,10 @@ impl ListingTableConfig { } } - /// Creates new `ListingTableConfig` with multiple table paths. - /// The `SchemaRef` and `ListingOptions` are inferred based on the suffix of the provided `table_paths` first element. + /// Creates new [`ListingTableConfig`] with multiple table paths. + /// + /// The [`SchemaRef`] and [`ListingOptions`] are inferred based on + /// the suffix of the provided `table_paths` first element. pub fn new_with_multi_paths(table_paths: Vec) -> Self { Self { table_paths, @@ -90,7 +94,7 @@ impl ListingTableConfig { options: None, } } - /// Add `schema` to `ListingTableConfig` + /// Add `schema` to [`ListingTableConfig`] pub fn with_schema(self, schema: SchemaRef) -> Self { Self { table_paths: self.table_paths, @@ -99,7 +103,7 @@ impl ListingTableConfig { } } - /// Add `listing_options` to `ListingTableConfig` + /// Add `listing_options` to [`ListingTableConfig`] pub fn with_listing_options(self, listing_options: ListingOptions) -> Self { Self { table_paths: self.table_paths, @@ -172,7 +176,7 @@ impl ListingTableConfig { }) } - /// Infer `SchemaRef` based on `table_path` suffix. Requires `self.options` to be set prior to using. + /// Infer the [`SchemaRef`] based on `table_path` suffix. Requires `self.options` to be set prior to using. pub async fn infer_schema(self, state: &SessionState) -> Result { match self.options { Some(options) => { @@ -198,7 +202,7 @@ impl ListingTableConfig { } } -/// Options for creating a `ListingTable` +/// Options for creating a [`ListingTable`] #[derive(Clone, Debug)] pub struct ListingOptions { /// A suffix on which files should be filtered (leave empty to @@ -432,8 +436,71 @@ impl StatisticsCache { } } -/// An implementation of `TableProvider` that uses the object store -/// or file system listing capability to get the list of files. +/// Reads data from one or more files via an +/// [`ObjectStore`](object_store::ObjectStore). For example, from +/// local files or objects from AWS S3. Implements [`TableProvider`], +/// a DataFusion data source. +/// +/// # Features +/// +/// 1. Merges schemas if the files have compatible but not indentical schemas +/// +/// 2. Hive-style partitioning support, where a path such as +/// `/files/date=1/1/2022/data.parquet` is injected as a `date` column. +/// +/// 3. Projection pushdown for formats that support it such as such as +/// Parquet +/// +/// # Example +/// +/// Here is an example of reading a directory of parquet files using a +/// [`ListingTable`]: +/// +/// ```no_run +/// # use datafusion::prelude::SessionContext; +/// # use datafusion::error::Result; +/// # use std::sync::Arc; +/// # use datafusion::datasource::{ +/// # listing::{ +/// # ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, +/// # }, +/// # file_format::parquet::ParquetFormat, +/// # }; +/// # #[tokio::main] +/// # async fn main() -> Result<()> { +/// let ctx = SessionContext::new(); +/// let session_state = ctx.state(); +/// let table_path = "/path/to/parquet"; +/// +/// // Parse the path +/// let table_path = ListingTableUrl::parse(table_path)?; +/// +/// // Create default parquet options +/// let file_format = ParquetFormat::new(); +/// let listing_options = ListingOptions::new(Arc::new(file_format)) +/// .with_file_extension(".parquet"); +/// +/// // Resolve the schema +/// let resolved_schema = listing_options +/// .infer_schema(&session_state, &table_path) +/// .await?; +/// +/// let config = ListingTableConfig::new(table_path) +/// .with_listing_options(listing_options) +/// .with_schema(resolved_schema); +/// +/// // Create a a new TableProvider +/// let provider = Arc::new(ListingTable::try_new(config)?); +/// +/// // This provider can now be read as a dataframe: +/// let df = ctx.read_table(provider.clone()); +/// +/// // or registered as a named table: +/// ctx.register_table("my_table", provider); +/// +/// # Ok(()) +/// # } +/// ``` pub struct ListingTable { table_paths: Vec, /// File fields only @@ -447,13 +514,16 @@ pub struct ListingTable { } impl ListingTable { - /// Create new table that lists the FS to get the files to scan. + /// Create new [`ListingTable`] that lists the FS to get the files + /// to scan. See [`ListingTable`] for and example. + /// /// Takes a `ListingTableConfig` as input which requires an `ObjectStore` and `table_path`. /// `ListingOptions` and `SchemaRef` are optional. If they are not /// provided the file type is inferred based on the file suffix. /// If the schema is provided then it must be resolved before creating the table /// and should contain the fields of the file without the table /// partitioning columns. + /// pub fn try_new(config: ListingTableConfig) -> Result { let file_schema = config .file_schema diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index ead76c8e091c..663120adb662 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -551,6 +551,9 @@ impl SessionContext { } /// Creates a [`DataFrame`] for reading an Avro data source. + /// + /// For more control such as reading multiple files, you can use + /// [`read_table`](Self::read_table) with a [`ListingTable`]. pub async fn read_avro( &self, table_path: impl AsRef, @@ -584,6 +587,9 @@ impl SessionContext { } /// Creates a [`DataFrame`] for reading an Json data source. + /// + /// For more control such as reading multiple files, you can use + /// [`read_table`](Self::read_table) with a [`ListingTable`]. pub async fn read_json( &self, table_path: impl AsRef, @@ -625,6 +631,9 @@ impl SessionContext { } /// Creates a [`DataFrame`] for reading a CSV data source. + /// + /// For more control such as reading multiple files, you can use + /// [`read_table`](Self::read_table) with a [`ListingTable`]. pub async fn read_csv( &self, table_path: impl AsRef, @@ -656,6 +665,9 @@ impl SessionContext { } /// Creates a [`DataFrame`] for reading a Parquet data source. + /// + /// For more control such as reading multiple files, you can use + /// [`read_table`](Self::read_table) with a [`ListingTable`]. pub async fn read_parquet( &self, table_path: impl AsRef, @@ -677,7 +689,8 @@ impl SessionContext { self.read_table(Arc::new(provider)) } - /// Creates a [`DataFrame`] for reading a custom [`TableProvider`]. + /// Creates a [`DataFrame`] for a [`TableProvider`] such as a + /// [`ListingTable`] or a custom user defined provider. pub fn read_table(&self, provider: Arc) -> Result { Ok(DataFrame::new( self.state(),