Skip to content

Commit

Permalink
Bump DataFusion version
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Apr 11, 2021
1 parent f1f4f2b commit a96fd26
Show file tree
Hide file tree
Showing 14 changed files with 66 additions and 52 deletions.
6 changes: 3 additions & 3 deletions rust/ballista/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ members = [
"scheduler",
]

[profile.release]
lto = true
codegen-units = 1
#[profile.release]
#lto = true
#codegen-units = 1
6 changes: 3 additions & 3 deletions rust/ballista/rust/benchmarks/tpch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ edition = "2018"
[dependencies]
ballista = { path="../../client" }

arrow = { git = "https://github.com/apache/arrow", rev="46161d2" }
datafusion = { git = "https://github.com/apache/arrow", rev="46161d2" }
parquet = { git = "https://github.com/apache/arrow", rev="46161d2" }
arrow = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }
datafusion = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }
parquet = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }


env_logger = "0.8"
Expand Down
4 changes: 2 additions & 2 deletions rust/ballista/rust/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ ballista-core = { path = "../core" }
futures = "0.3"
log = "0.4"
tokio = "1.0"
arrow = { git = "https://github.com/apache/arrow", rev="46161d2" }
datafusion = { git = "https://github.com/apache/arrow", rev="46161d2" }
arrow = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }
datafusion = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }
14 changes: 9 additions & 5 deletions rust/ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use ballista_core::{
};

use arrow::datatypes::Schema;
use datafusion::catalog::TableReference;
use datafusion::execution::context::ExecutionContext;
use datafusion::logical_plan::{DFSchema, Expr, LogicalPlan, Partitioning};
use datafusion::physical_plan::csv::CsvReadOptions;
Expand Down Expand Up @@ -148,7 +149,10 @@ impl BallistaContext {
for (name, plan) in &state.tables {
let plan = ctx.optimize(plan)?;
let execution_plan = ctx.create_physical_plan(&plan)?;
ctx.register_table(name, Arc::new(DFTableAdapter::new(plan, execution_plan)));
ctx.register_table(
TableReference::Bare { table: name },
Arc::new(DFTableAdapter::new(plan, execution_plan)),
)?;
}
let df = ctx.sql(sql)?;
Ok(BallistaDataFrame::from(self.state.clone(), df))
Expand Down Expand Up @@ -267,7 +271,7 @@ impl BallistaDataFrame {
))
}

