Skip to content

Commit

Permalink
Remove dependency from LogicalPlan::TableScan to ExecutionPlan (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Apr 20, 2022
1 parent 8e5551f commit 817199b
Show file tree
Hide file tree
Showing 15 changed files with 192 additions and 61 deletions.
10 changes: 6 additions & 4 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ use datafusion::catalog::TableReference;
use datafusion::dataframe::DataFrame;
use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_plan::{CreateExternalTable, FileType, LogicalPlan, TableScan};
use datafusion::logical_plan::{
source_as_provider, CreateExternalTable, FileType, LogicalPlan, TableScan,
};
use datafusion::prelude::{
AvroReadOptions, CsvReadOptions, ParquetReadOptions, SessionConfig, SessionContext,
};
Expand Down Expand Up @@ -270,7 +272,7 @@ impl BallistaContext {
) -> Result<()> {
match self.read_csv(path, options).await?.to_logical_plan() {
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source)
self.register_table(name, source_as_provider(&source)?)
}
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
}
Expand All @@ -284,7 +286,7 @@ impl BallistaContext {
) -> Result<()> {
match self.read_parquet(path, options).await?.to_logical_plan() {
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source)
self.register_table(name, source_as_provider(&source)?)
}
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
}
Expand All @@ -298,7 +300,7 @@ impl BallistaContext {
) -> Result<()> {
match self.read_avro(path, options).await?.to_logical_plan() {
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source)
self.register_table(name, source_as_provider(&source)?)
}
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
}
Expand Down
11 changes: 7 additions & 4 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ use datafusion::logical_plan::plan::{
Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, Window,
};
use datafusion::logical_plan::{
Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CrossJoin, Expr,
JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder, Repartition, TableScan,
Values,
source_as_provider, Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder, Repartition,
TableScan, Values,
};
use datafusion::prelude::SessionContext;

