From 9c9dacb64339c1c9454a0a8fafd820ffe2382bf6 Mon Sep 17 00:00:00 2001 From: Lucas Ontivero Date: Tue, 17 Sep 2024 03:19:43 -0400 Subject: [PATCH] More resilient broadcasting mechanism (#13381) --- WalletWasabi/BitcoinP2p/P2pBehavior.cs | 8 +- WalletWasabi/BitcoinP2p/TrustedP2pBehavior.cs | 6 +- .../BitcoinP2p/UntrustedP2pBehavior.cs | 6 +- .../Blockchain/Mempool/MempoolService.cs | 12 +-- .../TransactionBroadcaster.cs | 76 +++++++++++++------ .../TransactionProcessor.cs | 2 +- .../Transactions/TransactionBroadcastEntry.cs | 48 ++++++++---- 7 files changed, 103 insertions(+), 55 deletions(-) diff --git a/WalletWasabi/BitcoinP2p/P2pBehavior.cs b/WalletWasabi/BitcoinP2p/P2pBehavior.cs index 893521db305..3b9a80e5672 100644 --- a/WalletWasabi/BitcoinP2p/P2pBehavior.cs +++ b/WalletWasabi/BitcoinP2p/P2pBehavior.cs @@ -90,20 +90,20 @@ private async Task ProcessGetDataAsync(Node node, GetDataPayload payload) foreach (var inv in payload.Inventory.Where(inv => inv.Type.HasFlag(InventoryType.MSG_TX) || inv.Type.HasFlag(InventoryType.MSG_WTX))) { - if (MempoolService.TryGetFromBroadcastStore(inv.Hash, node.RemoteSocketEndpoint.ToString(), out TransactionBroadcastEntry? entry)) // If we have the transaction to be broadcasted then broadcast it now. + if (MempoolService.TryGetFromBroadcastStore(inv.Hash, out TransactionBroadcastEntry? entry)) // If we have the transaction to be broadcasted then broadcast it now. { try { var txPayload = new TxPayload(entry.Transaction.Transaction); if (!node.IsConnected) { - Logger.LogInfo($"Could not serve transaction. Node ({node.RemoteSocketEndpoint}) is not connected anymore: {entry.TransactionId}."); + Logger.LogDebug($"Could not serve transaction. Node ({node.RemoteSocketEndpoint}) is not connected anymore: {entry.TransactionId}."); } else { await node.SendMessageAsync(txPayload).ConfigureAwait(false); - entry.MakeBroadcasted(); - Logger.LogInfo($"Successfully served transaction to node ({node.RemoteSocketEndpoint}): {entry.TransactionId}."); + entry.BroadcastedTo(node.RemoteSocketEndpoint); + Logger.LogDebug($"Successfully served transaction to node ({node.RemoteSocketEndpoint}): {entry.TransactionId}."); } } catch (Exception ex) diff --git a/WalletWasabi/BitcoinP2p/TrustedP2pBehavior.cs b/WalletWasabi/BitcoinP2p/TrustedP2pBehavior.cs index 114574c637f..5a3f20d485b 100644 --- a/WalletWasabi/BitcoinP2p/TrustedP2pBehavior.cs +++ b/WalletWasabi/BitcoinP2p/TrustedP2pBehavior.cs @@ -18,14 +18,14 @@ protected override bool ProcessInventoryVector(InventoryVector inv, EndPoint rem { if (inv.Type.HasFlag(InventoryType.MSG_TX)) { - if (MempoolService.TryGetFromBroadcastStore(inv.Hash, remoteSocketEndpoint.ToString(), out TransactionBroadcastEntry? entry)) // If we have the transaction then adjust confirmation. + if (MempoolService.TryGetFromBroadcastStore(inv.Hash, out TransactionBroadcastEntry? entry)) // If we have the transaction then adjust confirmation. { - if (entry.NodeRemoteSocketEndpoint == remoteSocketEndpoint.ToString()) + if (entry.WasBroadcastedTo(remoteSocketEndpoint)) { return false; // Wtf, why are you trying to broadcast it back to us? } - entry.ConfirmPropagationForGood(); + entry.ConfirmPropagationForGood(remoteSocketEndpoint); } // If we already processed it continue. diff --git a/WalletWasabi/BitcoinP2p/UntrustedP2pBehavior.cs b/WalletWasabi/BitcoinP2p/UntrustedP2pBehavior.cs index 3cd5a8b7a06..106f581f006 100644 --- a/WalletWasabi/BitcoinP2p/UntrustedP2pBehavior.cs +++ b/WalletWasabi/BitcoinP2p/UntrustedP2pBehavior.cs @@ -23,14 +23,14 @@ protected override bool ProcessInventoryVector(InventoryVector inv, EndPoint rem { if (inv.Type.HasFlag(InventoryType.MSG_TX)) { - if (MempoolService.TryGetFromBroadcastStore(inv.Hash, remoteSocketEndpoint.ToString(), out TransactionBroadcastEntry? entry)) // If we have the transaction then adjust confirmation. + if (MempoolService.TryGetFromBroadcastStore(inv.Hash, out TransactionBroadcastEntry? entry)) // If we have the transaction then adjust confirmation. { - if (entry.NodeRemoteSocketEndpoint == remoteSocketEndpoint.ToString()) + if (entry.WasBroadcastedTo(remoteSocketEndpoint)) { return false; // Wtf, why are you trying to broadcast it back to us? } - entry.ConfirmPropagationOnce(); + entry.ConfirmPropagationOnce(remoteSocketEndpoint); } // If we already processed it or we're in trusted node mode, then don't ask for it. diff --git a/WalletWasabi/Blockchain/Mempool/MempoolService.cs b/WalletWasabi/Blockchain/Mempool/MempoolService.cs index 5b03adfb40f..a15f84178ab 100644 --- a/WalletWasabi/Blockchain/Mempool/MempoolService.cs +++ b/WalletWasabi/Blockchain/Mempool/MempoolService.cs @@ -38,26 +38,26 @@ public class MempoolService public bool TrustedNodeMode { get; set; } - public bool TryAddToBroadcastStore(SmartTransaction transaction, string nodeRemoteSocketEndpoint) + public bool TryAddToBroadcastStore(SmartTransaction transaction) { lock (BroadcastStoreLock) { - if (BroadcastStore.Any(x => x.TransactionId == transaction.GetHash() && x.NodeRemoteSocketEndpoint == nodeRemoteSocketEndpoint)) + if (BroadcastStore.Any(x => x.TransactionId == transaction.GetHash())) { return false; } - var entry = new TransactionBroadcastEntry(transaction, nodeRemoteSocketEndpoint); + var entry = new TransactionBroadcastEntry(transaction); BroadcastStore.Add(entry); return true; } } - public bool TryGetFromBroadcastStore(uint256 transactionHash, string? nodeRemoteSocketEndpoint, [NotNullWhen(true)] out TransactionBroadcastEntry? entry) + public bool TryGetFromBroadcastStore(uint256 transactionHash, [NotNullWhen(true)] out TransactionBroadcastEntry? entry) { lock (BroadcastStoreLock) { - entry = BroadcastStore.FirstOrDefault(x => x.TransactionId == transactionHash && (nodeRemoteSocketEndpoint is null || nodeRemoteSocketEndpoint == x.NodeRemoteSocketEndpoint)); + entry = BroadcastStore.FirstOrDefault(x => x.TransactionId == transactionHash); return entry is not null; } } @@ -65,7 +65,7 @@ public bool TryGetFromBroadcastStore(uint256 transactionHash, string? nodeRemote public LabelsArray TryGetLabel(uint256 txid) { var label = LabelsArray.Empty; - if (TryGetFromBroadcastStore(txid, null, out var entry)) + if (TryGetFromBroadcastStore(txid, out var entry)) { label = entry.Transaction.Labels; } diff --git a/WalletWasabi/Blockchain/TransactionBroadcasting/TransactionBroadcaster.cs b/WalletWasabi/Blockchain/TransactionBroadcasting/TransactionBroadcaster.cs index 0a6e849056c..6b6ff735af3 100644 --- a/WalletWasabi/Blockchain/TransactionBroadcasting/TransactionBroadcaster.cs +++ b/WalletWasabi/Blockchain/TransactionBroadcasting/TransactionBroadcaster.cs @@ -1,6 +1,7 @@ using NBitcoin; using NBitcoin.Protocol; using System.Linq; +using System.Net; using System.Net.Http; using System.Threading.Tasks; using NBitcoin.RPC; @@ -16,7 +17,17 @@ namespace WalletWasabi.Blockchain.TransactionBroadcasting; -using BroadcastingResult = Result; +using BroadcastingResult = Result; + + +public abstract record BroadcastOk +{ + public record BroadcastedByRpc : BroadcastOk; + + public record BroadcastedByNetwork(EndPoint[] Nodes) : BroadcastOk; + + public record BroadcastedByBackend() : BroadcastOk; +} public abstract record BroadcastError { @@ -41,7 +52,7 @@ public async Task BroadcastAsync(SmartTransaction tx) try { await rpcClient.SendRawTransactionAsync(tx.Transaction).ConfigureAwait(false); - return BroadcastingResult.Ok(); + return BroadcastingResult.Ok(new BroadcastOk.BroadcastedByRpc()); } catch (RPCException ex) { @@ -59,7 +70,7 @@ public async Task BroadcastAsync(SmartTransaction tx) { var wasabiClient = new WasabiClient(wasabiHttpClientFactory.NewHttpClientWithCircuitPerRequest()); await wasabiClient.BroadcastAsync(tx).ConfigureAwait(false); - return BroadcastingResult.Ok(); + return BroadcastingResult.Ok(new BroadcastOk.BroadcastedByBackend()); } catch (HttpRequestException ex) { @@ -87,16 +98,16 @@ public async Task BroadcastAsync(SmartTransaction tx) { return BroadcastingResult.Fail(new BroadcastError.NotEnoughP2pNodes()); } - var connectedNodeCount = nodes.ConnectedNodes.Count; + var connectedNodeCount = nodes.ConnectedNodes.Count(x => x.IsConnected); var broadcastToNodeTasks = nodes.ConnectedNodes .Where(n => n.IsConnected) .OrderBy(_ => Guid.NewGuid()) - .Take(1 + connectedNodeCount / 3) + .Take(2 + connectedNodeCount / 4) .Select(n => BroadcastCoreAsync(tx, n)) .ToList(); var tasksToWaitFor = broadcastToNodeTasks.ToList(); - Task> completedTask; + Task completedTask; do { completedTask = await Task.WhenAny(tasksToWaitFor).ConfigureAwait(false); @@ -109,22 +120,27 @@ public async Task BroadcastAsync(SmartTransaction tx) } while (completedTask.IsFaulted && tasksToWaitFor.Count > 0); var results = await Task.WhenAll(broadcastToNodeTasks).ConfigureAwait(false); - return results.SequenceResults().ThenError(e => new BroadcastError.AggregatedErrors(e)); + var errors = results + .Select(r => r.Match(_ => (IsError: false, Error: null)!, e => (IsError: true, Error: e))) + .Where(x => x.IsError) + .Select(x => x.Error) + .ToArray(); + return BroadcastingResult.Fail(new BroadcastError.AggregatedErrors(errors)); } private async Task BroadcastCoreAsync(SmartTransaction tx, Node node) { Logger.LogInfo($"Trying to broadcast transaction with random node ({node.RemoteSocketAddress}):{tx.GetHash()}."); var txId = tx.GetHash(); - if (!mempoolService.TryAddToBroadcastStore(tx, node.RemoteSocketEndpoint.ToString())) // So we'll reply to INV with this transaction. + if (!mempoolService.TryAddToBroadcastStore(tx)) // So we'll reply to INV with this transaction. { - Logger.LogInfo($"Transaction {txId} was already present in the broadcast store."); + Logger.LogDebug($"Transaction {txId} was already present in the broadcast store."); } var invPayload = new InvPayload(tx.Transaction); await node.SendMessageAsync(invPayload).WaitAsync(TimeSpan.FromSeconds(3)).ConfigureAwait(false); // ToDo: It's dangerous way to cancel. Implement proper cancellation to NBitcoin! - if (mempoolService.TryGetFromBroadcastStore(txId, node.RemoteSocketEndpoint.ToString(), out TransactionBroadcastEntry? entry)) + if (mempoolService.TryGetFromBroadcastStore(txId, out TransactionBroadcastEntry? entry)) { var broadcastTimeoutTask = Task.Delay(7000); var broadcastFinishedTask = await Task.WhenAny([broadcastTimeoutTask, entry.BroadcastCompleted.Task]).ConfigureAwait(false); @@ -137,19 +153,19 @@ private async Task BroadcastCoreAsync(SmartTransaction tx, N Logger.LogInfo($"Disconnected node: {node.RemoteSocketAddress}. Successfully broadcasted transaction: {txId}."); var propagationTimeoutTask = Task.Delay(7000); - var propagationFinishedTask = await Task.WhenAny([ broadcastTimeoutTask, entry.PropagationConfirmed.Task]).ConfigureAwait(false); + var propagationTask = entry.PropagationConfirmed.Task; + var propagationFinishedTask = await Task.WhenAny([ propagationTimeoutTask, propagationTask]).ConfigureAwait(false); if (propagationFinishedTask == propagationTimeoutTask) { return BroadcastingResult.Fail(new BroadcastError.Unknown("Timed out to verify propagation.")); } - } - else - { - Logger.LogWarning($"Expected transaction {txId} was not found in the broadcast store."); + + var propagators = await propagationTask.ConfigureAwait(false); + return BroadcastingResult.Ok(new BroadcastOk.BroadcastedByNetwork(propagators)); } - return BroadcastingResult.Ok(); + return BroadcastingResult.Fail(new BroadcastError.Unknown($"Expected transaction {txId} was not found in the broadcast store.")); } } @@ -175,10 +191,26 @@ await broadcasters return; - void BroadcastSuccess(Unit _) + void BroadcastSuccess(BroadcastOk ok) { broadcastedSuccessfully = true; - BroadcastSuccessfully(tx); + switch (ok) + { + case BroadcastOk.BroadcastedByBackend: + Logger.LogInfo($"Transaction is successfully broadcasted {tx.GetHash()} by backend."); + break; + case BroadcastOk.BroadcastedByRpc: + Logger.LogInfo($"Transaction is successfully broadcasted {tx.GetHash()} by local node RPC interface."); + break; + case BroadcastOk.BroadcastedByNetwork n: + foreach (var confirmedPropagators in n.Nodes) + { + Logger.LogInfo($"Transaction is successfully broadcasted {tx.GetHash()} and propagated by {confirmedPropagators}."); + } + + break; + } + BelieveTransaction(tx); } } @@ -216,12 +248,6 @@ private void BroadcastError(BroadcastError broadcastError) } } - private void BroadcastSuccessfully(SmartTransaction tx) - { - BelieveTransaction(tx); - Logger.LogInfo($"Transaction is successfully broadcasted: {tx.GetHash()}."); - } - private void BelieveTransaction(SmartTransaction transaction) { if (transaction.Height == Height.Unknown) @@ -229,7 +255,7 @@ private void BelieveTransaction(SmartTransaction transaction) transaction.SetUnconfirmed(); } - mempoolService.TryAddToBroadcastStore(transaction, "N/A"); + mempoolService.TryAddToBroadcastStore(transaction); walletManager.Process(transaction); } diff --git a/WalletWasabi/Blockchain/TransactionProcessing/TransactionProcessor.cs b/WalletWasabi/Blockchain/TransactionProcessing/TransactionProcessor.cs index 4ca2afdc416..38321d1584b 100644 --- a/WalletWasabi/Blockchain/TransactionProcessing/TransactionProcessor.cs +++ b/WalletWasabi/Blockchain/TransactionProcessing/TransactionProcessor.cs @@ -105,7 +105,7 @@ private ProcessedResult ProcessNoLock(SmartTransaction tx) uint256 txId = tx.GetHash(); // If we already have the transaction, then let's work on that. - if (MempoolService?.TryGetFromBroadcastStore(txId, null, out var foundEntry) is true) + if (MempoolService?.TryGetFromBroadcastStore(txId, out var foundEntry) is true) { // If we already have the transaction in the broadcast store, then let's work on that. foundEntry.Transaction.TryUpdate(tx); diff --git a/WalletWasabi/Blockchain/Transactions/TransactionBroadcastEntry.cs b/WalletWasabi/Blockchain/Transactions/TransactionBroadcastEntry.cs index f06c8387402..282b16ec974 100644 --- a/WalletWasabi/Blockchain/Transactions/TransactionBroadcastEntry.cs +++ b/WalletWasabi/Blockchain/Transactions/TransactionBroadcastEntry.cs @@ -1,3 +1,6 @@ +using System.Collections.Generic; +using System.Linq; +using System.Net; using System.Threading.Tasks; using NBitcoin; @@ -5,34 +8,53 @@ namespace WalletWasabi.Blockchain.Transactions; public class TransactionBroadcastEntry { - public TransactionBroadcastEntry(SmartTransaction transaction, string nodeRemoteSocketEndpoint) + public TransactionBroadcastEntry(SmartTransaction transaction) { Transaction = transaction; TransactionId = Transaction.GetHash(); - NodeRemoteSocketEndpoint = nodeRemoteSocketEndpoint; - BroadcastCompleted = new TaskCompletionSource(); - PropagationConfirmed = new TaskCompletionSource(); + BroadcastCompleted = new TaskCompletionSource(); + PropagationConfirmed = new TaskCompletionSource(); } public SmartTransaction Transaction { get; } public uint256 TransactionId { get; } - public string NodeRemoteSocketEndpoint { get; } - public TaskCompletionSource BroadcastCompleted { get; } - public TaskCompletionSource PropagationConfirmed { get; } + public TaskCompletionSource BroadcastCompleted { get; } + public TaskCompletionSource PropagationConfirmed { get; } - public void MakeBroadcasted() + private readonly HashSet _broadcastedTo = new(); + private readonly HashSet _confirmedBy = new(); + private readonly object _syncObj = new(); + + public void BroadcastedTo(EndPoint nodeEndpoint) + { + lock (_syncObj) + { + if (_broadcastedTo.Add(nodeEndpoint) && _broadcastedTo.Count > 1) + { + BroadcastCompleted.TrySetResult(_broadcastedTo.ToArray()); + } + } + } + + public bool WasBroadcastedTo(EndPoint nodeEndpoint) { - BroadcastCompleted.TrySetResult(); + return _broadcastedTo.Contains(nodeEndpoint); } - public void ConfirmPropagationOnce() + public void ConfirmPropagationOnce(EndPoint nodeEndpoint) { - PropagationConfirmed.TrySetResult(); + lock (_syncObj) + { + if (_confirmedBy.Add(nodeEndpoint) && _confirmedBy.Count > 1) + { + PropagationConfirmed.TrySetResult(_confirmedBy.ToArray()); + } + } } - public void ConfirmPropagationForGood() + public void ConfirmPropagationForGood(EndPoint nodeEndpoint) { - PropagationConfirmed.TrySetResult(); + PropagationConfirmed.TrySetResult([nodeEndpoint]); } }