Skip to content

Commit

Permalink
More resilient broadcasting mechanism (WalletWasabi#13381)
Browse files Browse the repository at this point in the history
  • Loading branch information
lontivero authored Sep 17, 2024
1 parent 86e8068 commit 9c9dacb
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 55 deletions.
8 changes: 4 additions & 4 deletions WalletWasabi/BitcoinP2p/P2pBehavior.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,20 @@ private async Task ProcessGetDataAsync(Node node, GetDataPayload payload)

foreach (var inv in payload.Inventory.Where(inv => inv.Type.HasFlag(InventoryType.MSG_TX) || inv.Type.HasFlag(InventoryType.MSG_WTX)))
{
if (MempoolService.TryGetFromBroadcastStore(inv.Hash, node.RemoteSocketEndpoint.ToString(), out TransactionBroadcastEntry? entry)) // If we have the transaction to be broadcasted then broadcast it now.
if (MempoolService.TryGetFromBroadcastStore(inv.Hash, out TransactionBroadcastEntry? entry)) // If we have the transaction to be broadcasted then broadcast it now.
{
try
{
var txPayload = new TxPayload(entry.Transaction.Transaction);
if (!node.IsConnected)
{
Logger.LogInfo($"Could not serve transaction. Node ({node.RemoteSocketEndpoint}) is not connected anymore: {entry.TransactionId}.");
Logger.LogDebug($"Could not serve transaction. Node ({node.RemoteSocketEndpoint}) is not connected anymore: {entry.TransactionId}.");
}
else
{
await node.SendMessageAsync(txPayload).ConfigureAwait(false);
entry.MakeBroadcasted();
Logger.LogInfo($"Successfully served transaction to node ({node.RemoteSocketEndpoint}): {entry.TransactionId}.");
entry.BroadcastedTo(node.RemoteSocketEndpoint);
Logger.LogDebug($"Successfully served transaction to node ({node.RemoteSocketEndpoint}): {entry.TransactionId}.");
}
}
catch (Exception ex)
Expand Down
6 changes: 3 additions & 3 deletions WalletWasabi/BitcoinP2p/TrustedP2pBehavior.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ protected override bool ProcessInventoryVector(InventoryVector inv, EndPoint rem
{
if (inv.Type.HasFlag(InventoryType.MSG_TX))
{
if (MempoolService.TryGetFromBroadcastStore(inv.Hash, remoteSocketEndpoint.ToString(), out TransactionBroadcastEntry? entry)) // If we have the transaction then adjust confirmation.
if (MempoolService.TryGetFromBroadcastStore(inv.Hash, out TransactionBroadcastEntry? entry)) // If we have the transaction then adjust confirmation.
{
if (entry.NodeRemoteSocketEndpoint == remoteSocketEndpoint.ToString())
if (entry.WasBroadcastedTo(remoteSocketEndpoint))
{
return false; // Wtf, why are you trying to broadcast it back to us?
}

entry.ConfirmPropagationForGood();
entry.ConfirmPropagationForGood(remoteSocketEndpoint);
}

// If we already processed it continue.
Expand Down
6 changes: 3 additions & 3 deletions WalletWasabi/BitcoinP2p/UntrustedP2pBehavior.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ protected override bool ProcessInventoryVector(InventoryVector inv, EndPoint rem
{
if (inv.Type.HasFlag(InventoryType.MSG_TX))
{
if (MempoolService.TryGetFromBroadcastStore(inv.Hash, remoteSocketEndpoint.ToString(), out TransactionBroadcastEntry? entry)) // If we have the transaction then adjust confirmation.
if (MempoolService.TryGetFromBroadcastStore(inv.Hash, out TransactionBroadcastEntry? entry)) // If we have the transaction then adjust confirmation.
{
if (entry.NodeRemoteSocketEndpoint == remoteSocketEndpoint.ToString())
if (entry.WasBroadcastedTo(remoteSocketEndpoint))
{
return false; // Wtf, why are you trying to broadcast it back to us?
}

entry.ConfirmPropagationOnce();
entry.ConfirmPropagationOnce(remoteSocketEndpoint);
}

// If we already processed it or we're in trusted node mode, then don't ask for it.
Expand Down
12 changes: 6 additions & 6 deletions WalletWasabi/Blockchain/Mempool/MempoolService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,34 +38,34 @@ public class MempoolService

public bool TrustedNodeMode { get; set; }

public bool TryAddToBroadcastStore(SmartTransaction transaction, string nodeRemoteSocketEndpoint)
public bool TryAddToBroadcastStore(SmartTransaction transaction)
{
lock (BroadcastStoreLock)
{
if (BroadcastStore.Any(x => x.TransactionId == transaction.GetHash() && x.NodeRemoteSocketEndpoint == nodeRemoteSocketEndpoint))
if (BroadcastStore.Any(x => x.TransactionId == transaction.GetHash()))
{
return false;
}

var entry = new TransactionBroadcastEntry(transaction, nodeRemoteSocketEndpoint);
var entry = new TransactionBroadcastEntry(transaction);
BroadcastStore.Add(entry);
return true;
}
}

public bool TryGetFromBroadcastStore(uint256 transactionHash, string? nodeRemoteSocketEndpoint, [NotNullWhen(true)] out TransactionBroadcastEntry? entry)
public bool TryGetFromBroadcastStore(uint256 transactionHash, [NotNullWhen(true)] out TransactionBroadcastEntry? entry)
{
lock (BroadcastStoreLock)
{
entry = BroadcastStore.FirstOrDefault(x => x.TransactionId == transactionHash && (nodeRemoteSocketEndpoint is null || nodeRemoteSocketEndpoint == x.NodeRemoteSocketEndpoint));
entry = BroadcastStore.FirstOrDefault(x => x.TransactionId == transactionHash);
return entry is not null;
}
}

public LabelsArray TryGetLabel(uint256 txid)
{
var label = LabelsArray.Empty;
if (TryGetFromBroadcastStore(txid, null, out var entry))
if (TryGetFromBroadcastStore(txid, out var entry))
{
label = entry.Transaction.Labels;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using NBitcoin;
using NBitcoin.Protocol;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using NBitcoin.RPC;
Expand All @@ -16,7 +17,17 @@

namespace WalletWasabi.Blockchain.TransactionBroadcasting;

using BroadcastingResult = Result<BroadcastError>;
using BroadcastingResult = Result<BroadcastOk, BroadcastError>;


public abstract record BroadcastOk
{
public record BroadcastedByRpc : BroadcastOk;

public record BroadcastedByNetwork(EndPoint[] Nodes) : BroadcastOk;

public record BroadcastedByBackend() : BroadcastOk;
}

public abstract record BroadcastError
{
Expand All @@ -41,7 +52,7 @@ public async Task<BroadcastingResult> BroadcastAsync(SmartTransaction tx)
try
{
await rpcClient.SendRawTransactionAsync(tx.Transaction).ConfigureAwait(false);
return BroadcastingResult.Ok();
return BroadcastingResult.Ok(new BroadcastOk.BroadcastedByRpc());
}
catch (RPCException ex)
{
Expand All @@ -59,7 +70,7 @@ public async Task<BroadcastingResult> BroadcastAsync(SmartTransaction tx)
{
var wasabiClient = new WasabiClient(wasabiHttpClientFactory.NewHttpClientWithCircuitPerRequest());
await wasabiClient.BroadcastAsync(tx).ConfigureAwait(false);
return BroadcastingResult.Ok();
return BroadcastingResult.Ok(new BroadcastOk.BroadcastedByBackend());
}
catch (HttpRequestException ex)
{
Expand Down Expand Up @@ -87,16 +98,16 @@ public async Task<BroadcastingResult> BroadcastAsync(SmartTransaction tx)
{
return BroadcastingResult.Fail(new BroadcastError.NotEnoughP2pNodes());
}
var connectedNodeCount = nodes.ConnectedNodes.Count;
var connectedNodeCount = nodes.ConnectedNodes.Count(x => x.IsConnected);
var broadcastToNodeTasks = nodes.ConnectedNodes
.Where(n => n.IsConnected)
.OrderBy(_ => Guid.NewGuid())
.Take(1 + connectedNodeCount / 3)
.Take(2 + connectedNodeCount / 4)
.Select(n => BroadcastCoreAsync(tx, n))
.ToList();

var tasksToWaitFor = broadcastToNodeTasks.ToList();
Task<Result<BroadcastError>> completedTask;
Task<BroadcastingResult> completedTask;
do
{
completedTask = await Task.WhenAny(tasksToWaitFor).ConfigureAwait(false);
Expand All @@ -109,22 +120,27 @@ public async Task<BroadcastingResult> BroadcastAsync(SmartTransaction tx)
} while (completedTask.IsFaulted && tasksToWaitFor.Count > 0);

var results = await Task.WhenAll(broadcastToNodeTasks).ConfigureAwait(false);
return results.SequenceResults().ThenError<BroadcastError>(e => new BroadcastError.AggregatedErrors(e));
var errors = results
.Select(r => r.Match(_ => (IsError: false, Error: null)!, e => (IsError: true, Error: e)))
.Where(x => x.IsError)
.Select(x => x.Error)
.ToArray();
return BroadcastingResult.Fail(new BroadcastError.AggregatedErrors(errors));
}

private async Task<BroadcastingResult> BroadcastCoreAsync(SmartTransaction tx, Node node)
{
Logger.LogInfo($"Trying to broadcast transaction with random node ({node.RemoteSocketAddress}):{tx.GetHash()}.");
var txId = tx.GetHash();
if (!mempoolService.TryAddToBroadcastStore(tx, node.RemoteSocketEndpoint.ToString())) // So we'll reply to INV with this transaction.
if (!mempoolService.TryAddToBroadcastStore(tx)) // So we'll reply to INV with this transaction.
{
Logger.LogInfo($"Transaction {txId} was already present in the broadcast store.");
Logger.LogDebug($"Transaction {txId} was already present in the broadcast store.");
}
var invPayload = new InvPayload(tx.Transaction);

await node.SendMessageAsync(invPayload).WaitAsync(TimeSpan.FromSeconds(3)).ConfigureAwait(false); // ToDo: It's dangerous way to cancel. Implement proper cancellation to NBitcoin!

if (mempoolService.TryGetFromBroadcastStore(txId, node.RemoteSocketEndpoint.ToString(), out TransactionBroadcastEntry? entry))
if (mempoolService.TryGetFromBroadcastStore(txId, out TransactionBroadcastEntry? entry))
{
var broadcastTimeoutTask = Task.Delay(7000);
var broadcastFinishedTask = await Task.WhenAny([broadcastTimeoutTask, entry.BroadcastCompleted.Task]).ConfigureAwait(false);
Expand All @@ -137,19 +153,19 @@ private async Task<BroadcastingResult> BroadcastCoreAsync(SmartTransaction tx, N
Logger.LogInfo($"Disconnected node: {node.RemoteSocketAddress}. Successfully broadcasted transaction: {txId}.");

var propagationTimeoutTask = Task.Delay(7000);
var propagationFinishedTask = await Task.WhenAny([ broadcastTimeoutTask, entry.PropagationConfirmed.Task]).ConfigureAwait(false);
var propagationTask = entry.PropagationConfirmed.Task;
var propagationFinishedTask = await Task.WhenAny([ propagationTimeoutTask, propagationTask]).ConfigureAwait(false);

if (propagationFinishedTask == propagationTimeoutTask)
{
return BroadcastingResult.Fail(new BroadcastError.Unknown("Timed out to verify propagation."));
}
}
else
{
Logger.LogWarning($"Expected transaction {txId} was not found in the broadcast store.");

var propagators = await propagationTask.ConfigureAwait(false);
return BroadcastingResult.Ok(new BroadcastOk.BroadcastedByNetwork(propagators));
}

return BroadcastingResult.Ok();
return BroadcastingResult.Fail(new BroadcastError.Unknown($"Expected transaction {txId} was not found in the broadcast store."));
}
}

Expand All @@ -175,10 +191,26 @@ await broadcasters

return;

void BroadcastSuccess(Unit _)
void BroadcastSuccess(BroadcastOk ok)
{
broadcastedSuccessfully = true;
BroadcastSuccessfully(tx);
switch (ok)
{
case BroadcastOk.BroadcastedByBackend:
Logger.LogInfo($"Transaction is successfully broadcasted {tx.GetHash()} by backend.");
break;
case BroadcastOk.BroadcastedByRpc:
Logger.LogInfo($"Transaction is successfully broadcasted {tx.GetHash()} by local node RPC interface.");
break;
case BroadcastOk.BroadcastedByNetwork n:
foreach (var confirmedPropagators in n.Nodes)
{
Logger.LogInfo($"Transaction is successfully broadcasted {tx.GetHash()} and propagated by {confirmedPropagators}.");
}

break;
}
BelieveTransaction(tx);
}
}

Expand Down Expand Up @@ -216,20 +248,14 @@ private void BroadcastError(BroadcastError broadcastError)
}
}

private void BroadcastSuccessfully(SmartTransaction tx)
{
BelieveTransaction(tx);
Logger.LogInfo($"Transaction is successfully broadcasted: {tx.GetHash()}.");
}

private void BelieveTransaction(SmartTransaction transaction)
{
if (transaction.Height == Height.Unknown)
{
transaction.SetUnconfirmed();
}

mempoolService.TryAddToBroadcastStore(transaction, "N/A");
mempoolService.TryAddToBroadcastStore(transaction);

walletManager.Process(transaction);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private ProcessedResult ProcessNoLock(SmartTransaction tx)
uint256 txId = tx.GetHash();

// If we already have the transaction, then let's work on that.
if (MempoolService?.TryGetFromBroadcastStore(txId, null, out var foundEntry) is true)
if (MempoolService?.TryGetFromBroadcastStore(txId, out var foundEntry) is true)
{
// If we already have the transaction in the broadcast store, then let's work on that.
foundEntry.Transaction.TryUpdate(tx);
Expand Down
48 changes: 35 additions & 13 deletions WalletWasabi/Blockchain/Transactions/TransactionBroadcastEntry.cs
Original file line number Diff line number Diff line change
@@ -1,38 +1,60 @@
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using NBitcoin;

namespace WalletWasabi.Blockchain.Transactions;

public class TransactionBroadcastEntry
{
public TransactionBroadcastEntry(SmartTransaction transaction, string nodeRemoteSocketEndpoint)
public TransactionBroadcastEntry(SmartTransaction transaction)
{
Transaction = transaction;
TransactionId = Transaction.GetHash();
NodeRemoteSocketEndpoint = nodeRemoteSocketEndpoint;
BroadcastCompleted = new TaskCompletionSource();
PropagationConfirmed = new TaskCompletionSource();
BroadcastCompleted = new TaskCompletionSource<EndPoint[]>();
PropagationConfirmed = new TaskCompletionSource<EndPoint[]>();
}

public SmartTransaction Transaction { get; }
public uint256 TransactionId { get; }
public string NodeRemoteSocketEndpoint { get; }

public TaskCompletionSource BroadcastCompleted { get; }
public TaskCompletionSource PropagationConfirmed { get; }
public TaskCompletionSource<EndPoint[]> BroadcastCompleted { get; }
public TaskCompletionSource<EndPoint[]> PropagationConfirmed { get; }

public void MakeBroadcasted()
private readonly HashSet<EndPoint> _broadcastedTo = new();
private readonly HashSet<EndPoint> _confirmedBy = new();
private readonly object _syncObj = new();

public void BroadcastedTo(EndPoint nodeEndpoint)
{
lock (_syncObj)
{
if (_broadcastedTo.Add(nodeEndpoint) && _broadcastedTo.Count > 1)
{
BroadcastCompleted.TrySetResult(_broadcastedTo.ToArray());
}
}
}

public bool WasBroadcastedTo(EndPoint nodeEndpoint)
{
BroadcastCompleted.TrySetResult();
return _broadcastedTo.Contains(nodeEndpoint);
}

public void ConfirmPropagationOnce()
public void ConfirmPropagationOnce(EndPoint nodeEndpoint)
{
PropagationConfirmed.TrySetResult();
lock (_syncObj)
{
if (_confirmedBy.Add(nodeEndpoint) && _confirmedBy.Count > 1)
{
PropagationConfirmed.TrySetResult(_confirmedBy.ToArray());
}
}
}

public void ConfirmPropagationForGood()
public void ConfirmPropagationForGood(EndPoint nodeEndpoint)
{
PropagationConfirmed.TrySetResult();
PropagationConfirmed.TrySetResult([nodeEndpoint]);
}
}

0 comments on commit 9c9dacb

Please sign in to comment.