diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index c1d3acab407aa..50315c216867f 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -646,48 +646,6 @@ impl SessionContext { )) } - /// Creates a [`DataFrame`] for reading a CSV data source. - pub async fn read_csv_with_multi_paths( - &self, - table_paths: &[impl AsRef], - options: CsvReadOptions<'_>, - ) -> Result { - let table_paths = table_paths.iter() - .map(ListingTableUrl::parse) - .collect::>>()?; - let target_partitions = self.copied_config().target_partitions(); - let listing_options = options.to_listing_options(target_partitions); - let resolved_schema = match (options.schema, options.infinite) { - (Some(s), _) => Arc::new(s.to_owned()), - (None, false) => { - listing_options - .infer_schema(&self.state(), &table_paths[0]) - .await? - } - (None, true) => { - return Err(DataFusionError::Plan( - "Schema inference for infinite data sources is not supported." - .to_string(), - )) - } - }; - let config = ListingTableConfig::new_with_multi_paths(table_paths) - .with_listing_options(listing_options) - .with_schema(resolved_schema); - - let provider = ListingTable::try_new(config)?; - self.read_table(Arc::new(provider)) - } - - /// Creates a [`DataFrame`] for reading a CSV data source. - pub async fn read_csv( - &self, - table_path: impl AsRef, - options: CsvReadOptions<'_>, - ) -> Result { - return self.read_csv_with_multi_paths(&[table_path], options).await; - } - /// Creates a [`DataFrame`] for reading a Parquet data source. pub async fn read_parquet_with_multi_paths( &self, @@ -1079,6 +1037,65 @@ impl FunctionRegistry for SessionContext { } } +#[async_trait] +pub trait Reader<'a, T>: Sized { + async fn read_csv(&self, table_paths: T, options: CsvReadOptions<'_>) -> Result + where 'a:'async_trait + ; +} + +#[async_trait] +impl<'a> Reader<'a, &'a str> for SessionContext { + async fn read_csv(&self, table_path: &'a str, options: CsvReadOptions<'_>) -> Result { + self.read_csv(vec![table_path], options).await + } +} + +#[async_trait] +impl<'a> Reader<'a, String> for SessionContext { + async fn read_csv(&self, table_path: String, options: CsvReadOptions<'_>) -> Result { + self.read_csv(vec![table_path.as_str()], options).await + } +} + +#[async_trait] +impl<'a> Reader<'a, Vec<&'a str>> for SessionContext { + async fn read_csv(&self, table_paths: Vec<&'a str>, options: CsvReadOptions<'_>) -> Result { + let table_paths = table_paths.iter() + .map(ListingTableUrl::parse) + .collect::>>()?; + let target_partitions = self.copied_config().target_partitions(); + let listing_options = options.to_listing_options(target_partitions); + let resolved_schema = match (options.schema, options.infinite) { + (Some(s), _) => Arc::new(s.to_owned()), + (None, false) => { + listing_options + .infer_schema(&self.state(), &table_paths[0]) + .await? + } + (None, true) => { + return Err(DataFusionError::Plan( + "Schema inference for infinite data sources is not supported." + .to_string(), + )) + } + }; + let config = ListingTableConfig::new_with_multi_paths(table_paths) + .with_listing_options(listing_options) + .with_schema(resolved_schema); + + let provider = ListingTable::try_new(config)?; + self.read_table(Arc::new(provider)) + } +} + +#[async_trait] +impl<'a> Reader<'a, Vec> for SessionContext { + async fn read_csv(&self, table_paths: Vec, options: CsvReadOptions<'_>) -> Result { + let table_paths = table_paths.iter().map(|s| s.as_str()).collect::>(); + self.read_csv(table_paths, options).await + } +} /// A planner used to add extensions to DataFusion logical and physical plans. #[async_trait] pub trait QueryPlanner { diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index 3723df9c49c4e..5772b80b16631 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -286,6 +286,7 @@ pub async fn plan_to_csv( mod tests { use super::*; use crate::datasource::file_format::file_type::FileType; + use crate::execution::context::Reader; use crate::physical_plan::file_format::chunked_store::ChunkedStore; use crate::physical_plan::file_format::partition_type_wrap; use crate::prelude::*; diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index 8184d3f1ac45a..34027708b7ac5 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -256,6 +256,7 @@ mod tests { use crate::datasource::file_format::{json::JsonFormat, FileFormat}; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; + use crate::execution::context::Reader; use crate::physical_plan::file_format::chunked_store::ChunkedStore; use crate::prelude::NdJsonReadOptions; use crate::prelude::*; diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index d14c9235b4326..52796e242c622 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -717,6 +717,7 @@ mod tests { use crate::datasource::file_format::test_util::scan_format; use crate::datasource::listing::{FileRange, PartitionedFile}; use crate::datasource::object_store::ObjectStoreUrl; + use crate::execution::context::Reader; use crate::execution::options::CsvReadOptions; use crate::physical_plan::displayable; use crate::physical_plan::file_format::partition_type_wrap; diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 371b27ed7c296..a4ac7b5b6dcff 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1827,7 +1827,7 @@ fn tuple_err(value: (Result, Result)) -> Result<(T, R)> { mod tests { use super::*; use crate::datasource::MemTable; - use crate::execution::context::TaskContext; + use crate::execution::context::{TaskContext, Reader}; use crate::execution::options::CsvReadOptions; use crate::execution::runtime_env::RuntimeEnv; use crate::physical_plan::SendableRecordBatchStream;