Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Move LogicalPlan to datafusion-expr crate #2286

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ use datafusion::catalog::TableReference;
use datafusion::dataframe::DataFrame;
use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_plan::{CreateExternalTable, LogicalPlan, TableScan};
use datafusion::logical_plan::{
source_as_provider, CreateExternalTable, LogicalPlan, TableScan,
};
use datafusion::prelude::{
AvroReadOptions, CsvReadOptions, ParquetReadOptions, SessionConfig, SessionContext,
};
Expand Down Expand Up @@ -270,7 +272,7 @@ impl BallistaContext {
) -> Result<()> {
match self.read_csv(path, options).await?.to_logical_plan() {
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source)
self.register_table(name, source_as_provider(&source)?)
}
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
}
Expand All @@ -284,7 +286,7 @@ impl BallistaContext {
) -> Result<()> {
match self.read_parquet(path, options).await?.to_logical_plan() {
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source)
self.register_table(name, source_as_provider(&source)?)
}
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
}
Expand All @@ -298,7 +300,7 @@ impl BallistaContext {
) -> Result<()> {
match self.read_avro(path, options).await?.to_logical_plan() {
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source)
self.register_table(name, source_as_provider(&source)?)
}
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
}
Expand Down
11 changes: 7 additions & 4 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ use datafusion::logical_plan::plan::{
Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, Window,
};
use datafusion::logical_plan::{
Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CrossJoin, Expr,
JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder, Repartition, TableScan,
Values,
source_as_provider, Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder, Repartition,
TableScan, Values,
};
use datafusion::prelude::SessionContext;

