Skip to content

Commit

Permalink
Merge branch 'master' into SendManualControl2
Browse files Browse the repository at this point in the history
  • Loading branch information
ichthus1604 committed Apr 24, 2024
2 parents 0c66b5d + f8305d2 commit 4a2dfad
Show file tree
Hide file tree
Showing 103 changed files with 2,053 additions and 869 deletions.
5 changes: 4 additions & 1 deletion WalletWasabi.Backend/Global.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ public Global(string dataDir, IRPCClient rpcClient, Config config, IHttpClientFa
P2pNode = new(config.Network, config.GetBitcoinP2pEndPoint(), new(), $"/WasabiCoordinator:{Constants.BackendMajorVersion}/");
HostedServices.Register<BlockNotifier>(() => new BlockNotifier(TimeSpan.FromSeconds(7), rpcClient, P2pNode), "Block Notifier");

EventBus = new EventBus();

// Initialize index building
var indexBuilderServiceDir = Path.Combine(DataDir, "IndexBuilderService");
var indexFilePath = Path.Combine(indexBuilderServiceDir, $"Index{RpcClient.Network}.dat");
IndexBuilderService = new(IndexType.SegwitTaproot, RpcClient, HostedServices.Get<BlockNotifier>(), indexFilePath);
IndexBuilderService = new(IndexType.SegwitTaproot, RpcClient, HostedServices.Get<BlockNotifier>(), indexFilePath, EventBus);

MempoolMirror = new MempoolMirror(TimeSpan.FromSeconds(21), RpcClient, P2pNode);
CoinJoinMempoolManager = new CoinJoinMempoolManager(CoinJoinIdStore, MempoolMirror);
Expand Down Expand Up @@ -75,6 +77,7 @@ public Global(string dataDir, IRPCClient rpcClient, Config config, IHttpClientFa
private Whitelist? WhiteList { get; set; }
public MempoolMirror MempoolMirror { get; }
public CoinJoinMempoolManager CoinJoinMempoolManager { get; private set; }
public EventBus EventBus { get; private set; }

public async Task InitializeAsync(CancellationToken cancel)
{
Expand Down
30 changes: 30 additions & 0 deletions WalletWasabi.Backend/Middlewares/Extensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System.Linq;
using System.Net.WebSockets;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;

namespace WalletWasabi.Backend.Middlewares;

public static class WebSocketManagerExtensions
{
public static IServiceCollection AddWebSocketHandlers(this IServiceCollection services)
{
services.AddSingleton<WebSocketsConnectionTracker>();

if (Assembly.GetEntryAssembly() is { } assembly)
{
foreach (var type in assembly.ExportedTypes.Where(t => !t.IsAbstract && t.IsAssignableTo(typeof(WebSocketHandlerBase))))
{
services.AddSingleton(type);
}
}
return services;
}

public static IApplicationBuilder MapWebSocketManager(this IApplicationBuilder app, PathString path, WebSocketHandlerBase handlerBase) =>
app.Map(path, _app => _app.UseMiddleware<WebSocketHandlerMiddleware>(handlerBase));
}
229 changes: 229 additions & 0 deletions WalletWasabi.Backend/Middlewares/SatoshiWebSocketHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using NBitcoin;
using WalletWasabi.Backend.Models;
using WalletWasabi.Blockchain.Analysis.FeesEstimation;
using WalletWasabi.Blockchain.BlockFilters;
using WalletWasabi.Extensions;
using WalletWasabi.Helpers;
using WalletWasabi.Logging;
using WalletWasabi.Services;
using WalletWasabi.Synchronization;

namespace WalletWasabi.Backend.Middlewares;

/// <summary>
/// SatoshiWebSocketHandler is the websocket handler than provides all information from the server that
/// can be delivered to the Satoshi identity. This are:
/// * Compact filters
/// * Rounds' state updates
/// * Mining fee updates
/// </summary>
/// <remarks>
/// This websockethandler is essentially a one-way only channel (server -> client). The only exception to that
/// is the initial handshake that is started by the client sending the best known block hash. After that, all
/// messages from the client are simply ignored.
/// </remarks>
public class SatoshiWebSocketHandler : WebSocketHandlerBase
{
private readonly IndexBuilderService _indexBuilderService;
private readonly EventBus _eventBus;
private readonly Dictionary<WebSocket, List<IDisposable>> _socketResources = new();
private readonly object _synObj = new();

public SatoshiWebSocketHandler(
WebSocketsConnectionTracker connectionTracker,
EventBus eventBus,
IndexBuilderService indexBuilderService)
: base(connectionTracker)
{
_eventBus = eventBus;
_indexBuilderService = indexBuilderService;
}

public override Task OnConnectedAsync(WebSocket socket, CancellationToken cancellationToken)
{
// Subscribe to changes in the exchange rate rates and send them immediately.
Subscribe(socket, NotifyExchangeRate);

// Subscribe to changes in the mining fee rates and send them immediately.
Subscribe(socket, NotifyFeeEstimations);

// Subscribe to changes in the rounds and send them immediately.
// _eventBus.Subscribe();

return base.OnConnectedAsync(socket, cancellationToken);
}

private void Subscribe<T>(WebSocket socket, Func<WebSocket, Action<T>> builder) where T : notnull
{
lock (_synObj)
{
var notification = _eventBus.Subscribe(builder(socket));
var resources = _socketResources.TryGetValue(socket, out var r) ? r : [];
resources.Add(notification);
_socketResources[socket] = resources;
}
}

public override Task OnDisconnectedAsync(WebSocket socket, CancellationToken cancellationToken)
{
lock (_synObj)
{
foreach (var disposable in _socketResources[socket])
{
disposable.Dispose();
}
}
return base.OnDisconnectedAsync(socket, cancellationToken);
}

/// <summary>
/// Receives the initial message from the client containing the bestknownblockhash required
/// to start sending the missing filters to the client. After that it launches the process
/// that sends the filters and other info to the client.
/// </summary>
/// <param name="socketState">The websocket connection state.</param>
/// <param name="result">The reading result.</param>
/// <param name="buffer">The buffer containing the message received from the client.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public override async Task ReceiveAsync(
WebSocketConnectionState socketState,
WebSocketReceiveResult result,
byte[] buffer,
CancellationToken cancellationToken)
{
if (!socketState.Handshaked && result.MessageType == WebSocketMessageType.Binary)
{
switch ((RequestMessage) buffer[0])
{
case RequestMessage.BestKnownBlockHash:
try
{
using var reader = new BinaryReader(new MemoryStream(buffer[1..]));
var bestKnownBlockHash = reader.ReadUInt256();
socketState.Handshaked = true;

await SendSoftwareVersionAsync(socketState.WebSocket, cancellationToken);
await SendLegalDocumentVersionAsync(socketState.WebSocket, cancellationToken);
await SendBlockHeightAsync(socketState.WebSocket, cancellationToken);
await StartSendingFiltersAsync(socketState.WebSocket, bestKnownBlockHash, cancellationToken);
}
catch (Exception e) when (e is FormatException or InvalidOperationException)
{
await SendHandshakeErrorAsync(socketState.WebSocket, cancellationToken);
}

break;
default:
await SendHandshakeErrorAsync(socketState.WebSocket, cancellationToken);
break;
}
}
}

private static Task SendHandshakeErrorAsync(WebSocket webSocket, CancellationToken cancellationToken) =>
webSocket.SendAsync(
new[] { (byte) ResponseMessage.HandshakeError },
WebSocketMessageType.Binary,
true,
cancellationToken);

private async Task StartSendingFiltersAsync(
WebSocket webSocket,
uint256 bestKnownBlockHash,
CancellationToken cancellationToken)
{
// First we send all the filters from the bestknownblockhash until the tip
await SendMissingFiltersAsync(webSocket, bestKnownBlockHash, cancellationToken);

// Subscribe to the filters creation and send filters immediately after they are create.
Subscribe(webSocket, SendFilter);
}

private Task SendBlockHeightAsync(WebSocket webSocket, CancellationToken cancellationToken)
{
var lastFilter = _indexBuilderService.GetLastFilter();
var bestBlockHeight = lastFilter.Header.Height;
var message = new BlockHeightMessage(bestBlockHeight);
return webSocket.SendAsync(message.ToByteArray(), WebSocketMessageType.Binary, true, cancellationToken);
}

private Task SendSoftwareVersionAsync(WebSocket webSocket, CancellationToken cancellationToken)
{
var clientVersion = Constants.ClientVersion;
var backendVersion = new Version(int.Parse(Constants.BackendMajorVersion), 0, 0);
var message = new VersionMessage(clientVersion, backendVersion);
return webSocket.SendAsync(message.ToByteArray(), WebSocketMessageType.Binary, true, cancellationToken);
}

private Task SendLegalDocumentVersionAsync(WebSocket webSocket, CancellationToken cancellationToken)
{
var message = new LegalDocumentVersionMessage(Constants.Ww2LegalDocumentsVersion);
return webSocket.SendAsync(message.ToByteArray(), WebSocketMessageType.Binary, true, cancellationToken);
}

// <summary>
// SendMissingFiltersAsync sends all the filters since bestknownblockhash to the client.
// </summary>
// <param name="webSocket">The websocket.</param>
// <param name="bestKnownBlockHash">The latest block id known by the client.</param>
// <param name="cancellationToken">The cancellation token.</param>
private async Task SendMissingFiltersAsync(
WebSocket webSocket,
uint256 bestKnownBlockHash,
CancellationToken cancellationToken)
{
var lastTransmittedFilter = bestKnownBlockHash;
var getFiltersChunk = GetFiltersBucketStartingFrom(lastTransmittedFilter);

while (getFiltersChunk.Any())
{
foreach (var filter in getFiltersChunk)
{
var message = new FilterMessage(filter);
await webSocket.SendAsync(message.ToByteArray(), WebSocketMessageType.Binary, true, cancellationToken);

lastTransmittedFilter = filter.Header.BlockHash;
}

getFiltersChunk = GetFiltersBucketStartingFrom(lastTransmittedFilter);
}
}

private IEnumerable<FilterModel> GetFiltersBucketStartingFrom(uint256 startingBlockHash)
{
var (_, filters) = _indexBuilderService.GetFilterLinesExcluding(startingBlockHash, 1_000, out var found);
if (!found)
{
throw new InvalidOperationException($"Filter {startingBlockHash} not found");
}

return filters;
}

private Action<AllFeeEstimate> NotifyFeeEstimations(WebSocket ws) =>
allFeeEstimate =>
{
var message = new MiningFeeRatesMessage(allFeeEstimate);
ws.SendAsync(message.ToByteArray(), WebSocketMessageType.Binary, true, CancellationToken.None);
};

private Action<ExchangeRate> NotifyExchangeRate(WebSocket ws) =>
exchangeRate =>
{
var message = new ExchangeRateMessage(exchangeRate);
ws.SendAsync(message.ToByteArray(), WebSocketMessageType.Binary, true, CancellationToken.None);
};

private Action<FilterModel> SendFilter(WebSocket ws) =>
filter =>
{
var message = new FilterMessage(filter);
ws.SendAsync(message.ToByteArray(), WebSocketMessageType.Binary, true, CancellationToken.None);
};
}
10 changes: 10 additions & 0 deletions WalletWasabi.Backend/Middlewares/WebSocketConnectionState.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Net.WebSockets;

namespace WalletWasabi.Backend.Middlewares;

public class WebSocketConnectionState(WebSocket webSocket, DateTime connectedSince)
{
public WebSocket WebSocket { get; } = webSocket;
public DateTime ConnectedSince { get; } = connectedSince;
public bool Handshaked { get; set; }
}
54 changes: 54 additions & 0 deletions WalletWasabi.Backend/Middlewares/WebSocketHandlerBase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;

namespace WalletWasabi.Backend.Middlewares;

/// <summary>
/// WebSocketHandlerBase is the base class for all WebSocketHandlers.
/// WebSocketHandlers are referenced and called by the WebSocketHandlerMiddleware instance
/// </summary>
/// <param name="connectionTracker">The instance that keeps track of all websockets.</param>
public abstract class WebSocketHandlerBase(WebSocketsConnectionTracker connectionTracker)
{
/// <summary>
/// OnConnectAsync is called by the WebSocketHandlerMiddleware instance every time a new
/// websocket connection is accepted.
/// </summary>
/// <param name="socket">The websocket instance.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public virtual Task OnConnectedAsync(WebSocket socket, CancellationToken cancellationToken)
{
connectionTracker.AddSocket(socket);
return Task.CompletedTask;
}

/// <summary>
/// OnDisconnectedAsync is called by the WebSocketHandlerMiddleware instance every time
/// a websocket starts the closing handshake or simply closed the connection unilaterally.
/// </summary>
/// <param name="socket">The web socket.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public virtual Task OnDisconnectedAsync(WebSocket socket, CancellationToken cancellationToken)
{
connectionTracker.RemoveSocket(socket);
return socket.State is WebSocketState.Open
? socket.CloseAsync(
WebSocketCloseStatus.NormalClosure,
$"Closed by the {nameof(WebSocketHandlerBase)}",
cancellationToken)
: Task.CompletedTask;
}

/// <summary>
/// Receives
/// </summary>
/// <param name="socket">The websocket.</param>
/// <param name="result">The websocket reading result.</param>
/// <param name="buffer">The buffer containing the read message</param>
/// <param name="cancellationToken">The cancellationToken.</param>
public virtual Task ReceiveAsync(WebSocket socket, WebSocketReceiveResult result, byte[] buffer, CancellationToken cancellationToken) =>
ReceiveAsync(connectionTracker.GetWebSocketConnectionState(socket), result, buffer, cancellationToken);

public abstract Task ReceiveAsync(WebSocketConnectionState socketState, WebSocketReceiveResult result, byte[] buffer, CancellationToken cancellationToken);
}
Loading

0 comments on commit 4a2dfad

Please sign in to comment.