Skip to content

Commit

Permalink
Update RedJubjubVerifier to use tower-batch-based async pattern
Browse files Browse the repository at this point in the history
Using updated redjubjub::batch::Verifier in the redjubjub#batch branch.
  • Loading branch information
dconnolly committed Jun 26, 2020
1 parent af7d2fa commit 88121d4
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 93 deletions.
56 changes: 50 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 15 additions & 6 deletions zebra-consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,27 @@ license = "MIT OR Apache-2.0"
edition = "2018"

[dependencies]
zebra-chain = { path = "../zebra-chain" }
zebra-state = { path = "../zebra-state" }
chrono = "0.4.11"
color-eyre = "0.5"
futures = "0.3.5"
futures-core = "0.3.5"
futures-util = "0.3.5"
tower = "0.3.1"
rand = "0.7"
redjubjub = { git = "https://github.com/ZcashFoundation/redjubjub.git", branch = "batch" }
tokio = { version = "0.2", features = ["time", "sync", "stream"] }
tower = "0.3"
tower-batch = { path = "../tower-batch/" }
tracing = "0.1.15"
tracing-futures = "0.2.4"
zebra-chain = { path = "../zebra-chain" }
zebra-state = { path = "../zebra-state" }

[dev-dependencies]
zebra-test = { path = "../zebra-test/" }
rand = "0.7"
spandoc = "0.1"
tokio = { version = "0.2.21", features = ["full"] }
tokio = { version = "0.2", features = ["full"] }
tracing = "0.1.15"
tracing-error = "0.1.2"
tracing-futures = "0.2"
tracing-subscriber = "0.2.6"
color-eyre = "0.5"
zebra-test = { path = "../zebra-test/" }
2 changes: 1 addition & 1 deletion zebra-consensus/src/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ mod redjubjub;
mod script;
mod transaction;

pub use self::redjubjub::{RedJubjubItem, RedJubjubVerifier};
pub use block::init;
pub use redjubjub::{BatchVerifier, Request, SingletonVerifier};
185 changes: 105 additions & 80 deletions zebra-consensus/src/verify/redjubjub.rs
Original file line number Diff line number Diff line change
@@ -1,117 +1,142 @@
use std::{
collections::HashMap,
convert::TryFrom,
future::Future,
mem,
pin::Pin,
task::{Context, Poll},
};

use rand::thread_rng;
use rand_core::{CryptoRng, RngCore};
use tokio::sync::watch::{channel, Receiver, Sender};
use tower::{Service, ServiceExt};

use zebra_chain::redjubjub::{
self, Error, PublicKey, PublicKeyBytes, SecretKey, Signature, SpendAuth,
};

type Scalar = zebra_chain::redjubjub::Randomizer;

/// A batch verification request.
///
/// This has two variants, to allow manually flushing queued verification
/// requests, even when the batching service is wrapped in other `tower` layers.
pub enum Request<'msg> {
/// Request verification of this key-sig-message tuple.
Verify(PublicKeyBytes<SpendAuth>, Signature<SpendAuth>, &'msg [u8]),
/// Flush the current batch, computing all queued verification requests.
Flush,
use redjubjub::*;
use tokio::sync::broadcast::{channel, RecvError, Sender};
use tower::Service;
use tower_batch::BatchControl;

pub struct RedJubjubVerifier<T: SigType>
where
T: SigType + std::default::Default,
{
batch: batch::Verifier<T>,
// This uses a "broadcast" channel, which is an mpmc channel. Tokio also
// provides a spmc channel, "watch", but it only keeps the latest value, so
// using it would require thinking through whether it was possible for
// results from one batch to be mixed with another.
tx: Sender<Result<(), Error>>,
}

/// Lets us manage tuples of public key bytes, signatures, and
/// messages and massage them into the appropriate
impl<'msg, M: AsRef<[u8]> + ?Sized> From<(PublicKeyBytes<SpendAuth>, Signature<SpendAuth>, &'msg M)>
for Request<'msg>
{
fn from(tup: (PublicKeyBytes<SpendAuth>, Signature<SpendAuth>, &'msg M)) -> Request<'msg> {
Request::Verify(tup.0, tup.1, tup.2.as_ref())
#[allow(clippy::new_without_default)]
impl<T: SigType> RedJubjubVerifier<T> {
pub fn new() -> Self {
let batch = batch::Verifier::<T>::new();
// XXX(hdevalence) what's a reasonable choice here?
let (tx, _) = channel(10);
Self { tx, batch }
}
}

/// Performs singleton RedJubjub signature verification.
///
/// This wraps the normal single-signature verification functions in a
/// [`Service`] implementation, allowing users to abstract over singleton and
/// batch verification.
#[derive(Default)]
pub struct SingletonVerifier;
pub type RedJubjubItem<T: SigType> = batch::Item<T>;

impl Service<Request<'_>> for SingletonVerifier {
impl<'msg, T> Service<BatchControl<RedJubjubItem<T>>> for RedJubjubVerifier<T>
where
T: SigType + std::default::Default,
{
type Response = ();
type Error = redjubjub::Error;
type Future = futures::future::Ready<Result<(), redjubjub::Error>>;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>;

fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Error>> {
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: Request) -> Self::Future {
futures::future::ready(match req {
Request::Verify(pk_bytes, sig, msg) => {
PublicKey::<SpendAuth>::try_from(pk_bytes).and_then(|pk| pk.verify(msg, &sig))
fn call(&mut self, req: BatchControl<RedJubjubItem<T>>) -> Self::Future {
match req {
BatchControl::Item(item) => {
tracing::trace!("got item");
self.batch.queue(item);
let mut rx = self.tx.subscribe();
Box::pin(async move {
match rx.recv().await {
Ok(result) => result,
Err(RecvError::Lagged(_)) => {
tracing::warn!(
"missed channel updates for the correct signature batch!"
);
Err(Error::InvalidSignature)
}
Err(RecvError::Closed) => panic!("verifier was dropped without flushing"),
}
})
}
BatchControl::Flush => {
tracing::trace!("got flush command");
let batch = mem::take(&mut self.batch);
let _ = self.tx.send(batch.verify(thread_rng()));
Box::pin(async { Ok(()) })
}
Request::Flush => Ok(()),
})
}
}
}

/// Performs batch RedJubjub verification.
pub struct BatchVerifier {
tx: Sender<Result<(), Error>>,
rx: Receiver<Result<(), Error>>,
/// The number of signatures per batch.
batch_size: usize,
/// The number of signatures currently queued for verification.
num_sigs: usize,
/// Signature data queued for verification.
signatures: HashMap<PublicKeyBytes<SpendAuth>, Vec<(Scalar, Signature<SpendAuth>)>>,
impl<T: SigType> Drop for RedJubjubVerifier<T> {
fn drop(&mut self) {
// We need to flush the current batch in case there are still any pending futures.
let batch = mem::replace(&mut self.batch, batch::Verifier::<T>::new());
let _ = self.tx.send(batch.verify(thread_rng()));
}
}

#[cfg(test)]
mod tests {
use super::*;

use tokio::runtime::Runtime;
use time::Duration;

use super::*;
use futures::stream::{FuturesUnordered, StreamExt};
use tower_batch::Batch;

async fn sign_and_verify<S>(svc: &mut S) -> impl std::future::Future<Output = Result<(), Error>>
async fn sign_and_verify<V>(mut verifier: V, n: usize) -> Result<(), V::Error>
where
for<'msg> S: Service<Request<'msg>, Response = (), Error = Error>,
T: SigType,
V: Service<RedJubjubItem<T>, Response = ()>,
{
let sk = SecretKey::<SpendAuth>::new(thread_rng());
let pk_bytes = PublicKey::from(&sk).into();
let rng = thread_rng();
let mut results = FuturesUnordered::new();
for i in 0..n {
let span = tracing::trace_span!("sig", i);
let sk = SigningKey::<T>::new(rng);
let vk = VerificationKey::from(&sk);
let msg = b"BatchVerifyTest";
let sig = sk.sign(rng, &msg[..]);

verifier.ready_and().await?;
results.push(span.in_scope(|| verifier.call((vk.into(), sig, msg).into())))
}

while let Some(result) = results.next().await {
result?;
}

Ok(())
}

let msg = b"";
let sig = sk.sign(thread_rng(), msg);
#[tokio::test]
async fn batch_flushes_on_max_items() -> Result<()> {
use tokio::time::timeout;
zebra_test::init();

svc.ready().await.unwrap();
svc.call((pk_bytes, sig, msg).into())
// Use a very long max_latency and a short timeout to check that
// flushing is happening based on hitting max_items.
let verifier = Batch::new(RedJubjubVerifier::new(), 10, Duration::from_secs(1000));
timeout(Duration::from_secs(1), sign_and_verify(verifier, 100)).await?
}

#[test]
fn singleton_verification() {
let mut rt = Runtime::new().unwrap();
rt.block_on(async {
let mut svc = SingletonVerifier;
let fut1 = sign_and_verify(&mut svc).await;
let fut2 = sign_and_verify(&mut svc).await;
let result1 = fut1.await;
let result2 = fut2.await;
assert_eq!(result1, Ok(()));
assert_eq!(result2, Ok(()));
})
}
#[tokio::test]
async fn batch_flushes_on_max_latency() -> Result<()> {
use tokio::time::timeout;
zebra_test::init();

// TODO: add proptests to test that singleton and batch of size
// (1) both validate or fail together
// Use a very high max_items and a short timeout to check that
// flushing is happening based on hitting max_latency.
let verifier = Batch::new(RedJubjubVerifier::new(), 100, Duration::from_millis(500));
timeout(Duration::from_secs(1), sign_and_verify(verifier, 10)).await?
}
}

0 comments on commit 88121d4

Please sign in to comment.