Skip to content

Commit

Permalink
chore: Hide underlying channel in the Dispatcher (#1585)
Browse files Browse the repository at this point in the history
Hide the underlying dependency so we can have a stable API while making
changes to the underlying building blocks and implementation
  • Loading branch information
AdamGS authored Dec 6, 2024
1 parent 5b3b5d7 commit 2958870
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 12 deletions.
6 changes: 3 additions & 3 deletions vortex-io/src/dispatcher/compio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use compio::runtime::{JoinHandle as CompioJoinHandle, Runtime, RuntimeBuilder};
use futures::channel::oneshot;
use vortex_error::{vortex_bail, vortex_panic, VortexResult};

use super::Dispatch;
use super::{Dispatch, JoinHandle as VortexJoinHandle};

trait CompioSpawn {
fn spawn(self: Box<Self>) -> CompioJoinHandle<()>;
Expand Down Expand Up @@ -71,7 +71,7 @@ impl CompioDispatcher {
}

impl Dispatch for CompioDispatcher {
fn dispatch<F, Fut, R>(&self, task: F) -> VortexResult<oneshot::Receiver<R>>
fn dispatch<F, Fut, R>(&self, task: F) -> VortexResult<VortexJoinHandle<R>>
where
F: (FnOnce() -> Fut) + Send + 'static,
Fut: Future<Output = R> + 'static,
Expand All @@ -80,7 +80,7 @@ impl Dispatch for CompioDispatcher {
let (tx, rx) = oneshot::channel();
let compio_task = Box::new(CompioTask { task, result: tx });
match self.submitter.send(compio_task) {
Ok(()) => Ok(rx),
Ok(()) => Ok(VortexJoinHandle(rx)),
Err(err) => vortex_bail!("Dispatcher error spawning task: {err}"),
}
}
Expand Down
25 changes: 22 additions & 3 deletions vortex-io/src/dispatcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ mod tokio;
mod wasm;

use std::future::Future;
use std::task::Poll;

use futures::channel::oneshot;
use futures::FutureExt;
#[cfg(not(any(feature = "compio", feature = "tokio")))]
use vortex_error::vortex_panic;
use vortex_error::VortexResult;
use vortex_error::{vortex_err, VortexResult};

#[cfg(feature = "compio")]
use self::compio::*;
Expand Down Expand Up @@ -43,7 +45,7 @@ pub trait Dispatch: sealed::Sealed {
///
/// The returned `Future` will be executed to completion on a single thread,
/// thus it may be `!Send`.
fn dispatch<F, Fut, R>(&self, task: F) -> VortexResult<oneshot::Receiver<R>>
fn dispatch<F, Fut, R>(&self, task: F) -> VortexResult<JoinHandle<R>>
where
F: (FnOnce() -> Fut) + Send + 'static,
Fut: Future<Output = R> + 'static,
Expand All @@ -68,6 +70,23 @@ pub trait Dispatch: sealed::Sealed {
#[derive(Debug)]
pub struct IoDispatcher(Inner);

pub struct JoinHandle<R>(oneshot::Receiver<R>);

impl<R> Future for JoinHandle<R> {
type Output = VortexResult<R>;

fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
match self.0.poll_unpin(cx) {
Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
Poll::Ready(Err(_)) => Poll::Ready(Err(vortex_err!("Task was canceled"))),
Poll::Pending => Poll::Pending,
}
}
}

#[derive(Debug)]
enum Inner {
#[cfg(feature = "tokio")]
Expand Down Expand Up @@ -97,7 +116,7 @@ impl Default for IoDispatcher {

impl Dispatch for IoDispatcher {
#[allow(unused_variables)] // If no features are enabled `task` ends up being unused
fn dispatch<F, Fut, R>(&self, task: F) -> VortexResult<oneshot::Receiver<R>>
fn dispatch<F, Fut, R>(&self, task: F) -> VortexResult<JoinHandle<R>>
where
F: (FnOnce() -> Fut) + Send + 'static,
Fut: Future<Output = R> + 'static,
Expand Down
6 changes: 3 additions & 3 deletions vortex-io/src/dispatcher/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures::channel::oneshot;
use tokio::task::{JoinHandle as TokioJoinHandle, LocalSet};
use vortex_error::{vortex_bail, vortex_panic, VortexResult};

use super::Dispatch;
use super::{Dispatch, JoinHandle as VortexJoinHandle};

trait TokioSpawn {
fn spawn(self: Box<Self>) -> TokioJoinHandle<()>;
Expand Down Expand Up @@ -84,7 +84,7 @@ where
}

impl Dispatch for TokioDispatcher {
fn dispatch<F, Fut, R>(&self, task: F) -> VortexResult<oneshot::Receiver<R>>
fn dispatch<F, Fut, R>(&self, task: F) -> VortexResult<VortexJoinHandle<R>>
where
F: (FnOnce() -> Fut) + Send + 'static,
Fut: Future<Output = R> + 'static,
Expand All @@ -95,7 +95,7 @@ impl Dispatch for TokioDispatcher {
let task = TokioTask { result: tx, task };

match self.submitter.send(Box::new(task)) {
Ok(()) => Ok(rx),
Ok(()) => Ok(VortexJoinHandle(rx)),
Err(err) => vortex_bail!("Dispatcher error spawning task: {err}"),
}
}
Expand Down
6 changes: 3 additions & 3 deletions vortex-io/src/dispatcher/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures::channel::oneshot::Receiver;
use vortex_error::{vortex_panic, VortexResult};
use wasm_bindgen_futures::wasm_bindgen::__rt::Start;

use crate::Dispatch;
use crate::{Dispatch, JoinHandle as VortexJoinHandle};

/// `Dispatch`able type that is available when running Vortex in the browser or other WASM env.
#[derive(Debug, Clone)]
Expand All @@ -20,7 +20,7 @@ impl WasmDispatcher {
}

impl Dispatch for WasmDispatcher {
fn dispatch<F, Fut, R>(&self, task: F) -> VortexResult<Receiver<R>>
fn dispatch<F, Fut, R>(&self, task: F) -> VortexResult<VortexJoinHandle<R>>
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = R> + 'static,
Expand All @@ -35,7 +35,7 @@ impl Dispatch for WasmDispatcher {
})
.start();

Ok(rx)
Ok(VortexJoinHandle(rx))
}

fn shutdown(self) -> VortexResult<()> {
Expand Down

0 comments on commit 2958870

Please sign in to comment.