-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix logical plan optimization will execute twice in SQL mode #1183
Conversation
@@ -56,8 +56,12 @@ impl DataFrameImpl { | |||
/// Create a physical plan | |||
async fn create_physical_plan(&self) -> Result<Arc<dyn ExecutionPlan>> { | |||
let state = self.ctx_state.lock().unwrap().clone(); | |||
let has_optimized = state.has_optimized; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is ok to call DataFrame::create_physical_plan()
twice on the same DataFrame -- with this implementation, won't the second call to create_physical_plan
not be optimized?
I vaguely remember @Dandandan finding something similar in the past, but I can't find the reference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think redundant calls to logical plan optimization will cost performance.
With this implementation, after the first optimization in https://github.com/apache/arrow-datafusion/blob/ad059a688fd8da7b360423e0d911f2f1f33dbb9f/datafusion/src/execution/context.rs#L749, the state.has_optimized
will be true. So in create_physical_plan
, we can avoid the redundant optimization for logical plan and directly create a physical plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xudong963 I think what @alamb meant was the return value from ctx.optimize(&self.plan)?
was not saved into self.plan
. So the next time create_physical_plan
is called, ctx.create_physical_plan(&plan).await
will be invoked with an unoptimized plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@houqp I don't mean create_physical_plan
will be called twice. I mean https://github.com/apache/arrow-datafusion/blob/ad059a688fd8da7b360423e0d911f2f1f33dbb9f/datafusion/src/execution/context.rs#L613 will be called twice.
Fox example
#[tokio::main]
async fn main() -> Result<()> {
// create local execution context
let mut ctx = ExecutionContext::new();
let testdata = datafusion::arrow::util::test_util::parquet_test_data();
// register parquet file with the execution context
ctx.register_parquet(
"alltypes_plain",
&format!("{}/alltypes_plain.parquet", testdata),
)
.await?;
// execute the query
let df = ctx
.sql(
"SELECT int_col, double_col, CAST(date_string_col as VARCHAR) \
FROM alltypes_plain \
WHERE id > 1 AND tinyint_col < double_col",
)
.await?;
// print the results
df.show().await?;
Ok(())
}
ctx.sql(..)
and df.show()
both will call logical plan optimization.
Let me know where I misunderstand.
cc @alamb
@@ -176,6 +176,7 @@ impl ExecutionContext { | |||
config, | |||
execution_props: ExecutionProps::new(), | |||
object_store_registry: Arc::new(ObjectStoreRegistry::new()), | |||
has_optimized: false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This way - if we want to execute two queries in the same context, it seems only the first will be optimized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to do this, I think it should be something on the LogicalPlan
instead.
On the other hand, I don't think double optimize on different query executions (e.g. collect, show) is something very beneficial.
The slower part of optimization is collecting statistics and using it for cost based optimizations and pruning, which is not something we do in the logical optimizations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This way - if we want to execute two queries in the same context, it seems only the first will be optimized?
How to execute two queries in the same context, could you please give an example, thanks very much @Dandandan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to do this, I think it should be something on the
LogicalPlan
instead.
I agree. ExecutionContext
as a global context isn't suitable to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What could be done to avoid optimizing a logical plan twice is adding a is_optimized
or something similar to LogicalPlan
instead. After optimizing we can set is_optimized
to true on the logical plan of the dataframe.
Might also be good to have some numbers about how much time a typical full optimization pass costs (and / or to track some statistics) - I would expect in most cases it will be quite a bit less than say 1 ms.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might also be good to have some numbers about how much time a typical full optimization pass costs (and / or to track some statistics) - I would expect in most cases it will be quite a bit less than say 1 ms.
Maybe we can do this in the next PR.
An argument against avoiding to optimize it twice in different query executions is this example: let x = df.collect();
// change execution context e.g. change enabled optimizations, parallelism, etc.
ctx.state...
// evaluate dataframe with collect or similar method
let y = df.collect(); Now - when running the optimizer for the new execution "again" for the DataFrame will optimize it based on the provided configurations. |
Thanks! @Dandandan. Happy weekend, happy coding! 🎉 |
A summary of what the PR will do.
About 1, I want to know if directly adding pub enum LogicalPlan {
Projection {
...
is_optimized: bool
},
Filter {
...
is_optimized: bool
},
... @Dandandan @alamb @houqp Please help me check my thought, thanks! |
I think it might be ok to optimize the plan twice (in other words, perhaps we can close the ticket as "working as expected"?). Do we have any examples of the double optimization causing problems (or taking overly long)? I think @Dandandan was also hinting at this point in his comment at #1183 (comment) Adding a lot of additional code to LogicalPlan ( |
OK, No code is best code😄 |
Yes I was hinting at that. If we are able to show otherwise (examples of logical plan optimization takes very long) then we can see if we can optimize for this case. Otherwise I agree "working as expected" should be the conclusion. |
Thanks again for looking into this @xudong963 |
Which issue does this PR close?
Closes #1182
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?