Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Remove NetworkStatusSinks #8748

Merged
3 commits merged into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>

config.network.extra_sets.push(sc_finality_grandpa::grandpa_peers_set_config());

let (network, network_status_sinks, system_rpc_tx, network_starter) =
let (network, system_rpc_tx, network_starter) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
client: client.clone(),
Expand Down Expand Up @@ -199,7 +199,6 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
on_demand: None,
remote_blockchain: None,
backend,
network_status_sinks,
system_rpc_tx,
config,
telemetry: telemetry.as_mut(),
Expand Down Expand Up @@ -370,7 +369,7 @@ pub fn new_light(mut config: Configuration) -> Result<TaskManager, ServiceError>
},
)?;

let (network, network_status_sinks, system_rpc_tx, network_starter) =
let (network, system_rpc_tx, network_starter) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
client: client.clone(),
Expand Down Expand Up @@ -418,7 +417,6 @@ pub fn new_light(mut config: Configuration) -> Result<TaskManager, ServiceError>
keystore: keystore_container.sync_keystore(),
backend,
network,
network_status_sinks,
system_rpc_tx,
telemetry: telemetry.as_mut(),
})?;
Expand Down
9 changes: 3 additions & 6 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ pub struct NewFullBase {
pub task_manager: TaskManager,
pub client: Arc<FullClient>,
pub network: Arc<NetworkService<Block, <Block as BlockT>::Hash>>,
pub network_status_sinks: sc_service::NetworkStatusSinks<Block>,
pub transaction_pool: Arc<sc_transaction_pool::FullPool<Block, FullClient>>,
}

Expand Down Expand Up @@ -242,7 +241,7 @@ pub fn new_full_base(
)
);

let (network, network_status_sinks, system_rpc_tx, network_starter) =
let (network, system_rpc_tx, network_starter) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
client: client.clone(),
Expand Down Expand Up @@ -279,7 +278,6 @@ pub fn new_full_base(
task_manager: &mut task_manager,
on_demand: None,
remote_blockchain: None,
network_status_sinks: network_status_sinks.clone(),
system_rpc_tx,
telemetry: telemetry.as_mut(),
},
Expand Down Expand Up @@ -415,7 +413,6 @@ pub fn new_full_base(
task_manager,
client,
network,
network_status_sinks,
transaction_pool,
})
}
Expand Down Expand Up @@ -519,7 +516,7 @@ pub fn new_light_base(
telemetry.as_ref().map(|x| x.handle()),
)?;

let (network, network_status_sinks, system_rpc_tx, network_starter) =
let (network, system_rpc_tx, network_starter) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
client: client.clone(),
Expand Down Expand Up @@ -576,7 +573,7 @@ pub fn new_light_base(
client: client.clone(),
transaction_pool: transaction_pool.clone(),
keystore: keystore_container.sync_keystore(),
config, backend, network_status_sinks, system_rpc_tx,
config, backend, system_rpc_tx,
network: network.clone(),
task_manager: &mut task_manager,
telemetry: telemetry.as_mut(),
Expand Down
2 changes: 1 addition & 1 deletion client/informant/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
ansi_term = "0.12.1"
futures = "0.3.9"
futures-timer = "3.0.1"
log = "0.4.8"
parity-util-mem = { version = "0.9.0", default-features = false, features = ["primitive-types"] }
sc-client-api = { version = "3.0.0", path = "../api" }
sc-network = { version = "0.9.0", path = "../network" }
sp-blockchain = { version = "3.0.0", path = "../../primitives/blockchain" }
sp-runtime = { version = "3.0.0", path = "../../primitives/runtime" }
sp-utils = { version = "3.0.0", path = "../../primitives/utils" }
sp-transaction-pool = { version = "3.0.0", path = "../../primitives/transaction-pool" }
wasm-timer = "0.2"
31 changes: 19 additions & 12 deletions client/informant/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,23 @@

use ansi_term::Colour;
use futures::prelude::*;
use futures_timer::Delay;
use log::{info, trace, warn};
use parity_util_mem::MallocSizeOf;
use sc_client_api::{BlockchainEvents, UsageProvider};
use sc_network::NetworkStatus;
use sc_network::NetworkService;
use sp_blockchain::HeaderMetadata;
use sp_runtime::traits::{Block as BlockT, Header};
use sp_transaction_pool::TransactionPool;
use sp_utils::{status_sinks, mpsc::tracing_unbounded};
use std::{fmt::Display, sync::Arc, time::Duration, collections::VecDeque};

mod display;

/// Creates a stream that returns a new value every `duration`.
fn interval(duration: Duration) -> impl Stream<Item = ()> + Unpin {
futures::stream::unfold((), move |_| Delay::new(duration).map(|_| Some(((), ())))).map(drop)
}

/// The format to print telemetry output in.
#[derive(Clone, Debug)]
pub struct OutputFormat {
Expand Down Expand Up @@ -64,23 +69,25 @@ impl<T: TransactionPool> TransactionPoolAndMaybeMallogSizeOf for T {}
impl<T: TransactionPool + MallocSizeOf> TransactionPoolAndMaybeMallogSizeOf for T {}

/// Builds the informant and returns a `Future` that drives the informant.
pub fn build<B: BlockT, C>(
pub async fn build<B: BlockT, C>(
client: Arc<C>,
network_status_sinks: Arc<status_sinks::StatusSinks<NetworkStatus<B>>>,
network: Arc<NetworkService<B, <B as BlockT>::Hash>>,
pool: Arc<impl TransactionPoolAndMaybeMallogSizeOf>,
format: OutputFormat,
) -> impl futures::Future<Output = ()>
)
where
C: UsageProvider<B> + HeaderMetadata<B> + BlockchainEvents<B>,
<C as HeaderMetadata<B>>::Error: Display,
{
let mut display = display::InformantDisplay::new(format.clone());

let client_1 = client.clone();
let (network_status_sink, network_status_stream) = tracing_unbounded("mpsc_network_status");
network_status_sinks.push(Duration::from_millis(5000), network_status_sink);

let display_notifications = network_status_stream
let display_notifications = interval(Duration::from_millis(5000))
.filter_map(|_| async {
let status = network.status().await;
status.ok()
})
.for_each(move |net_status| {
let info = client_1.usage_info();
if let Some(ref usage) = info.usage {
Expand All @@ -101,10 +108,10 @@ where
future::ready(())
});

future::join(
display_notifications,
display_block_import(client),
).map(|_| ())
futures::select! {
() = display_notifications.fuse() => (),
() = display_block_import(client).fuse() => (),
};
}

fn display_block_import<B: BlockT, C>(client: Arc<C>) -> impl Future<Output = ()>
Expand Down
49 changes: 49 additions & 0 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,43 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
});
}

/// High-level network status information.
///
/// Returns an error if the `NetworkWorker` is no longer running.
pub async fn status(&self) -> Result<NetworkStatus<B>, ()> {
let (tx, rx) = oneshot::channel();

let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::NetworkStatus {
pending_response: tx,
});

match rx.await {
Ok(v) => v.map_err(|_| ()),
// The channel can only be closed if the network worker no longer exists.
Err(_) => Err(()),
}
}

/// Get network state.
///
/// **Note**: Use this only for debugging. This API is unstable. There are warnings literally
/// everywhere about this. Please don't use this function to retrieve actual information.
///
/// Returns an error if the `NetworkWorker` is no longer running.
pub async fn network_state(&self) -> Result<NetworkState, ()> {
let (tx, rx) = oneshot::channel();

let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::NetworkState {
pending_response: tx,
});

match rx.await {
Ok(v) => v.map_err(|_| ()),
// The channel can only be closed if the network worker no longer exists.
Err(_) => Err(()),
}
}

/// You may call this when new transactons are imported by the transaction pool.
///
/// All transactions will be fetched from the `TransactionPool` that was passed at
Expand Down Expand Up @@ -1307,6 +1344,12 @@ enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> {
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
connect: IfDisconnected,
},
NetworkStatus {
pending_response: oneshot::Sender<Result<NetworkStatus<B>, RequestFailure>>,
},
NetworkState {
pending_response: oneshot::Sender<Result<NetworkState, RequestFailure>>,
},
DisconnectPeer(PeerId, Cow<'static, str>),
NewBestBlockImported(B::Hash, NumberFor<B>),
}
Expand Down Expand Up @@ -1434,6 +1477,12 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
ServiceToWorkerMsg::Request { target, protocol, request, pending_response, connect } => {
this.network_service.behaviour_mut().send_request(&target, &protocol, request, pending_response, connect);
},
ServiceToWorkerMsg::NetworkStatus { pending_response } => {
let _ = pending_response.send(Ok(this.status()));
},
ServiceToWorkerMsg::NetworkState { pending_response } => {
let _ = pending_response.send(Ok(this.network_state()));
},
ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) =>
this.network_service.behaviour_mut().user_protocol_mut().disconnect_peer(&who, &protocol_name),
ServiceToWorkerMsg::NewBestBlockImported(hash, number) =>
Expand Down
14 changes: 4 additions & 10 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::{
error::Error, MallocSizeOfWasm, RpcHandlers, NetworkStatusSinks,
error::Error, MallocSizeOfWasm, RpcHandlers,
start_rpc_servers, build_network_future, TransactionPoolAdapter, TaskManager, SpawnTaskHandle,
metrics::MetricsService,
client::{light, Client, ClientConfig},
Expand Down Expand Up @@ -519,8 +519,6 @@ pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
pub remote_blockchain: Option<Arc<dyn RemoteBlockchain<TBl>>>,
/// A shared network instance.
pub network: Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
/// Sinks to propagate network status updates.
pub network_status_sinks: NetworkStatusSinks<TBl>,
/// A Sender for RPC requests.
pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
/// Telemetry instance for this node.
Expand Down Expand Up @@ -590,7 +588,6 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
rpc_extensions_builder,
remote_blockchain,
network,
network_status_sinks,
system_rpc_tx,
telemetry,
} = params;
Expand Down Expand Up @@ -654,7 +651,7 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
metrics_service.run(
client.clone(),
transaction_pool.clone(),
network_status_sinks.clone()
network.clone(),
)
);

Expand All @@ -679,7 +676,7 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
// Spawn informant task
spawn_handle.spawn("informant", sc_informant::build(
client.clone(),
network_status_sinks.status.clone(),
network.clone(),
transaction_pool.clone(),
config.informant_output_format,
));
Expand Down Expand Up @@ -865,7 +862,6 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
) -> Result<
(
Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
NetworkStatusSinks<TBl>,
TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
NetworkStarter,
),
Expand Down Expand Up @@ -959,15 +955,13 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
let has_bootnodes = !network_params.network_config.boot_nodes.is_empty();
let network_mut = sc_network::NetworkWorker::new(network_params)?;
let network = network_mut.service().clone();
let network_status_sinks = NetworkStatusSinks::new();

let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc");

let future = build_network_future(
config.role.clone(),
network_mut,
client,
network_status_sinks.clone(),
system_rpc_rx,
has_bootnodes,
config.announce_block,
Expand Down Expand Up @@ -1010,7 +1004,7 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
future.await
});

Ok((network, network_status_sinks, system_rpc_tx, NetworkStarter(network_start_tx)))
Ok((network, system_rpc_tx, NetworkStarter(network_start_tx)))
}

/// Object used to start the network.
Expand Down
Loading