From cfbb14d9c0b8f4a86ed12d716a55c1dba3791e8b Mon Sep 17 00:00:00 2001 From: Sai Krishna Reddy Lakkam <86965352+saikrishna1-bidgely@users.noreply.github.com> Date: Mon, 20 Feb 2023 17:32:06 +0530 Subject: [PATCH] Allow `SessionContext::read_csv`, etc to read multiple files (#4908) * Added a traitDataFilePaths to convert strings and vector of strings to a vector of URLs. * Added docs and tests. Updated DataFilePaths to accept any vector containing AsRef trait. * Added docs to read_ methods and extended the SessionContext doc. * Ran Cargo fmt * removed CallReadTrait methods * Update read_csv example Co-authored-by: Andrew Lamb * removed addition to SessionContext example --------- Co-authored-by: Lakkam Sai Krishna Reddy Co-authored-by: Andrew Lamb --- datafusion/core/src/execution/context.rs | 101 +++++++++++++++++------ 1 file changed, 78 insertions(+), 23 deletions(-) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 8944b4f90503..251eaf0efec7 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -106,6 +106,43 @@ use super::options::{ AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions, ReadOptions, }; +/// DataFilePaths adds a method to convert strings and vector of strings to vector of [`ListingTableUrl`] URLs. +/// This allows methods such [`SessionContext::read_csv`] and `[`SessionContext::read_avro`] +/// to take either a single file or multiple files. +pub trait DataFilePaths { + /// Parse to a vector of [`ListingTableUrl`] URLs. + fn to_urls(self) -> Result>; +} + +impl DataFilePaths for &str { + fn to_urls(self) -> Result> { + Ok(vec![ListingTableUrl::parse(self)?]) + } +} + +impl DataFilePaths for String { + fn to_urls(self) -> Result> { + Ok(vec![ListingTableUrl::parse(self)?]) + } +} + +impl DataFilePaths for &String { + fn to_urls(self) -> Result> { + Ok(vec![ListingTableUrl::parse(self)?]) + } +} + +impl

DataFilePaths for Vec

+where + P: AsRef, +{ + fn to_urls(self) -> Result> { + self.iter() + .map(ListingTableUrl::parse) + .collect::>>() + } +} + /// SessionContext is the main interface for executing queries with DataFusion. It stands for /// the connection between user and DataFusion/Ballista cluster. /// The context provides the following functionality @@ -627,22 +664,18 @@ impl SessionContext { /// /// For more control such as reading multiple files, you can use /// [`read_table`](Self::read_table) with a [`ListingTable`]. - async fn _read_type<'a>( + async fn _read_type<'a, P: DataFilePaths>( &self, - table_path: impl AsRef, + table_paths: P, options: impl ReadOptions<'a>, ) -> Result { - let table_path = ListingTableUrl::parse(table_path)?; + let table_paths = table_paths.to_urls()?; let session_config = self.copied_config(); let listing_options = options.to_listing_options(&session_config); - let resolved_schema = match options - .get_resolved_schema(&session_config, self.state(), table_path.clone()) - .await - { - Ok(resolved_schema) => resolved_schema, - Err(e) => return Err(e), - }; - let config = ListingTableConfig::new(table_path) + let resolved_schema = options + .get_resolved_schema(&session_config, self.state(), table_paths[0].clone()) + .await?; + let config = ListingTableConfig::new_with_multi_paths(table_paths) .with_listing_options(listing_options) .with_schema(resolved_schema); let provider = ListingTable::try_new(config)?; @@ -653,24 +686,28 @@ impl SessionContext { /// /// For more control such as reading multiple files, you can use /// [`read_table`](Self::read_table) with a [`ListingTable`]. - pub async fn read_avro( + /// + /// For an example, see [`read_csv`](Self::read_csv) + pub async fn read_avro( &self, - table_path: impl AsRef, + table_paths: P, options: AvroReadOptions<'_>, ) -> Result { - self._read_type(table_path, options).await + self._read_type(table_paths, options).await } /// Creates a [`DataFrame`] for reading an JSON data source. /// /// For more control such as reading multiple files, you can use /// [`read_table`](Self::read_table) with a [`ListingTable`]. - pub async fn read_json( + /// + /// For an example, see [`read_csv`](Self::read_csv) + pub async fn read_json( &self, - table_path: impl AsRef, + table_paths: P, options: NdJsonReadOptions<'_>, ) -> Result { - self._read_type(table_path, options).await + self._read_type(table_paths, options).await } /// Creates an empty DataFrame. @@ -685,24 +722,42 @@ impl SessionContext { /// /// For more control such as reading multiple files, you can use /// [`read_table`](Self::read_table) with a [`ListingTable`]. - pub async fn read_csv( + /// + /// Example usage is given below: + /// + /// ``` + /// use datafusion::prelude::*; + /// # use datafusion::error::Result; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let ctx = SessionContext::new(); + /// // You can read a single file using `read_csv` + /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + /// // you can also read multiple files: + /// let df = ctx.read_csv(vec!["tests/data/example.csv", "tests/data/example.csv"], CsvReadOptions::new()).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn read_csv( &self, - table_path: impl AsRef, + table_paths: P, options: CsvReadOptions<'_>, ) -> Result { - self._read_type(table_path, options).await + self._read_type(table_paths, options).await } /// Creates a [`DataFrame`] for reading a Parquet data source. /// /// For more control such as reading multiple files, you can use /// [`read_table`](Self::read_table) with a [`ListingTable`]. - pub async fn read_parquet( + /// + /// For an example, see [`read_csv`](Self::read_csv) + pub async fn read_parquet( &self, - table_path: impl AsRef, + table_paths: P, options: ParquetReadOptions<'_>, ) -> Result { - self._read_type(table_path, options).await + self._read_type(table_paths, options).await } /// Creates a [`DataFrame`] for a [`TableProvider`] such as a