Skip to content

Commit

Permalink
Stop optimizing queries twice (apache#2369)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored and ovr committed Jun 29, 2022
1 parent 24265c4 commit 45bc3ee
Show file tree
Hide file tree
Showing 14 changed files with 165 additions and 81 deletions.
6 changes: 3 additions & 3 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ impl BallistaContext {
path: &str,
options: CsvReadOptions<'_>,
) -> Result<()> {
match self.read_csv(path, options).await?.to_logical_plan() {
match self.read_csv(path, options).await?.to_logical_plan()? {
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source)
}
Expand All @@ -282,7 +282,7 @@ impl BallistaContext {
path: &str,
options: ParquetReadOptions<'_>,
) -> Result<()> {
match self.read_parquet(path, options).await?.to_logical_plan() {
match self.read_parquet(path, options).await?.to_logical_plan()? {
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source)
}
Expand All @@ -296,7 +296,7 @@ impl BallistaContext {
path: &str,
options: AvroReadOptions<'_>,
) -> Result<()> {
match self.read_avro(path, options).await?.to_logical_plan() {
match self.read_avro(path, options).await?.to_logical_plan()? {
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source)
}
Expand Down
6 changes: 3 additions & 3 deletions ballista/rust/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ mod test {
)
.await?;

let plan = df.to_logical_plan();
let plan = df.to_logical_plan()?;
let plan = ctx.optimize(&plan)?;
let plan = ctx.create_physical_plan(&plan).await?;

Expand Down Expand Up @@ -420,7 +420,7 @@ order by
)
.await?;

let plan = df.to_logical_plan();
let plan = df.to_logical_plan()?;
let plan = ctx.optimize(&plan)?;
let plan = ctx.create_physical_plan(&plan).await?;

Expand Down Expand Up @@ -568,7 +568,7 @@ order by
)
.await?;

let plan = df.to_logical_plan();
let plan = df.to_logical_plan()?;
let plan = ctx.optimize(&plan)?;
let plan = ctx.create_physical_plan(&plan).await?;

Expand Down
11 changes: 6 additions & 5 deletions ballista/rust/scheduler/src/scheduler_server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,14 +380,15 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
error!("{}", msg);
tonic::Status::internal(msg)
})?,
Query::Sql(sql) => {
let df = df_session.sql(&sql).await.map_err(|e| {
Query::Sql(sql) => df_session
.sql(&sql)
.await
.and_then(|df| df.to_logical_plan())
.map_err(|e| {
let msg = format!("Error parsing SQL: {}", e);
error!("{}", msg);
tonic::Status::internal(msg)
})?;
df.to_logical_plan()
}
})?,
};
debug!("Received plan for execution: {:?}", plan);

Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
}

// create the physical plan
let csv = csv.to_logical_plan();
let csv = csv.to_logical_plan()?;
let csv = ctx.optimize(&csv)?;
let csv = ctx.create_physical_plan(&csv).await?;

Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ name = "physical_plan"
harness = false
name = "parquet_query_sql"

[[bench]]
harness = false
name = "sql_planner"

[[bench]]
harness = false
name = "jit"
Expand Down
93 changes: 93 additions & 0 deletions datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#[macro_use]
extern crate criterion;
extern crate arrow;
extern crate datafusion;

mod data_utils;
use crate::criterion::Criterion;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::datasource::MemTable;
use datafusion::error::Result;
use datafusion::execution::context::SessionContext;
use parking_lot::Mutex;
use std::sync::Arc;
use tokio::runtime::Runtime;

fn plan(ctx: Arc<Mutex<SessionContext>>, sql: &str) {
let rt = Runtime::new().unwrap();
criterion::black_box(rt.block_on(ctx.lock().sql(sql)).unwrap());
}

/// Create schema representing a large table
pub fn create_schema(column_prefix: &str) -> Schema {
let fields = (0..200)
.map(|i| Field::new(&format!("{}{}", column_prefix, i), DataType::Int32, true))
.collect();
Schema::new(fields)
}

pub fn create_table_provider(column_prefix: &str) -> Result<Arc<MemTable>> {
let schema = Arc::new(create_schema(column_prefix));
MemTable::try_new(schema, vec![]).map(Arc::new)
}

fn create_context() -> Result<Arc<Mutex<SessionContext>>> {
let ctx = SessionContext::new();
ctx.register_table("t1", create_table_provider("a")?)?;
ctx.register_table("t2", create_table_provider("b")?)?;
Ok(Arc::new(Mutex::new(ctx)))
}

fn criterion_benchmark(c: &mut Criterion) {
let ctx = create_context().unwrap();

c.bench_function("trivial join low numbered columns", |b| {
b.iter(|| {
plan(
ctx.clone(),
"SELECT t1.a2, t2.b2 \
FROM t1, t2 WHERE a1 = b1",
)
})
});

c.bench_function("trivial join high numbered columns", |b| {
b.iter(|| {
plan(
ctx.clone(),
"SELECT t1.a99, t2.b99 \
FROM t1, t2 WHERE a199 = b199",
)
})
});

c.bench_function("aggregate with join", |b| {
b.iter(|| {
plan(
ctx.clone(),
"SELECT t1.a99, MIN(t2.b1), MAX(t2.b199), AVG(t2.b123), COUNT(t2.b73) \
FROM t1 JOIN t2 ON t1.a199 = t2.b199 GROUP BY t1.a99",
)
})
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
Loading

0 comments on commit 45bc3ee

Please sign in to comment.