Skip to content

Commit

Permalink
fix(rpc): Shut down the RPC server properly when Zebra shuts down (#5591
Browse files Browse the repository at this point in the history
)

* Make the queue runner task shut down when the RpcImpl is dropped

* Move RPC server startup into the spawn() tokio future

* Return a shutdown handle from the RPC spawn() method

* Shut down the RPC server properly when Zebra shuts down

* Add a changelog entry for this security fix

* Call RpcServer::shutdown() when it is dropped, and wait

* Block on RPC server shutdown when Zebra's tasks have an error
  • Loading branch information
teor2345 authored Nov 10, 2022
1 parent a815e9d commit 074733d
Show file tree
Hide file tree
Showing 7 changed files with 273 additions and 106 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,24 @@ All notable changes to Zebra are documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org).

## [Zebra 1.0.0-rc.2](https://github.com/ZcashFoundation/zebra/releases/tag/v1.0.0-rc.2) - 2022-11-TODO

Zebra's latest release continues work on mining pool RPCs, and fixes a rare RPC crash that could lead to memory corruption.

Zebra's consensus rules, node sync, and `lightwalletd` RPCs are ready for user testing and experimental use. Zebra has not been audited yet.

### Breaking Changes

This release has the following breaking changes:
- TODO: search the changelog for breaking changes

### Security

- Fix a rare crash and memory errors when Zebra's RPC server shuts down ([#5591](https://github.com/ZcashFoundation/zebra/pull/5591))

TODO: the rest of the changelog


## [Zebra 1.0.0-rc.1](https://github.com/ZcashFoundation/zebra/releases/tag/v1.0.0-rc.1) - 2022-11-02

This is the second Zebra release candidate. Zebra's consensus rules, node sync, and `lightwalletd` RPCs are ready for user testing and experimental use. Zebra has not been audited yet.
Expand Down
12 changes: 6 additions & 6 deletions zebra-rpc/src/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use hex::{FromHex, ToHex};
use indexmap::IndexMap;
use jsonrpc_core::{self, BoxFuture, Error, ErrorCode, Result};
use jsonrpc_derive::rpc;
use tokio::{sync::broadcast::Sender, task::JoinHandle};
use tokio::{sync::broadcast, task::JoinHandle};
use tower::{buffer::Buffer, Service, ServiceExt};
use tracing::Instrument;

Expand Down Expand Up @@ -278,8 +278,8 @@ where

// Tasks
//
/// A sender component of a channel used to send transactions to the queue.
queue_sender: Sender<Option<UnminedTx>>,
/// A sender component of a channel used to send transactions to the mempool queue.
queue_sender: broadcast::Sender<UnminedTx>,
}

impl<Mempool, State, Tip> RpcImpl<Mempool, State, Tip>
Expand Down Expand Up @@ -313,7 +313,7 @@ where
<Mempool as Service<mempool::Request>>::Future: Send,
<State as Service<zebra_state::ReadRequest>>::Future: Send,
{
let runner = Queue::start();
let (runner, queue_sender) = Queue::start();

let mut app_version = app_version.to_string();

Expand All @@ -329,7 +329,7 @@ where
mempool: mempool.clone(),
state: state.clone(),
latest_chain_tip: latest_chain_tip.clone(),
queue_sender: runner.sender(),
queue_sender,
};

// run the process queue
Expand Down Expand Up @@ -517,7 +517,7 @@ where

// send transaction to the rpc queue, ignore any error.
let unmined_transaction = UnminedTx::from(raw_transaction.clone());
let _ = queue_sender.send(Some(unmined_transaction));
let _ = queue_sender.send(unmined_transaction);

let transaction_parameter = mempool::Gossip::Tx(raw_transaction.into());
let request = mempool::Request::Queue(vec![transaction_parameter]);
Expand Down
47 changes: 26 additions & 21 deletions zebra-rpc/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{collections::HashSet, sync::Arc};
use chrono::Duration;
use indexmap::IndexMap;
use tokio::{
sync::broadcast::{channel, Receiver, Sender},
sync::broadcast::{self, error::TryRecvError},
time::Instant,
};

