diff --git a/WalletWasabi.Daemon/Global.cs b/WalletWasabi.Daemon/Global.cs index 49b211cc8e8..9d7ee1703c8 100644 --- a/WalletWasabi.Daemon/Global.cs +++ b/WalletWasabi.Daemon/Global.cs @@ -2,6 +2,7 @@ using NBitcoin; using Nito.AsyncEx; using System; +using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; @@ -37,7 +38,6 @@ using WalletWasabi.WebClients.ShopWare; using WalletWasabi.Wallets.FilterProcessor; using WalletWasabi.Models; -using WalletWasabi.WabiSabi.Models; namespace WalletWasabi.Daemon; @@ -153,7 +153,18 @@ public Global(string dataDir, string configFilePath, Config config) Network == Network.RegTest ? null : HostedServices.Get()); WalletManager = new WalletManager(config.Network, DataDir, new WalletDirectories(Config.Network, DataDir), walletFactory); - TransactionBroadcaster = new TransactionBroadcaster(Network, BitcoinStore, HttpClientFactory, WalletManager); + var p2p = HostedServices.Get(); + var broadcasters = new List(); + if (BitcoinCoreNode?.RpcClient is not null) + { + broadcasters.Add(new RpcBroadcaster(BitcoinCoreNode.RpcClient)); + } + broadcasters.AddRange([ + new NetworkBroadcaster(BitcoinStore.MempoolService, p2p.Nodes), + new BackendBroadcaster(HttpClientFactory) + ]); + + TransactionBroadcaster = new TransactionBroadcaster(broadcasters.ToArray(), BitcoinStore.MempoolService, WalletManager); WalletManager.WalletStateChanged += WalletManager_WalletStateChanged; } @@ -274,7 +285,6 @@ public async Task InitializeNoWalletAsync(bool initializeSleepInhibitor, Termina Logger.LogInfo("Start synchronizing filters..."); - TransactionBroadcaster.Initialize(HostedServices.Get().Nodes, BitcoinCoreNode?.RpcClient); CoinJoinProcessor = new CoinJoinProcessor(Network, HostedServices.Get(), WalletManager, BitcoinCoreNode?.RpcClient); await StartRpcServerAsync(terminateService, cancel).ConfigureAwait(false); diff --git a/WalletWasabi.Tests/RegressionTests/BackendTests.cs b/WalletWasabi.Tests/RegressionTests/BackendTests.cs index fc1892b9d67..73afb50fbc5 100644 --- a/WalletWasabi.Tests/RegressionTests/BackendTests.cs +++ b/WalletWasabi.Tests/RegressionTests/BackendTests.cs @@ -174,8 +174,7 @@ public async Task GetUnconfirmedTxChainAsync() var blockCount = await rpc.GetBlockCountAsync(); await setup.WaitForFiltersToBeProcessedAsync(TimeSpan.FromSeconds(120), blockCount); - TransactionBroadcaster broadcaster = new(network, bitcoinStore, httpClientFactory, walletManager); - broadcaster.Initialize(nodes, rpc); + TransactionBroadcaster broadcaster = new([new RpcBroadcaster(rpc)], bitcoinStore.MempoolService, walletManager); #endregion Initialize diff --git a/WalletWasabi.Tests/RegressionTests/BuildTransactionReorgsTest.cs b/WalletWasabi.Tests/RegressionTests/BuildTransactionReorgsTest.cs index 26b9226c192..87bee77c877 100644 --- a/WalletWasabi.Tests/RegressionTests/BuildTransactionReorgsTest.cs +++ b/WalletWasabi.Tests/RegressionTests/BuildTransactionReorgsTest.cs @@ -111,8 +111,7 @@ public async Task BuildTransactionReorgsTestAsync() var coin = Assert.Single(wallet.Coins); Assert.True(coin.Confirmed); - TransactionBroadcaster broadcaster = new(network, bitcoinStore, httpClientFactory, walletManager); - broadcaster.Initialize(nodes, rpc); + TransactionBroadcaster broadcaster = new([], bitcoinStore.MempoolService, walletManager); // Send money before reorg. PaymentIntent operations = new(scp, Money.Coins(0.011m)); diff --git a/WalletWasabi.Tests/RegressionTests/CancelTests.cs b/WalletWasabi.Tests/RegressionTests/CancelTests.cs index df7953885fc..26fa13024d5 100644 --- a/WalletWasabi.Tests/RegressionTests/CancelTests.cs +++ b/WalletWasabi.Tests/RegressionTests/CancelTests.cs @@ -101,9 +101,6 @@ public async Task CancelTestsAsync() await setup.WaitForFiltersToBeProcessedAsync(TimeSpan.FromSeconds(120), blockCount); wallet.Password = password; - TransactionBroadcaster broadcaster = new(network, bitcoinStore, httpClientFactory, walletManager); - broadcaster.Initialize(nodes, rpc); - var waitCount = 0; while (wallet.Coins.Sum(x => x.Amount) == Money.Zero) { @@ -123,6 +120,7 @@ public async Task CancelTestsAsync() SetHighInputAnonsets(txToCancel); + TransactionBroadcaster broadcaster = new([new RpcBroadcaster(rpc)], bitcoinStore.MempoolService, walletManager); await broadcaster.SendTransactionAsync(txToCancel.Transaction); AssertAllAnonsets1(txToCancel); diff --git a/WalletWasabi.Tests/RegressionTests/MaxFeeTests.cs b/WalletWasabi.Tests/RegressionTests/MaxFeeTests.cs index ea43369c096..dea1432bb88 100644 --- a/WalletWasabi.Tests/RegressionTests/MaxFeeTests.cs +++ b/WalletWasabi.Tests/RegressionTests/MaxFeeTests.cs @@ -108,9 +108,6 @@ public async Task CalculateMaxFeeTestAsync() await setup.WaitForFiltersToBeProcessedAsync(TimeSpan.FromSeconds(120), blockCount); wallet.Password = password; - TransactionBroadcaster broadcaster = new(network, bitcoinStore, httpClientFactory, walletManager); - broadcaster.Initialize(nodes, rpc); - var waitCount = 0; while (wallet.Coins.Sum(x => x.Amount) == Money.Zero) { diff --git a/WalletWasabi.Tests/RegressionTests/ReceiveSpeedupTests.cs b/WalletWasabi.Tests/RegressionTests/ReceiveSpeedupTests.cs index 45a6680b6db..ba58c507c9b 100644 --- a/WalletWasabi.Tests/RegressionTests/ReceiveSpeedupTests.cs +++ b/WalletWasabi.Tests/RegressionTests/ReceiveSpeedupTests.cs @@ -98,9 +98,6 @@ public async Task ReceiveSpeedupTestsAsync() wallet.Password = password; - TransactionBroadcaster broadcaster = new(network, bitcoinStore, httpClientFactory, walletManager); - broadcaster.Initialize(nodes, rpc); - // Get some money. var key = keyManager.GetNextReceiveKey("foo"); var txId = await rpc.SendToAddressAsync(key.GetP2wpkhAddress(network), Money.Coins(1m)); @@ -122,6 +119,8 @@ public async Task ReceiveSpeedupTestsAsync() Assert.True(bitcoinStore.TransactionStore.TryGetTransaction(txId, out var txToSpeedUp)); var cpfp = await wallet.SpeedUpTransactionAsync(txToSpeedUp, null, CancellationToken.None); + + TransactionBroadcaster broadcaster = new([new RpcBroadcaster(rpc)], bitcoinStore.MempoolService, walletManager); await broadcaster.SendTransactionAsync(cpfp.Transaction); Assert.Equal("foo", txToSpeedUp.Labels.Single()); diff --git a/WalletWasabi.Tests/RegressionTests/SelfSpendSpeedupTests.cs b/WalletWasabi.Tests/RegressionTests/SelfSpendSpeedupTests.cs index 638e9fe3c5f..c2b8570b226 100644 --- a/WalletWasabi.Tests/RegressionTests/SelfSpendSpeedupTests.cs +++ b/WalletWasabi.Tests/RegressionTests/SelfSpendSpeedupTests.cs @@ -105,9 +105,6 @@ public async Task SelfSpendSpeedupTestsAsync() wallet.Password = password; - TransactionBroadcaster broadcaster = new(network, bitcoinStore, httpClientFactory, walletManager); - broadcaster.Initialize(nodes, rpc); - var waitCount = 0; while (wallet.Coins.Sum(x => x.Amount) == Money.Zero) { @@ -124,6 +121,8 @@ public async Task SelfSpendSpeedupTestsAsync() Money amountToSend = wallet.Coins.Where(x => x.IsAvailable()).Sum(x => x.Amount) / 2; var txToSpeedUp = wallet.BuildTransaction(password, new PaymentIntent(keyManager.GetNextReceiveKey("foo").GetAssumedScriptPubKey(), amountToSend, label: "bar"), FeeStrategy.SevenDaysConfirmationTargetStrategy, allowUnconfirmed: true); + + TransactionBroadcaster broadcaster = new([new RpcBroadcaster(rpc)], bitcoinStore.MempoolService, walletManager); await broadcaster.SendTransactionAsync(txToSpeedUp.Transaction); Assert.Equal(Assert.Single(txToSpeedUp.SpentCoins).SpenderTransaction, txToSpeedUp.Transaction); diff --git a/WalletWasabi.Tests/RegressionTests/SendSpeedupTests.cs b/WalletWasabi.Tests/RegressionTests/SendSpeedupTests.cs index 29d7d3b7f0c..ab3ebeb3b90 100644 --- a/WalletWasabi.Tests/RegressionTests/SendSpeedupTests.cs +++ b/WalletWasabi.Tests/RegressionTests/SendSpeedupTests.cs @@ -103,9 +103,6 @@ public async Task SendSpeedupTestsAsync() await setup.WaitForFiltersToBeProcessedAsync(TimeSpan.FromSeconds(120), blockCount); wallet.Password = password; - TransactionBroadcaster broadcaster = new(network, bitcoinStore, httpClientFactory, walletManager); - broadcaster.Initialize(nodes, rpc); - var waitCount = 0; while (wallet.Coins.Sum(x => x.Amount) == Money.Zero) { @@ -122,6 +119,8 @@ public async Task SendSpeedupTestsAsync() Money amountToSend = wallet.Coins.Where(x => x.IsAvailable()).Sum(x => x.Amount) / 2; var rpcAddress = await rpc.GetNewAddressAsync(); var txToSpeedUp = wallet.BuildTransaction(password, new PaymentIntent(rpcAddress, amountToSend, label: "bar"), FeeStrategy.SevenDaysConfirmationTargetStrategy, allowUnconfirmed: true); + + TransactionBroadcaster broadcaster = new([new RpcBroadcaster(rpc)], bitcoinStore.MempoolService, walletManager); await broadcaster.SendTransactionAsync(txToSpeedUp.Transaction); Assert.Equal(Assert.Single(txToSpeedUp.SpentCoins).SpenderTransaction, txToSpeedUp.Transaction); diff --git a/WalletWasabi.Tests/RegressionTests/SendTests.cs b/WalletWasabi.Tests/RegressionTests/SendTests.cs index cc6e33e6ad6..ecaf8b462ef 100644 --- a/WalletWasabi.Tests/RegressionTests/SendTests.cs +++ b/WalletWasabi.Tests/RegressionTests/SendTests.cs @@ -107,9 +107,6 @@ public async Task SendTestsAsync() var blockCount = await rpc.GetBlockCountAsync(); await setup.WaitForFiltersToBeProcessedAsync(TimeSpan.FromSeconds(120), blockCount); - TransactionBroadcaster broadcaster = new(network, bitcoinStore, httpClientFactory, walletManager); - broadcaster.Initialize(nodes, rpc); - var waitCount = 0; while (wallet.Coins.Sum(x => x.Amount) == Money.Zero) { @@ -137,6 +134,8 @@ public async Task SendTestsAsync() Assert.Equal(Money.Coins(1m), res2.SpentCoins.Single().Amount); Assert.False(res2.SpendsUnconfirmed); + TransactionBroadcaster broadcaster = new([new RpcBroadcaster(rpc)], bitcoinStore.MempoolService, walletManager); + await broadcaster.SendTransactionAsync(res2.Transaction); Assert.Contains(res2.InnerWalletOutputs.Single(), wallet.Coins); diff --git a/WalletWasabi.Tests/RegressionTests/SpendUnconfirmedTxTests.cs b/WalletWasabi.Tests/RegressionTests/SpendUnconfirmedTxTests.cs index 4e0f14e3f0d..54c79240f81 100644 --- a/WalletWasabi.Tests/RegressionTests/SpendUnconfirmedTxTests.cs +++ b/WalletWasabi.Tests/RegressionTests/SpendUnconfirmedTxTests.cs @@ -114,9 +114,6 @@ public async Task SpendUnconfirmedTxTestAsync() Assert.Equal(tx0Id, eventArgs.NewlyReceivedCoins.Single().TransactionId); Assert.Single(wallet.Coins); - TransactionBroadcaster broadcaster = new(network, bitcoinStore, httpClientFactory, walletManager); - broadcaster.Initialize(nodes, rpc); - using Key key2 = new(); using Key key3 = new(); var destination1 = key.PubKey.GetAddress(ScriptPubKeyType.Segwit, Network.Main); @@ -135,6 +132,8 @@ public async Task SpendUnconfirmedTxTestAsync() eventAwaiter = new EventAwaiter( h => wallet.TransactionProcessor.WalletRelevantTransactionProcessed += h, h => wallet.TransactionProcessor.WalletRelevantTransactionProcessed -= h); + + TransactionBroadcaster broadcaster = new([new RpcBroadcaster(rpc)], bitcoinStore.MempoolService, walletManager); await broadcaster.SendTransactionAsync(tx1Res.Transaction); eventArgs = await eventAwaiter.WaitAsync(TimeSpan.FromSeconds(21)); Assert.Equal(tx0Id, eventArgs.NewlySpentCoins.Single().TransactionId); diff --git a/WalletWasabi/BitcoinP2p/P2pBehavior.cs b/WalletWasabi/BitcoinP2p/P2pBehavior.cs index 8342f5b9a15..893521db305 100644 --- a/WalletWasabi/BitcoinP2p/P2pBehavior.cs +++ b/WalletWasabi/BitcoinP2p/P2pBehavior.cs @@ -90,13 +90,8 @@ private async Task ProcessGetDataAsync(Node node, GetDataPayload payload) foreach (var inv in payload.Inventory.Where(inv => inv.Type.HasFlag(InventoryType.MSG_TX) || inv.Type.HasFlag(InventoryType.MSG_WTX))) { - if (MempoolService.TryGetFromBroadcastStore(inv.Hash, out TransactionBroadcastEntry? entry)) // If we have the transaction to be broadcasted then broadcast it now. + if (MempoolService.TryGetFromBroadcastStore(inv.Hash, node.RemoteSocketEndpoint.ToString(), out TransactionBroadcastEntry? entry)) // If we have the transaction to be broadcasted then broadcast it now. { - if (entry.NodeRemoteSocketEndpoint != node.RemoteSocketEndpoint.ToString()) - { - continue; // Would be strange. It could be some kind of attack. - } - try { var txPayload = new TxPayload(entry.Transaction.Transaction); diff --git a/WalletWasabi/BitcoinP2p/TrustedP2pBehavior.cs b/WalletWasabi/BitcoinP2p/TrustedP2pBehavior.cs index a9f60ef78a3..114574c637f 100644 --- a/WalletWasabi/BitcoinP2p/TrustedP2pBehavior.cs +++ b/WalletWasabi/BitcoinP2p/TrustedP2pBehavior.cs @@ -18,7 +18,7 @@ protected override bool ProcessInventoryVector(InventoryVector inv, EndPoint rem { if (inv.Type.HasFlag(InventoryType.MSG_TX)) { - if (MempoolService.TryGetFromBroadcastStore(inv.Hash, out TransactionBroadcastEntry? entry)) // If we have the transaction then adjust confirmation. + if (MempoolService.TryGetFromBroadcastStore(inv.Hash, remoteSocketEndpoint.ToString(), out TransactionBroadcastEntry? entry)) // If we have the transaction then adjust confirmation. { if (entry.NodeRemoteSocketEndpoint == remoteSocketEndpoint.ToString()) { diff --git a/WalletWasabi/BitcoinP2p/UntrustedP2pBehavior.cs b/WalletWasabi/BitcoinP2p/UntrustedP2pBehavior.cs index 744c916ab8e..3cd5a8b7a06 100644 --- a/WalletWasabi/BitcoinP2p/UntrustedP2pBehavior.cs +++ b/WalletWasabi/BitcoinP2p/UntrustedP2pBehavior.cs @@ -23,7 +23,7 @@ protected override bool ProcessInventoryVector(InventoryVector inv, EndPoint rem { if (inv.Type.HasFlag(InventoryType.MSG_TX)) { - if (MempoolService.TryGetFromBroadcastStore(inv.Hash, out TransactionBroadcastEntry? entry)) // If we have the transaction then adjust confirmation. + if (MempoolService.TryGetFromBroadcastStore(inv.Hash, remoteSocketEndpoint.ToString(), out TransactionBroadcastEntry? entry)) // If we have the transaction then adjust confirmation. { if (entry.NodeRemoteSocketEndpoint == remoteSocketEndpoint.ToString()) { diff --git a/WalletWasabi/Blockchain/Mempool/MempoolService.cs b/WalletWasabi/Blockchain/Mempool/MempoolService.cs index e4ce5ec3a65..5b03adfb40f 100644 --- a/WalletWasabi/Blockchain/Mempool/MempoolService.cs +++ b/WalletWasabi/Blockchain/Mempool/MempoolService.cs @@ -42,24 +42,22 @@ public bool TryAddToBroadcastStore(SmartTransaction transaction, string nodeRemo { lock (BroadcastStoreLock) { - if (BroadcastStore.Any(x => x.TransactionId == transaction.GetHash())) + if (BroadcastStore.Any(x => x.TransactionId == transaction.GetHash() && x.NodeRemoteSocketEndpoint == nodeRemoteSocketEndpoint)) { return false; } - else - { - var entry = new TransactionBroadcastEntry(transaction, nodeRemoteSocketEndpoint); - BroadcastStore.Add(entry); - return true; - } + + var entry = new TransactionBroadcastEntry(transaction, nodeRemoteSocketEndpoint); + BroadcastStore.Add(entry); + return true; } } - public bool TryGetFromBroadcastStore(uint256 transactionHash, [NotNullWhen(true)] out TransactionBroadcastEntry? entry) + public bool TryGetFromBroadcastStore(uint256 transactionHash, string? nodeRemoteSocketEndpoint, [NotNullWhen(true)] out TransactionBroadcastEntry? entry) { lock (BroadcastStoreLock) { - entry = BroadcastStore.FirstOrDefault(x => x.TransactionId == transactionHash); + entry = BroadcastStore.FirstOrDefault(x => x.TransactionId == transactionHash && (nodeRemoteSocketEndpoint is null || nodeRemoteSocketEndpoint == x.NodeRemoteSocketEndpoint)); return entry is not null; } } @@ -67,7 +65,7 @@ public bool TryGetFromBroadcastStore(uint256 transactionHash, [NotNullWhen(true) public LabelsArray TryGetLabel(uint256 txid) { var label = LabelsArray.Empty; - if (TryGetFromBroadcastStore(txid, out var entry)) + if (TryGetFromBroadcastStore(txid, null, out var entry)) { label = entry.Transaction.Labels; } diff --git a/WalletWasabi/Blockchain/TransactionBroadcasting/TransactionBroadcaster.cs b/WalletWasabi/Blockchain/TransactionBroadcasting/TransactionBroadcaster.cs index 3ba518b3411..0a6e849056c 100644 --- a/WalletWasabi/Blockchain/TransactionBroadcasting/TransactionBroadcaster.cs +++ b/WalletWasabi/Blockchain/TransactionBroadcasting/TransactionBroadcaster.cs @@ -1,212 +1,236 @@ using NBitcoin; using NBitcoin.Protocol; -using System.Collections.Generic; using System.Linq; using System.Net.Http; using System.Threading.Tasks; -using WabiSabi.Crypto.Randomness; +using NBitcoin.RPC; using WalletWasabi.BitcoinCore.Rpc; +using WalletWasabi.Blockchain.Mempool; using WalletWasabi.Blockchain.Transactions; using WalletWasabi.Extensions; +using WalletWasabi.Helpers; using WalletWasabi.Logging; using WalletWasabi.Models; -using WalletWasabi.Stores; -using WalletWasabi.Tor.Http; using WalletWasabi.Wallets; using WalletWasabi.WebClients.Wasabi; namespace WalletWasabi.Blockchain.TransactionBroadcasting; -public class TransactionBroadcaster +using BroadcastingResult = Result; + +public abstract record BroadcastError { - public TransactionBroadcaster(Network network, BitcoinStore bitcoinStore, WasabiHttpClientFactory httpClientFactory, WalletManager walletManager) - { - Network = network; - BitcoinStore = bitcoinStore; - HttpClientFactory = httpClientFactory; - WalletManager = walletManager; - } + public record SpentError : BroadcastError; + public record SpentInputError(OutPoint SpentOutpoint) : BroadcastError; + public record RpcError(string RpcErrorMessage) : BroadcastError; + public record Unknown(string Message) : BroadcastError; + public record NotEnoughP2pNodes : BroadcastError; + public record AggregatedErrors(BroadcastError[] Errors) : BroadcastError; +} - private BitcoinStore BitcoinStore { get; } - private IWasabiHttpClientFactory HttpClientFactory { get; } - private Network Network { get; } - private NodesGroup? Nodes { get; set; } - private IRPCClient? RpcClient { get; set; } - private WalletManager WalletManager { get; } - private WasabiRandom Random { get; } = SecureRandom.Instance; +public interface IBroadcaster +{ + Task BroadcastAsync(SmartTransaction tx); +} - public void Initialize(NodesGroup nodes, IRPCClient? rpcClient) +public class RpcBroadcaster(IRPCClient rpcClient) : IBroadcaster +{ + public async Task BroadcastAsync(SmartTransaction tx) { - Nodes = nodes; - RpcClient = rpcClient; + Logger.LogInfo($"Trying to broadcast transaction via RPC:{tx.GetHash()}."); + try + { + await rpcClient.SendRawTransactionAsync(tx.Transaction).ConfigureAwait(false); + return BroadcastingResult.Ok(); + } + catch (RPCException ex) + { + return BroadcastingResult.Fail(new BroadcastError.RpcError(ex.RPCCodeMessage)); + } } +} - private async Task BroadcastTransactionToNetworkNodeAsync(SmartTransaction transaction, Node node) +public class BackendBroadcaster(IWasabiHttpClientFactory wasabiHttpClientFactory) : IBroadcaster +{ + public async Task BroadcastAsync(SmartTransaction tx) { - Logger.LogInfo($"Trying to broadcast transaction with random node ({node.RemoteSocketAddress}):{transaction.GetHash()}."); - if (!BitcoinStore.MempoolService.TryAddToBroadcastStore(transaction, node.RemoteSocketEndpoint.ToString())) // So we'll reply to INV with this transaction. + Logger.LogInfo($"Trying to broadcast transaction via backend API:{tx.GetHash()}."); + try { - Logger.LogWarning($"Transaction {transaction.GetHash()} was already present in the broadcast store."); + var wasabiClient = new WasabiClient(wasabiHttpClientFactory.NewHttpClientWithCircuitPerRequest()); + await wasabiClient.BroadcastAsync(tx).ConfigureAwait(false); + return BroadcastingResult.Ok(); } - var invPayload = new InvPayload(transaction.Transaction); - - // Give 7 seconds to send the inv payload. - await node.SendMessageAsync(invPayload).WaitAsync(TimeSpan.FromSeconds(7)).ConfigureAwait(false); // ToDo: It's dangerous way to cancel. Implement proper cancellation to NBitcoin! - - if (BitcoinStore.MempoolService.TryGetFromBroadcastStore(transaction.GetHash(), out TransactionBroadcastEntry? entry)) + catch (HttpRequestException ex) { - // Give 7 seconds for serving. - var timeout = 0; - while (!entry.IsBroadcasted()) + if (RpcErrorTools.IsSpentError(ex.Message)) { - if (timeout > 7) + // If there is only one coin then that's the one that is already spent (what about full-RBF?). + if (tx.Transaction.Inputs.Count == 1) { - throw new TimeoutException("Did not serve the transaction."); + OutPoint input = tx.Transaction.Inputs[0].PrevOut; + return BroadcastingResult.Fail(new BroadcastError.SpentInputError(input)); } - await Task.Delay(1_000).ConfigureAwait(false); - timeout++; - } - node.DisconnectAsync("Thank you!"); - Logger.LogInfo($"Disconnected node: {node.RemoteSocketAddress}. Successfully broadcasted transaction: {transaction.GetHash()}."); - // Give 21 seconds for propagation. - timeout = 0; - while (entry.GetPropagationConfirmations() < 2) - { - if (timeout > 21) - { - throw new TimeoutException("Did not serve the transaction."); - } - await Task.Delay(1_000).ConfigureAwait(false); - timeout++; + return BroadcastingResult.Fail(new BroadcastError.SpentError()); } - Logger.LogInfo($"Transaction is successfully propagated: {transaction.GetHash()}."); - } - else - { - Logger.LogWarning($"Expected transaction {transaction.GetHash()} was not found in the broadcast store."); + return BroadcastingResult.Fail(new BroadcastError.Unknown(ex.Message)); } } +} - private async Task BroadcastTransactionToBackendAsync(SmartTransaction transaction) +public class NetworkBroadcaster(MempoolService mempoolService, NodesGroup nodes) : IBroadcaster +{ + public async Task BroadcastAsync(SmartTransaction tx) { - Logger.LogInfo("Broadcasting with backend..."); - IHttpClient httpClient = HttpClientFactory.NewHttpClientWithCircuitPerRequest(); - - WasabiClient client = new(httpClient); - - try + if (nodes.ConnectedNodes.Count < 2) { - await client.BroadcastAsync(transaction).ConfigureAwait(false); + return BroadcastingResult.Fail(new BroadcastError.NotEnoughP2pNodes()); } - catch (HttpRequestException ex2) when (RpcErrorTools.IsSpentError(ex2.Message)) + var connectedNodeCount = nodes.ConnectedNodes.Count; + var broadcastToNodeTasks = nodes.ConnectedNodes + .Where(n => n.IsConnected) + .OrderBy(_ => Guid.NewGuid()) + .Take(1 + connectedNodeCount / 3) + .Select(n => BroadcastCoreAsync(tx, n)) + .ToList(); + + var tasksToWaitFor = broadcastToNodeTasks.ToList(); + Task> completedTask; + do { - if (transaction.Transaction.Inputs.Count == 1) // If we tried to only spend one coin, then we can mark it as spent. If there were more coins, then we do not know. - { - OutPoint input = transaction.Transaction.Inputs.First().PrevOut; - foreach (var coin in WalletManager.CoinsByOutPoint(input)) - { - coin.SpentAccordingToBackend = true; - } - } - - // Exception message is in form: 'message:::tx1:::tx2:::etc.' where txs are encoded in HEX. - IEnumerable txs = ex2.Message.Split(":::", StringSplitOptions.RemoveEmptyEntries) - .Skip(1) // Skip the exception message. - .Select(x => new SmartTransaction(Transaction.Parse(x, Network), Height.Mempool)); - - foreach (var tx in txs) + completedTask = await Task.WhenAny(tasksToWaitFor).ConfigureAwait(false); + tasksToWaitFor.Remove(completedTask); + var result = await completedTask.ConfigureAwait(false); + if (result.IsOk) { - WalletManager.Process(tx); + return result; } + } while (completedTask.IsFaulted && tasksToWaitFor.Count > 0); - throw; - } - - BelieveTransaction(transaction); - - Logger.LogInfo($"Transaction is successfully broadcasted to backend: {transaction.GetHash()}."); + var results = await Task.WhenAll(broadcastToNodeTasks).ConfigureAwait(false); + return results.SequenceResults().ThenError(e => new BroadcastError.AggregatedErrors(e)); } - private void BelieveTransaction(SmartTransaction transaction) + private async Task BroadcastCoreAsync(SmartTransaction tx, Node node) { - if (transaction.Height == Height.Unknown) + Logger.LogInfo($"Trying to broadcast transaction with random node ({node.RemoteSocketAddress}):{tx.GetHash()}."); + var txId = tx.GetHash(); + if (!mempoolService.TryAddToBroadcastStore(tx, node.RemoteSocketEndpoint.ToString())) // So we'll reply to INV with this transaction. { - transaction.SetUnconfirmed(); + Logger.LogInfo($"Transaction {txId} was already present in the broadcast store."); } + var invPayload = new InvPayload(tx.Transaction); - BitcoinStore.MempoolService.TryAddToBroadcastStore(transaction, "N/A"); - - WalletManager.Process(transaction); - } + await node.SendMessageAsync(invPayload).WaitAsync(TimeSpan.FromSeconds(3)).ConfigureAwait(false); // ToDo: It's dangerous way to cancel. Implement proper cancellation to NBitcoin! - public async Task SendTransactionAsync(SmartTransaction transaction) - { - try + if (mempoolService.TryGetFromBroadcastStore(txId, node.RemoteSocketEndpoint.ToString(), out TransactionBroadcastEntry? entry)) { - // Broadcast to a random node. - // Wait until it arrives to at least two other nodes. - // If something's wrong, fall back broadcasting with rpc, then backend. + var broadcastTimeoutTask = Task.Delay(7000); + var broadcastFinishedTask = await Task.WhenAny([broadcastTimeoutTask, entry.BroadcastCompleted.Task]).ConfigureAwait(false); - if (Network == Network.RegTest) + if (broadcastFinishedTask == broadcastTimeoutTask) { - throw new InvalidOperationException($"Transaction broadcasting to nodes does not work in {Network.RegTest}."); + return BroadcastingResult.Fail(new BroadcastError.Unknown($"Timed out to broadcast to {node.RemoteSocketEndpoint} node")); } + node.DisconnectAsync("Thank you!"); + Logger.LogInfo($"Disconnected node: {node.RemoteSocketAddress}. Successfully broadcasted transaction: {txId}."); - if (Nodes is null) - { - throw new InvalidOperationException($"Nodes are not yet initialized."); - } + var propagationTimeoutTask = Task.Delay(7000); + var propagationFinishedTask = await Task.WhenAny([ broadcastTimeoutTask, entry.PropagationConfirmed.Task]).ConfigureAwait(false); - Node? node = Nodes.ConnectedNodes.RandomElement(Random); - while (node is null || !node.IsConnected || Nodes.ConnectedNodes.Count < 5) + if (propagationFinishedTask == propagationTimeoutTask) { - // As long as we are connected to at least 4 nodes, we can always try again. - // 3 should be enough, but make it 5 so 2 nodes could disconnect in the meantime. - if (Nodes.ConnectedNodes.Count < 5) - { - throw new InvalidOperationException("We are not connected to enough nodes."); - } - await Task.Delay(100).ConfigureAwait(false); - node = Nodes.ConnectedNodes.RandomElement(Random); + return BroadcastingResult.Fail(new BroadcastError.Unknown("Timed out to verify propagation.")); } - await BroadcastTransactionToNetworkNodeAsync(transaction, node).ConfigureAwait(false); } - catch (Exception ex) + else { - Logger.LogInfo($"Random node could not broadcast transaction. Reason: {ex.Message}."); - Logger.LogDebug(ex); + Logger.LogWarning($"Expected transaction {txId} was not found in the broadcast store."); + } - if (RpcClient is { }) - { - try + return BroadcastingResult.Ok(); + } +} + +public class TransactionBroadcaster(IBroadcaster[] broadcasters, MempoolService mempoolService, WalletManager walletManager) +{ + public async Task SendTransactionAsync(SmartTransaction tx) + { + var broadcastedSuccessfully = false; + await broadcasters + .ToAsyncEnumerable() + .SelectAwait(async x => await x.BroadcastAsync(tx).ConfigureAwait(false)) + .TakeUntil(x => x.Match(_ => true, _ => false)) + .ForEachAsync(b => b.MatchDo( + BroadcastSuccess, + BroadcastError + )) + .ConfigureAwait(false); + + if (!broadcastedSuccessfully) + { + throw new InvalidOperationException("Error while sending transaction."); + } + + return; + + void BroadcastSuccess(Unit _) + { + broadcastedSuccessfully = true; + BroadcastSuccessfully(tx); + } + } + + private void BroadcastError(BroadcastError broadcastError) + { + switch (broadcastError) + { + case BroadcastError.RpcError rpcError: + Logger.LogInfo($"Failed to broadcast transaction via RPC. Reason: {rpcError.RpcErrorMessage}."); + break; + case BroadcastError.SpentError _: + Logger.LogError("Failed to broadcast transaction. There are spent inputs."); + break; + case BroadcastError.SpentInputError spentInputError: + Logger.LogError($"Failed to broadcast transaction. Input {spentInputError.SpentOutpoint} is already spent."); + foreach (var coin in walletManager.CoinsByOutPoint(spentInputError.SpentOutpoint)) { - await BroadcastTransactionWithRpcAsync(transaction).ConfigureAwait(false); + coin.SpentAccordingToBackend = true; } - catch (Exception ex2) + break; + case BroadcastError.NotEnoughP2pNodes _: + Logger.LogInfo("Failed to broadcast transaction via peer-to-peer network: We are not connected to enough nodes."); + break; + case BroadcastError.Unknown unknown: + Logger.LogInfo($"Failed to broadcast transaction: {unknown.Message}."); + break; + case BroadcastError.AggregatedErrors aggregatedErrors: + foreach (var error in aggregatedErrors.Errors) { - Logger.LogInfo($"RPC could not broadcast transaction. Reason: {ex2.Message}."); - Logger.LogDebug(ex2); - - await BroadcastTransactionToBackendAsync(transaction).ConfigureAwait(false); + BroadcastError(error); } - } - else - { - await BroadcastTransactionToBackendAsync(transaction).ConfigureAwait(false); - } + break; + default: + throw new ArgumentOutOfRangeException(nameof(broadcastError)); } } - private async Task BroadcastTransactionWithRpcAsync(SmartTransaction transaction) + private void BroadcastSuccessfully(SmartTransaction tx) + { + BelieveTransaction(tx); + Logger.LogInfo($"Transaction is successfully broadcasted: {tx.GetHash()}."); + } + + private void BelieveTransaction(SmartTransaction transaction) { - if (RpcClient is null) + if (transaction.Height == Height.Unknown) { - throw new InvalidOperationException("Trying to broadcast on RPC but it is not initialized."); + transaction.SetUnconfirmed(); } - await RpcClient.SendRawTransactionAsync(transaction.Transaction).ConfigureAwait(false); - BelieveTransaction(transaction); - Logger.LogInfo($"Transaction is successfully broadcasted with RPC: {transaction.GetHash()}."); + mempoolService.TryAddToBroadcastStore(transaction, "N/A"); + + walletManager.Process(transaction); } } diff --git a/WalletWasabi/Blockchain/TransactionProcessing/TransactionProcessor.cs b/WalletWasabi/Blockchain/TransactionProcessing/TransactionProcessor.cs index 38321d1584b..4ca2afdc416 100644 --- a/WalletWasabi/Blockchain/TransactionProcessing/TransactionProcessor.cs +++ b/WalletWasabi/Blockchain/TransactionProcessing/TransactionProcessor.cs @@ -105,7 +105,7 @@ private ProcessedResult ProcessNoLock(SmartTransaction tx) uint256 txId = tx.GetHash(); // If we already have the transaction, then let's work on that. - if (MempoolService?.TryGetFromBroadcastStore(txId, out var foundEntry) is true) + if (MempoolService?.TryGetFromBroadcastStore(txId, null, out var foundEntry) is true) { // If we already have the transaction in the broadcast store, then let's work on that. foundEntry.Transaction.TryUpdate(tx); diff --git a/WalletWasabi/Blockchain/Transactions/TransactionBroadcastEntry.cs b/WalletWasabi/Blockchain/Transactions/TransactionBroadcastEntry.cs index eb77ecb9e34..f06c8387402 100644 --- a/WalletWasabi/Blockchain/Transactions/TransactionBroadcastEntry.cs +++ b/WalletWasabi/Blockchain/Transactions/TransactionBroadcastEntry.cs @@ -1,3 +1,4 @@ +using System.Threading.Tasks; using NBitcoin; namespace WalletWasabi.Blockchain.Transactions; @@ -6,62 +7,32 @@ public class TransactionBroadcastEntry { public TransactionBroadcastEntry(SmartTransaction transaction, string nodeRemoteSocketEndpoint) { - Lock = new object(); Transaction = transaction; TransactionId = Transaction.GetHash(); - Broadcasted = false; - PropagationConfirmations = 0; NodeRemoteSocketEndpoint = nodeRemoteSocketEndpoint; + BroadcastCompleted = new TaskCompletionSource(); + PropagationConfirmed = new TaskCompletionSource(); } public SmartTransaction Transaction { get; } public uint256 TransactionId { get; } public string NodeRemoteSocketEndpoint { get; } - private bool Broadcasted { get; set; } - private int PropagationConfirmations { get; set; } - - private object Lock { get; } + public TaskCompletionSource BroadcastCompleted { get; } + public TaskCompletionSource PropagationConfirmed { get; } public void MakeBroadcasted() { - lock (Lock) - { - Broadcasted = true; - } - } - - public bool IsBroadcasted() - { - lock (Lock) - { - return Broadcasted; - } + BroadcastCompleted.TrySetResult(); } public void ConfirmPropagationOnce() { - lock (Lock) - { - Broadcasted = true; - PropagationConfirmations++; - } + PropagationConfirmed.TrySetResult(); } public void ConfirmPropagationForGood() { - lock (Lock) - { - Broadcasted = true; - PropagationConfirmations = 21; - } - } - - public int GetPropagationConfirmations() - { - lock (Lock) - { - return PropagationConfirmations; - } + PropagationConfirmed.TrySetResult(); } } diff --git a/WalletWasabi/Extensions/LinqExtensions.cs b/WalletWasabi/Extensions/LinqExtensions.cs index 002f93c69fb..f208422ab8d 100644 --- a/WalletWasabi/Extensions/LinqExtensions.cs +++ b/WalletWasabi/Extensions/LinqExtensions.cs @@ -166,6 +166,17 @@ public static IEnumerable TakeUntil(this IEnumerable list, Func TakeUntil(this IAsyncEnumerable list, Func predicate) + { + await foreach (T el in list) + { + yield return el; + if (predicate(el)) + { + yield break; + } + } + } public static IEnumerable Singleton(this T item) { yield return item; diff --git a/WalletWasabi/Helpers/Result.cs b/WalletWasabi/Helpers/Result.cs index f3e7b9d75ca..045a78a6650 100644 --- a/WalletWasabi/Helpers/Result.cs +++ b/WalletWasabi/Helpers/Result.cs @@ -1,5 +1,6 @@ using System.Collections.Generic; using System.Linq; +using WalletWasabi.Userfacing.Bip21; namespace WalletWasabi.Helpers; @@ -54,6 +55,8 @@ public void MatchDo(Action success, Action failure) } } + public bool IsOk => _isSuccess; + public Result Then(Func f) => Match(v => Result.Ok(f(v)), e => e); @@ -91,6 +94,9 @@ public static Result Sequence(IEnumerable> results) => .Match( _ => Result.Ok(), es => Result.Fail(es)); + + public Result ThenError(Func f) => + Match(_=> Result.Ok(), e => Result.Fail(f(e))); } public static class ResultExtensions