Expand Down Expand Up @@ -510,6 +510,7 @@ impl AsLogicalPlan for LogicalPlanNode {
projection,
..
}) => {
let source = source_as_provider(source)?;
let schema = source.schema();
let source = source.as_any();

Expand Down Expand Up @@ -982,6 +983,7 @@ mod roundtrip_tests {
use crate::serde::{AsLogicalPlan, BallistaCodec};
use async_trait::async_trait;
use core::panic;
use datafusion::logical_plan::source_as_provider;
use datafusion::{
arrow::datatypes::{DataType, Field, Schema},
datafusion_data_access::{
Expand Down Expand Up @@ -1435,7 +1437,8 @@ mod roundtrip_tests {

let round_trip_store = match round_trip {
LogicalPlan::TableScan(scan) => {
match scan.source.as_ref().as_any().downcast_ref::<ListingTable>() {
let source = source_as_provider(&scan.source)?;
match source.as_ref().as_any().downcast_ref::<ListingTable>() {
Some(listing_table) => {
format!("{:?}", listing_table.object_store())
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use arrow::{
};
use datafusion_common::Result;

use crate::datasource::{MemTable, TableProvider, TableType};
use crate::datasource::{MemTable, TableProvider};
use crate::logical_expr::TableType;

use super::{
catalog::{CatalogList, CatalogProvider},
Expand Down
8 changes: 3 additions & 5 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@

use crate::arrow::record_batch::RecordBatch;
use crate::error::Result;
use crate::logical_plan::{
col, DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, LogicalPlanBuilder,
Partitioning,
};
use crate::logical_expr::logical_plan::{JoinType, LogicalPlan, Partitioning};
use crate::logical_plan::{col, DFSchema, Expr, FunctionRegistry, LogicalPlanBuilder};
use parquet::file::properties::WriterProperties;
use std::sync::Arc;

Expand All @@ -33,8 +31,8 @@ use crate::arrow::datatypes::Schema;
use crate::arrow::datatypes::SchemaRef;
use crate::arrow::util::pretty;
use crate::datasource::TableProvider;
use crate::datasource::TableType;
use crate::execution::context::{SessionState, TaskContext};
use crate::logical_expr::TableType;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::physical_plan::{collect, collect_partitioned};
use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan};
Expand Down
29 changes: 1 addition & 28 deletions datafusion/core/src/datasource/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,40 +21,13 @@ use std::any::Any;
use std::sync::Arc;

use async_trait::async_trait;
use datafusion_expr::{TableProviderFilterPushDown, TableType};

use crate::arrow::datatypes::SchemaRef;
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::ExecutionPlan;

/// Indicates whether and how a filter expression can be handled by a
/// TableProvider for table scans.
#[derive(Debug, Clone, PartialEq)]
pub enum TableProviderFilterPushDown {
/// The expression cannot be used by the provider.
Unsupported,
/// The expression can be used to help minimise the data retrieved,
/// but the provider cannot guarantee that all returned tuples
/// satisfy the filter. The Filter plan node containing this expression
/// will be preserved.
Inexact,
/// The provider guarantees that all returned data satisfies this
/// filter expression. The Filter plan node containing this expression
/// will be removed.
Exact,
}

/// Indicates the type of this table for metadata/catalog purposes.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum TableType {
/// An ordinary physical table.
Base,
/// A non-materialised table that itself uses a query internally to provide data.
View,
/// A transient table.
Temporary,
}

/// Source table
#[async_trait]
pub trait TableProvider: Sync + Send {
Expand Down
17 changes: 8 additions & 9 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ use arrow::datatypes::{Field, Schema, SchemaRef};
use async_trait::async_trait;
use futures::StreamExt;

use crate::datasource::{
file_format::{
avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat,
FileFormat,
},
get_statistics_with_limit, TableProvider,
};
use crate::logical_expr::TableProviderFilterPushDown;
use crate::{
error::{DataFusionError, Result},
logical_plan::Expr,
Expand All @@ -33,15 +41,6 @@ use crate::{
},
};

use crate::datasource::{
datasource::TableProviderFilterPushDown,
file_format::{
avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat,
FileFormat,
},
get_statistics_with_limit, TableProvider,
};

use super::PartitionedFile;
use datafusion_data_access::object_store::ObjectStore;

Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ pub mod object_store_registry;

use futures::Stream;

pub use self::datasource::{TableProvider, TableType};
pub use self::datasource::TableProvider;
use self::listing::PartitionedFile;
pub use self::memory::MemTable;
use crate::arrow::datatypes::{Schema, SchemaRef};
use crate::error::Result;
pub use crate::logical_expr::TableType;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
use futures::StreamExt;
Expand Down
15 changes: 8 additions & 7 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,19 @@ use crate::{
},
MemTable,
},
logical_plan::{PlanType, ToStringifiedPlan},
logical_expr::logical_plan::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
DropTable, Explain, FileType, LogicalPlan, PlanType, ToStringifiedPlan,
},
logical_plan::FunctionRegistry,
optimizer::eliminate_filter::EliminateFilter,
optimizer::eliminate_limit::EliminateLimit,
physical_optimizer::{
aggregate_statistics::AggregateStatistics,
hash_build_probe_order::HashBuildProbeOrder, optimizer::PhysicalOptimizerRule,
},
};

use log::{debug, trace};
use parking_lot::RwLock;
use std::string::String;
Expand All @@ -60,10 +65,6 @@ use crate::dataframe::DataFrame;
use crate::datasource::listing::ListingTableConfig;
use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
use crate::logical_plan::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
DropTable, FunctionRegistry, LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE,
};
use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
use crate::optimizer::filter_push_down::FilterPushDown;
use crate::optimizer::limit_push_down::LimitPushDown;
Expand All @@ -78,15 +79,15 @@ use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use crate::physical_optimizer::repartition::Repartition;

use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::logical_plan::plan::Explain;
use crate::logical_plan::{LogicalPlanBuilder, UNNAMED_TABLE};
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::udaf::AggregateUDF;
use crate::physical_plan::udf::ScalarUDF;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::PhysicalPlanner;
use crate::sql::{
parser::{DFParser, FileType},
parser::DFParser,
planner::{ContextProvider, SqlToRel},
};
use crate::variable::{VarProvider, VarType};
Expand Down
15 changes: 10 additions & 5 deletions datafusion/core/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ use crate::datasource::{
MemTable, TableProvider,
};
use crate::error::{DataFusionError, Result};
use crate::logical_plan::expr_schema::ExprSchemable;
use crate::logical_plan::plan::{
use crate::logical_expr::logical_plan::{
Aggregate, Analyze, EmptyRelation, Explain, Filter, Join, Projection, Sort,
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Window,
};
use crate::logical_plan::expr_schema::ExprSchemable;
use crate::logical_plan::plan::DefaultTableSource;
use crate::optimizer::utils;
use crate::prelude::*;
use crate::scalar::ScalarValue;
Expand All @@ -44,12 +45,16 @@ use std::{
};

use super::dfschema::ToDFSchema;
use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType};
use super::{exprlist_to_fields, Expr};
use crate::logical_expr::logical_plan::{
CrossJoin, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType,
Repartition, Values,
};
use crate::logical_plan::{
columnize_expr, normalize_col, normalize_cols, rewrite_sort_cols_by_aggs, Column,
CrossJoin, DFField, DFSchema, DFSchemaRef, Limit, Partitioning, Repartition, Values,
};
use crate::sql::utils::group_window_expr_by_sort_keys;
use datafusion_common::{DFField, DFSchema, DFSchemaRef};

/// Default table name for unnamed table
pub const UNNAMED_TABLE: &str = "?table?";
Expand Down Expand Up @@ -449,7 +454,7 @@ impl LogicalPlanBuilder {

let table_scan = LogicalPlan::TableScan(TableScan {
table_name,
source: provider,
source: Arc::new(DefaultTableSource::new(provider)),
projected_schema: Arc::new(projected_schema),
projection,
filters,
Expand Down
11 changes: 5 additions & 6 deletions datafusion/core/src/logical_plan/expr_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

//! Expression rewriter

use super::Expr;
use crate::logical_plan::plan::Aggregate;
use crate::logical_plan::DFSchema;
use crate::logical_expr::{
logical_plan::{Aggregate, LogicalPlan},
Expr,
};
use crate::logical_plan::ExprSchemable;
use crate::logical_plan::LogicalPlan;
use datafusion_common::Column;
use datafusion_common::Result;
use datafusion_common::{Column, DFSchema, Result};
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
Expand Down
12 changes: 2 additions & 10 deletions datafusion/core/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,21 @@

pub(crate) mod builder;
mod dfschema;
mod display;
mod expr;
mod expr_rewriter;
mod expr_schema;
mod expr_simplier;
mod expr_visitor;
mod extension;
mod operators;
pub mod plan;
mod registry;
pub mod window_frames;
pub use crate::logical_expr::logical_plan::LogicalPlan;
pub use builder::{
build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE,
};
pub use datafusion_expr::expr_fn::binary_expr;
pub use dfschema::{DFField, DFSchema, DFSchemaRef, ToDFSchema};
pub use display::display_schema;
pub use expr::{
abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan,
avg, bit_length, btrim, call_fn, case, ceil, character_length, chr, coalesce, col,
Expand All @@ -60,12 +58,6 @@ pub use expr_rewriter::{
pub use expr_schema::ExprSchemable;
pub use expr_simplier::{ExprSimplifiable, SimplifyInfo};
pub use expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
pub use extension::UserDefinedLogicalNode;
pub use operators::Operator;
pub use plan::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
CrossJoin, DropTable, EmptyRelation, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, PlanVisitor, Repartition, TableScan, Union, Values,
};
pub(crate) use plan::{StringifiedPlan, ToStringifiedPlan};
pub use plan::{provider_as_source, source_as_provider};
pub use registry::FunctionRegistry;
Loading