Expand Down Expand Up @@ -55,24 +55,26 @@ pub struct Queue {
/// The runner will make the processing of the transactions in the queue.
pub struct Runner {
queue: Queue,
sender: Sender<Option<UnminedTx>>,
receiver: broadcast::Receiver<UnminedTx>,
tip_height: Height,
}

impl Queue {
/// Start a new queue
pub fn start() -> Runner {
let (sender, _receiver) = channel(CHANNEL_AND_QUEUE_CAPACITY);
pub fn start() -> (Runner, broadcast::Sender<UnminedTx>) {
let (sender, receiver) = broadcast::channel(CHANNEL_AND_QUEUE_CAPACITY);

let queue = Queue {
transactions: IndexMap::new(),
};

Runner {
let runner = Runner {
queue,
sender,
receiver,
tip_height: Height(0),
}
};

(runner, sender)
}

/// Get the transactions in the queue.
Expand Down Expand Up @@ -103,16 +105,6 @@ impl Queue {
}

impl Runner {
/// Create a new sender for this runner.
pub fn sender(&self) -> Sender<Option<UnminedTx>> {
self.sender.clone()
}

/// Create a new receiver.
pub fn receiver(&self) -> Receiver<Option<UnminedTx>> {
self.sender.subscribe()
}

/// Get the queue transactions as a `HashSet` of unmined ids.
fn transactions_as_hash_set(&self) -> HashSet<UnminedTxId> {
let transactions = self.queue.transactions();
Expand Down Expand Up @@ -157,8 +149,6 @@ impl Runner {
+ 'static,
Tip: ChainTip + Clone + Send + Sync + 'static,
{
let mut receiver = self.sender.subscribe();

loop {
// if we don't have a chain use `NO_CHAIN_TIP_HEIGHT` to get block spacing
let tip_height = match tip.best_tip_height() {
Expand All @@ -173,8 +163,23 @@ impl Runner {
tokio::time::sleep(spacing.to_std().expect("should never be less than zero")).await;

// get transactions from the channel
while let Ok(Some(tx)) = receiver.try_recv() {
let _ = &self.queue.insert(tx.clone());
loop {
let tx = match self.receiver.try_recv() {
Ok(tx) => tx,
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Lagged(skipped_count)) => {
tracing::info!("sendrawtransaction queue was full: skipped {skipped_count} transactions");
continue;
}
Err(TryRecvError::Closed) => {
tracing::info!(
"sendrawtransaction queue was closed: is Zebra shutting down?"
);
return;
}
};

self.queue.insert(tx.clone());
}

// skip some work if stored tip height is the same as the one arriving
Expand Down
14 changes: 7 additions & 7 deletions zebra-rpc/src/queue/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ proptest! {
#[test]
fn insert_remove_to_from_queue(transaction in any::<UnminedTx>()) {
// create a queue
let mut runner = Queue::start();
let (mut runner, _sender) = Queue::start();

// insert transaction
runner.queue.insert(transaction.clone());
Expand All @@ -54,7 +54,7 @@ proptest! {
#[test]
fn queue_size_limit(transactions in any::<[UnminedTx; CHANNEL_AND_QUEUE_CAPACITY + 1]>()) {
// create a queue
let mut runner = Queue::start();
let (mut runner, _sender) = Queue::start();

// insert all transactions we have
transactions.iter().for_each(|t| runner.queue.insert(t.clone()));
Expand All @@ -68,7 +68,7 @@ proptest! {
#[test]
fn queue_order(transactions in any::<[UnminedTx; 32]>()) {
// create a queue
let mut runner = Queue::start();
let (mut runner, _sender) = Queue::start();
// fill the queue and check insertion order
for i in 0..CHANNEL_AND_QUEUE_CAPACITY {
let transaction = transactions[i].clone();
Expand Down Expand Up @@ -108,7 +108,7 @@ proptest! {
time::pause();

// create a queue
let mut runner = Queue::start();
let (mut runner, _sender) = Queue::start();

// insert a transaction to the queue
runner.queue.insert(transaction);
Expand Down Expand Up @@ -165,7 +165,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();

// create a queue
let mut runner = Queue::start();
let (mut runner, _sender) = Queue::start();

// insert a transaction to the queue
let unmined_transaction = UnminedTx::from(transaction);
Expand Down Expand Up @@ -246,7 +246,7 @@ proptest! {
let mut write_state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

// create a queue
let mut runner = Queue::start();
let (mut runner, _sender) = Queue::start();

// insert a transaction to the queue
let unmined_transaction = UnminedTx::from(&transaction);
Expand Down Expand Up @@ -320,7 +320,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();

// create a queue
let mut runner = Queue::start();
let (mut runner, _sender) = Queue::start();

// insert a transaction to the queue
let unmined_transaction = UnminedTx::from(transaction.clone());
Expand Down
Loading

0 comments on commit 074733d

Please sign in to comment.