Skip to content

Commit

Permalink
Extract catalog API to separate crate
Browse files Browse the repository at this point in the history
This moves `CatalogProvider`, `TableProvider`, `SchemaProvider` to a new
`datafusion-catalog` crate.  The circular dependency between core
`SessionState` and implementations is broken up by introducing
`CatalogSession` dyn trait.  Implementations of `TableProvider` that
reside under core current have access to `CatalogSession` by
downcasting. This is supposed to be an intermediate step.
  • Loading branch information
findepi committed Jul 17, 2024
1 parent de0765a commit 5cd1243
Show file tree
Hide file tree
Showing 37 changed files with 186 additions and 585 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ exclude = ["datafusion-cli", "dev/depcheck"]
members = [
"datafusion/common",
"datafusion/common-runtime",
"datafusion/catalog",
"datafusion/core",
"datafusion/expr",
"datafusion/execution",
Expand Down Expand Up @@ -87,6 +88,7 @@ chrono = { version = "0.4.34", default-features = false }
ctor = "0.2.0"
dashmap = "6.0.1"
datafusion = { path = "datafusion/core", version = "40.0.0", default-features = false }
datafusion-catalog = { path = "datafusion/catalog", version = "40.0.0" }
datafusion-common = { path = "datafusion/common", version = "40.0.0", default-features = false }
datafusion-common-runtime = { path = "datafusion/common-runtime", version = "40.0.0" }
datafusion-execution = { path = "datafusion/execution", version = "40.0.0" }
Expand Down
13 changes: 13 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use std::fmt;
use std::fs::File;
use std::str::FromStr;
use std::sync::Arc;
use datafusion::catalog_api::CatalogSession;

#[derive(Debug)]
pub enum Function {
Expand Down Expand Up @@ -234,7 +235,7 @@ impl TableProvider for ParquetMetadataTable {

async fn scan(
&self,
_state: &SessionState,
_state: &dyn CatalogSession,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
Expand Down
6 changes: 3 additions & 3 deletions datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::catalog_api::CatalogSession;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::{
ParquetAccessPlan, ParquetExecBuilder,
Expand All @@ -27,7 +28,6 @@ use datafusion::datasource::physical_plan::{
parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig,
};
use datafusion::datasource::TableProvider;
use datafusion::execution::context::SessionState;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::parquet::arrow::arrow_reader::{
ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector,
Expand Down Expand Up @@ -271,7 +271,7 @@ impl IndexTableProvider {
/// to a single predicate like `a = 1 AND b = 2` suitable for execution
fn filters_to_predicate(
&self,
state: &SessionState,
state: &dyn CatalogSession,
filters: &[Expr],
) -> Result<Arc<dyn PhysicalExpr>> {
let df_schema = DFSchema::try_from(self.schema())?;
Expand Down Expand Up @@ -463,7 +463,7 @@ impl TableProvider for IndexTableProvider {

async fn scan(
&self,
state: &SessionState,
state: &dyn CatalogSession,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
Expand Down
6 changes: 2 additions & 4 deletions datafusion-examples/examples/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
use async_trait::async_trait;
use datafusion::{
arrow::util::pretty,
catalog::{
schema::SchemaProvider,
{CatalogProvider, CatalogProviderList},
},
catalog::CatalogProviderList,
catalog_api::{CatalogProvider, SchemaProvider},
datasource::{
file_format::{csv::CsvFormat, FileFormat},
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
Expand Down
5 changes: 3 additions & 2 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::{provider_as_source, TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::{SessionState, TaskContext};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
Expand All @@ -37,6 +37,7 @@ use datafusion_expr::LogicalPlanBuilder;
use datafusion_physical_expr::EquivalenceProperties;

use async_trait::async_trait;
use datafusion::catalog_api::CatalogSession;
use tokio::time::timeout;

/// This example demonstrates executing a simple query against a custom datasource
Expand Down Expand Up @@ -175,7 +176,7 @@ impl TableProvider for CustomDataSource {

async fn scan(
&self,
_state: &SessionState,
_state: &dyn CatalogSession,
projection: Option<&Vec<usize>>,
// filters and limit can be used here to inject some push-down operations if needed
_filters: &[Expr],
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ use arrow::datatypes::Int32Type;
use arrow::util::pretty::pretty_format_batches;
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion::catalog_api::CatalogSession;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{
parquet::StatisticsConverter,
{FileScanConfig, ParquetExec},
};
use datafusion::datasource::TableProvider;
use datafusion::execution::context::SessionState;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::parquet::arrow::{
arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter,
Expand Down Expand Up @@ -222,7 +222,7 @@ impl TableProvider for IndexTableProvider {

async fn scan(
&self,
state: &SessionState,
state: &dyn CatalogSession,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
Expand Down
6 changes: 3 additions & 3 deletions datafusion-examples/examples/simple_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ use arrow::csv::ReaderBuilder;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::catalog_api::CatalogSession;
use datafusion::datasource::function::TableFunctionImpl;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::{ExecutionProps, SessionState};
use datafusion::execution::context::ExecutionProps;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
Expand All @@ -35,7 +36,6 @@ use std::fs::File;
use std::io::Seek;
use std::path::Path;
use std::sync::Arc;

// To define your own table function, you only need to do the following 3 things:
// 1. Implement your own [`TableProvider`]
// 2. Implement your own [`TableFunctionImpl`] and return your [`TableProvider`]
Expand Down Expand Up @@ -95,7 +95,7 @@ impl TableProvider for LocalCsvTable {

async fn scan(
&self,
_state: &SessionState,
_state: &dyn CatalogSession,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ bytes = { workspace = true }
bzip2 = { version = "0.4.3", optional = true }
chrono = { workspace = true }
dashmap = { workspace = true }
datafusion-catalog = { workspace = true }
datafusion-common = { workspace = true, features = ["object_store"] }
datafusion-common-runtime = { workspace = true }
datafusion-execution = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use arrow::{
record_batch::RecordBatch,
};

use crate::catalog_api::{SchemaProvider, TableProvider};
use crate::datasource::streaming::StreamingTable;
use crate::datasource::TableProvider;
use crate::execution::context::TaskContext;
use crate::logical_expr::TableType;
use crate::physical_plan::stream::RecordBatchStreamAdapter;
Expand All @@ -40,7 +40,7 @@ use crate::{
physical_plan::streaming::PartitionStream,
};

use super::{schema::SchemaProvider, CatalogProviderList};
use super::CatalogProviderList;

pub(crate) const INFORMATION_SCHEMA: &str = "information_schema";
pub(crate) const TABLES: &str = "tables";
Expand Down
4 changes: 1 addition & 3 deletions datafusion/core/src/catalog/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::{Arc, Mutex};

use crate::catalog::schema::SchemaProvider;
use crate::datasource::provider::TableProviderFactory;
use crate::datasource::TableProvider;
use crate::catalog_api::{SchemaProvider, TableProvider, TableProviderFactory};
use crate::execution::context::SessionState;

use datafusion_common::{Constraints, DFSchema, DataFusionError, TableReference};
Expand Down
8 changes: 3 additions & 5 deletions datafusion/core/src/catalog/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
//! [`MemoryCatalogProvider`], [`MemoryCatalogProviderList`]: In-memory
//! implementations of [`CatalogProviderList`] and [`CatalogProvider`].
use crate::catalog::schema::SchemaProvider;
use crate::catalog::{CatalogProvider, CatalogProviderList};
use crate::datasource::TableProvider;
use crate::catalog::CatalogProviderList;
use crate::catalog_api::{CatalogProvider, SchemaProvider, TableProvider};
use async_trait::async_trait;
use dashmap::DashMap;
use datafusion_common::{exec_err, DataFusionError};
Expand Down Expand Up @@ -201,11 +200,10 @@ impl SchemaProvider for MemorySchemaProvider {
#[cfg(test)]
mod test {
use super::*;
use crate::catalog::schema::{MemorySchemaProvider, SchemaProvider};
use crate::catalog::memory::MemorySchemaProvider;
use crate::catalog::CatalogProvider;
use crate::datasource::empty::EmptyTable;
use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl};
use crate::datasource::TableProvider;
use crate::prelude::SessionContext;
use arrow_schema::Schema;
use datafusion_common::assert_batches_eq;
Expand Down
Loading

0 comments on commit 5cd1243

Please sign in to comment.