Skip to content

Commit

Permalink
Trackers (#218)
Browse files Browse the repository at this point in the history
Fix for intermittently failing rate limiting test
Added ConnectionId to RequestDefinition to correctly handle connection and path rate limiting configuration
Added ValidateMessage method to websocket Query object to filter messages even though it is matched to the query based on the  ListenIdentifier
Added KlineTracker and TradeTracker implementation
  • Loading branch information
JKorf authored Oct 28, 2024
1 parent ed007b5 commit 9e86a08
Show file tree
Hide file tree
Showing 17 changed files with 2,386 additions and 5 deletions.
2 changes: 1 addition & 1 deletion CryptoExchange.Net.UnitTests/RestClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ public async Task EndpointRateLimiterMultipleEndpoints(string endpoint, bool exp
public async Task ApiKeyRateLimiterBasics(string key1, string key2, string endpoint1, string endpoint2, bool expectLimited)
{
var rateLimiter = new RateLimitGate("Test");
rateLimiter.AddGuard(new RateLimitGuard(RateLimitGuard.PerApiKey, new AuthenticatedEndpointFilter(true), 1, TimeSpan.FromSeconds(0.1), RateLimitWindowType.Fixed));
rateLimiter.AddGuard(new RateLimitGuard(RateLimitGuard.PerApiKey, new AuthenticatedEndpointFilter(true), 1, TimeSpan.FromSeconds(0.1), RateLimitWindowType.Sliding));
var requestDefinition1 = new RequestDefinition(endpoint1, HttpMethod.Get) { Authenticated = key1 != null };
var requestDefinition2 = new RequestDefinition(endpoint2, HttpMethod.Get) { Authenticated = key2 != null };

Expand Down
292 changes: 292 additions & 0 deletions CryptoExchange.Net/Logging/Extensions/TrackerLoggingExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
using System;
using CryptoExchange.Net.Objects;
using Microsoft.Extensions.Logging;

namespace CryptoExchange.Net.Logging.Extensions
{
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member

public static class TrackerLoggingExtensions
{
private static readonly Action<ILogger, string, SyncStatus, SyncStatus, Exception?> _klineTrackerStatusChanged;
private static readonly Action<ILogger, string, Exception?> _klineTrackerStarting;
private static readonly Action<ILogger, string, string, Exception?> _klineTrackerStartFailed;
private static readonly Action<ILogger, string, Exception?> _klineTrackerStarted;
private static readonly Action<ILogger, string, Exception?> _klineTrackerStopping;
private static readonly Action<ILogger, string, Exception?> _klineTrackerStopped;
private static readonly Action<ILogger, string, DateTime, Exception?> _klineTrackerInitialDataSet;
private static readonly Action<ILogger, string, DateTime, Exception?> _klineTrackerKlineUpdated;
private static readonly Action<ILogger, string, DateTime, Exception?> _klineTrackerKlineAdded;
private static readonly Action<ILogger, string, Exception?> _klineTrackerConnectionLost;
private static readonly Action<ILogger, string, Exception?> _klineTrackerConnectionClosed;
private static readonly Action<ILogger, string, Exception?> _klineTrackerConnectionRestored;

private static readonly Action<ILogger, string, SyncStatus, SyncStatus, Exception?> _tradeTrackerStatusChanged;
private static readonly Action<ILogger, string, Exception?> _tradeTrackerStarting;
private static readonly Action<ILogger, string, string, Exception?> _tradeTrackerStartFailed;
private static readonly Action<ILogger, string, Exception?> _tradeTrackerStarted;
private static readonly Action<ILogger, string, Exception?> _tradeTrackerStopping;
private static readonly Action<ILogger, string, Exception?> _tradeTrackerStopped;
private static readonly Action<ILogger, string, int, long, Exception?> _tradeTrackerInitialDataSet;
private static readonly Action<ILogger, string, long, Exception?> _tradeTrackerPreSnapshotSkip;
private static readonly Action<ILogger, string, long, Exception?> _tradeTrackerPreSnapshotApplied;
private static readonly Action<ILogger, string, long, Exception?> _tradeTrackerTradeAdded;
private static readonly Action<ILogger, string, Exception?> _tradeTrackerConnectionLost;
private static readonly Action<ILogger, string, Exception?> _tradeTrackerConnectionClosed;
private static readonly Action<ILogger, string, Exception?> _tradeTrackerConnectionRestored;

static TrackerLoggingExtensions()
{
_klineTrackerStatusChanged = LoggerMessage.Define<string, SyncStatus, SyncStatus>(
LogLevel.Debug,
new EventId(6001, "KlineTrackerStatusChanged"),
"Kline tracker for {Symbol} status changed: {OldStatus} => {NewStatus}");

_klineTrackerStarting = LoggerMessage.Define<string>(
LogLevel.Debug,
new EventId(6002, "KlineTrackerStarting"),
"Kline tracker for {Symbol} starting");

_klineTrackerStartFailed = LoggerMessage.Define<string, string>(
LogLevel.Warning,
new EventId(6003, "KlineTrackerStartFailed"),
"Kline tracker for {Symbol} failed to start: {Error}");

_klineTrackerStarted = LoggerMessage.Define<string>(
LogLevel.Information,
new EventId(6004, "KlineTrackerStarted"),
"Kline tracker for {Symbol} started");

_klineTrackerStopping = LoggerMessage.Define<string>(
LogLevel.Debug,
new EventId(6005, "KlineTrackerStopping"),
"Kline tracker for {Symbol} stopping");

_klineTrackerStopped = LoggerMessage.Define<string>(
LogLevel.Information,
new EventId(6006, "KlineTrackerStopped"),
"Kline tracker for {Symbol} stopped");

_klineTrackerInitialDataSet = LoggerMessage.Define<string, DateTime>(
LogLevel.Debug,
new EventId(6007, "KlineTrackerInitialDataSet"),
"Kline tracker for {Symbol} initial data set, last timestamp: {LastTime}");

_klineTrackerKlineUpdated = LoggerMessage.Define<string, DateTime>(
LogLevel.Trace,
new EventId(6008, "KlineTrackerKlineUpdated"),
"Kline tracker for {Symbol} kline updated for open time: {LastTime}");

_klineTrackerKlineAdded = LoggerMessage.Define<string, DateTime>(
LogLevel.Trace,
new EventId(6009, "KlineTrackerKlineAdded"),
"Kline tracker for {Symbol} new kline for open time: {LastTime}");

_klineTrackerConnectionLost = LoggerMessage.Define<string>(
LogLevel.Warning,
new EventId(6010, "KlineTrackerConnectionLost"),
"Kline tracker for {Symbol} connection lost");

_klineTrackerConnectionClosed = LoggerMessage.Define<string>(
LogLevel.Warning,
new EventId(6011, "KlineTrackerConnectionClosed"),
"Kline tracker for {Symbol} disconnected");

_klineTrackerConnectionRestored = LoggerMessage.Define<string>(
LogLevel.Information,
new EventId(6012, "KlineTrackerConnectionRestored"),
"Kline tracker for {Symbol} successfully resynchronized");


_tradeTrackerStatusChanged = LoggerMessage.Define<string, SyncStatus, SyncStatus>(
LogLevel.Debug,
new EventId(6013, "KlineTrackerStatusChanged"),
"Trade tracker for {Symbol} status changed: {OldStatus} => {NewStatus}");

_tradeTrackerStarting = LoggerMessage.Define<string>(
LogLevel.Debug,
new EventId(6014, "KlineTrackerStarting"),
"Trade tracker for {Symbol} starting");

_tradeTrackerStartFailed = LoggerMessage.Define<string, string>(
LogLevel.Warning,
new EventId(6015, "KlineTrackerStartFailed"),
"Trade tracker for {Symbol} failed to start: {Error}");

_tradeTrackerStarted = LoggerMessage.Define<string>(
LogLevel.Information,
new EventId(6016, "KlineTrackerStarted"),
"Trade tracker for {Symbol} started");

_tradeTrackerStopping = LoggerMessage.Define<string>(
LogLevel.Debug,
new EventId(6017, "KlineTrackerStopping"),
"Trade tracker for {Symbol} stopping");

_tradeTrackerStopped = LoggerMessage.Define<string>(
LogLevel.Information,
new EventId(6018, "KlineTrackerStopped"),
"Trade tracker for {Symbol} stopped");

_tradeTrackerInitialDataSet = LoggerMessage.Define<string, int, long>(
LogLevel.Debug,
new EventId(6019, "TradeTrackerInitialDataSet"),
"Trade tracker for {Symbol} snapshot set, Count: {Count}, Last id: {LastId}");

_tradeTrackerPreSnapshotSkip = LoggerMessage.Define<string, long>(
LogLevel.Trace,
new EventId(6020, "TradeTrackerPreSnapshotSkip"),
"Trade tracker for {Symbol} skipping {Id}, already in snapshot");

_tradeTrackerPreSnapshotApplied = LoggerMessage.Define<string, long>(
LogLevel.Trace,
new EventId(6021, "TradeTrackerPreSnapshotApplied"),
"Trade tracker for {Symbol} adding {Id} from pre-snapshot");

_tradeTrackerTradeAdded = LoggerMessage.Define<string, long>(
LogLevel.Trace,
new EventId(6022, "TradeTrackerTradeAdded"),
"Trade tracker for {Symbol} adding trade {Id}");

_tradeTrackerConnectionLost = LoggerMessage.Define<string>(
LogLevel.Warning,
new EventId(6023, "TradeTrackerConnectionLost"),
"Trade tracker for {Symbol} connection lost");

_tradeTrackerConnectionClosed = LoggerMessage.Define<string>(
LogLevel.Warning,
new EventId(6024, "TradeTrackerConnectionClosed"),
"Trade tracker for {Symbol} disconnected");

_tradeTrackerConnectionRestored = LoggerMessage.Define<string>(
LogLevel.Information,
new EventId(6025, "TradeTrackerConnectionRestored"),
"Trade tracker for {Symbol} successfully resynchronized");
}

public static void KlineTrackerStatusChanged(this ILogger logger, string symbol, SyncStatus oldStatus, SyncStatus newStatus)
{
_klineTrackerStatusChanged(logger, symbol, oldStatus, newStatus, null);
}

public static void KlineTrackerStarting(this ILogger logger, string symbol)
{
_klineTrackerStarting(logger, symbol, null);
}

public static void KlineTrackerStartFailed(this ILogger logger, string symbol, string error)
{
_klineTrackerStartFailed(logger, symbol, error, null);
}

public static void KlineTrackerStarted(this ILogger logger, string symbol)
{
_klineTrackerStarted(logger, symbol, null);
}

public static void KlineTrackerStopping(this ILogger logger, string symbol)
{
_klineTrackerStopping(logger, symbol, null);
}

public static void KlineTrackerStopped(this ILogger logger, string symbol)
{
_klineTrackerStopped(logger, symbol, null);
}

public static void KlineTrackerInitialDataSet(this ILogger logger, string symbol, DateTime lastTime)
{
_klineTrackerInitialDataSet(logger, symbol, lastTime, null);
}

public static void KlineTrackerKlineUpdated(this ILogger logger, string symbol, DateTime lastTime)
{
_klineTrackerKlineUpdated(logger, symbol, lastTime, null);
}

public static void KlineTrackerKlineAdded(this ILogger logger, string symbol, DateTime lastTime)
{
_klineTrackerKlineAdded(logger, symbol, lastTime, null);
}

public static void KlineTrackerConnectionLost(this ILogger logger, string symbol)
{
_klineTrackerConnectionLost(logger, symbol, null);
}

public static void KlineTrackerConnectionClosed(this ILogger logger, string symbol)
{
_klineTrackerConnectionClosed(logger, symbol, null);
}

public static void KlineTrackerConnectionRestored(this ILogger logger, string symbol)
{
_klineTrackerConnectionRestored(logger, symbol, null);
}

public static void TradeTrackerStatusChanged(this ILogger logger, string symbol, SyncStatus oldStatus, SyncStatus newStatus)
{
_tradeTrackerStatusChanged(logger, symbol, oldStatus, newStatus, null);
}

public static void TradeTrackerStarting(this ILogger logger, string symbol)
{
_tradeTrackerStarting(logger, symbol, null);
}

public static void TradeTrackerStartFailed(this ILogger logger, string symbol, string error)
{
_tradeTrackerStartFailed(logger, symbol, error, null);
}

public static void TradeTrackerStarted(this ILogger logger, string symbol)
{
_tradeTrackerStarted(logger, symbol, null);
}

public static void TradeTrackerStopping(this ILogger logger, string symbol)
{
_tradeTrackerStopping(logger, symbol, null);
}

public static void TradeTrackerStopped(this ILogger logger, string symbol)
{
_tradeTrackerStopped(logger, symbol, null);
}

public static void TradeTrackerInitialDataSet(this ILogger logger, string symbol, int count, long lastId)
{
_tradeTrackerInitialDataSet(logger, symbol, count, lastId, null);
}

public static void TradeTrackerPreSnapshotSkip(this ILogger logger, string symbol, long lastId)
{
_tradeTrackerPreSnapshotSkip(logger, symbol, lastId, null);
}

public static void TradeTrackerPreSnapshotApplied(this ILogger logger, string symbol, long lastId)
{
_tradeTrackerPreSnapshotApplied(logger, symbol, lastId, null);
}

public static void TradeTrackerTradeAdded(this ILogger logger, string symbol, long lastId)
{
_tradeTrackerTradeAdded(logger, symbol, lastId, null);
}

public static void TradeTrackerConnectionLost(this ILogger logger, string symbol)
{
_tradeTrackerConnectionLost(logger, symbol, null);
}

public static void TradeTrackerConnectionClosed(this ILogger logger, string symbol)
{
_tradeTrackerConnectionClosed(logger, symbol, null);
}

public static void TradeTrackerConnectionRestored(this ILogger logger, string symbol)
{
_tradeTrackerConnectionRestored(logger, symbol, null);
}
}
}
27 changes: 27 additions & 0 deletions CryptoExchange.Net/Objects/Enums.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,33 @@ public enum RequestBodyFormat
Json
}

/// <summary>
/// Tracker sync status
/// </summary>
public enum SyncStatus
{
/// <summary>
/// Not connected
/// </summary>
Disconnected,
/// <summary>
/// Syncing, data connection is being made
/// </summary>
Syncing,
/// <summary>
/// The connection is active, but the full data backlog is not yet reached. For example, a tracker set to retain 10 minutes of data only has 8 minutes of data at this moment.
/// </summary>
PartiallySynced,
/// <summary>
/// Synced
/// </summary>
Synced,
/// <summary>
/// Disposed
/// </summary>
Diposed
}

/// <summary>
/// Status of the order book
/// </summary>
Expand Down
6 changes: 5 additions & 1 deletion CryptoExchange.Net/Objects/RequestDefinition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,16 @@ public class RequestDefinition
/// </summary>
public IRateLimitGuard? LimitGuard { get; set; }


/// <summary>
/// Whether this request should never be cached
/// </summary>
public bool PreventCaching { get; set; }

/// <summary>
/// Connection id
/// </summary>
public int? ConnectionId { get; set; }

/// <summary>
/// ctor
/// </summary>
Expand Down
4 changes: 4 additions & 0 deletions CryptoExchange.Net/RateLimiting/Guards/RateLimitGuard.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ public class RateLimitGuard : IRateLimitGuard
/// </summary>
public static Func<RequestDefinition, string, string?, string> PerEndpoint { get; } = new Func<RequestDefinition, string, string?, string>((def, host, key) => def.Path + def.Method);
/// <summary>
/// Apply guard per connection
/// </summary>
public static Func<RequestDefinition, string, string?, string> PerConnection { get; } = new Func<RequestDefinition, string, string?, string>((def, host, key) => def.ConnectionId.ToString());
/// <summary>
/// Apply guard per API key
/// </summary>
public static Func<RequestDefinition, string, string?, string> PerApiKey { get; } = new Func<RequestDefinition, string, string?, string>((def, host, key) => key!);
Expand Down
4 changes: 2 additions & 2 deletions CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ private async Task<CallResult> ConnectInternalAsync()
{
if (Parameters.RateLimiter != null)
{
var definition = new RequestDefinition(Id.ToString(), HttpMethod.Get);
var definition = new RequestDefinition(Uri.AbsolutePath, HttpMethod.Get) { ConnectionId = Id };
var limitResult = await Parameters.RateLimiter.ProcessAsync(_logger, Id, RateLimitItemType.Connection, definition, _baseAddress, null, 1, Parameters.RateLimitingBehaviour, _ctsSource.Token).ConfigureAwait(false);
if (!limitResult)
return new CallResult(new ClientRateLimitError("Connection limit reached"));
Expand Down Expand Up @@ -475,7 +475,7 @@ public void Dispose()
/// <returns></returns>
private async Task SendLoopAsync()
{
var requestDefinition = new RequestDefinition(Id.ToString(), HttpMethod.Get);
var requestDefinition = new RequestDefinition(Uri.AbsolutePath, HttpMethod.Get) { ConnectionId = Id };
try
{
while (true)
Expand Down
Loading

0 comments on commit 9e86a08

Please sign in to comment.