Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: add paging to utxo stream request #5302

Merged
merged 3 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 25 additions & 10 deletions base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ where B: BlockchainBackend + 'static
Ok(())
}

#[allow(clippy::too_many_lines)]
async fn start_streaming(
&self,
tx: &mut mpsc::Sender<Result<SyncUtxosByBlockResponse, RpcStatus>>,
Expand Down Expand Up @@ -141,7 +142,6 @@ where B: BlockchainBackend + 'static
.fetch_utxos_in_block(current_header.hash(), Some(bitmap.clone()))
.await
.rpc_status_internal_error(LOG_TARGET)?;

let utxos = utxos
.into_iter()
.enumerate()
Expand All @@ -159,15 +159,30 @@ where B: BlockchainBackend + 'static
current_header_hash.to_hex(),
);

let utxo_block_response = SyncUtxosByBlockResponse {
outputs: utxos,
height: current_header.height,
header_hash: current_header_hash.to_vec(),
mined_timestamp: current_header.timestamp.as_u64(),
};
// Ensure task stops if the peer prematurely stops their RPC session
if tx.send(Ok(utxo_block_response)).await.is_err() {
break;
for utxo_chunk in utxos.chunks(2000) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can safely increase this number to 4000 as 5526 UTXOs in the faucet takes up approximately 4.5924 MiB. Thus (4000 / 5526) x 4.5924 = 3.3242 MiB.

let utxo_block_response = SyncUtxosByBlockResponse {
outputs: utxo_chunk.to_vec(),
height: current_header.height,
header_hash: current_header_hash.to_vec(),
mined_timestamp: current_header.timestamp.as_u64(),
};
// Ensure task stops if the peer prematurely stops their RPC session
if tx.send(Ok(utxo_block_response)).await.is_err() {
break;
}
}
if utxos.is_empty() {
// if its empty, we need to send an empty vec of outputs.
let utxo_block_response = SyncUtxosByBlockResponse {
outputs: utxos,
height: current_header.height,
header_hash: current_header_hash.to_vec(),
mined_timestamp: current_header.timestamp.as_u64(),
};
// Ensure task stops if the peer prematurely stops their RPC session
if tx.send(Ok(utxo_block_response)).await.is_err() {
break;
}
}

debug!(
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/tests/base_node_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ async fn test_sync_utxos_by_block() {
let mut streaming = service.sync_utxos_by_block(req).await.unwrap().into_inner();

let responses = convert_mpsc_to_stream(&mut streaming).collect::<Vec<_>>().await;

// dbg!(&block0);
assert_eq!(
vec![
(0, block0.header().hash().to_vec(), 0),
Expand Down
64 changes: 42 additions & 22 deletions base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ where
}
}

#[allow(clippy::too_many_lines)]
async fn scan_utxos(
&mut self,
client: &mut BaseNodeWalletRpcClient,
Expand Down Expand Up @@ -457,6 +458,8 @@ where

let mut utxo_next_await_profiling = Vec::new();
let mut scan_for_outputs_profiling = Vec::new();
let mut prev_scanned_block: Option<ScannedBlock> = None;
let mut prev_output = None;
while let Some(response) = {
let start = Instant::now();
let utxo_stream_next = utxo_stream.next().await;
Expand All @@ -478,42 +481,59 @@ where
.into_iter()
.map(|utxo| TransactionOutput::try_from(utxo).map_err(UtxoScannerError::ConversionError))
.collect::<Result<Vec<_>, _>>()?;

let first_output = Some(outputs[0].clone());
total_scanned += outputs.len();

let start = Instant::now();
let found_outputs = self.scan_for_outputs(outputs).await?;
scan_for_outputs_profiling.push(start.elapsed());

let (count, amount) = self
let (mut count, mut amount) = self
.import_utxos_to_transaction_service(found_outputs, current_height, mined_timestamp)
.await?;
let block_hash = current_header_hash.try_into()?;
self.resources.db.save_scanned_block(ScannedBlock {
if let Some(scanned_block) = prev_scanned_block {
if block_hash == scanned_block.header_hash && first_output == prev_output {
count += scanned_block.num_outputs.unwrap_or(0);
amount += scanned_block.amount.unwrap_or_else(|| 0.into())
} else {
self.resources.db.save_scanned_block(scanned_block)?;
self.resources.db.clear_scanned_blocks_before_height(
current_height.saturating_sub(SCANNED_BLOCK_CACHE_SIZE),
true,
)?;

if current_height % PROGRESS_REPORT_INTERVAL == 0 {
debug!(
target: LOG_TARGET,
"Scanned up to block {} with a current tip_height of {}", current_height, tip_height
);
self.publish_event(UtxoScannerEvent::Progress {
current_height,
tip_height,
});
}

num_recovered = num_recovered.saturating_add(count);
total_amount += amount;
}
}
prev_output = first_output;
prev_scanned_block = Some(ScannedBlock {
header_hash: block_hash,
height: current_height,
num_outputs: Some(count),
amount: Some(amount),
timestamp: Utc::now().naive_utc(),
})?;

self.resources
.db
.clear_scanned_blocks_before_height(current_height.saturating_sub(SCANNED_BLOCK_CACHE_SIZE), true)?;

if current_height % PROGRESS_REPORT_INTERVAL == 0 {
debug!(
target: LOG_TARGET,
"Scanned up to block {} with a current tip_height of {}", current_height, tip_height
);
self.publish_event(UtxoScannerEvent::Progress {
current_height,
tip_height,
});
}

num_recovered = num_recovered.saturating_add(count);
total_amount += amount;
});
}
// We need to update the last one
if let Some(scanned_block) = prev_scanned_block {
self.resources.db.clear_scanned_blocks_before_height(
scanned_block.height.saturating_sub(SCANNED_BLOCK_CACHE_SIZE),
true,
)?;
self.resources.db.save_scanned_block(scanned_block)?;
}
trace!(
target: LOG_TARGET,
Expand Down