Skip to content

Commit

Permalink
Minor: Document how to create ListingTables (#5001)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Jan 20, 2023
1 parent e566bfc commit 65555d7
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 13 deletions.
94 changes: 82 additions & 12 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use super::PartitionedFile;

use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};

/// Configuration for creating a 'ListingTable'
/// Configuration for creating a [`ListingTable`]
#[derive(Debug, Clone)]
pub struct ListingTableConfig {
/// Paths on the `ObjectStore` for creating `ListingTable`.
Expand All @@ -70,8 +70,10 @@ pub struct ListingTableConfig {
}

impl ListingTableConfig {
/// Creates new `ListingTableConfig`.
/// The `SchemaRef` and `ListingOptions` are inferred based on the suffix of the provided `table_paths` first element.
/// Creates new [`ListingTableConfig`].
///
/// The [`SchemaRef`] and [`ListingOptions`] are inferred based on
/// the suffix of the provided `table_paths` first element.
pub fn new(table_path: ListingTableUrl) -> Self {
let table_paths = vec![table_path];
Self {
Expand All @@ -81,16 +83,18 @@ impl ListingTableConfig {
}
}

/// Creates new `ListingTableConfig` with multiple table paths.
/// The `SchemaRef` and `ListingOptions` are inferred based on the suffix of the provided `table_paths` first element.
/// Creates new [`ListingTableConfig`] with multiple table paths.
///
/// The [`SchemaRef`] and [`ListingOptions`] are inferred based on
/// the suffix of the provided `table_paths` first element.
pub fn new_with_multi_paths(table_paths: Vec<ListingTableUrl>) -> Self {
Self {
table_paths,
file_schema: None,
options: None,
}
}
/// Add `schema` to `ListingTableConfig`
/// Add `schema` to [`ListingTableConfig`]
pub fn with_schema(self, schema: SchemaRef) -> Self {
Self {
table_paths: self.table_paths,
Expand All @@ -99,7 +103,7 @@ impl ListingTableConfig {
}
}

/// Add `listing_options` to `ListingTableConfig`
/// Add `listing_options` to [`ListingTableConfig`]
pub fn with_listing_options(self, listing_options: ListingOptions) -> Self {
Self {
table_paths: self.table_paths,
Expand Down Expand Up @@ -172,7 +176,7 @@ impl ListingTableConfig {
})
}

/// Infer `SchemaRef` based on `table_path` suffix. Requires `self.options` to be set prior to using.
/// Infer the [`SchemaRef`] based on `table_path` suffix. Requires `self.options` to be set prior to using.
pub async fn infer_schema(self, state: &SessionState) -> Result<Self> {
match self.options {
Some(options) => {
Expand All @@ -198,7 +202,7 @@ impl ListingTableConfig {
}
}

/// Options for creating a `ListingTable`
/// Options for creating a [`ListingTable`]
#[derive(Clone, Debug)]
pub struct ListingOptions {
/// A suffix on which files should be filtered (leave empty to
Expand Down Expand Up @@ -432,8 +436,71 @@ impl StatisticsCache {
}
}

/// An implementation of `TableProvider` that uses the object store
/// or file system listing capability to get the list of files.
/// Reads data from one or more files via an
/// [`ObjectStore`](object_store::ObjectStore). For example, from
/// local files or objects from AWS S3. Implements [`TableProvider`],
/// a DataFusion data source.
///
/// # Features
///
/// 1. Merges schemas if the files have compatible but not indentical schemas
///
/// 2. Hive-style partitioning support, where a path such as
/// `/files/date=1/1/2022/data.parquet` is injected as a `date` column.
///
/// 3. Projection pushdown for formats that support it such as such as
/// Parquet
///
/// # Example
///
/// Here is an example of reading a directory of parquet files using a
/// [`ListingTable`]:
///
/// ```no_run
/// # use datafusion::prelude::SessionContext;
/// # use datafusion::error::Result;
/// # use std::sync::Arc;
/// # use datafusion::datasource::{
/// # listing::{
/// # ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
/// # },
/// # file_format::parquet::ParquetFormat,
/// # };
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new();
/// let session_state = ctx.state();
/// let table_path = "/path/to/parquet";
///
/// // Parse the path
/// let table_path = ListingTableUrl::parse(table_path)?;
///
/// // Create default parquet options
/// let file_format = ParquetFormat::new();
/// let listing_options = ListingOptions::new(Arc::new(file_format))
/// .with_file_extension(".parquet");
///
/// // Resolve the schema
/// let resolved_schema = listing_options
/// .infer_schema(&session_state, &table_path)
/// .await?;
///
/// let config = ListingTableConfig::new(table_path)
/// .with_listing_options(listing_options)
/// .with_schema(resolved_schema);
///
/// // Create a a new TableProvider
/// let provider = Arc::new(ListingTable::try_new(config)?);
///
/// // This provider can now be read as a dataframe:
/// let df = ctx.read_table(provider.clone());
///
/// // or registered as a named table:
/// ctx.register_table("my_table", provider);
///
/// # Ok(())
/// # }
/// ```
pub struct ListingTable {
table_paths: Vec<ListingTableUrl>,
/// File fields only
Expand All @@ -447,13 +514,16 @@ pub struct ListingTable {
}

impl ListingTable {
/// Create new table that lists the FS to get the files to scan.
/// Create new [`ListingTable`] that lists the FS to get the files
/// to scan. See [`ListingTable`] for and example.
///
/// Takes a `ListingTableConfig` as input which requires an `ObjectStore` and `table_path`.
/// `ListingOptions` and `SchemaRef` are optional. If they are not
/// provided the file type is inferred based on the file suffix.
/// If the schema is provided then it must be resolved before creating the table
/// and should contain the fields of the file without the table
/// partitioning columns.
///
pub fn try_new(config: ListingTableConfig) -> Result<Self> {
let file_schema = config
.file_schema
Expand Down
15 changes: 14 additions & 1 deletion datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,9 @@ impl SessionContext {
}

/// Creates a [`DataFrame`] for reading an Avro data source.
///
/// For more control such as reading multiple files, you can use
/// [`read_table`](Self::read_table) with a [`ListingTable`].
pub async fn read_avro(
&self,
table_path: impl AsRef<str>,
Expand Down Expand Up @@ -584,6 +587,9 @@ impl SessionContext {
}

/// Creates a [`DataFrame`] for reading an Json data source.
///
/// For more control such as reading multiple files, you can use
/// [`read_table`](Self::read_table) with a [`ListingTable`].
pub async fn read_json(
&self,
table_path: impl AsRef<str>,
Expand Down Expand Up @@ -625,6 +631,9 @@ impl SessionContext {
}

/// Creates a [`DataFrame`] for reading a CSV data source.
///
/// For more control such as reading multiple files, you can use
/// [`read_table`](Self::read_table) with a [`ListingTable`].
pub async fn read_csv(
&self,
table_path: impl AsRef<str>,
Expand Down Expand Up @@ -656,6 +665,9 @@ impl SessionContext {
}

/// Creates a [`DataFrame`] for reading a Parquet data source.
///
/// For more control such as reading multiple files, you can use
/// [`read_table`](Self::read_table) with a [`ListingTable`].
pub async fn read_parquet(
&self,
table_path: impl AsRef<str>,
Expand All @@ -677,7 +689,8 @@ impl SessionContext {
self.read_table(Arc::new(provider))
}

/// Creates a [`DataFrame`] for reading a custom [`TableProvider`].
/// Creates a [`DataFrame`] for a [`TableProvider`] such as a
/// [`ListingTable`] or a custom user defined provider.
pub fn read_table(&self, provider: Arc<dyn TableProvider>) -> Result<DataFrame> {
Ok(DataFrame::new(
self.state(),
Expand Down

0 comments on commit 65555d7

Please sign in to comment.