Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into diverse_accounts
Browse files Browse the repository at this point in the history
  • Loading branch information
AArnott committed Feb 10, 2024
2 parents 92cd7b0 + 3cbc481 commit 6b191e8
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 57 deletions.
1 change: 1 addition & 0 deletions zcash_client_backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ orchard = ["dep:orchard", "zcash_keys/orchard"]
test-dependencies = [
"dep:proptest",
"orchard?/test-dependencies",
"zcash_keys/test-dependencies",
"zcash_primitives/test-dependencies",
"incrementalmerkletree/test-dependencies",
]
Expand Down
2 changes: 1 addition & 1 deletion zcash_client_backend/src/data_api/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ where
.get_sapling_nullifiers(NullifierQuery::Unspent)
.map_err(Error::Wallet)?;

let mut batch_runner = BatchRunner::<_, _, _, ()>::new(
let mut batch_runner = BatchRunner::<_, _, _, _, ()>::new(
100,
dfvks
.iter()
Expand Down
171 changes: 124 additions & 47 deletions zcash_client_backend/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,107 @@ use std::sync::{
};

use memuse::DynamicUsage;
use zcash_note_encryption::{batch, BatchDomain, Domain, ShieldedOutput, COMPACT_NOTE_SIZE};
use zcash_note_encryption::{
batch, BatchDomain, Domain, ShieldedOutput, COMPACT_NOTE_SIZE, ENC_CIPHERTEXT_SIZE,
};
use zcash_primitives::{block::BlockHash, transaction::TxId};

/// A decrypted note.
pub(crate) struct DecryptedNote<A, D: Domain> {
/// A decrypted transaction output.
pub(crate) struct DecryptedOutput<A, D: Domain, M> {
/// The tag corresponding to the incoming viewing key used to decrypt the note.
pub(crate) ivk_tag: A,
/// The recipient of the note.
pub(crate) recipient: D::Recipient,
/// The note!
pub(crate) note: D::Note,
/// The memo field, or `()` if this is a decrypted compact output.
pub(crate) memo: M,
}

impl<A, D: Domain> fmt::Debug for DecryptedNote<A, D>
impl<A, D: Domain, M> fmt::Debug for DecryptedOutput<A, D, M>
where
A: fmt::Debug,
D::IncomingViewingKey: fmt::Debug,
D::Recipient: fmt::Debug,
D::Note: fmt::Debug,
D::Memo: fmt::Debug,
M: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DecryptedNote")
f.debug_struct("DecryptedOutput")
.field("ivk_tag", &self.ivk_tag)
.field("recipient", &self.recipient)
.field("note", &self.note)
.field("memo", &self.memo)
.finish()
}
}

/// A decryptor of transaction outputs.
pub(crate) trait Decryptor<D: BatchDomain, Output> {
type Memo;

// Once we reach MSRV 1.75.0, this can return `impl Iterator`.
fn batch_decrypt<A: Clone>(
tags: &[A],
ivks: &[D::IncomingViewingKey],
outputs: &[(D, Output)],
) -> Vec<Option<DecryptedOutput<A, D, Self::Memo>>>;
}

/// A decryptor of outputs as encoded in transactions.
pub(crate) struct FullDecryptor;

impl<D: BatchDomain, Output: ShieldedOutput<D, ENC_CIPHERTEXT_SIZE>> Decryptor<D, Output>
for FullDecryptor
{
type Memo = D::Memo;

fn batch_decrypt<A: Clone>(
tags: &[A],
ivks: &[D::IncomingViewingKey],
outputs: &[(D, Output)],
) -> Vec<Option<DecryptedOutput<A, D, Self::Memo>>> {
batch::try_note_decryption(ivks, outputs)
.into_iter()
.map(|res| {
res.map(|((note, recipient, memo), ivk_idx)| DecryptedOutput {
ivk_tag: tags[ivk_idx].clone(),
recipient,
note,
memo,
})
})
.collect()
}
}

/// A decryptor of outputs as encoded in compact blocks.
pub(crate) struct CompactDecryptor;

impl<D: BatchDomain, Output: ShieldedOutput<D, COMPACT_NOTE_SIZE>> Decryptor<D, Output>
for CompactDecryptor
{
type Memo = ();

fn batch_decrypt<A: Clone>(
tags: &[A],
ivks: &[D::IncomingViewingKey],
outputs: &[(D, Output)],
) -> Vec<Option<DecryptedOutput<A, D, Self::Memo>>> {
batch::try_compact_note_decryption(ivks, outputs)
.into_iter()
.map(|res| {
res.map(|((note, recipient), ivk_idx)| DecryptedOutput {
ivk_tag: tags[ivk_idx].clone(),
recipient,
note,
memo: (),
})
})
.collect()
}
}

/// A value correlated with an output index.
struct OutputIndex<V> {
/// The index of the output within the corresponding shielded bundle.
Expand All @@ -46,12 +117,12 @@ struct OutputIndex<V> {
value: V,
}

type OutputItem<A, D> = OutputIndex<DecryptedNote<A, D>>;
type OutputItem<A, D, M> = OutputIndex<DecryptedOutput<A, D, M>>;

/// The sender for the result of batch scanning a specific transaction output.
struct OutputReplier<A, D: Domain>(OutputIndex<channel::Sender<OutputItem<A, D>>>);
struct OutputReplier<A, D: Domain, M>(OutputIndex<channel::Sender<OutputItem<A, D, M>>>);

impl<A, D: Domain> DynamicUsage for OutputReplier<A, D> {
impl<A, D: Domain, M> DynamicUsage for OutputReplier<A, D, M> {
#[inline(always)]
fn dynamic_usage(&self) -> usize {
// We count the memory usage of items in the channel on the receiver side.
Expand All @@ -65,9 +136,9 @@ impl<A, D: Domain> DynamicUsage for OutputReplier<A, D> {
}

/// The receiver for the result of batch scanning a specific transaction.
struct BatchReceiver<A, D: Domain>(channel::Receiver<OutputItem<A, D>>);
struct BatchReceiver<A, D: Domain, M>(channel::Receiver<OutputItem<A, D, M>>);

impl<A, D: Domain> DynamicUsage for BatchReceiver<A, D> {
impl<A, D: Domain, M> DynamicUsage for BatchReceiver<A, D, M> {
fn dynamic_usage(&self) -> usize {
// We count the memory usage of items in the channel on the receiver side.
let num_items = self.0.len();
Expand All @@ -84,7 +155,7 @@ impl<A, D: Domain> DynamicUsage for BatchReceiver<A, D> {
// - Space for an item.
// - The state of the slot, stored as an AtomicUsize.
const PTR_SIZE: usize = std::mem::size_of::<usize>();
let item_size = std::mem::size_of::<OutputItem<A, D>>();
let item_size = std::mem::size_of::<OutputItem<A, D, M>>();
const ATOMIC_USIZE_SIZE: usize = std::mem::size_of::<AtomicUsize>();
let block_size = PTR_SIZE + ITEMS_PER_BLOCK * (item_size + ATOMIC_USIZE_SIZE);

Expand Down Expand Up @@ -208,7 +279,7 @@ impl<Item: Task> Task for WithUsageTask<Item> {
}

/// A batch of outputs to trial decrypt.
pub(crate) struct Batch<A, D: BatchDomain, Output: ShieldedOutput<D, COMPACT_NOTE_SIZE>> {
pub(crate) struct Batch<A, D: BatchDomain, Output, Dec: Decryptor<D, Output>> {
tags: Vec<A>,
ivks: Vec<D::IncomingViewingKey>,
/// We currently store outputs and repliers as parallel vectors, because
Expand All @@ -219,15 +290,16 @@ pub(crate) struct Batch<A, D: BatchDomain, Output: ShieldedOutput<D, COMPACT_NOT
/// all be part of the same struct, which would also track the output index
/// (that is captured in the outer `OutputIndex` of each `OutputReplier`).
outputs: Vec<(D, Output)>,
repliers: Vec<OutputReplier<A, D>>,
repliers: Vec<OutputReplier<A, D, Dec::Memo>>,
}

impl<A, D, Output> DynamicUsage for Batch<A, D, Output>
impl<A, D, Output, Dec> DynamicUsage for Batch<A, D, Output, Dec>
where
A: DynamicUsage,
D: BatchDomain + DynamicUsage,
D::IncomingViewingKey: DynamicUsage,
Output: ShieldedOutput<D, COMPACT_NOTE_SIZE> + DynamicUsage,
Output: DynamicUsage,
Dec: Decryptor<D, Output>,
{
fn dynamic_usage(&self) -> usize {
self.tags.dynamic_usage()
Expand All @@ -253,11 +325,11 @@ where
}
}

impl<A, D, Output> Batch<A, D, Output>
impl<A, D, Output, Dec> Batch<A, D, Output, Dec>
where
A: Clone,
D: BatchDomain,
Output: ShieldedOutput<D, COMPACT_NOTE_SIZE>,
Dec: Decryptor<D, Output>,
{
/// Constructs a new batch.
fn new(tags: Vec<A>, ivks: Vec<D::IncomingViewingKey>) -> Self {
Expand All @@ -276,15 +348,17 @@ where
}
}

impl<A, D, Output> Task for Batch<A, D, Output>
impl<A, D, Output, Dec> Task for Batch<A, D, Output, Dec>
where
A: Clone + Send + 'static,
D: BatchDomain + Send + 'static,
D::IncomingViewingKey: Send,
D::Memo: Send,
D::Note: Send,
D::Recipient: Send,
Output: ShieldedOutput<D, COMPACT_NOTE_SIZE> + Send + 'static,
Output: Send + 'static,
Dec: Decryptor<D, Output> + 'static,
Dec::Memo: Send,
{
/// Runs the batch of trial decryptions, and reports the results.
fn run(self) {
Expand All @@ -298,20 +372,16 @@ where

assert_eq!(outputs.len(), repliers.len());

let decryption_results = batch::try_compact_note_decryption(&ivks, &outputs);
let decryption_results = Dec::batch_decrypt(&tags, &ivks, &outputs);
for (decryption_result, OutputReplier(replier)) in
decryption_results.into_iter().zip(repliers.into_iter())
{
// If `decryption_result` is `None` then we will just drop `replier`,
// indicating to the parent `BatchRunner` that this output was not for us.
if let Some(((note, recipient), ivk_idx)) = decryption_result {
if let Some(value) = decryption_result {
let result = OutputIndex {
output_index: replier.output_index,
value: DecryptedNote {
ivk_tag: tags[ivk_idx].clone(),
recipient,
note,
},
value,
};

if replier.value.send(result).is_err() {
Expand All @@ -323,15 +393,20 @@ where
}
}

impl<A, D: BatchDomain, Output: ShieldedOutput<D, COMPACT_NOTE_SIZE> + Clone> Batch<A, D, Output> {
impl<A, D, Output, Dec> Batch<A, D, Output, Dec>
where
D: BatchDomain,
Output: Clone,
Dec: Decryptor<D, Output>,
{
/// Adds the given outputs to this batch.
///
/// `replier` will be called with the result of every output.
fn add_outputs(
&mut self,
domain: impl Fn() -> D,
outputs: &[Output],
replier: channel::Sender<OutputItem<A, D>>,
replier: channel::Sender<OutputItem<A, D, Dec::Memo>>,
) {
self.outputs
.extend(outputs.iter().cloned().map(|output| (domain(), output)));
Expand Down Expand Up @@ -361,28 +436,29 @@ impl DynamicUsage for ResultKey {
}

/// Logic to run batches of trial decryptions on the global threadpool.
pub(crate) struct BatchRunner<A, D, Output, T>
pub(crate) struct BatchRunner<A, D, Output, Dec, T>
where
D: BatchDomain,
Output: ShieldedOutput<D, COMPACT_NOTE_SIZE>,
T: Tasks<Batch<A, D, Output>>,
Dec: Decryptor<D, Output>,
T: Tasks<Batch<A, D, Output, Dec>>,
{
batch_size_threshold: usize,
// The batch currently being accumulated.
acc: Batch<A, D, Output>,
acc: Batch<A, D, Output, Dec>,
// The running batches.
running_tasks: T,
// Receivers for the results of the running batches.
pending_results: HashMap<ResultKey, BatchReceiver<A, D>>,
pending_results: HashMap<ResultKey, BatchReceiver<A, D, Dec::Memo>>,
}

impl<A, D, Output, T> DynamicUsage for BatchRunner<A, D, Output, T>
impl<A, D, Output, Dec, T> DynamicUsage for BatchRunner<A, D, Output, Dec, T>
where
A: DynamicUsage,
D: BatchDomain + DynamicUsage,
D::IncomingViewingKey: DynamicUsage,
Output: ShieldedOutput<D, COMPACT_NOTE_SIZE> + DynamicUsage,
T: Tasks<Batch<A, D, Output>> + DynamicUsage,
Output: DynamicUsage,
Dec: Decryptor<D, Output>,
T: Tasks<Batch<A, D, Output, Dec>> + DynamicUsage,
{
fn dynamic_usage(&self) -> usize {
self.acc.dynamic_usage()
Expand All @@ -408,12 +484,12 @@ where
}
}

impl<A, D, Output, T> BatchRunner<A, D, Output, T>
impl<A, D, Output, Dec, T> BatchRunner<A, D, Output, Dec, T>
where
A: Clone,
D: BatchDomain,
Output: ShieldedOutput<D, COMPACT_NOTE_SIZE>,
T: Tasks<Batch<A, D, Output>>,
Dec: Decryptor<D, Output>,
T: Tasks<Batch<A, D, Output, Dec>>,
{
/// Constructs a new batch runner for the given incoming viewing keys.
pub(crate) fn new(
Expand All @@ -430,26 +506,27 @@ where
}
}

impl<A, D, Output, T> BatchRunner<A, D, Output, T>
impl<A, D, Output, Dec, T> BatchRunner<A, D, Output, Dec, T>
where
A: Clone + Send + 'static,
D: BatchDomain + Send + 'static,
D::IncomingViewingKey: Clone + Send,
D::Memo: Send,
D::Note: Send,
D::Recipient: Send,
Output: ShieldedOutput<D, COMPACT_NOTE_SIZE> + Clone + Send + 'static,
T: Tasks<Batch<A, D, Output>>,
Output: Clone + Send + 'static,
Dec: Decryptor<D, Output>,
T: Tasks<Batch<A, D, Output, Dec>>,
{
/// Batches the given outputs for trial decryption.
///
/// `block_tag` is the hash of the block that triggered this txid being added to the
/// batch, or the all-zeros hash to indicate that no block triggered it (i.e. it was a
/// mempool change).
///
/// If after adding the given outputs, the accumulated batch size is at least
/// `BATCH_SIZE_THRESHOLD`, `Self::flush` is called. Subsequent calls to
/// `Self::add_outputs` will be accumulated into a new batch.
/// If after adding the given outputs, the accumulated batch size is at least the size
/// threshold that was set via `Self::new`, `Self::flush` is called. Subsequent calls
/// to `Self::add_outputs` will be accumulated into a new batch.
pub(crate) fn add_outputs(
&mut self,
block_tag: BlockHash,
Expand Down Expand Up @@ -487,7 +564,7 @@ where
&mut self,
block_tag: BlockHash,
txid: TxId,
) -> HashMap<(TxId, usize), DecryptedNote<A, D>> {
) -> HashMap<(TxId, usize), DecryptedOutput<A, D, Dec::Memo>> {
self.pending_results
.remove(&ResultKey(block_tag, txid))
// We won't have a pending result if the transaction didn't have outputs of
Expand Down
Loading

0 comments on commit 6b191e8

Please sign in to comment.