From fd084355816475e6c2f1f2dd8138100210e1745a Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Tue, 11 Apr 2023 13:58:09 +0200 Subject: [PATCH 1/3] add paging to request --- .../base_node/rpc/sync_utxos_by_block_task.rs | 21 +++---- .../utxo_scanner_service/utxo_scanner_task.rs | 55 +++++++++++-------- 2 files changed, 44 insertions(+), 32 deletions(-) diff --git a/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs b/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs index b1d4b38644..9dd0bdf778 100644 --- a/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs +++ b/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs @@ -158,16 +158,17 @@ where B: BlockchainBackend + 'static current_header.height, current_header_hash.to_hex(), ); - - let utxo_block_response = SyncUtxosByBlockResponse { - outputs: utxos, - height: current_header.height, - header_hash: current_header_hash.to_vec(), - mined_timestamp: current_header.timestamp.as_u64(), - }; - // Ensure task stops if the peer prematurely stops their RPC session - if tx.send(Ok(utxo_block_response)).await.is_err() { - break; + for utxo_chunk in utxos.chunks(2000) { + let utxo_block_response = SyncUtxosByBlockResponse { + outputs: utxo_chunk.to_vec(), + height: current_header.height, + header_hash: current_header_hash.to_vec(), + mined_timestamp: current_header.timestamp.as_u64(), + }; + // Ensure task stops if the peer prematurely stops their RPC session + if tx.send(Ok(utxo_block_response)).await.is_err() { + break; + } } debug!( diff --git a/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs index d60e7d3f37..b4790b56a2 100644 --- a/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs +++ b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs @@ -457,6 +457,8 @@ where let mut utxo_next_await_profiling = Vec::new(); let mut scan_for_outputs_profiling = Vec::new(); + let mut prev_scanned_block: Option = None; + let mut prev_output = None; while let Some(response) = { let start = Instant::now(); let utxo_stream_next = utxo_stream.next().await; @@ -478,42 +480,51 @@ where .into_iter() .map(|utxo| TransactionOutput::try_from(utxo).map_err(UtxoScannerError::ConversionError)) .collect::, _>>()?; - + let first_output = Some(outputs[0].clone()); total_scanned += outputs.len(); let start = Instant::now(); let found_outputs = self.scan_for_outputs(outputs).await?; scan_for_outputs_profiling.push(start.elapsed()); - let (count, amount) = self + let (mut count, mut amount) = self .import_utxos_to_transaction_service(found_outputs, current_height, mined_timestamp) .await?; let block_hash = current_header_hash.try_into()?; - self.resources.db.save_scanned_block(ScannedBlock { + if let Some(scanned_block) = prev_scanned_block { + if block_hash == scanned_block.header_hash && first_output == prev_output { + count += scanned_block.num_outputs.unwrap_or(0); + amount += scanned_block.amount.unwrap_or_else(|| 0.into()) + } else { + self.resources.db.save_scanned_block(scanned_block)?; + self.resources.db.clear_scanned_blocks_before_height( + current_height.saturating_sub(SCANNED_BLOCK_CACHE_SIZE), + true, + )?; + + if current_height % PROGRESS_REPORT_INTERVAL == 0 { + debug!( + target: LOG_TARGET, + "Scanned up to block {} with a current tip_height of {}", current_height, tip_height + ); + self.publish_event(UtxoScannerEvent::Progress { + current_height, + tip_height, + }); + } + + num_recovered = num_recovered.saturating_add(count); + total_amount += amount; + } + } + prev_output = first_output; + prev_scanned_block = Some(ScannedBlock { header_hash: block_hash, height: current_height, num_outputs: Some(count), amount: Some(amount), timestamp: Utc::now().naive_utc(), - })?; - - self.resources - .db - .clear_scanned_blocks_before_height(current_height.saturating_sub(SCANNED_BLOCK_CACHE_SIZE), true)?; - - if current_height % PROGRESS_REPORT_INTERVAL == 0 { - debug!( - target: LOG_TARGET, - "Scanned up to block {} with a current tip_height of {}", current_height, tip_height - ); - self.publish_event(UtxoScannerEvent::Progress { - current_height, - tip_height, - }); - } - - num_recovered = num_recovered.saturating_add(count); - total_amount += amount; + }); } trace!( target: LOG_TARGET, From f413b897bb0700fb9515a7ae61c071db835d9c1f Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Tue, 11 Apr 2023 15:47:18 +0200 Subject: [PATCH 2/3] fix edge case --- .../src/base_node/rpc/sync_utxos_by_block_task.rs | 15 ++++++++++++++- base_layer/core/tests/base_node_rpc.rs | 2 +- .../src/utxo_scanner_service/utxo_scanner_task.rs | 8 ++++++++ 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs b/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs index 9dd0bdf778..8458422769 100644 --- a/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs +++ b/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs @@ -141,7 +141,6 @@ where B: BlockchainBackend + 'static .fetch_utxos_in_block(current_header.hash(), Some(bitmap.clone())) .await .rpc_status_internal_error(LOG_TARGET)?; - let utxos = utxos .into_iter() .enumerate() @@ -158,6 +157,7 @@ where B: BlockchainBackend + 'static current_header.height, current_header_hash.to_hex(), ); + for utxo_chunk in utxos.chunks(2000) { let utxo_block_response = SyncUtxosByBlockResponse { outputs: utxo_chunk.to_vec(), @@ -170,6 +170,19 @@ where B: BlockchainBackend + 'static break; } } + if utxos.is_empty() { + // if its empty, we need to send an empty vec of outputs. + let utxo_block_response = SyncUtxosByBlockResponse { + outputs: utxos, + height: current_header.height, + header_hash: current_header_hash.to_vec(), + mined_timestamp: current_header.timestamp.as_u64(), + }; + // Ensure task stops if the peer prematurely stops their RPC session + if tx.send(Ok(utxo_block_response)).await.is_err() { + break; + } + } debug!( target: LOG_TARGET, diff --git a/base_layer/core/tests/base_node_rpc.rs b/base_layer/core/tests/base_node_rpc.rs index 9188a4ca0a..70768f7b08 100644 --- a/base_layer/core/tests/base_node_rpc.rs +++ b/base_layer/core/tests/base_node_rpc.rs @@ -389,7 +389,7 @@ async fn test_sync_utxos_by_block() { let mut streaming = service.sync_utxos_by_block(req).await.unwrap().into_inner(); let responses = convert_mpsc_to_stream(&mut streaming).collect::>().await; - + // dbg!(&block0); assert_eq!( vec![ (0, block0.header().hash().to_vec(), 0), diff --git a/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs index b4790b56a2..dfd4d4ca5e 100644 --- a/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs +++ b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs @@ -526,6 +526,14 @@ where timestamp: Utc::now().naive_utc(), }); } + // We need to update the last one + if let Some(scanned_block) = prev_scanned_block { + self.resources.db.clear_scanned_blocks_before_height( + scanned_block.height.saturating_sub(SCANNED_BLOCK_CACHE_SIZE), + true, + )?; + self.resources.db.save_scanned_block(scanned_block)?; + } trace!( target: LOG_TARGET, "bulletproof rewind profile - streamed {} outputs in {} ms", From b92aa881286695ee1762fc9779a0aabdb12f80e9 Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Tue, 11 Apr 2023 17:17:04 +0200 Subject: [PATCH 3/3] clippy --- base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs | 1 + base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs b/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs index 8458422769..1ec424eb90 100644 --- a/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs +++ b/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs @@ -91,6 +91,7 @@ where B: BlockchainBackend + 'static Ok(()) } + #[allow(clippy::too_many_lines)] async fn start_streaming( &self, tx: &mut mpsc::Sender>, diff --git a/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs index dfd4d4ca5e..7db42c4f45 100644 --- a/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs +++ b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs @@ -428,6 +428,7 @@ where } } + #[allow(clippy::too_many_lines)] async fn scan_utxos( &mut self, client: &mut BaseNodeWalletRpcClient,