Skip to content

Commit

Permalink
Move worker RPC handlers to a separate file (MystenLabs#1032)
Browse files Browse the repository at this point in the history
  • Loading branch information
aschran authored Sep 27, 2022
1 parent a67b6fe commit 15fb102
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 72 deletions.
76 changes: 76 additions & 0 deletions narwhal/worker/src/handlers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2021, Facebook, Inc. and its affiliates
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use async_trait::async_trait;
use store::Store;
use types::{
error::DagError, metered_channel::Sender, Batch, BatchDigest, PrimaryToWorker,
PrimaryWorkerMessage, WorkerMessage, WorkerToWorker,
};

/// Defines how the network receiver handles incoming workers messages.
#[derive(Clone)]
pub struct WorkerReceiverHandler {
pub tx_processor: Sender<Batch>,
pub store: Store<BatchDigest, Batch>,
}

#[async_trait]
impl WorkerToWorker for WorkerReceiverHandler {
async fn send_message(
&self,
request: anemo::Request<types::WorkerMessage>,
) -> Result<anemo::Response<()>, anemo::rpc::Status> {
let message = request.into_body();
match message {
WorkerMessage::Batch(batch) => self
.tx_processor
.send(batch)
.await
.map_err(|_| DagError::ShuttingDown),
}
.map(|_| anemo::Response::new(()))
.map_err(|e| anemo::rpc::Status::internal(e.to_string()))
}
async fn request_batches(
&self,
request: anemo::Request<types::WorkerBatchRequest>,
) -> Result<anemo::Response<types::WorkerBatchResponse>, anemo::rpc::Status> {
let message = request.into_body();
// TODO [issue #7]: Do some accounting to prevent bad actors from monopolizing our resources
// TODO: Add a limit on number of requested batches
let batches: Vec<Batch> = self
.store
.read_all(message.digests)
.await
.map_err(|e| anemo::rpc::Status::from_error(Box::new(e)))?
.into_iter()
.flatten()
.collect();
Ok(anemo::Response::new(types::WorkerBatchResponse { batches }))
}
}

/// Defines how the network receiver handles incoming primary messages.
#[derive(Clone)]
pub struct PrimaryReceiverHandler {
pub tx_synchronizer: Sender<PrimaryWorkerMessage>,
}

#[async_trait]
impl PrimaryToWorker for PrimaryReceiverHandler {
async fn send_message(
&self,
request: anemo::Request<PrimaryWorkerMessage>,
) -> Result<anemo::Response<()>, anemo::rpc::Status> {
let message = request.into_body();

self.tx_synchronizer
.send(message)
.await
.map_err(|_| DagError::ShuttingDown)
.map_err(|e| anemo::rpc::Status::internal(e.to_string()))?;

Ok(anemo::Response::new(()))
}
}
1 change: 1 addition & 0 deletions narwhal/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
)]

mod batch_maker;
mod handlers;
pub mod metrics;
mod primary_connector;
mod processor;
Expand Down
81 changes: 9 additions & 72 deletions narwhal/worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::{
batch_maker::BatchMaker, metrics::WorkerChannelMetrics, primary_connector::PrimaryConnector,
processor::Processor, quorum_waiter::QuorumWaiter, synchronizer::Synchronizer,
batch_maker::BatchMaker,
handlers::{PrimaryReceiverHandler, WorkerReceiverHandler},
metrics::WorkerChannelMetrics,
primary_connector::PrimaryConnector,
processor::Processor,
quorum_waiter::QuorumWaiter,
synchronizer::Synchronizer,
};
use anemo::{types::PeerInfo, PeerId};
use async_trait::async_trait;
Expand All @@ -21,9 +26,8 @@ use tracing::info;
use types::{
error::DagError,
metered_channel::{channel, Receiver, Sender},
Batch, BatchDigest, Empty, PrimaryToWorker, PrimaryToWorkerServer, ReconfigureNotification,
Transaction, TransactionProto, Transactions, TransactionsServer, WorkerPrimaryMessage,
WorkerToWorker, WorkerToWorkerServer,
Batch, BatchDigest, Empty, PrimaryToWorkerServer, ReconfigureNotification, Transaction,
TransactionProto, Transactions, TransactionsServer, WorkerPrimaryMessage, WorkerToWorkerServer,
};

#[cfg(test)]
Expand Down Expand Up @@ -421,70 +425,3 @@ impl Transactions for TxReceiverHandler {
Ok(Response::new(Empty {}))
}
}

/// Defines how the network receiver handles incoming workers messages.
#[derive(Clone)]
struct WorkerReceiverHandler {
tx_processor: Sender<Batch>,
store: Store<BatchDigest, Batch>,
}

#[async_trait]
impl WorkerToWorker for WorkerReceiverHandler {
async fn send_message(
&self,
request: anemo::Request<types::WorkerMessage>,
) -> Result<anemo::Response<()>, anemo::rpc::Status> {
let message = request.into_body();
match message {
WorkerMessage::Batch(batch) => self
.tx_processor
.send(batch)
.await
.map_err(|_| DagError::ShuttingDown),
}
.map(|_| anemo::Response::new(()))
.map_err(|e| anemo::rpc::Status::internal(e.to_string()))
}
async fn request_batches(
&self,
request: anemo::Request<types::WorkerBatchRequest>,
) -> Result<anemo::Response<types::WorkerBatchResponse>, anemo::rpc::Status> {
let message = request.into_body();
// TODO [issue #7]: Do some accounting to prevent bad actors from monopolizing our resources
// TODO: Add a limit on number of requested batches
let batches: Vec<Batch> = self
.store
.read_all(message.digests)
.await
.map_err(|e| anemo::rpc::Status::from_error(Box::new(e)))?
.into_iter()
.flatten()
.collect();
Ok(anemo::Response::new(types::WorkerBatchResponse { batches }))
}
}

/// Defines how the network receiver handles incoming primary messages.
#[derive(Clone)]
struct PrimaryReceiverHandler {
tx_synchronizer: Sender<PrimaryWorkerMessage>,
}

#[async_trait]
impl PrimaryToWorker for PrimaryReceiverHandler {
async fn send_message(
&self,
request: anemo::Request<PrimaryWorkerMessage>,
) -> Result<anemo::Response<()>, anemo::rpc::Status> {
let message = request.into_body();

self.tx_synchronizer
.send(message)
.await
.map_err(|_| DagError::ShuttingDown)
.map_err(|e| anemo::rpc::Status::internal(e.to_string()))?;

Ok(anemo::Response::new(()))
}
}

0 comments on commit 15fb102

Please sign in to comment.