Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow SessionContext::read_csv, etc to read multiple files #4908

Merged
merged 8 commits into from
Feb 20, 2023
101 changes: 78 additions & 23 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,43 @@ use super::options::{
AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions, ReadOptions,
};

/// DataFilePaths adds a method to convert strings and vector of strings to vector of [`ListingTableUrl`] URLs.
/// This allows methods such [`SessionContext::read_csv`] and `[`SessionContext::read_avro`]
/// to take either a single file or multiple files.
pub trait DataFilePaths {
/// Parse to a vector of [`ListingTableUrl`] URLs.
fn to_urls(self) -> Result<Vec<ListingTableUrl>>;
}

impl DataFilePaths for &str {
alamb marked this conversation as resolved.
Show resolved Hide resolved
fn to_urls(self) -> Result<Vec<ListingTableUrl>> {
Ok(vec![ListingTableUrl::parse(self)?])
}
}

impl DataFilePaths for String {
fn to_urls(self) -> Result<Vec<ListingTableUrl>> {
Ok(vec![ListingTableUrl::parse(self)?])
}
}

impl DataFilePaths for &String {
fn to_urls(self) -> Result<Vec<ListingTableUrl>> {
Ok(vec![ListingTableUrl::parse(self)?])
}
}

impl<P> DataFilePaths for Vec<P>
where
P: AsRef<str>,
{
fn to_urls(self) -> Result<Vec<ListingTableUrl>> {
self.iter()
.map(ListingTableUrl::parse)
.collect::<Result<Vec<ListingTableUrl>>>()
}
}

/// SessionContext is the main interface for executing queries with DataFusion. It stands for
/// the connection between user and DataFusion/Ballista cluster.
/// The context provides the following functionality
Expand Down Expand Up @@ -627,22 +664,18 @@ impl SessionContext {
///
/// For more control such as reading multiple files, you can use
/// [`read_table`](Self::read_table) with a [`ListingTable`].
async fn _read_type<'a>(
async fn _read_type<'a, P: DataFilePaths>(
&self,
table_path: impl AsRef<str>,
table_paths: P,
options: impl ReadOptions<'a>,
) -> Result<DataFrame> {
let table_path = ListingTableUrl::parse(table_path)?;
let table_paths = table_paths.to_urls()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

let session_config = self.copied_config();
let listing_options = options.to_listing_options(&session_config);
let resolved_schema = match options
.get_resolved_schema(&session_config, self.state(), table_path.clone())
.await
{
Ok(resolved_schema) => resolved_schema,
Err(e) => return Err(e),
};
let config = ListingTableConfig::new(table_path)
let resolved_schema = options
.get_resolved_schema(&session_config, self.state(), table_paths[0].clone())
.await?;
let config = ListingTableConfig::new_with_multi_paths(table_paths)
.with_listing_options(listing_options)
.with_schema(resolved_schema);
let provider = ListingTable::try_new(config)?;
Expand All @@ -653,24 +686,28 @@ impl SessionContext {
///
/// For more control such as reading multiple files, you can use
/// [`read_table`](Self::read_table) with a [`ListingTable`].
pub async fn read_avro(
///
/// For an example, see [`read_csv`](Self::read_csv)
pub async fn read_avro<P: DataFilePaths>(
&self,
table_path: impl AsRef<str>,
table_paths: P,
options: AvroReadOptions<'_>,
) -> Result<DataFrame> {
self._read_type(table_path, options).await
self._read_type(table_paths, options).await
}

/// Creates a [`DataFrame`] for reading an JSON data source.
///
/// For more control such as reading multiple files, you can use
/// [`read_table`](Self::read_table) with a [`ListingTable`].
pub async fn read_json(
///
/// For an example, see [`read_csv`](Self::read_csv)
pub async fn read_json<P: DataFilePaths>(
&self,
table_path: impl AsRef<str>,
table_paths: P,
options: NdJsonReadOptions<'_>,
) -> Result<DataFrame> {
self._read_type(table_path, options).await
self._read_type(table_paths, options).await
}

/// Creates an empty DataFrame.
Expand All @@ -685,24 +722,42 @@ impl SessionContext {
///
/// For more control such as reading multiple files, you can use
/// [`read_table`](Self::read_table) with a [`ListingTable`].
pub async fn read_csv(
///
/// Example usage is given below:
///
/// ```
/// use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new();
/// // You can read a single file using `read_csv`
/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
/// // you can also read multiple files:
/// let df = ctx.read_csv(vec!["tests/data/example.csv", "tests/data/example.csv"], CsvReadOptions::new()).await?;
/// # Ok(())
/// # }
/// ```
pub async fn read_csv<P: DataFilePaths>(
&self,
table_path: impl AsRef<str>,
table_paths: P,
options: CsvReadOptions<'_>,
) -> Result<DataFrame> {
self._read_type(table_path, options).await
self._read_type(table_paths, options).await
}

/// Creates a [`DataFrame`] for reading a Parquet data source.
///
/// For more control such as reading multiple files, you can use
/// [`read_table`](Self::read_table) with a [`ListingTable`].
pub async fn read_parquet(
///
/// For an example, see [`read_csv`](Self::read_csv)
pub async fn read_parquet<P: DataFilePaths>(
&self,
table_path: impl AsRef<str>,
table_paths: P,
options: ParquetReadOptions<'_>,
) -> Result<DataFrame> {
self._read_type(table_path, options).await
self._read_type(table_paths, options).await
}

/// Creates a [`DataFrame`] for a [`TableProvider`] such as a
Expand Down