pub fn select(&self, expr: &[Expr]) -> Result<BallistaDataFrame> {
pub fn select(&self, expr: Vec<Expr>) -> Result<BallistaDataFrame> {
Ok(Self::from(
self.state.clone(),
self.df.select(expr).map_err(BallistaError::from)?,
Expand All @@ -283,8 +287,8 @@ impl BallistaDataFrame {

pub fn aggregate(
&self,
group_expr: &[Expr],
aggr_expr: &[Expr],
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
) -> Result<BallistaDataFrame> {
Ok(Self::from(
self.state.clone(),
Expand All @@ -301,7 +305,7 @@ impl BallistaDataFrame {
))
}

pub fn sort(&self, expr: &[Expr]) -> Result<BallistaDataFrame> {
pub fn sort(&self, expr: Vec<Expr>) -> Result<BallistaDataFrame> {
Ok(Self::from(
self.state.clone(),
self.df.sort(expr).map_err(BallistaError::from)?,
Expand Down
6 changes: 3 additions & 3 deletions rust/ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ sqlparser = "0.8"
tokio = "1.0"
tonic = "0.4"
uuid = { version = "0.8", features = ["v4"] }
arrow = { git = "https://github.com/apache/arrow", rev="46161d2" }
arrow-flight = { git = "https://github.com/apache/arrow", rev="46161d2" }
datafusion = { git = "https://github.com/apache/arrow", rev="46161d2" }
arrow = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }
arrow-flight = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }
datafusion = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }


[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions rust/ballista/rust/core/src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl TableProvider for DFTableAdapter {
_projection: &Option<Vec<usize>>,
_batch_size: usize,
_filters: &[Expr],
_limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
Ok(self.plan.clone())
}
Expand Down
17 changes: 8 additions & 9 deletions rust/ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,13 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
match plan {
LogicalPlanType::Projection(projection) => {
let input: LogicalPlan = convert_box_required!(projection.input)?;
let x: Vec<Expr> = projection
.expr
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, _>>()?;
LogicalPlanBuilder::from(&input)
.project(
&projection
.expr
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, _>>()?,
)?
.project(x)?
.build()
.map_err(|e| e.into())
}
Expand Down Expand Up @@ -89,7 +88,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, _>>()?;
LogicalPlanBuilder::from(&input)
.aggregate(&group_expr, &aggr_expr)?
.aggregate(group_expr, aggr_expr)?
.build()
.map_err(|e| e.into())
}
Expand Down Expand Up @@ -148,7 +147,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.map(|expr| expr.try_into())
.collect::<Result<Vec<Expr>, _>>()?;
LogicalPlanBuilder::from(&input)
.sort(&sort_expr)?
.sort(sort_expr)?
.build()
.map_err(|e| e.into())
}
Expand Down
10 changes: 5 additions & 5 deletions rust/ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ mod roundtrip_tests {
CsvReadOptions::new().schema(&schema).has_header(true),
Some(vec![3, 4]),
)
.and_then(|plan| plan.sort(&[col("salary")]))
.and_then(|plan| plan.sort(vec![col("salary")]))
.and_then(|plan| plan.build())
.map_err(BallistaError::DataFusionError)?,
);
Expand Down Expand Up @@ -679,7 +679,7 @@ mod roundtrip_tests {
CsvReadOptions::new().schema(&schema).has_header(true),
Some(vec![3, 4]),
)
.and_then(|plan| plan.sort(&[col("salary")]))
.and_then(|plan| plan.sort(vec![col("salary")]))
.and_then(|plan| plan.explain(true))
.and_then(|plan| plan.build())
.map_err(BallistaError::DataFusionError)?;
Expand All @@ -689,7 +689,7 @@ mod roundtrip_tests {
CsvReadOptions::new().schema(&schema).has_header(true),
Some(vec![3, 4]),
)
.and_then(|plan| plan.sort(&[col("salary")]))
.and_then(|plan| plan.sort(vec![col("salary")]))
.and_then(|plan| plan.explain(false))
.and_then(|plan| plan.build())
.map_err(BallistaError::DataFusionError)?;
Expand Down Expand Up @@ -742,7 +742,7 @@ mod roundtrip_tests {
CsvReadOptions::new().schema(&schema).has_header(true),
Some(vec![3, 4]),
)
.and_then(|plan| plan.sort(&[col("salary")]))
.and_then(|plan| plan.sort(vec![col("salary")]))
.and_then(|plan| plan.build())
.map_err(BallistaError::DataFusionError)?;
roundtrip_test!(plan);
Expand Down Expand Up @@ -784,7 +784,7 @@ mod roundtrip_tests {
CsvReadOptions::new().schema(&schema).has_header(true),
Some(vec![3, 4]),
)
.and_then(|plan| plan.aggregate(&[col("state")], &[max(col("salary"))]))
.and_then(|plan| plan.aggregate(vec![col("state")], vec![max(col("salary"))]))
.and_then(|plan| plan.build())
.map_err(BallistaError::DataFusionError)?;

Expand Down
10 changes: 2 additions & 8 deletions rust/ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -939,10 +939,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
})
}
LogicalPlan::Extension { .. } => unimplemented!(),
// _ => Err(BallistaError::General(format!(
// "logical plan to_proto {:?}",
// self
// ))),
LogicalPlan::Union { .. } => unimplemented!(),
}
}
}
Expand Down Expand Up @@ -1161,10 +1158,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
Expr::Wildcard => Ok(protobuf::LogicalExprNode {
expr_type: Some(protobuf::logical_expr_node::ExprType::Wildcard(true)),
}),
// _ => Err(BallistaError::General(format!(
// "logical expr to_proto {:?}",
// self
// ))),
Expr::TryCast { .. } => unimplemented!(),
}
}
}
Expand Down
19 changes: 16 additions & 3 deletions rust/ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ use crate::serde::{proto_error, protobuf};
use crate::{convert_box_required, convert_required};

use arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion::catalog::catalog::{
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
};
use datafusion::execution::context::{ExecutionConfig, ExecutionContextState};
use datafusion::logical_plan::{DFSchema, Expr};
use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction};
use datafusion::physical_plan::expressions::col;
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::hash_join::PartitionMode;
use datafusion::physical_plan::merge::MergeExec;
use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
use datafusion::physical_plan::{
Expand Down Expand Up @@ -111,6 +115,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
options,
Some(projection),
batch_size,
None,
)?))
}
PhysicalPlanType::ParquetScan(scan) => {
Expand All @@ -123,6 +128,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
None,
scan.batch_size as usize,
scan.num_partitions as usize,
None,
)?))
}
PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
Expand Down Expand Up @@ -215,8 +221,10 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
.collect::<Result<Vec<_>, _>>()?;

let df_planner = DefaultPhysicalPlanner::default();
let catalog_list =
Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;
let ctx_state = ExecutionContextState {
datasources: Default::default(),
catalog_list,
scalar_functions: Default::default(),
var_provider: Default::default(),
aggregate_functions: Default::default(),
Expand Down Expand Up @@ -294,7 +302,11 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
protobuf::JoinType::Right => JoinType::Right,
};
Ok(Arc::new(HashJoinExec::try_new(
left, right, &on, &join_type,
left,
right,
&on,
&join_type,
PartitionMode::CollectLeft,
)?))
}
PhysicalPlanType::ShuffleReader(shuffle_reader) => {
Expand Down Expand Up @@ -374,8 +386,9 @@ fn compile_expr(
schema: &Schema,
) -> Result<Arc<dyn PhysicalExpr>, BallistaError> {
let df_planner = DefaultPhysicalPlanner::default();
let catalog_list = Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;
let state = ExecutionContextState {
datasources: HashMap::new(),
catalog_list,
scalar_functions: HashMap::new(),
var_provider: HashMap::new(),
aggregate_functions: HashMap::new(),
Expand Down
2 changes: 2 additions & 0 deletions rust/ballista/rust/core/src/serde/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ mod roundtrip_tests {

use super::super::super::error::Result;
use super::super::protobuf;
use datafusion::physical_plan::hash_join::PartitionMode;

fn roundtrip_test(exec_plan: Arc<dyn ExecutionPlan>) -> Result<()> {
let proto: protobuf::PhysicalPlanNode = exec_plan.clone().try_into()?;
Expand Down Expand Up @@ -84,6 +85,7 @@ mod roundtrip_tests {
Arc::new(EmptyExec::new(false, Arc::new(schema_right))),
&[("col".to_string(), "col".to_string())],
&JoinType::Inner,
PartitionMode::CollectLeft,
)?))
}

Expand Down
6 changes: 3 additions & 3 deletions rust/ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ tokio-stream = "0.1"
tonic = "0.4"
uuid = { version = "0.8", features = ["v4"] }

arrow = { git = "https://github.com/apache/arrow", rev="46161d2" }
arrow-flight = { git = "https://github.com/apache/arrow", rev="46161d2" }
datafusion = { git = "https://github.com/apache/arrow", rev="46161d2" }
arrow = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }
arrow-flight = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }
datafusion = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }

[dev-dependencies]

Expand Down
4 changes: 2 additions & 2 deletions rust/ballista/rust/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ sled_package = { package = "sled", version = "0.34", optional = true }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
tonic = "0.4"

arrow = { git = "https://github.com/apache/arrow", rev="46161d2" }
datafusion = { git = "https://github.com/apache/arrow", rev="46161d2" }
arrow = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }
datafusion = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }

[dev-dependencies]
ballista-core = { path = "../core" }
Expand Down
13 changes: 7 additions & 6 deletions rust/ballista/rust/scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,13 @@ impl SchedulerGrpc for SchedulerServer {

match file_type {
FileType::Parquet => {
let parquet_exec = ParquetExec::try_from_path(&path, None, None, 1024, 1)
.map_err(|e| {
let msg = format!("Error opening parquet files: {}", e);
error!("{}", msg);
tonic::Status::internal(msg)
})?;
let parquet_exec =
ParquetExec::try_from_path(&path, None, None, 1024, 1, None)
.map_err(|e| {
let msg = format!("Error opening parquet files: {}", e);
error!("{}", msg);
tonic::Status::internal(msg)
})?;

//TODO include statistics and any other info needed to reconstruct ParquetExec
Ok(Response::new(GetFileMetadataResult {
Expand Down

0 comments on commit a96fd26

Please sign in to comment.