Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(iroh): Remove flume from iroh gossip #2542

Merged
merged 2 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions iroh-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ tokio-util = { version = "0.7.8", optional = true, features = ["codec"] }
genawaiter = { version = "0.99.1", default-features = false, features = ["futures03"] }

# dispatcher dependencies (optional)
async-channel = { version = "2.3.1", optional = true }
futures-util = { version = "0.3.30", optional = true }
flume = { version = "0.11", optional = true }

[dev-dependencies]
clap = { version = "4", features = ["derive"] }
Expand All @@ -51,7 +51,7 @@ url = "2.4.0"
[features]
default = ["net", "dispatcher"]
net = ["dep:futures-lite", "dep:iroh-net", "dep:tokio", "dep:tokio-util"]
dispatcher = ["dep:flume", "dep:futures-util"]
dispatcher = ["dep:async-channel", "dep:futures-util"]

[[example]]
name = "chat"
Expand Down
15 changes: 7 additions & 8 deletions iroh-gossip/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
proto::{DeliveryScope, TopicId},
};
use bytes::Bytes;
use futures_lite::StreamExt;
use futures_util::Stream;
use iroh_base::rpc::{RpcError, RpcResult};
use iroh_net::{key::PublicKey, util::AbortingJoinHandle, NodeId};
Expand Down Expand Up @@ -106,7 +107,7 @@ struct State {
/// Type alias for a stream of gossip updates, so we don't have to repeat all the bounds.
type CommandStream = Box<dyn Stream<Item = Command> + Send + Sync + Unpin + 'static>;
/// Type alias for a sink of gossip events.
type EventSink = flume::Sender<RpcResult<Event>>;
type EventSink = async_channel::Sender<RpcResult<Event>>;

#[derive(derive_more::Debug)]
enum TopicState {
Expand Down Expand Up @@ -214,7 +215,7 @@ impl GossipDispatcher {
/// This will not wait until the sink is full, but send a `Lagged` response if the sink is almost full.
fn try_send(send: &EventSink, event: &IrohGossipEvent) -> bool {
// If the stream is disconnected, we don't need to send to it.
if send.is_disconnected() {
if send.is_closed() {
return false;
}
// Check if the send buffer is almost full, and send a lagged response if it is.
Expand All @@ -234,7 +235,6 @@ impl GossipDispatcher {
///
/// This should not fail unless the gossip instance is faulty.
async fn dispatch_loop(mut self) -> anyhow::Result<()> {
use futures_lite::stream::StreamExt;
let stream = self.gossip.clone().subscribe_all();
tokio::pin!(stream);
while let Some(item) = stream.next().await {
Expand Down Expand Up @@ -306,7 +306,6 @@ impl GossipDispatcher {
topic: TopicId,
mut updates: CommandStream,
) -> anyhow::Result<()> {
use futures_lite::stream::StreamExt;
while let Some(update) = Pin::new(&mut updates).next().await {
match update {
Command::Broadcast(msg) => {
Expand Down Expand Up @@ -404,7 +403,7 @@ impl GossipDispatcher {
let mut update_tasks = vec![];
for (updates, event_sink) in waiting {
// if the stream is disconnected, we don't need to keep it and start the update task
if event_sink.is_disconnected() {
if event_sink.is_closed() {
continue;
}
event_sinks.push(event_sink);
Expand Down Expand Up @@ -438,9 +437,9 @@ impl GossipDispatcher {
topic: TopicId,
options: SubscribeOptions,
updates: CommandStream,
) -> impl Stream<Item = RpcResult<Event>> {
) -> impl Stream<Item = RpcResult<Event>> + Unpin {
let mut inner = self.inner.lock().unwrap();
let (send, recv) = flume::bounded(options.subscription_capacity);
let (send, recv) = async_channel::bounded(options.subscription_capacity);
match inner.current_subscriptions.entry(topic) {
Entry::Vacant(entry) => {
// There is no existing subscription, so we need to start a new one.
Expand Down Expand Up @@ -490,7 +489,7 @@ impl GossipDispatcher {
}
}
}
recv.into_stream()
recv.boxed()
}
}

Expand Down
Loading