Skip to content

Commit

Permalink
Make transaction broadcasting faster. (WalletWasabi#13351)
Browse files Browse the repository at this point in the history
* Make transaction broadcasting faster.

Instead of trying to broadcast a transaction with nodes one by one, this version tries with one or more in parallel. It doesn't iterate in a loop but uses a `TaskCompletion` task to get a notification when the tx is propagated.

The number of selected nodes is one thrid of the connected nodes plus one, what makes sure to have always connected nodes (it doesn't disconnect them all). Also, this allows users to broadcast transaction immediately after the wallet starts because at that moment there use to be a very few connected nodes. Before, this was almost always impossible and the backend had to be used in most of the cases.

* Pascal case + nits

* Do not wait for all nodes to succeed

* CR suggestions

---------

Co-authored-by: Turbolay <[email protected]>
  • Loading branch information
lontivero and turbolay authored Aug 30, 2024
1 parent e8967b2 commit 1a47be1
Show file tree
Hide file tree
Showing 19 changed files with 228 additions and 225 deletions.
16 changes: 13 additions & 3 deletions WalletWasabi.Daemon/Global.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using NBitcoin;
using Nito.AsyncEx;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
Expand Down Expand Up @@ -37,7 +38,6 @@
using WalletWasabi.WebClients.ShopWare;
using WalletWasabi.Wallets.FilterProcessor;
using WalletWasabi.Models;
using WalletWasabi.WabiSabi.Models;

namespace WalletWasabi.Daemon;

Expand Down Expand Up @@ -153,7 +153,18 @@ public Global(string dataDir, string configFilePath, Config config)
Network == Network.RegTest ? null : HostedServices.Get<CpfpInfoProvider>());

WalletManager = new WalletManager(config.Network, DataDir, new WalletDirectories(Config.Network, DataDir), walletFactory);
TransactionBroadcaster = new TransactionBroadcaster(Network, BitcoinStore, HttpClientFactory, WalletManager);
var p2p = HostedServices.Get<P2pNetwork>();
var broadcasters = new List<IBroadcaster>();
if (BitcoinCoreNode?.RpcClient is not null)
{
broadcasters.Add(new RpcBroadcaster(BitcoinCoreNode.RpcClient));
}
broadcasters.AddRange([
new NetworkBroadcaster(BitcoinStore.MempoolService, p2p.Nodes),
new BackendBroadcaster(HttpClientFactory)
]);

TransactionBroadcaster = new TransactionBroadcaster(broadcasters.ToArray(), BitcoinStore.MempoolService, WalletManager);

WalletManager.WalletStateChanged += WalletManager_WalletStateChanged;
}
Expand Down Expand Up @@ -274,7 +285,6 @@ public async Task InitializeNoWalletAsync(bool initializeSleepInhibitor, Termina

Logger.LogInfo("Start synchronizing filters...");

TransactionBroadcaster.Initialize(HostedServices.Get<P2pNetwork>().Nodes, BitcoinCoreNode?.RpcClient);
CoinJoinProcessor = new CoinJoinProcessor(Network, HostedServices.Get<WasabiSynchronizer>(), WalletManager, BitcoinCoreNode?.RpcClient);

await StartRpcServerAsync(terminateService, cancel).ConfigureAwait(false);
Expand Down
3 changes: 1 addition & 2 deletions WalletWasabi.Tests/RegressionTests/BackendTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,7 @@ public async Task GetUnconfirmedTxChainAsync()
var blockCount = await rpc.GetBlockCountAsync();
await setup.WaitForFiltersToBeProcessedAsync(TimeSpan.FromSeconds(120), blockCount);

TransactionBroadcaster broadcaster = new(network, bitcoinStore, httpClientFactory, walletManager);
broadcaster.Initialize(nodes, rpc);
TransactionBroadcaster broadcaster = new([new RpcBroadcaster(rpc)], bitcoinStore.MempoolService, walletManager);

#endregion Initialize

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ public async Task BuildTransactionReorgsTestAsync()

var coin = Assert.Single(wallet.Coins);
Assert.True(coin.Confirmed);
TransactionBroadcaster broadcaster = new(network, bitcoinStore, httpClientFactory, walletManager);
broadcaster.Initialize(nodes, rpc);
TransactionBroadcaster broadcaster = new([], bitcoinStore.MempoolService, walletManager);

// Send money before reorg.
PaymentIntent operations = new(scp, Money.Coins(0.011m));
Expand Down
4 changes: 1 addition & 3 deletions WalletWasabi.Tests/RegressionTests/CancelTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ public async Task CancelTestsAsync()
await setup.WaitForFiltersToBeProcessedAsync(TimeSpan.FromSeconds(120), blockCount);
wallet.Password = password;

TransactionBroadcaster broadcaster = new(network, bitcoinStore, httpClientFactory, walletManager);
broadcaster.Initialize(nodes, rpc);

var waitCount = 0;
while (wallet.Coins.Sum(x => x.Amount) == Money.Zero)
{
Expand All @@ -123,6 +120,7 @@ public async Task CancelTestsAsync()

SetHighInputAnonsets(txToCancel);

TransactionBroadcaster broadcaster = new([new RpcBroadcaster(rpc)], bitcoinStore.MempoolService, walletManager);
await broadcaster.SendTransactionAsync(txToCancel.Transaction);

AssertAllAnonsets1(txToCancel);
Expand Down
3 changes: 0 additions & 3 deletions WalletWasabi.Tests/RegressionTests/MaxFeeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ public async Task CalculateMaxFeeTestAsync()
await setup.WaitForFiltersToBeProcessedAsync(TimeSpan.FromSeconds(120), blockCount);
wallet.Password = password;

TransactionBroadcaster broadcaster = new(network, bitcoinStore, httpClientFactory, walletManager);
broadcaster.Initialize(nodes, rpc);

var waitCount = 0;
while (wallet.Coins.Sum(x => x.Amount) == Money.Zero)
{
Expand Down
5 changes: 2 additions & 3 deletions WalletWasabi.Tests/RegressionTests/ReceiveSpeedupTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,6 @@ public async Task ReceiveSpeedupTestsAsync()

wallet.Password = password;

TransactionBroadcaster broadcaster = new(network, bitcoinStore, httpClientFactory, walletManager);
broadcaster.Initialize(nodes, rpc);

// Get some money.
var key = keyManager.GetNextReceiveKey("foo");
var txId = await rpc.SendToAddressAsync(key.GetP2wpkhAddress(network), Money.Coins(1m));
Expand All @@ -122,6 +119,8 @@ public async Task ReceiveSpeedupTestsAsync()

Assert.True(bitcoinStore.TransactionStore.TryGetTransaction(txId, out var txToSpeedUp));
var cpfp = await wallet.SpeedUpTransactionAsync(txToSpeedUp, null, CancellationToken.None);

TransactionBroadcaster broadcaster = new([new RpcBroadcaster(rpc)], bitcoinStore.MempoolService, walletManager);
await broadcaster.SendTransactionAsync(cpfp.Transaction);

Assert.Equal("foo", txToSpeedUp.Labels.Single());
Expand Down
5 changes: 2 additions & 3 deletions WalletWasabi.Tests/RegressionTests/SelfSpendSpeedupTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,6 @@ public async Task SelfSpendSpeedupTestsAsync()

wallet.Password = password;

TransactionBroadcaster broadcaster = new(network, bitcoinStore, httpClientFactory, walletManager);
broadcaster.Initialize(nodes, rpc);

var waitCount = 0;
while (wallet.Coins.Sum(x => x.Amount) == Money.Zero)
{
Expand All @@ -124,6 +121,8 @@ public async Task SelfSpendSpeedupTestsAsync()

Money amountToSend = wallet.Coins.Where(x => x.IsAvailable()).Sum(x => x.Amount) / 2;
var txToSpeedUp = wallet.BuildTransaction(password, new PaymentIntent(keyManager.GetNextReceiveKey("foo").GetAssumedScriptPubKey(), amountToSend, label: "bar"), FeeStrategy.SevenDaysConfirmationTargetStrategy, allowUnconfirmed: true);

TransactionBroadcaster broadcaster = new([new RpcBroadcaster(rpc)], bitcoinStore.MempoolService, walletManager);
await broadcaster.SendTransactionAsync(txToSpeedUp.Transaction);

Assert.Equal(Assert.Single(txToSpeedUp.SpentCoins).SpenderTransaction, txToSpeedUp.Transaction);
Expand Down
5 changes: 2 additions & 3 deletions WalletWasabi.Tests/RegressionTests/SendSpeedupTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,6 @@ public async Task SendSpeedupTestsAsync()
await setup.WaitForFiltersToBeProcessedAsync(TimeSpan.FromSeconds(120), blockCount);
wallet.Password = password;

TransactionBroadcaster broadcaster = new(network, bitcoinStore, httpClientFactory, walletManager);
broadcaster.Initialize(nodes, rpc);

var waitCount = 0;
while (wallet.Coins.Sum(x => x.Amount) == Money.Zero)
{
Expand All @@ -122,6 +119,8 @@ public async Task SendSpeedupTestsAsync()
Money amountToSend = wallet.Coins.Where(x => x.IsAvailable()).Sum(x => x.Amount) / 2;
var rpcAddress = await rpc.GetNewAddressAsync();
var txToSpeedUp = wallet.BuildTransaction(password, new PaymentIntent(rpcAddress, amountToSend, label: "bar"), FeeStrategy.SevenDaysConfirmationTargetStrategy, allowUnconfirmed: true);

TransactionBroadcaster broadcaster = new([new RpcBroadcaster(rpc)], bitcoinStore.MempoolService, walletManager);
await broadcaster.SendTransactionAsync(txToSpeedUp.Transaction);

Assert.Equal(Assert.Single(txToSpeedUp.SpentCoins).SpenderTransaction, txToSpeedUp.Transaction);
Expand Down
5 changes: 2 additions & 3 deletions WalletWasabi.Tests/RegressionTests/SendTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,6 @@ public async Task SendTestsAsync()
var blockCount = await rpc.GetBlockCountAsync();
await setup.WaitForFiltersToBeProcessedAsync(TimeSpan.FromSeconds(120), blockCount);

TransactionBroadcaster broadcaster = new(network, bitcoinStore, httpClientFactory, walletManager);
broadcaster.Initialize(nodes, rpc);

var waitCount = 0;
while (wallet.Coins.Sum(x => x.Amount) == Money.Zero)
{
Expand Down Expand Up @@ -137,6 +134,8 @@ public async Task SendTestsAsync()
Assert.Equal(Money.Coins(1m), res2.SpentCoins.Single().Amount);
Assert.False(res2.SpendsUnconfirmed);

TransactionBroadcaster broadcaster = new([new RpcBroadcaster(rpc)], bitcoinStore.MempoolService, walletManager);

await broadcaster.SendTransactionAsync(res2.Transaction);

Assert.Contains(res2.InnerWalletOutputs.Single(), wallet.Coins);
Expand Down
5 changes: 2 additions & 3 deletions WalletWasabi.Tests/RegressionTests/SpendUnconfirmedTxTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,6 @@ public async Task SpendUnconfirmedTxTestAsync()
Assert.Equal(tx0Id, eventArgs.NewlyReceivedCoins.Single().TransactionId);
Assert.Single(wallet.Coins);

TransactionBroadcaster broadcaster = new(network, bitcoinStore, httpClientFactory, walletManager);
broadcaster.Initialize(nodes, rpc);

using Key key2 = new();
using Key key3 = new();
var destination1 = key.PubKey.GetAddress(ScriptPubKeyType.Segwit, Network.Main);
Expand All @@ -135,6 +132,8 @@ public async Task SpendUnconfirmedTxTestAsync()
eventAwaiter = new EventAwaiter<ProcessedResult>(
h => wallet.TransactionProcessor.WalletRelevantTransactionProcessed += h,
h => wallet.TransactionProcessor.WalletRelevantTransactionProcessed -= h);

TransactionBroadcaster broadcaster = new([new RpcBroadcaster(rpc)], bitcoinStore.MempoolService, walletManager);
await broadcaster.SendTransactionAsync(tx1Res.Transaction);
eventArgs = await eventAwaiter.WaitAsync(TimeSpan.FromSeconds(21));
Assert.Equal(tx0Id, eventArgs.NewlySpentCoins.Single().TransactionId);
Expand Down
7 changes: 1 addition & 6 deletions WalletWasabi/BitcoinP2p/P2pBehavior.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,8 @@ private async Task ProcessGetDataAsync(Node node, GetDataPayload payload)

foreach (var inv in payload.Inventory.Where(inv => inv.Type.HasFlag(InventoryType.MSG_TX) || inv.Type.HasFlag(InventoryType.MSG_WTX)))
{
if (MempoolService.TryGetFromBroadcastStore(inv.Hash, out TransactionBroadcastEntry? entry)) // If we have the transaction to be broadcasted then broadcast it now.
if (MempoolService.TryGetFromBroadcastStore(inv.Hash, node.RemoteSocketEndpoint.ToString(), out TransactionBroadcastEntry? entry)) // If we have the transaction to be broadcasted then broadcast it now.
{
if (entry.NodeRemoteSocketEndpoint != node.RemoteSocketEndpoint.ToString())
{
continue; // Would be strange. It could be some kind of attack.
}

try
{
var txPayload = new TxPayload(entry.Transaction.Transaction);
Expand Down
2 changes: 1 addition & 1 deletion WalletWasabi/BitcoinP2p/TrustedP2pBehavior.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ protected override bool ProcessInventoryVector(InventoryVector inv, EndPoint rem
{
if (inv.Type.HasFlag(InventoryType.MSG_TX))
{
if (MempoolService.TryGetFromBroadcastStore(inv.Hash, out TransactionBroadcastEntry? entry)) // If we have the transaction then adjust confirmation.
if (MempoolService.TryGetFromBroadcastStore(inv.Hash, remoteSocketEndpoint.ToString(), out TransactionBroadcastEntry? entry)) // If we have the transaction then adjust confirmation.
{
if (entry.NodeRemoteSocketEndpoint == remoteSocketEndpoint.ToString())
{
Expand Down
2 changes: 1 addition & 1 deletion WalletWasabi/BitcoinP2p/UntrustedP2pBehavior.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ protected override bool ProcessInventoryVector(InventoryVector inv, EndPoint rem
{
if (inv.Type.HasFlag(InventoryType.MSG_TX))
{
if (MempoolService.TryGetFromBroadcastStore(inv.Hash, out TransactionBroadcastEntry? entry)) // If we have the transaction then adjust confirmation.
if (MempoolService.TryGetFromBroadcastStore(inv.Hash, remoteSocketEndpoint.ToString(), out TransactionBroadcastEntry? entry)) // If we have the transaction then adjust confirmation.
{
if (entry.NodeRemoteSocketEndpoint == remoteSocketEndpoint.ToString())
{
Expand Down
18 changes: 8 additions & 10 deletions WalletWasabi/Blockchain/Mempool/MempoolService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,32 +42,30 @@ public bool TryAddToBroadcastStore(SmartTransaction transaction, string nodeRemo
{
lock (BroadcastStoreLock)
{
if (BroadcastStore.Any(x => x.TransactionId == transaction.GetHash()))
if (BroadcastStore.Any(x => x.TransactionId == transaction.GetHash() && x.NodeRemoteSocketEndpoint == nodeRemoteSocketEndpoint))
{
return false;
}
else
{
var entry = new TransactionBroadcastEntry(transaction, nodeRemoteSocketEndpoint);
BroadcastStore.Add(entry);
return true;
}

var entry = new TransactionBroadcastEntry(transaction, nodeRemoteSocketEndpoint);
BroadcastStore.Add(entry);
return true;
}
}

public bool TryGetFromBroadcastStore(uint256 transactionHash, [NotNullWhen(true)] out TransactionBroadcastEntry? entry)
public bool TryGetFromBroadcastStore(uint256 transactionHash, string? nodeRemoteSocketEndpoint, [NotNullWhen(true)] out TransactionBroadcastEntry? entry)
{
lock (BroadcastStoreLock)
{
entry = BroadcastStore.FirstOrDefault(x => x.TransactionId == transactionHash);
entry = BroadcastStore.FirstOrDefault(x => x.TransactionId == transactionHash && (nodeRemoteSocketEndpoint is null || nodeRemoteSocketEndpoint == x.NodeRemoteSocketEndpoint));
return entry is not null;
}
}

public LabelsArray TryGetLabel(uint256 txid)
{
var label = LabelsArray.Empty;
if (TryGetFromBroadcastStore(txid, out var entry))
if (TryGetFromBroadcastStore(txid, null, out var entry))
{
label = entry.Transaction.Labels;
}
Expand Down
Loading

0 comments on commit 1a47be1

Please sign in to comment.