Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes(executor): support abort for pipeline executor stream #5803

Merged
merged 3 commits into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions query/src/interpreters/interpreter_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,11 @@ impl CopyInterpreter {

let async_runtime = ctx.get_storage_runtime();
let executor = PipelinePullingExecutor::try_create(async_runtime, pipeline)?;
let source_stream = Box::pin(ProcessorExecutorStream::create(executor)?);
let (handler, stream) = ProcessorExecutorStream::create(executor)?;
self.ctx.add_source_abort_handle(handler);

let operations = table
.append_data(ctx.clone(), source_stream)
.append_data(ctx.clone(), Box::pin(stream))
.await?
.try_collect()
.await?;
Expand Down
5 changes: 3 additions & 2 deletions query/src/interpreters/interpreter_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ impl Interpreter for SelectInterpreter {
let async_runtime = self.ctx.get_storage_runtime();
let new_pipeline = self.create_new_pipeline()?;
let executor = PipelinePullingExecutor::try_create(async_runtime, new_pipeline)?;
let executor_stream = Box::pin(ProcessorExecutorStream::create(executor)?);
return Ok(Box::pin(self.ctx.try_create_abortable(executor_stream)?));
let (handler, stream) = ProcessorExecutorStream::create(executor)?;
self.ctx.add_source_abort_handle(handler);
return Ok(Box::pin(stream));
}
let optimized_plan = self.rewrite_plan()?;
plan_schedulers::schedule_query(&self.ctx, &optimized_plan).await
Expand Down
5 changes: 3 additions & 2 deletions query/src/interpreters/interpreter_select_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ impl Interpreter for SelectInterpreterV2 {

// Spawn root pipeline
let executor = PipelinePullingExecutor::try_create(async_runtime, root_pipeline)?;
let executor_stream = Box::pin(ProcessorExecutorStream::create(executor)?);
Ok(Box::pin(self.ctx.try_create_abortable(executor_stream)?))
let (handler, stream) = ProcessorExecutorStream::create(executor)?;
self.ctx.add_source_abort_handle(handler);
Ok(Box::pin(Box::pin(stream)))
}

async fn start(&self) -> Result<()> {
Expand Down
28 changes: 24 additions & 4 deletions query/src/interpreters/stream/processor_executor_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,55 @@
// limitations under the License.

use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

use common_datablocks::DataBlock;
use common_exception::ErrorCode;
use common_exception::Result;
use futures::stream::AbortHandle;
use futures::stream::Abortable;
use futures::Stream;

use crate::pipelines::new::executor::PipelinePullingExecutor;

pub struct ProcessorExecutorStream {
is_aborted: Arc<Abortable<()>>,
executor: PipelinePullingExecutor,
}

impl ProcessorExecutorStream {
pub fn create(mut executor: PipelinePullingExecutor) -> Result<Self> {
pub fn create(mut executor: PipelinePullingExecutor) -> Result<(AbortHandle, Self)> {
let (handle, reg) = AbortHandle::new_pair();
let is_aborted = Arc::new(Abortable::new((), reg));

executor.start();
Ok(Self { executor })
Ok((handle, Self {
is_aborted,
executor,
}))
}
}

impl Stream for ProcessorExecutorStream {
type Item = Result<DataBlock>;

fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let is_aborted = self.is_aborted.clone();
let self_ = Pin::get_mut(self);
match self_.executor.pull_data() {
Ok(None) => Poll::Ready(None),
match self_
.executor
.try_pull_data(move || is_aborted.is_aborted())
{
Err(cause) => Poll::Ready(Some(Err(cause))),
Ok(Some(data)) => Poll::Ready(Some(Ok(data))),
Ok(None) => match self_.is_aborted.is_aborted() {
false => Poll::Ready(None),
true => Poll::Ready(Some(Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed",
)))),
},
}
}
}
4 changes: 4 additions & 0 deletions query/src/pipelines/new/executor/pipeline_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ impl PipelineExecutor {
Ok(())
}

pub fn is_finished(&self) -> bool {
self.global_tasks_queue.is_finished()
}

pub fn execute(self: &Arc<Self>) -> Result<()> {
let mut thread_join_handles = self.execute_threads(self.threads_num);

Expand Down
19 changes: 19 additions & 0 deletions query/src/pipelines/new/executor/pipeline_pulling_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
// limitations under the License.

use std::sync::mpsc::Receiver;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::mpsc::SyncSender;
use std::sync::Arc;
use std::time::Duration;

use common_base::base::Runtime;
use common_datablocks::DataBlock;
Expand Down Expand Up @@ -117,6 +119,23 @@ impl PipelinePullingExecutor {
Err(_recv_err) => Err(ErrorCode::LogicalError("Logical error, receiver error.")),
}
}

pub fn try_pull_data<F>(&mut self, f: F) -> Result<Option<DataBlock>>
where F: Fn() -> bool {
while !f() && !self.executor.is_finished() {
return match self.receiver.recv_timeout(Duration::from_millis(100)) {
Ok(data_block) => data_block,
Err(RecvTimeoutError::Timeout) => {
continue;
}
Err(RecvTimeoutError::Disconnected) => {
Err(ErrorCode::LogicalError("Logical error, receiver error."))
}
};
}

Ok(None)
}
}

impl Drop for PipelinePullingExecutor {
Expand Down