Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Ballista executing during plan #2428

Merged
merged 2 commits into from
May 4, 2022

Conversation

tustvold
Copy link
Contributor

@tustvold tustvold commented May 3, 2022

Which issue does this PR close?

Part of #2307

Rationale for this change

ExecutionPlan::execute is expected to defer computation to the returned SendableRecordBatchStream, this is necessary for result streaming to work correctly. Currently the Ballista ExecutionPlan instead evaluate in ExecutionPlan::execute, which is not correct.

What changes are included in this PR?

Defers plan evaluation to the SendableRecordBatchStream

Are there any user-facing changes?

Yes, I replaced ballista::core::WrappedStream with the RecordBatchStreamAdapter from DataFusion

@github-actions github-actions bot added ballista datafusion Changes in the datafusion crate labels May 3, 2022
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;

let schema: Schema = self.plan.schema().as_ref().clone().into();

let mut buf: Vec<u8> = vec![];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend reviewing this without whitespace

https://github.com/apache/arrow-datafusion/pull/2428/files?w=1

break Err(DataFusionError::Execution(msg));
}
job_status::Status::Completed(completed) => {
let streams = completed.partition_location.into_iter().map(|p| {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change, previously it would buffer up all records from all partitions and then yield them in partition order, now it will stream them potentially interleaving results from different partitions. I'm fairly certain this is fine, but want to draw attention to this

@@ -316,38 +313,3 @@ impl<T: 'static + AsLogicalPlan> QueryPlanner for BallistaQueryPlanner<T> {
}
}
}

pub struct WrappedStream {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been replaced by RecordBatchStreamAdapter which also does not require boxing or pinning the stream, making it not only easier to use, but potentially slightly faster

@andygrove
Copy link
Member

I skimmed through this and it LGTM but I have not been active in Ballista lately so would be good if perhaps @mingmwang @yahoNanJing or @yjshen could also take a look

@tustvold
Copy link
Contributor Author

tustvold commented May 3, 2022

I've backed out the "fix" for apache/datafusion-ballista#483 as it was causing test failures, and isn't on the critical path to making ExecutionPlan sync which is what I'm focused on at the moment. I also lack sufficient knowledge of ballista to effectively diagnose this, so will leave it for someone with a better grasp on what is going on

@andygrove
Copy link
Member

@tustvold I am working on fixing the Ballista integration tests and then will test with this PR and approve assuming no issues come up.

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes LGTM and I ran the Ballista integration tests and they pass 🎉

@andygrove andygrove merged commit b7bb2cf into apache:master May 4, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants