Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Document how to create ListingTables #5001

Merged
merged 1 commit into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 82 additions & 12 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use super::PartitionedFile;

use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};

/// Configuration for creating a 'ListingTable'
/// Configuration for creating a [`ListingTable`]
#[derive(Debug, Clone)]
pub struct ListingTableConfig {
/// Paths on the `ObjectStore` for creating `ListingTable`.
Expand All @@ -70,8 +70,10 @@ pub struct ListingTableConfig {
}

impl ListingTableConfig {
/// Creates new `ListingTableConfig`.
/// The `SchemaRef` and `ListingOptions` are inferred based on the suffix of the provided `table_paths` first element.
/// Creates new [`ListingTableConfig`].
///
/// The [`SchemaRef`] and [`ListingOptions`] are inferred based on
/// the suffix of the provided `table_paths` first element.
pub fn new(table_path: ListingTableUrl) -> Self {
let table_paths = vec![table_path];
Self {
Expand All @@ -81,16 +83,18 @@ impl ListingTableConfig {
}
}

/// Creates new `ListingTableConfig` with multiple table paths.
/// The `SchemaRef` and `ListingOptions` are inferred based on the suffix of the provided `table_paths` first element.
/// Creates new [`ListingTableConfig`] with multiple table paths.
///
/// The [`SchemaRef`] and [`ListingOptions`] are inferred based on
/// the suffix of the provided `table_paths` first element.
pub fn new_with_multi_paths(table_paths: Vec<ListingTableUrl>) -> Self {
Self {
table_paths,
file_schema: None,
options: None,
}
}
/// Add `schema` to `ListingTableConfig`
/// Add `schema` to [`ListingTableConfig`]
pub fn with_schema(self, schema: SchemaRef) -> Self {
Self {
table_paths: self.table_paths,
Expand All @@ -99,7 +103,7 @@ impl ListingTableConfig {
}
}

/// Add `listing_options` to `ListingTableConfig`
/// Add `listing_options` to [`ListingTableConfig`]
pub fn with_listing_options(self, listing_options: ListingOptions) -> Self {
Self {
table_paths: self.table_paths,
Expand Down Expand Up @@ -172,7 +176,7 @@ impl ListingTableConfig {
})
}

/// Infer `SchemaRef` based on `table_path` suffix. Requires `self.options` to be set prior to using.
/// Infer the [`SchemaRef`] based on `table_path` suffix. Requires `self.options` to be set prior to using.
pub async fn infer_schema(self, state: &SessionState) -> Result<Self> {
match self.options {
Some(options) => {
Expand All @@ -198,7 +202,7 @@ impl ListingTableConfig {
}
}

/// Options for creating a `ListingTable`
/// Options for creating a [`ListingTable`]
#[derive(Clone, Debug)]
pub struct ListingOptions {
/// A suffix on which files should be filtered (leave empty to
Expand Down Expand Up @@ -432,8 +436,71 @@ impl StatisticsCache {
}
}

/// An implementation of `TableProvider` that uses the object store
/// or file system listing capability to get the list of files.
/// Reads data from one or more files via an
/// [`ObjectStore`](object_store::ObjectStore). For example, from
/// local files or objects from AWS S3. Implements [`TableProvider`],
/// a DataFusion data source.
///
/// # Features
///
/// 1. Merges schemas if the files have compatible but not indentical schemas
///
/// 2. Hive-style partitioning support, where a path such as
/// `/files/date=1/1/2022/data.parquet` is injected as a `date` column.
///
/// 3. Projection pushdown for formats that support it such as such as
/// Parquet
///
/// # Example
///
/// Here is an example of reading a directory of parquet files using a
/// [`ListingTable`]:
///
/// ```no_run
/// # use datafusion::prelude::SessionContext;
/// # use datafusion::error::Result;
/// # use std::sync::Arc;
/// # use datafusion::datasource::{
/// # listing::{
/// # ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
/// # },
/// # file_format::parquet::ParquetFormat,
/// # };
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new();
/// let session_state = ctx.state();
/// let table_path = "/path/to/parquet";
///
/// // Parse the path
/// let table_path = ListingTableUrl::parse(table_path)?;
///
/// // Create default parquet options
/// let file_format = ParquetFormat::new();
/// let listing_options = ListingOptions::new(Arc::new(file_format))
/// .with_file_extension(".parquet");
///
/// // Resolve the schema
/// let resolved_schema = listing_options
/// .infer_schema(&session_state, &table_path)
/// .await?;
///
/// let config = ListingTableConfig::new(table_path)
/// .with_listing_options(listing_options)
/// .with_schema(resolved_schema);
///
/// // Create a a new TableProvider
/// let provider = Arc::new(ListingTable::try_new(config)?);
///
/// // This provider can now be read as a dataframe:
/// let df = ctx.read_table(provider.clone());
///
/// // or registered as a named table:
/// ctx.register_table("my_table", provider);
///
/// # Ok(())
/// # }
/// ```
pub struct ListingTable {
table_paths: Vec<ListingTableUrl>,
/// File fields only
Expand All @@ -447,13 +514,16 @@ pub struct ListingTable {
}

impl ListingTable {
/// Create new table that lists the FS to get the files to scan.
/// Create new [`ListingTable`] that lists the FS to get the files
/// to scan. See [`ListingTable`] for and example.
///
/// Takes a `ListingTableConfig` as input which requires an `ObjectStore` and `table_path`.
/// `ListingOptions` and `SchemaRef` are optional. If they are not
/// provided the file type is inferred based on the file suffix.
/// If the schema is provided then it must be resolved before creating the table
/// and should contain the fields of the file without the table
/// partitioning columns.
///
pub fn try_new(config: ListingTableConfig) -> Result<Self> {
let file_schema = config
.file_schema
Expand Down
15 changes: 14 additions & 1 deletion datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,9 @@ impl SessionContext {
}

/// Creates a [`DataFrame`] for reading an Avro data source.
///
/// For more control such as reading multiple files, you can use
/// [`read_table`](Self::read_table) with a [`ListingTable`].
pub async fn read_avro(
&self,
table_path: impl AsRef<str>,
Expand Down Expand Up @@ -584,6 +587,9 @@ impl SessionContext {
}

/// Creates a [`DataFrame`] for reading an Json data source.
///
/// For more control such as reading multiple files, you can use
/// [`read_table`](Self::read_table) with a [`ListingTable`].
pub async fn read_json(
&self,
table_path: impl AsRef<str>,
Expand Down Expand Up @@ -625,6 +631,9 @@ impl SessionContext {
}

/// Creates a [`DataFrame`] for reading a CSV data source.
///
/// For more control such as reading multiple files, you can use
/// [`read_table`](Self::read_table) with a [`ListingTable`].
pub async fn read_csv(
&self,
table_path: impl AsRef<str>,
Expand Down Expand Up @@ -656,6 +665,9 @@ impl SessionContext {
}

/// Creates a [`DataFrame`] for reading a Parquet data source.
///
/// For more control such as reading multiple files, you can use
/// [`read_table`](Self::read_table) with a [`ListingTable`].
pub async fn read_parquet(
&self,
table_path: impl AsRef<str>,
Expand All @@ -677,7 +689,8 @@ impl SessionContext {
self.read_table(Arc::new(provider))
}

/// Creates a [`DataFrame`] for reading a custom [`TableProvider`].
/// Creates a [`DataFrame`] for a [`TableProvider`] such as a
/// [`ListingTable`] or a custom user defined provider.
pub fn read_table(&self, provider: Arc<dyn TableProvider>) -> Result<DataFrame> {
Ok(DataFrame::new(
self.state(),
Expand Down