Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix logical plan optimization will execute twice in SQL mode #1183

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
object_store_registry: Arc::new(ObjectStoreRegistry::new()),
has_optimized: true,
};

let fun_expr = functions::create_physical_fun(
Expand Down
5 changes: 5 additions & 0 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ impl ExecutionContext {
config,
execution_props: ExecutionProps::new(),
object_store_registry: Arc::new(ObjectStoreRegistry::new()),
has_optimized: false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way - if we want to execute two queries in the same context, it seems only the first will be optimized?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to do this, I think it should be something on the LogicalPlan instead.

On the other hand, I don't think double optimize on different query executions (e.g. collect, show) is something very beneficial.
The slower part of optimization is collecting statistics and using it for cost based optimizations and pruning, which is not something we do in the logical optimizations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way - if we want to execute two queries in the same context, it seems only the first will be optimized?

How to execute two queries in the same context, could you please give an example, thanks very much @Dandandan

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to do this, I think it should be something on the LogicalPlan instead.

I agree. ExecutionContext as a global context isn't suitable to do this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What could be done to avoid optimizing a logical plan twice is adding a is_optimized or something similar to LogicalPlan instead. After optimizing we can set is_optimized to true on the logical plan of the dataframe.

Might also be good to have some numbers about how much time a typical full optimization pass costs (and / or to track some statistics) - I would expect in most cases it will be quite a bit less than say 1 ms.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might also be good to have some numbers about how much time a typical full optimization pass costs (and / or to track some statistics) - I would expect in most cases it will be quite a bit less than say 1 ms.

Maybe we can do this in the next PR.

})),
}
}
Expand Down Expand Up @@ -767,6 +768,7 @@ impl ExecutionContext {
observer(&new_plan, optimizer.as_ref());
}
debug!("Optimized logical plan:\n {:?}", new_plan);
state.has_optimized = true;
Ok(new_plan)
}
}
Expand Down Expand Up @@ -1037,6 +1039,8 @@ pub struct ExecutionContextState {
pub execution_props: ExecutionProps,
/// Object Store that are registered with the context
pub object_store_registry: Arc<ObjectStoreRegistry>,
/// If Logical Plan has optimized, it will be true
pub has_optimized: bool,
}

impl ExecutionProps {
Expand Down Expand Up @@ -1065,6 +1069,7 @@ impl ExecutionContextState {
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
object_store_registry: Arc::new(ObjectStoreRegistry::new()),
has_optimized: false,
}
}

Expand Down
6 changes: 5 additions & 1 deletion datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,12 @@ impl DataFrameImpl {
/// Create a physical plan
async fn create_physical_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
let state = self.ctx_state.lock().unwrap().clone();
let has_optimized = state.has_optimized;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is ok to call DataFrame::create_physical_plan() twice on the same DataFrame -- with this implementation, won't the second call to create_physical_plan not be optimized?

I vaguely remember @Dandandan finding something similar in the past, but I can't find the reference

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think redundant calls to logical plan optimization will cost performance.

With this implementation, after the first optimization in https://github.com/apache/arrow-datafusion/blob/ad059a688fd8da7b360423e0d911f2f1f33dbb9f/datafusion/src/execution/context.rs#L749, the state.has_optimized will be true. So in create_physical_plan, we can avoid the redundant optimization for logical plan and directly create a physical plan.

Copy link
Member

@houqp houqp Oct 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb perhaps you were thinking about this ticket? #705

Copy link
Member

@houqp houqp Oct 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xudong963 I think what @alamb meant was the return value from ctx.optimize(&self.plan)? was not saved into self.plan. So the next time create_physical_plan is called, ctx.create_physical_plan(&plan).await will be invoked with an unoptimized plan.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@houqp I don't mean create_physical_plan will be called twice. I mean https://github.com/apache/arrow-datafusion/blob/ad059a688fd8da7b360423e0d911f2f1f33dbb9f/datafusion/src/execution/context.rs#L613 will be called twice.

Fox example

#[tokio::main]
async fn main() -> Result<()> {
    // create local execution context
    let mut ctx = ExecutionContext::new();

    let testdata = datafusion::arrow::util::test_util::parquet_test_data();

    // register parquet file with the execution context
    ctx.register_parquet(
        "alltypes_plain",
        &format!("{}/alltypes_plain.parquet", testdata),
    )
    .await?;

    // execute the query
    let df = ctx
        .sql(
            "SELECT int_col, double_col, CAST(date_string_col as VARCHAR) \
        FROM alltypes_plain \
        WHERE id > 1 AND tinyint_col < double_col",
        )
        .await?;

    // print the results
    df.show().await?;

    Ok(())
}

ctx.sql(..) and df.show() both will call logical plan optimization.
Let me know where I misunderstand.
cc @alamb

let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
let plan = ctx.optimize(&self.plan)?;
let mut plan: LogicalPlan = self.plan.clone();
if !has_optimized {
plan = ctx.optimize(&self.plan)?;
}
ctx.create_physical_plan(&plan).await
}
}
Expand Down