Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlockDownloadService integration #1

Conversation

turbolay
Copy link

No description provided.

Turbolay added 5 commits January 9, 2024 23:03
# Conflicts:
#	WalletWasabi.Daemon/Global.cs
#	WalletWasabi.Tests/UnitTests/Wallet/FilterProcessor/BlockDownloadServiceTests.cs
#	WalletWasabi/Wallets/FilterProcessor/BlockDownloadService.cs
#	WalletWasabi/Wallets/WalletFilterProcessor.cs
@@ -103,6 +103,9 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
try
{
// This will store all pre processing tasks per sync type. No need for Priority here as requests are added by successive Heights.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@turbolay Is the description here WalletWasabi#12148 (comment) up to date please?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is now. Let me know it it was not enough for you to understand the logic.


FilterModel filter = await FilterIteratorsBySyncType[request.SyncType].GetAndRemoveAsync(currentHeight, cancellationToken).ConfigureAwait(false);
var matchFound = await ProcessFilterModelAsync(filter, request.SyncType, cancellationToken).ConfigureAwait(false);
// TODO: Reorgs??
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYM? This block is inside using (await ReorgLock.LockAsync(cancellationToken).ConfigureAwait(false)). Can you clarify?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentHeight: 100
blockchainTip: 105

First iteration: heights 100, 101, ..., 105 are enqueued for pre-processing
Height 100 is Dequeued, processed. 101, ..., 105 are still in the queue

The ExecuteAsync makes a loop, so ReorgLock is released. We receive and process a ReorgNotification for blocks 105 and 104.

-> There are still the preprocessing tasks for 104 and 105 in the Queue.

So they will be cancelled by the BDS, and we could catch that in the processing. We could also catch that in the loop when acquiring ReorgLock.

It also causes a slight extra complexity if a new block is mined.

I know there is a solution, I just didn't deeply think about it.

@kiminuo
Copy link
Owner

kiminuo commented Jan 23, 2024

@turbolay Could you describe how the integration is supposed to work on the high level (for me to check that against code)? The code is quite confusing to me. I'm not sure if WalletWasabi#12148 (comment) is up to date or not.

@turbolay
Copy link
Author

It is more or less up-to-date but I will write a better explanation right now

@turbolay
Copy link
Author

turbolay commented Jan 23, 2024

Currently, when a request is received, WFP will make a loop in ExecuteAsync. This loop is simply calling ProcessFilterModel, which tests keys of the requested syncType against filter at currentHeight, if required downloads block and processes txs and then finally WPF updates the height for the syncType to currentHeight + 1.

What this PR is doing is simply to have ProcessFilterModel twice for each filter:

  • One non-blocking: Pre-processing. It's done when currentHeight is lower than the height of the filter. The keys tested against the filter here are the keys known at the time, so not necessarily the full set of keys that needs to be tested against it (because we can derive new keys on the way to reach the height of the filter). That is why we need the second processing. During this, we only test the keys, start downloading if required, and return the download task if there is one. We do not wait for the block nor process any transaction
  • Blocking: Processing. This is initiated when currentHeight reaches the height of the filter. So at this point the set of keys we have is the full one that has to be tested against this filter. We need to pass the result of the pre-processing to the function. If the preprocessing matched, we await for the block downloading task. If not, we test the keys not yet tested during pre-processing (not yet implemented in this PR but there is a TODO, currently, I'm testing ALL keys so I'm testing most of them twice). If it matches, we download and process transactions. If not, we return. Then we update currentHeight.

Now most of the complexity comes from the loop in ExecuteAsync. This loop is not so straightforward, and the main complexity comes down to the fact that it's a balance between 2 things:

  • If you don't start enough pre-processing tasks at the same time, you're at risk that the pre-processing becomes useless because you would only see benefits on wallets that have really frequent transactions.
  • If you start too many pre-processing tasks at the same time, you're also at risk of the feature being useless because too many keys are derived in the interval and therefore you miss blocks during pre-processing. Also, you need the FilterModel object from pre-processing all the way to final processing, so you have to respect MaxFilterInMemory per syncType.

Some other properties really important to understand the code:

  • When WFP receives a sync request, it means that it always needs to synchronize the syncType from currentHeight to blockchainTip.
  • WFP always tries to process the filters consecutively: currentHeight + 1, currentHeight + 2, ..., blockchainTip - 1, blockchainTip

Here's a breakdown of the code with many more comments to explain it:

uint toProcessHeight = (uint)lastHeight.Value + 1; // Height we have to PROCESS (blocking)

// How many preprocessing tasks we can initiate still respecting MaxNumberFiltersInMemory?
var availablePreProcessSlots = Math.Max(0, FilterIteratorsBySyncType[request.SyncType].MaxNumberFiltersInMemory - preProcessTasks.Count);

// Height to which we can start pre-processing. This makes use of the "consecutive heights" property. If toProcessHeight is 100, and we have 10 pre-processing slots available, it must be for heights 100, 101, 102, ..., 109. 
// So at next iteration, task 100 is dequeued from preProcessTasks, so we have 9 tasks in there and toProcessHeight will be 101. We still have 10 slots available, so we want to start tasks starting from height: toProcessHeight (101) + number of tasks (9) for 10 slots: 110, 111, ..., 109
var toPreProcessHeight = toProcessHeight + (uint)preProcessTasks[request.SyncType].Count;

for (uint i = 0; i < availablePreProcessSlots; i++)
{
	// Stop at blockchain tip
	if (toPreProcessHeight > BitcoinStore.SmartHeaderChain.TipHeight)
	{
		break;
	}

	// Get the filter from the iterator for the height to preprocess. This filter will stay in memory until the end of the blocking process. Another solution would be to re-request it but I prefer this way
	var filter = await FilterIteratorsBySyncType[request.SyncType].GetAndRemoveAsync(toPreProcessHeight, cancellationToken).ConfigureAwait(false);

	// We start the task. PreProcessFilterModel will test current keys we have, if it's not a match it will return null if it's a match it will return the task to download the block, handled by the BDS.
	preProcessTasks[request.SyncType].Enqueue(new FilterPreProcessing(filter, PreProcessFilterModel(filter, request.SyncType, cancellationToken)));

	// We could use i here instead of doing that.
	toPreProcessHeight++;
}

if (preProcessTasks[request.SyncType].Count == 0)
{
	// TODO: I believe this must be a problem?
	continue;
}

// TODO: Reorgs??

// The actual Blocking Processing. We get the result from the preprocessing using Dequeue. It works because of the consecutive height property.
var matchFound = await ProcessFilterModelAsync(preProcessTasks[request.SyncType].Dequeue(), request.SyncType, cancellationToken).ConfigureAwait(false);

@@ -26,12 +26,12 @@ public class WalletFilterProcessor : BackgroundService
/// <remarks>Guarded by <see cref="Lock"/>.</remarks>
private FilterModel? _lastProcessedFilter;

public WalletFilterProcessor(KeyManager keyManager, BitcoinStore bitcoinStore, TransactionProcessor transactionProcessor, IBlockProvider blockProvider)
public WalletFilterProcessor(KeyManager keyManager, BitcoinStore bitcoinStore, TransactionProcessor transactionProcessor, BlockDownloadService blockDownloadService)
Copy link
Owner

@kiminuo kiminuo Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will merge back master after your PR is merged

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, I did this sort of in WalletWasabi#12314.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I will close all my PRs to keep only this one. I will ake final review for the service today, I believe it's ready to merge.

For the integration, we will do it in 2 times:

  • Normal integration
  • Add preprocessing

Turbolay added 2 commits January 25, 2024 23:19
# Conflicts:
#	WalletWasabi.Daemon/Global.cs
#	WalletWasabi.Tests/UnitTests/Wallet/WalletBuilder.cs
#	WalletWasabi/Wallets/FilterProcessor/BlockDownloadService.cs
#	WalletWasabi/Wallets/WalletFilterProcessor.cs
kiminuo pushed a commit that referenced this pull request Apr 10, 2024
@turbolay turbolay closed this May 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants