Skip to content

Commit

Permalink
update tokio-stream to 0.1.3 and use BroadcastStream (sigp#2212)
Browse files Browse the repository at this point in the history
## Issue Addressed

Resolves sigp#2189 

## Proposed Changes

use tokio's `BroadcastStream`

## Additional Info

N/A


Co-authored-by: realbigsean <[email protected]>
  • Loading branch information
realbigsean and realbigsean committed Mar 1, 2021
1 parent baef1db commit ed9b245
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 87 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion beacon_node/http_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2018"
warp = { git = "https://github.com/paulhauner/warp ", branch = "cors-wildcard" }
serde = { version = "1.0.116", features = ["derive"] }
tokio = { version = "1.1.0", features = ["macros","sync"] }
tokio-stream = "0.1.2"
tokio-stream = { version = "0.1.3", features = ["sync"] }
tokio-util = "0.6.3"
parking_lot = "0.11.0"
types = { path = "../../consensus/types" }
Expand Down
66 changes: 0 additions & 66 deletions beacon_node/http_api/src/broadcast_stream.rs

This file was deleted.

36 changes: 17 additions & 19 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
mod beacon_proposer_cache;
mod block_id;
mod broadcast_stream;
mod metrics;
mod state_id;
mod validator_inclusion;
Expand Down Expand Up @@ -36,7 +35,7 @@ use std::future::Future;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::StreamExt;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use types::{
Attestation, AttestationDuty, AttesterSlashing, CloneConfig, CommitteeCache, Epoch, EthSpec,
Hash256, ProposerSlashing, PublicKey, PublicKeyBytes, RelativeEpoch, SignedAggregateAndProof,
Expand Down Expand Up @@ -2405,23 +2404,22 @@ pub fn serve<T: BeaconChainTypes>(
}
};

receivers.push(broadcast_stream::BroadcastStream::new(receiver).map(
|msg| {
match msg {
Ok(data) => Event::default()
.event(data.topic_name())
.json_data(data)
.map_err(|e| {
warp_utils::reject::server_sent_event_error(
format!("{:?}", e),
)
}),
Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("{:?}", e),
)),
}
},
));
receivers.push(BroadcastStream::new(receiver).map(|msg| {
match msg {
Ok(data) => Event::default()
.event(data.topic_name())
.json_data(data)
.map_err(|e| {
warp_utils::reject::server_sent_event_error(format!(
"{:?}",
e
))
}),
Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("{:?}", e),
)),
}
}));
}
} else {
return Err(warp_utils::reject::custom_server_error(
Expand Down
2 changes: 1 addition & 1 deletion slasher/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ slot_clock = { path = "../../common/slot_clock" }
state_processing = { path = "../../consensus/state_processing" }
task_executor = { path = "../../common/task_executor" }
tokio = { version = "1.1.0", features = ["full"] }
tokio-stream = "0.1.2"
tokio-stream = "0.1.3"
types = { path = "../../consensus/types" }

0 comments on commit ed9b245

Please sign in to comment.