Skip to content

Commit

Permalink
feat(node/p2p): Relax internal Send bounds (#260)
Browse files Browse the repository at this point in the history
  • Loading branch information
oblique authored Apr 3, 2024
1 parent e917b6b commit 74d75cc
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 29 deletions.
65 changes: 37 additions & 28 deletions node/src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,12 @@
use std::future::Future;
use std::pin::Pin;

use libp2p::swarm;
use tokio::select;
use tokio_util::sync::CancellationToken;

#[allow(unused_imports)]
pub(crate) use self::imp::{sleep, spawn, timeout, yield_now, Elapsed, Interval};

pub(crate) struct Executor;

impl swarm::Executor for Executor {
fn exec(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
spawn(future)
}
}

/// Spawn a cancellable task.
///
/// This will cancel the task in the highest layer and should not be used
/// if cancellation must happen in a point.
pub(crate) fn spawn_cancellable<F>(cancelation_token: CancellationToken, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
spawn(async move {
select! {
_ = cancelation_token.cancelled() => {}
_ = future => {}
}
});
}
pub(crate) use self::imp::{
sleep, spawn, spawn_cancellable, timeout, yield_now, Elapsed, Interval,
};

#[cfg(not(target_arch = "wasm32"))]
mod imp {
Expand All @@ -47,6 +23,22 @@ mod imp {
tokio::spawn(future);
}

/// Spawn a cancellable task.
///
/// This will cancel the task in the highest layer and should not be used
/// if cancellation must happen in a point.
pub(crate) fn spawn_cancellable<F>(cancelation_token: CancellationToken, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
tokio::spawn(async move {
select! {
_ = cancelation_token.cancelled() => {}
_ = future => {}
}
});
}

pub(crate) struct Interval(tokio::time::Interval);

impl Interval {
Expand Down Expand Up @@ -83,11 +75,27 @@ mod imp {

pub(crate) fn spawn<F>(future: F)
where
F: Future<Output = ()> + Send + 'static,
F: Future<Output = ()> + 'static,
{
wasm_bindgen_futures::spawn_local(future);
}

/// Spawn a cancellable task.
///
/// This will cancel the task in the highest layer and should not be used
/// if cancellation must happen in a point.
pub(crate) fn spawn_cancellable<F>(cancelation_token: CancellationToken, future: F)
where
F: Future<Output = ()> + 'static,
{
wasm_bindgen_futures::spawn_local(async move {
select! {
_ = cancelation_token.cancelled() => {}
_ = future => {}
}
});
}

pub(crate) struct Interval(SendWrapper<IntervalStream>);

impl Interval {
Expand Down Expand Up @@ -135,6 +143,7 @@ mod imp {
#[pin]
delay: SendWrapper<TimeoutFuture>,
}

impl<T> Future for Timeout<T>
where
T: Future,
Expand Down
2 changes: 1 addition & 1 deletion node/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1087,7 +1087,7 @@ fn init_bitswap<B, S>(
network_id: &str,
) -> Result<beetswap::Behaviour<MAX_MH_SIZE, B>>
where
B: Blockstore,
B: Blockstore + 'static,
S: Store + 'static,
{
let protocol_prefix = format!("/celestia/{}", network_id);
Expand Down

0 comments on commit 74d75cc

Please sign in to comment.