Expand Down Expand Up @@ -510,6 +510,7 @@ impl AsLogicalPlan for LogicalPlanNode {
projection,
..
}) => {
let source = source_as_provider(source)?;
let schema = source.schema();
let source = source.as_any();

Expand Down Expand Up @@ -982,6 +983,7 @@ mod roundtrip_tests {
use crate::serde::{AsLogicalPlan, BallistaCodec};
use async_trait::async_trait;
use core::panic;
use datafusion::logical_plan::source_as_provider;
use datafusion::{
arrow::datatypes::{DataType, Field, Schema},
datafusion_data_access::{
Expand Down Expand Up @@ -1434,7 +1436,8 @@ mod roundtrip_tests {

let round_trip_store = match round_trip {
LogicalPlan::TableScan(scan) => {
match scan.source.as_ref().as_any().downcast_ref::<ListingTable>() {
let source = source_as_provider(&scan.source)?;
match source.as_ref().as_any().downcast_ref::<ListingTable>() {
Some(listing_table) => {
format!("{:?}", listing_table.object_store())
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use arrow::{
};
use datafusion_common::Result;

use crate::datasource::{MemTable, TableProvider, TableType};
use crate::datasource::{MemTable, TableProvider};
use crate::logical_expr::TableType;

use super::{
catalog::{CatalogList, CatalogProvider},
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use crate::arrow::datatypes::Schema;
use crate::arrow::datatypes::SchemaRef;
use crate::arrow::util::pretty;
use crate::datasource::TableProvider;
use crate::datasource::TableType;
use crate::execution::context::{SessionState, TaskContext};
use crate::logical_expr::TableType;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::physical_plan::{collect, collect_partitioned};
use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan};
Expand Down
29 changes: 1 addition & 28 deletions datafusion/core/src/datasource/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,40 +21,13 @@ use std::any::Any;
use std::sync::Arc;

use async_trait::async_trait;
use datafusion_expr::{TableProviderFilterPushDown, TableType};

use crate::arrow::datatypes::SchemaRef;
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::ExecutionPlan;

/// Indicates whether and how a filter expression can be handled by a
/// TableProvider for table scans.
#[derive(Debug, Clone, PartialEq)]
pub enum TableProviderFilterPushDown {
/// The expression cannot be used by the provider.
Unsupported,
/// The expression can be used to help minimise the data retrieved,
/// but the provider cannot guarantee that all returned tuples
/// satisfy the filter. The Filter plan node containing this expression
/// will be preserved.
Inexact,
/// The provider guarantees that all returned data satisfies this
/// filter expression. The Filter plan node containing this expression
/// will be removed.
Exact,
}

/// Indicates the type of this table for metadata/catalog purposes.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum TableType {
/// An ordinary physical table.
Base,
/// A non-materialised table that itself uses a query internally to provide data.
View,
/// A transient table.
Temporary,
}

/// Source table
#[async_trait]
pub trait TableProvider: Sync + Send {
Expand Down
17 changes: 8 additions & 9 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ use arrow::datatypes::{Field, Schema, SchemaRef};
use async_trait::async_trait;
use futures::StreamExt;

use crate::datasource::{
file_format::{
avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat,
FileFormat,
},
get_statistics_with_limit, TableProvider,
};
use crate::logical_expr::TableProviderFilterPushDown;
use crate::{
error::{DataFusionError, Result},
logical_plan::Expr,
Expand All @@ -33,15 +41,6 @@ use crate::{
},
};

use crate::datasource::{
datasource::TableProviderFilterPushDown,
file_format::{
avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat,
FileFormat,
},
get_statistics_with_limit, TableProvider,
};

use super::PartitionedFile;
use datafusion_data_access::object_store::ObjectStore;

Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ pub mod object_store_registry;

use futures::Stream;

pub use self::datasource::{TableProvider, TableType};
pub use self::datasource::TableProvider;
use self::listing::PartitionedFile;
pub use self::memory::MemTable;
use crate::arrow::datatypes::{Schema, SchemaRef};
use crate::error::Result;
pub use crate::logical_expr::TableType;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
use futures::StreamExt;
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ use std::{
use super::dfschema::ToDFSchema;
use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType};
use crate::logical_plan::{
columnize_expr, normalize_col, normalize_cols, rewrite_sort_cols_by_aggs, Column,
CrossJoin, DFField, DFSchema, DFSchemaRef, Limit, Partitioning, Repartition, Values,
columnize_expr, normalize_col, normalize_cols, provider_as_source,
rewrite_sort_cols_by_aggs, Column, CrossJoin, DFField, DFSchema, DFSchemaRef, Limit,
Partitioning, Repartition, Values,
};
use crate::sql::utils::group_window_expr_by_sort_keys;

Expand Down Expand Up @@ -449,7 +450,7 @@ impl LogicalPlanBuilder {

let table_scan = LogicalPlan::TableScan(TableScan {
table_name,
source: provider,
source: provider_as_source(provider),
projected_schema: Arc::new(projected_schema),
projection,
filters,
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub use expr_simplier::{ExprSimplifiable, SimplifyInfo};
pub use expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
pub use extension::UserDefinedLogicalNode;
pub use operators::Operator;
pub use plan::{provider_as_source, source_as_provider};
pub use plan::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint, JoinType, Limit,
Expand Down
68 changes: 66 additions & 2 deletions datafusion/core/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
use super::display::{GraphvizVisitor, IndentVisitor};
use super::expr::{Column, Expr};
use super::extension::UserDefinedLogicalNode;
use crate::datasource::datasource::TableProviderFilterPushDown;
use crate::datasource::TableProvider;
use crate::error::DataFusionError;
use crate::logical_expr::TableProviderFilterPushDown;
use crate::logical_plan::dfschema::DFSchemaRef;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_expr::TableSource;
use std::any::Any;
use std::fmt::Formatter;
use std::{
collections::HashSet,
Expand Down Expand Up @@ -124,13 +126,75 @@ pub struct Window {
pub schema: DFSchemaRef,
}

/// DataFusion default table source, wrapping TableProvider
///
/// This structure adapts a `TableProvider` (physical plan trait) to the `TableSource`
/// (logical plan trait)
pub struct DefaultTableSource {
/// table provider
pub table_provider: Arc<dyn TableProvider>,
}

impl DefaultTableSource {
/// Create a new DefaultTableSource to wrap a TableProvider
pub fn new(table_provider: Arc<dyn TableProvider>) -> Self {
Self { table_provider }
}
}

impl TableSource for DefaultTableSource {
/// Returns the table source as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any {
self
}

/// Get a reference to the schema for this table
fn schema(&self) -> SchemaRef {
self.table_provider.schema()
}

/// Tests whether the table provider can make use of a filter expression
/// to optimise data retrieval.
fn supports_filter_pushdown(
&self,
filter: &Expr,
) -> datafusion_common::Result<TableProviderFilterPushDown> {
self.table_provider.supports_filter_pushdown(filter)
}
}

/// Wrap TableProvider in TableSource
pub fn provider_as_source(
table_provider: Arc<dyn TableProvider>,
) -> Arc<dyn TableSource> {
Arc::new(DefaultTableSource::new(table_provider))
}

/// Attempt to downcast a TableSource to DefaultTableSource and access the
/// TableProvider. This will only work with a TableSource created by DataFusion.
pub fn source_as_provider(
source: &Arc<dyn TableSource>,
) -> datafusion_common::Result<Arc<dyn TableProvider>> {
match source
.as_ref()
.as_any()
.downcast_ref::<DefaultTableSource>()
{
Some(source) => Ok(source.table_provider.clone()),
_ => Err(DataFusionError::Internal(
"TableSource was not DefaultTableSource".to_string(),
)),
}
}

/// Produces rows from a table provider by reference or from the context
#[derive(Clone)]
pub struct TableScan {
/// The name of the table
pub table_name: String,
/// The source of the table
pub source: Arc<dyn TableProvider>,
pub source: Arc<dyn TableSource>,
/// Optional column indices to use as a projection
pub projection: Option<Vec<usize>>,
/// The schema description of the output
Expand Down
12 changes: 8 additions & 4 deletions datafusion/core/src/optimizer/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
use crate::datasource::datasource::TableProviderFilterPushDown;
use crate::execution::context::ExecutionProps;
use crate::logical_expr::TableProviderFilterPushDown;
use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection, Union};
use crate::logical_plan::{
and, col, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan,
Expand Down Expand Up @@ -599,7 +599,11 @@ mod tests {
};
use crate::physical_plan::ExecutionPlan;
use crate::test::*;
use crate::{logical_plan::col, prelude::JoinType};
use crate::{
logical_plan::{col, plan::provider_as_source},
prelude::JoinType,
};

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;

Expand Down Expand Up @@ -1417,7 +1421,7 @@ mod tests {
(*test_provider.schema()).clone(),
)?),
projection: None,
source: Arc::new(test_provider),
source: provider_as_source(Arc::new(test_provider)),
limit: None,
});

Expand Down Expand Up @@ -1490,7 +1494,7 @@ mod tests {
(*test_provider.schema()).clone(),
)?),
projection: Some(vec![0]),
source: Arc::new(test_provider),
source: provider_as_source(Arc::new(test_provider)),
limit: None,
});

Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use super::{
};
use crate::execution::context::{ExecutionProps, SessionState};
use crate::logical_plan::plan::{
Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, TableScan,
Window,
source_as_provider, Aggregate, EmptyRelation, Filter, Join, Projection, Sort,
SubqueryAlias, TableScan, Window,
};
use crate::logical_plan::{
unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, Operator,
Expand Down Expand Up @@ -339,6 +339,7 @@ impl DefaultPhysicalPlanner {
limit,
..
}) => {
let source = source_as_provider(source)?;
// Remove all qualifiers from the scan as the provider
// doesn't know (nor should care) how the relation was
// referred to in the query
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/provider_filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use arrow::array::{as_primitive_array, Int32Builder, UInt64Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion::datasource::datasource::{TableProvider, TableProviderFilterPushDown};
use datafusion::datasource::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::{SessionContext, TaskContext};
use datafusion::logical_plan::Expr;
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
use datafusion::physical_plan::common::SizedRecordBatchStream;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod literal;
mod nullif;
mod operator;
mod signature;
mod table_source;
pub mod type_coercion;
mod udaf;
mod udf;
Expand All @@ -50,6 +51,7 @@ pub use literal::{lit, lit_timestamp_nano, Literal, TimestampLiteral};
pub use nullif::SUPPORTED_NULLIF_TYPES;
pub use operator::Operator;
pub use signature::{Signature, TypeSignature, Volatility};
pub use table_source::{TableProviderFilterPushDown, TableSource, TableType};
pub use udaf::AggregateUDF;
pub use udf::ScalarUDF;
pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
Expand Down
Loading

0 comments on commit 817199b

Please sign in to comment.