diff --git a/CryptoExchange.Net.UnitTests/RestClientTests.cs b/CryptoExchange.Net.UnitTests/RestClientTests.cs index e06535b0..2838af41 100644 --- a/CryptoExchange.Net.UnitTests/RestClientTests.cs +++ b/CryptoExchange.Net.UnitTests/RestClientTests.cs @@ -302,7 +302,7 @@ public async Task EndpointRateLimiterMultipleEndpoints(string endpoint, bool exp public async Task ApiKeyRateLimiterBasics(string key1, string key2, string endpoint1, string endpoint2, bool expectLimited) { var rateLimiter = new RateLimitGate("Test"); - rateLimiter.AddGuard(new RateLimitGuard(RateLimitGuard.PerApiKey, new AuthenticatedEndpointFilter(true), 1, TimeSpan.FromSeconds(0.1), RateLimitWindowType.Fixed)); + rateLimiter.AddGuard(new RateLimitGuard(RateLimitGuard.PerApiKey, new AuthenticatedEndpointFilter(true), 1, TimeSpan.FromSeconds(0.1), RateLimitWindowType.Sliding)); var requestDefinition1 = new RequestDefinition(endpoint1, HttpMethod.Get) { Authenticated = key1 != null }; var requestDefinition2 = new RequestDefinition(endpoint2, HttpMethod.Get) { Authenticated = key2 != null }; diff --git a/CryptoExchange.Net/Logging/Extensions/TrackerLoggingExtensions.cs b/CryptoExchange.Net/Logging/Extensions/TrackerLoggingExtensions.cs new file mode 100644 index 00000000..514db432 --- /dev/null +++ b/CryptoExchange.Net/Logging/Extensions/TrackerLoggingExtensions.cs @@ -0,0 +1,292 @@ +using System; +using CryptoExchange.Net.Objects; +using Microsoft.Extensions.Logging; + +namespace CryptoExchange.Net.Logging.Extensions +{ +#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member + + public static class TrackerLoggingExtensions + { + private static readonly Action _klineTrackerStatusChanged; + private static readonly Action _klineTrackerStarting; + private static readonly Action _klineTrackerStartFailed; + private static readonly Action _klineTrackerStarted; + private static readonly Action _klineTrackerStopping; + private static readonly Action _klineTrackerStopped; + private static readonly Action _klineTrackerInitialDataSet; + private static readonly Action _klineTrackerKlineUpdated; + private static readonly Action _klineTrackerKlineAdded; + private static readonly Action _klineTrackerConnectionLost; + private static readonly Action _klineTrackerConnectionClosed; + private static readonly Action _klineTrackerConnectionRestored; + + private static readonly Action _tradeTrackerStatusChanged; + private static readonly Action _tradeTrackerStarting; + private static readonly Action _tradeTrackerStartFailed; + private static readonly Action _tradeTrackerStarted; + private static readonly Action _tradeTrackerStopping; + private static readonly Action _tradeTrackerStopped; + private static readonly Action _tradeTrackerInitialDataSet; + private static readonly Action _tradeTrackerPreSnapshotSkip; + private static readonly Action _tradeTrackerPreSnapshotApplied; + private static readonly Action _tradeTrackerTradeAdded; + private static readonly Action _tradeTrackerConnectionLost; + private static readonly Action _tradeTrackerConnectionClosed; + private static readonly Action _tradeTrackerConnectionRestored; + + static TrackerLoggingExtensions() + { + _klineTrackerStatusChanged = LoggerMessage.Define( + LogLevel.Debug, + new EventId(6001, "KlineTrackerStatusChanged"), + "Kline tracker for {Symbol} status changed: {OldStatus} => {NewStatus}"); + + _klineTrackerStarting = LoggerMessage.Define( + LogLevel.Debug, + new EventId(6002, "KlineTrackerStarting"), + "Kline tracker for {Symbol} starting"); + + _klineTrackerStartFailed = LoggerMessage.Define( + LogLevel.Warning, + new EventId(6003, "KlineTrackerStartFailed"), + "Kline tracker for {Symbol} failed to start: {Error}"); + + _klineTrackerStarted = LoggerMessage.Define( + LogLevel.Information, + new EventId(6004, "KlineTrackerStarted"), + "Kline tracker for {Symbol} started"); + + _klineTrackerStopping = LoggerMessage.Define( + LogLevel.Debug, + new EventId(6005, "KlineTrackerStopping"), + "Kline tracker for {Symbol} stopping"); + + _klineTrackerStopped = LoggerMessage.Define( + LogLevel.Information, + new EventId(6006, "KlineTrackerStopped"), + "Kline tracker for {Symbol} stopped"); + + _klineTrackerInitialDataSet = LoggerMessage.Define( + LogLevel.Debug, + new EventId(6007, "KlineTrackerInitialDataSet"), + "Kline tracker for {Symbol} initial data set, last timestamp: {LastTime}"); + + _klineTrackerKlineUpdated = LoggerMessage.Define( + LogLevel.Trace, + new EventId(6008, "KlineTrackerKlineUpdated"), + "Kline tracker for {Symbol} kline updated for open time: {LastTime}"); + + _klineTrackerKlineAdded = LoggerMessage.Define( + LogLevel.Trace, + new EventId(6009, "KlineTrackerKlineAdded"), + "Kline tracker for {Symbol} new kline for open time: {LastTime}"); + + _klineTrackerConnectionLost = LoggerMessage.Define( + LogLevel.Warning, + new EventId(6010, "KlineTrackerConnectionLost"), + "Kline tracker for {Symbol} connection lost"); + + _klineTrackerConnectionClosed = LoggerMessage.Define( + LogLevel.Warning, + new EventId(6011, "KlineTrackerConnectionClosed"), + "Kline tracker for {Symbol} disconnected"); + + _klineTrackerConnectionRestored = LoggerMessage.Define( + LogLevel.Information, + new EventId(6012, "KlineTrackerConnectionRestored"), + "Kline tracker for {Symbol} successfully resynchronized"); + + + _tradeTrackerStatusChanged = LoggerMessage.Define( + LogLevel.Debug, + new EventId(6013, "KlineTrackerStatusChanged"), + "Trade tracker for {Symbol} status changed: {OldStatus} => {NewStatus}"); + + _tradeTrackerStarting = LoggerMessage.Define( + LogLevel.Debug, + new EventId(6014, "KlineTrackerStarting"), + "Trade tracker for {Symbol} starting"); + + _tradeTrackerStartFailed = LoggerMessage.Define( + LogLevel.Warning, + new EventId(6015, "KlineTrackerStartFailed"), + "Trade tracker for {Symbol} failed to start: {Error}"); + + _tradeTrackerStarted = LoggerMessage.Define( + LogLevel.Information, + new EventId(6016, "KlineTrackerStarted"), + "Trade tracker for {Symbol} started"); + + _tradeTrackerStopping = LoggerMessage.Define( + LogLevel.Debug, + new EventId(6017, "KlineTrackerStopping"), + "Trade tracker for {Symbol} stopping"); + + _tradeTrackerStopped = LoggerMessage.Define( + LogLevel.Information, + new EventId(6018, "KlineTrackerStopped"), + "Trade tracker for {Symbol} stopped"); + + _tradeTrackerInitialDataSet = LoggerMessage.Define( + LogLevel.Debug, + new EventId(6019, "TradeTrackerInitialDataSet"), + "Trade tracker for {Symbol} snapshot set, Count: {Count}, Last id: {LastId}"); + + _tradeTrackerPreSnapshotSkip = LoggerMessage.Define( + LogLevel.Trace, + new EventId(6020, "TradeTrackerPreSnapshotSkip"), + "Trade tracker for {Symbol} skipping {Id}, already in snapshot"); + + _tradeTrackerPreSnapshotApplied = LoggerMessage.Define( + LogLevel.Trace, + new EventId(6021, "TradeTrackerPreSnapshotApplied"), + "Trade tracker for {Symbol} adding {Id} from pre-snapshot"); + + _tradeTrackerTradeAdded = LoggerMessage.Define( + LogLevel.Trace, + new EventId(6022, "TradeTrackerTradeAdded"), + "Trade tracker for {Symbol} adding trade {Id}"); + + _tradeTrackerConnectionLost = LoggerMessage.Define( + LogLevel.Warning, + new EventId(6023, "TradeTrackerConnectionLost"), + "Trade tracker for {Symbol} connection lost"); + + _tradeTrackerConnectionClosed = LoggerMessage.Define( + LogLevel.Warning, + new EventId(6024, "TradeTrackerConnectionClosed"), + "Trade tracker for {Symbol} disconnected"); + + _tradeTrackerConnectionRestored = LoggerMessage.Define( + LogLevel.Information, + new EventId(6025, "TradeTrackerConnectionRestored"), + "Trade tracker for {Symbol} successfully resynchronized"); + } + + public static void KlineTrackerStatusChanged(this ILogger logger, string symbol, SyncStatus oldStatus, SyncStatus newStatus) + { + _klineTrackerStatusChanged(logger, symbol, oldStatus, newStatus, null); + } + + public static void KlineTrackerStarting(this ILogger logger, string symbol) + { + _klineTrackerStarting(logger, symbol, null); + } + + public static void KlineTrackerStartFailed(this ILogger logger, string symbol, string error) + { + _klineTrackerStartFailed(logger, symbol, error, null); + } + + public static void KlineTrackerStarted(this ILogger logger, string symbol) + { + _klineTrackerStarted(logger, symbol, null); + } + + public static void KlineTrackerStopping(this ILogger logger, string symbol) + { + _klineTrackerStopping(logger, symbol, null); + } + + public static void KlineTrackerStopped(this ILogger logger, string symbol) + { + _klineTrackerStopped(logger, symbol, null); + } + + public static void KlineTrackerInitialDataSet(this ILogger logger, string symbol, DateTime lastTime) + { + _klineTrackerInitialDataSet(logger, symbol, lastTime, null); + } + + public static void KlineTrackerKlineUpdated(this ILogger logger, string symbol, DateTime lastTime) + { + _klineTrackerKlineUpdated(logger, symbol, lastTime, null); + } + + public static void KlineTrackerKlineAdded(this ILogger logger, string symbol, DateTime lastTime) + { + _klineTrackerKlineAdded(logger, symbol, lastTime, null); + } + + public static void KlineTrackerConnectionLost(this ILogger logger, string symbol) + { + _klineTrackerConnectionLost(logger, symbol, null); + } + + public static void KlineTrackerConnectionClosed(this ILogger logger, string symbol) + { + _klineTrackerConnectionClosed(logger, symbol, null); + } + + public static void KlineTrackerConnectionRestored(this ILogger logger, string symbol) + { + _klineTrackerConnectionRestored(logger, symbol, null); + } + + public static void TradeTrackerStatusChanged(this ILogger logger, string symbol, SyncStatus oldStatus, SyncStatus newStatus) + { + _tradeTrackerStatusChanged(logger, symbol, oldStatus, newStatus, null); + } + + public static void TradeTrackerStarting(this ILogger logger, string symbol) + { + _tradeTrackerStarting(logger, symbol, null); + } + + public static void TradeTrackerStartFailed(this ILogger logger, string symbol, string error) + { + _tradeTrackerStartFailed(logger, symbol, error, null); + } + + public static void TradeTrackerStarted(this ILogger logger, string symbol) + { + _tradeTrackerStarted(logger, symbol, null); + } + + public static void TradeTrackerStopping(this ILogger logger, string symbol) + { + _tradeTrackerStopping(logger, symbol, null); + } + + public static void TradeTrackerStopped(this ILogger logger, string symbol) + { + _tradeTrackerStopped(logger, symbol, null); + } + + public static void TradeTrackerInitialDataSet(this ILogger logger, string symbol, int count, long lastId) + { + _tradeTrackerInitialDataSet(logger, symbol, count, lastId, null); + } + + public static void TradeTrackerPreSnapshotSkip(this ILogger logger, string symbol, long lastId) + { + _tradeTrackerPreSnapshotSkip(logger, symbol, lastId, null); + } + + public static void TradeTrackerPreSnapshotApplied(this ILogger logger, string symbol, long lastId) + { + _tradeTrackerPreSnapshotApplied(logger, symbol, lastId, null); + } + + public static void TradeTrackerTradeAdded(this ILogger logger, string symbol, long lastId) + { + _tradeTrackerTradeAdded(logger, symbol, lastId, null); + } + + public static void TradeTrackerConnectionLost(this ILogger logger, string symbol) + { + _tradeTrackerConnectionLost(logger, symbol, null); + } + + public static void TradeTrackerConnectionClosed(this ILogger logger, string symbol) + { + _tradeTrackerConnectionClosed(logger, symbol, null); + } + + public static void TradeTrackerConnectionRestored(this ILogger logger, string symbol) + { + _tradeTrackerConnectionRestored(logger, symbol, null); + } + } +} diff --git a/CryptoExchange.Net/Objects/Enums.cs b/CryptoExchange.Net/Objects/Enums.cs index 56783d80..a0f877c6 100644 --- a/CryptoExchange.Net/Objects/Enums.cs +++ b/CryptoExchange.Net/Objects/Enums.cs @@ -68,6 +68,33 @@ public enum RequestBodyFormat Json } + /// + /// Tracker sync status + /// + public enum SyncStatus + { + /// + /// Not connected + /// + Disconnected, + /// + /// Syncing, data connection is being made + /// + Syncing, + /// + /// The connection is active, but the full data backlog is not yet reached. For example, a tracker set to retain 10 minutes of data only has 8 minutes of data at this moment. + /// + PartiallySynced, + /// + /// Synced + /// + Synced, + /// + /// Disposed + /// + Diposed + } + /// /// Status of the order book /// diff --git a/CryptoExchange.Net/Objects/RequestDefinition.cs b/CryptoExchange.Net/Objects/RequestDefinition.cs index 1dd05673..e583dddd 100644 --- a/CryptoExchange.Net/Objects/RequestDefinition.cs +++ b/CryptoExchange.Net/Objects/RequestDefinition.cs @@ -58,12 +58,16 @@ public class RequestDefinition /// public IRateLimitGuard? LimitGuard { get; set; } - /// /// Whether this request should never be cached /// public bool PreventCaching { get; set; } + /// + /// Connection id + /// + public int? ConnectionId { get; set; } + /// /// ctor /// diff --git a/CryptoExchange.Net/RateLimiting/Guards/RateLimitGuard.cs b/CryptoExchange.Net/RateLimiting/Guards/RateLimitGuard.cs index 9501360e..e9e51270 100644 --- a/CryptoExchange.Net/RateLimiting/Guards/RateLimitGuard.cs +++ b/CryptoExchange.Net/RateLimiting/Guards/RateLimitGuard.cs @@ -18,6 +18,10 @@ public class RateLimitGuard : IRateLimitGuard /// public static Func PerEndpoint { get; } = new Func((def, host, key) => def.Path + def.Method); /// + /// Apply guard per connection + /// + public static Func PerConnection { get; } = new Func((def, host, key) => def.ConnectionId.ToString()); + /// /// Apply guard per API key /// public static Func PerApiKey { get; } = new Func((def, host, key) => key!); diff --git a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs index 3c0a445e..fd3fb650 100644 --- a/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs +++ b/CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs @@ -209,7 +209,7 @@ private async Task ConnectInternalAsync() { if (Parameters.RateLimiter != null) { - var definition = new RequestDefinition(Id.ToString(), HttpMethod.Get); + var definition = new RequestDefinition(Uri.AbsolutePath, HttpMethod.Get) { ConnectionId = Id }; var limitResult = await Parameters.RateLimiter.ProcessAsync(_logger, Id, RateLimitItemType.Connection, definition, _baseAddress, null, 1, Parameters.RateLimitingBehaviour, _ctsSource.Token).ConfigureAwait(false); if (!limitResult) return new CallResult(new ClientRateLimitError("Connection limit reached")); @@ -475,7 +475,7 @@ public void Dispose() /// private async Task SendLoopAsync() { - var requestDefinition = new RequestDefinition(Id.ToString(), HttpMethod.Get); + var requestDefinition = new RequestDefinition(Uri.AbsolutePath, HttpMethod.Get) { ConnectionId = Id }; try { while (true) diff --git a/CryptoExchange.Net/Sockets/Query.cs b/CryptoExchange.Net/Sockets/Query.cs index d79279dc..833abf8b 100644 --- a/CryptoExchange.Net/Sockets/Query.cs +++ b/CryptoExchange.Net/Sockets/Query.cs @@ -177,6 +177,10 @@ protected Query(object request, bool authenticated, int weight = 1) : base(reque /// public override async Task Handle(SocketConnection connection, DataEvent message) { + var typedMessage = message.As((TServerResponse)message.Data); + if (!ValidateMessage(typedMessage)) + return new CallResult(null); + CurrentResponses++; if (CurrentResponses == RequiredResponses) { @@ -186,7 +190,7 @@ public override async Task Handle(SocketConnection connection, DataE if (Result?.Success != false) // If an error result is already set don't override that - Result = HandleMessage(connection, message.As((TServerResponse)message.Data)); + Result = HandleMessage(connection, typedMessage); if (CurrentResponses == RequiredResponses) { @@ -198,6 +202,13 @@ public override async Task Handle(SocketConnection connection, DataE return Result; } + /// + /// Validate if a message is actually processable by this query + /// + /// + /// + public virtual bool ValidateMessage(DataEvent message) => true; + /// /// Handle the query response /// diff --git a/CryptoExchange.Net/Trackers/CompareValue.cs b/CryptoExchange.Net/Trackers/CompareValue.cs new file mode 100644 index 00000000..a6fe0335 --- /dev/null +++ b/CryptoExchange.Net/Trackers/CompareValue.cs @@ -0,0 +1,34 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace CryptoExchange.Net.Trackers +{ + + /// + /// Compare value + /// + public record CompareValue + { + /// + /// The value difference + /// + public decimal? Difference { get; set; } + /// + /// The value difference percentage + /// + public decimal? PercentageDifference { get; set; } + + /// + /// ctor + /// + public CompareValue(decimal? value1, decimal? value2) + { + if (value1 == null || value2 == null) + return; + + Difference = value2 - value1; + PercentageDifference = value1.Value == 0 ? null : Math.Round(value2.Value / value1.Value * 100 - 100, 4); + } + } +} diff --git a/CryptoExchange.Net/Trackers/Klines/IKlineTracker.cs b/CryptoExchange.Net/Trackers/Klines/IKlineTracker.cs new file mode 100644 index 00000000..63cc323e --- /dev/null +++ b/CryptoExchange.Net/Trackers/Klines/IKlineTracker.cs @@ -0,0 +1,105 @@ +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.SharedApis; +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Trackers.Klines +{ + /// + /// A tracker for kline data of a symbol + /// + public interface IKlineTracker + { + /// + /// The total number of klines + /// + int Count { get; } + + /// + /// Exchange name + /// + string Exchange { get; } + + /// + /// Symbol name + /// + string SymbolName { get; } + + /// + /// Symbol + /// + SharedSymbol Symbol { get; } + + /// + /// The max number of klines tracked + /// + int? Limit { get; } + + /// + /// The max age of the data tracked + /// + TimeSpan? Period { get; } + + /// + /// From which timestamp the trades are registered + /// + DateTime? SyncedFrom { get; } + + /// + /// Sync status + /// + SyncStatus Status { get; } + + /// + /// Get the last kline + /// + SharedKline? Last { get; } + + /// + /// Event for when a new kline is added + /// + event Func? OnAdded; + /// + /// Event for when a kline is removed because it's no longer within the period/limit window + /// + event Func? OnRemoved; + /// + /// Event for when a kline is updated + /// + event Func OnUpdated; + /// + /// Event for when the sync status changes + /// + event Func? OnStatusChanged; + + /// + /// Start synchronization + /// + /// + Task StartAsync(bool startWithSnapshot = true); + + /// + /// Stop synchronization + /// + /// + Task StopAsync(); + + /// + /// Get the data tracked + /// + /// Start timestamp to get the data from, defaults to tracked data start time + /// End timestamp to get the data until, defaults to current time + /// + IEnumerable GetData(DateTime? fromTimestamp = null, DateTime? toTimestamp = null); + + /// + /// Get statitistics on the klines + /// + /// Start timestamp to get the data from, defaults to tracked data start time + /// End timestamp to get the data until, defaults to current time + /// + KlinesStats GetStats(DateTime? fromTimestamp = null, DateTime? toTimestamp = null); + + } +} \ No newline at end of file diff --git a/CryptoExchange.Net/Trackers/Klines/KlineTracker.cs b/CryptoExchange.Net/Trackers/Klines/KlineTracker.cs new file mode 100644 index 00000000..9ed47b4f --- /dev/null +++ b/CryptoExchange.Net/Trackers/Klines/KlineTracker.cs @@ -0,0 +1,481 @@ +using CryptoExchange.Net.Logging.Extensions; +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.SharedApis; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Trackers.Klines +{ + /// + public class KlineTracker : IKlineTracker + { + private readonly IKlineSocketClient _socketClient; + private readonly IKlineRestClient _restClient; + private SyncStatus _status; + private bool _startWithSnapshot; + + /// + /// The internal data structure + /// + protected readonly Dictionary _data = new Dictionary(); + /// + /// The pre-snapshot queue buffering updates received before the snapshot is set and which will be applied after the snapshot was set + /// + protected readonly List _preSnapshotQueue = new List(); + /// + /// Lock for accessing _data + /// + protected readonly object _lock = new object(); + /// + /// The last time the window was applied + /// + protected DateTime _lastWindowApplied = DateTime.MinValue; + /// + /// Whether or not the data has changed since last window was applied + /// + protected bool _changed = false; + /// + /// The kline interval + /// + protected readonly SharedKlineInterval _interval; + /// + /// Whether the snapshot has been set + /// + protected bool _snapshotSet; + /// + /// Logger + /// + protected readonly ILogger _logger; + /// + /// Update subscription + /// + protected UpdateSubscription? _updateSubscription; + + /// + /// The timestamp of the first item + /// + protected DateTime? _firstTimestamp; + + /// + public SyncStatus Status + { + get => _status; + set + { + if (value == _status) + return; + + var old = _status; + _status = value; + _logger.KlineTrackerStatusChanged(SymbolName, old, value); + OnStatusChanged?.Invoke(old, _status); + } + } + + /// + public string Exchange { get; } + + /// + public string SymbolName { get; } + + /// + public SharedSymbol Symbol { get; } + + /// + public int? Limit { get; } + /// + public TimeSpan? Period { get; } + + /// + public DateTime? SyncedFrom + { + get + { + if (Period == null) + return _firstTimestamp; + + var max = DateTime.UtcNow - Period.Value; + if (_firstTimestamp > max) + return _firstTimestamp; + + return max; + } + } + + /// + public int Count + { + get + { + lock (_lock) + { + ApplyWindow(true); + return _data.Count; + } + } + } + + /// + public SharedKline? Last + { + get + { + lock (_lock) + { + ApplyWindow(true); + return _data.LastOrDefault().Value; + } + } + } + + /// + public event Func? OnAdded; + /// + public event Func? OnUpdated; + /// + public event Func? OnRemoved; + /// + public event Func? OnStatusChanged; + + /// + /// ctor + /// + public KlineTracker( + ILogger? logger, + IKlineRestClient restClient, + IKlineSocketClient socketClient, + SharedSymbol symbol, + SharedKlineInterval interval, + int? limit = null, + TimeSpan? period = null) + { + _logger = logger ?? new NullLogger(); + Symbol = symbol; + SymbolName = socketClient.FormatSymbol(symbol.BaseAsset, symbol.QuoteAsset, symbol.TradingMode, symbol.DeliverTime); + Exchange = restClient.Exchange; + Limit = limit; + Period = period; + _interval = interval; + _socketClient = socketClient; + _restClient = restClient; + } + + /// + public async Task StartAsync(bool startWithSnapshot = true) + { + if (Status != SyncStatus.Disconnected) + throw new InvalidOperationException($"Can't start syncing unless state is {SyncStatus.Disconnected}. Current state: {Status}"); + + _startWithSnapshot = startWithSnapshot; + Status = SyncStatus.Syncing; + _logger.KlineTrackerStarting(SymbolName); + + var startResult = await DoStartAsync().ConfigureAwait(false); + if (!startResult) + { + _logger.KlineTrackerStartFailed(SymbolName, startResult.Error!.ToString()); + Status = SyncStatus.Disconnected; + return new CallResult(startResult.Error!); + } + + _updateSubscription = startResult.Data; + _updateSubscription.ConnectionLost += HandleConnectionLost; + _updateSubscription.ConnectionClosed += HandleConnectionClosed; + _updateSubscription.ConnectionRestored += HandleConnectionRestored; + Status = SyncStatus.Synced; + _logger.KlineTrackerStarted(SymbolName); + return new CallResult(null); + } + + /// + public async Task StopAsync() + { + _logger.KlineTrackerStopping(SymbolName); + Status = SyncStatus.Disconnected; + await DoStopAsync().ConfigureAwait(false); + _data.Clear(); + _preSnapshotQueue.Clear(); + _logger.KlineTrackerStopped(SymbolName); + } + + /// + /// The start procedure needed for kline syncing, generally subscribing to an update stream and requesting the snapshot + /// + /// + protected virtual async Task> DoStartAsync() + { + var subResult = await _socketClient.SubscribeToKlineUpdatesAsync(new SubscribeKlineRequest(Symbol, _interval), + update => + { + AddOrUpdate(update.Data); + }).ConfigureAwait(false); + + if (!subResult) + { + Status = SyncStatus.Disconnected; + return subResult; + } + + if (!_startWithSnapshot) + return subResult; + + var startTime = Period == null ? (DateTime?)null : DateTime.UtcNow.Add(-Period.Value); + if (_restClient.GetKlinesOptions.MaxAge != null && DateTime.UtcNow.Add(-_restClient.GetKlinesOptions.MaxAge.Value) > startTime) + startTime = DateTime.UtcNow.Add(-_restClient.GetKlinesOptions.MaxAge.Value); + + var limit = Math.Min(_restClient.GetKlinesOptions.MaxRequestDataPoints ?? _restClient.GetKlinesOptions.MaxTotalDataPoints ?? 100, Limit ?? 100); + + var request = new GetKlinesRequest(Symbol, _interval, startTime, DateTime.UtcNow, limit: limit); + var data = new List(); + await foreach (var result in ExchangeHelpers.ExecutePages(_restClient.GetKlinesAsync, request).ConfigureAwait(false)) + { + if (!result) + { + _ = subResult.Data.CloseAsync(); + Status = SyncStatus.Disconnected; + return subResult.AsError(result.Error!); + } + + if (Limit != null && data.Count > Limit) + break; + + data.AddRange(result.Data); + } + + SetInitialData(data); + return subResult; + } + + /// + /// The stop procedure needed, generally stopping the update stream + /// + /// + protected virtual Task DoStopAsync() => _updateSubscription?.CloseAsync() ?? Task.CompletedTask; + + /// + public KlinesStats GetStats(DateTime? fromTimestamp = null, DateTime? toTimestamp = null) + { + var compareTime = SyncedFrom?.AddSeconds(-2); + var stats = GetStats(GetData(fromTimestamp, toTimestamp)); + stats.Complete = (fromTimestamp == null || fromTimestamp >= compareTime) && (toTimestamp == null || toTimestamp >= compareTime); + return stats; + } + + private KlinesStats GetStats(IEnumerable klines) + { + if (!klines.Any()) + return new KlinesStats(); + + return new KlinesStats + { + KlineCount = klines.Count(), + FirstOpenTime = klines.First().OpenTime, + LastOpenTime = klines.Last().OpenTime, + HighPrice = klines.Select(d => d.LowPrice).Max(), + LowPrice = klines.Select(d => d.HighPrice).Min(), + Volume = klines.Select(d => d.Volume).Sum(), + AverageVolume = Math.Round(klines.OrderByDescending(d => d.OpenTime).Skip(1).Select(d => d.Volume).DefaultIfEmpty().Average(), 8) + }; + } + + /// + public IEnumerable GetData(DateTime? since = null, DateTime? until = null) + { + lock (_lock) + { + ApplyWindow(true); + + IEnumerable result = _data.Values; + if (since != null) + result = result.Where(d => d.OpenTime >= since); + if (until != null) + result = result.Where(d => d.OpenTime <= until); + + return result.ToList(); + } + } + + /// + /// Set the initial kline data snapshot + /// + /// + protected void SetInitialData(IEnumerable data) + { + lock (_lock) + { + _data.Clear(); + + IEnumerable items = data.OrderByDescending(d => d.OpenTime); + if (Limit != null) + items = items.Take(Limit.Value); + if (Period != null) + items = items.Where(e => e.OpenTime >= DateTime.UtcNow.Add(-Period.Value)); + + foreach (var item in items.OrderBy(d => d.OpenTime)) + _data.Add(item.OpenTime, item); + + _snapshotSet = true; + + foreach (var item in _preSnapshotQueue) + { + if (_data.ContainsKey(item.OpenTime)) + continue; + + _data.Add(item.OpenTime, item); + } + + _firstTimestamp = _data.Min(v => v.Key); + ApplyWindow(false); + _logger.KlineTrackerInitialDataSet(SymbolName, _data.Last().Key); + } + } + + /// + /// Add or update a kline + /// + /// + protected void AddOrUpdate(SharedKline item) => AddOrUpdate(new[] { item }); + + /// + /// Add or update klines + /// + /// + protected void AddOrUpdate(IEnumerable items) + { + lock (_lock) + { + if (_restClient != null && _startWithSnapshot && !_snapshotSet) + { + _preSnapshotQueue.AddRange(items); + return; + } + + foreach (var item in items) + { + if (_data.TryGetValue(item.OpenTime, out var existing)) + { + _data.Remove(item.OpenTime); + _data.Add(item.OpenTime, item); + OnUpdated?.Invoke(item); + _logger.KlineTrackerKlineUpdated(SymbolName, _data.Last().Key); + } + else + { + _data.Add(item.OpenTime, item); + OnAdded?.Invoke(item); + _logger.KlineTrackerKlineAdded(SymbolName, _data.Last().Key); + } + } + + _firstTimestamp = _data.Min(x => x.Key); + _changed = true; + + SetSyncStatus(); + ApplyWindow(true); + } + } + + private void ApplyWindow(bool broadcastEvents) + { + if (!_changed && (DateTime.UtcNow - _lastWindowApplied) < TimeSpan.FromSeconds(1)) + return; + + if (Period != null) + { + var compareDate = DateTime.UtcNow.Add(-Period.Value); + for (var i = 0; i < _data.Count; i++) + { + var item = _data.ElementAt(0); + if (item.Key >= compareDate) + break; + + _data.Remove(item.Key); + if (broadcastEvents) + OnRemoved?.Invoke(item.Value); + } + } + + if (Limit != null && _data.Count > Limit.Value) + { + var toRemove = Math.Max(0, _data.Count - Limit.Value); + for (var i = 0; i < toRemove; i++) + { + var item = _data.ElementAt(0); + _data.Remove(item.Key); + if (broadcastEvents) + OnRemoved?.Invoke(item.Value); + } + } + + _lastWindowApplied = DateTime.UtcNow; + _changed = false; + } + + private void HandleConnectionLost() + { + _logger.KlineTrackerConnectionLost(SymbolName); + if (Status != SyncStatus.Disconnected) + { + Status = SyncStatus.Syncing; + _snapshotSet = false; + _firstTimestamp = null; + _preSnapshotQueue.Clear(); + } + } + + private void HandleConnectionClosed() + { + _logger.KlineTrackerConnectionClosed(SymbolName); + Status = SyncStatus.Disconnected; + _ = StopAsync(); + } + + private async void HandleConnectionRestored(TimeSpan _) + { + Status = SyncStatus.Syncing; + var success = false; + while (!success) + { + if (Status != SyncStatus.Syncing) + return; + + var resyncResult = await DoStartAsync().ConfigureAwait(false); + success = resyncResult; + } + + _logger.KlineTrackerConnectionRestored(SymbolName); + SetSyncStatus(); + } + + private void SetSyncStatus() + { + if (Status == SyncStatus.Synced) + return; + + if (Period != null) + { + if (_firstTimestamp <= DateTime.UtcNow - Period.Value) + Status = SyncStatus.Synced; + else + Status = SyncStatus.PartiallySynced; + } + + if (Limit != null) + { + if (_data.Count == Limit.Value) + Status = SyncStatus.Synced; + else + Status = SyncStatus.PartiallySynced; + } + + if (Period == null && Limit == null) + Status = SyncStatus.Synced; + } + } +} diff --git a/CryptoExchange.Net/Trackers/Klines/KlinesCompare.cs b/CryptoExchange.Net/Trackers/Klines/KlinesCompare.cs new file mode 100644 index 00000000..6e1deeac --- /dev/null +++ b/CryptoExchange.Net/Trackers/Klines/KlinesCompare.cs @@ -0,0 +1,30 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace CryptoExchange.Net.Trackers.Klines +{ + /// + /// Klines statistics comparison + /// + public record KlinesCompare + { + /// + /// Number of trades + /// + public CompareValue? LowPriceDif { get; set; } + /// + /// Number of trades + /// + public CompareValue? HighPriceDif { get; set; } + /// + /// Number of trades + /// + public CompareValue? VolumeDif { get; set; } + /// + /// Number of trades + /// + public CompareValue? AverageVolumeDif { get; set; } + + } +} diff --git a/CryptoExchange.Net/Trackers/Klines/KlinesStats.cs b/CryptoExchange.Net/Trackers/Klines/KlinesStats.cs new file mode 100644 index 00000000..2a832f11 --- /dev/null +++ b/CryptoExchange.Net/Trackers/Klines/KlinesStats.cs @@ -0,0 +1,59 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace CryptoExchange.Net.Trackers.Klines +{ + /// + /// Klines statistics + /// + public record KlinesStats + { + /// + /// Number of klines + /// + public int KlineCount { get; set; } + /// + /// The kline open time of the first entry + /// + public DateTime? FirstOpenTime { get; set; } + /// + /// The kline open time of the last entry + /// + public DateTime? LastOpenTime { get; set; } + /// + /// Lowest trade price + /// + public decimal? LowPrice { get; set; } + /// + /// Highest trade price + /// + public decimal? HighPrice { get; set; } + /// + /// Trade volume + /// + public decimal Volume { get; set; } + /// + /// Average volume per kline + /// + public decimal? AverageVolume { get; set; } + /// + /// Whether the data is complete + /// + public bool Complete { get; set; } + + /// + /// Compare 2 stat snapshots to eachother + /// + public KlinesCompare CompareTo(KlinesStats otherStats) + { + return new KlinesCompare + { + LowPriceDif = new CompareValue(LowPrice, otherStats.LowPrice), + HighPriceDif = new CompareValue(HighPrice, otherStats.HighPrice), + VolumeDif = new CompareValue(Volume, otherStats.Volume), + AverageVolumeDif = new CompareValue(AverageVolume, otherStats.AverageVolume), + }; + } + } +} diff --git a/CryptoExchange.Net/Trackers/Trades/ITradeTracker.cs b/CryptoExchange.Net/Trackers/Trades/ITradeTracker.cs new file mode 100644 index 00000000..589d5bda --- /dev/null +++ b/CryptoExchange.Net/Trackers/Trades/ITradeTracker.cs @@ -0,0 +1,100 @@ +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.SharedApis; +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Trackers.Trades +{ + /// + /// A tracker for trades on a symbol + /// + public interface ITradeTracker + { + /// + /// The total number of trades + /// + int Count { get; } + + /// + /// Exchange name + /// + string Exchange { get; } + + /// + /// Symbol name + /// + string SymbolName { get; } + + /// + /// Symbol + /// + SharedSymbol Symbol { get; } + + /// + /// The max number of trades tracked + /// + int? Limit { get; } + + /// + /// The max age of the data tracked + /// + TimeSpan? Period { get; } + + /// + /// From which timestamp the trades are registered + /// + DateTime? SyncedFrom { get; } + + /// + /// The current synchronization status + /// + SyncStatus Status { get; } + + /// + /// Get the last trade + /// + SharedTrade? Last { get; } + + /// + /// Event for when a new trade is added + /// + event Func? OnAdded; + /// + /// Event for when a trade is removed because it's no longer within the period/limit window + /// + event Func? OnRemoved; + /// + /// Event for when the sync status changes + /// + event Func? OnStatusChanged; + + /// + /// Start synchronization + /// + /// + Task StartAsync(bool startWithSnapshot = true); + + /// + /// Stop synchronization + /// + /// + Task StopAsync(); + + /// + /// Get the data tracked + /// + /// Start timestamp to get the data from, defaults to tracked data start time + /// End timestamp to get the data until, defaults to current time + /// + IEnumerable GetData(DateTime? fromTimestamp = null, DateTime? toTimestamp = null); + + /// + /// Get statitistics on the trades + /// + /// Start timestamp to get the data from, defaults to tracked data start time + /// End timestamp to get the data until, defaults to current time + /// + TradesStats GetStats(DateTime? fromTimestamp = null, DateTime? toTimestamp = null); + } +} \ No newline at end of file diff --git a/CryptoExchange.Net/Trackers/Trades/TradeTracker.cs b/CryptoExchange.Net/Trackers/Trades/TradeTracker.cs new file mode 100644 index 00000000..89625668 --- /dev/null +++ b/CryptoExchange.Net/Trackers/Trades/TradeTracker.cs @@ -0,0 +1,495 @@ +using CryptoExchange.Net.Logging.Extensions; +using CryptoExchange.Net.Objects; +using CryptoExchange.Net.Objects.Sockets; +using CryptoExchange.Net.SharedApis; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading.Tasks; + +namespace CryptoExchange.Net.Trackers.Trades +{ + /// + public class TradeTracker : ITradeTracker + { + private readonly ITradeSocketClient _socketClient; + private readonly IRecentTradeRestClient? _recentRestClient; + private readonly ITradeHistoryRestClient? _historyRestClient; + private SyncStatus _status; + private long _snapshotId; + private bool _startWithSnapshot; + + /// + /// The internal data structure + /// + protected readonly List _data = new List(); + /// + /// The pre-snapshot queue buffering updates received before the snapshot is set and which will be applied after the snapshot was set + /// + protected readonly List _preSnapshotQueue = new List(); + + /// + /// The last time the window was applied + /// + protected DateTime _lastWindowApplied = DateTime.MinValue; + /// + /// Whether or not the data has changed since last window was applied + /// + protected bool _changed = false; + /// + /// Lock for accessing _data + /// + protected readonly object _lock = new object(); + /// + /// Whether the snapshot has been set + /// + protected bool _snapshotSet; + /// + /// Logger + /// + protected readonly ILogger _logger; + /// + /// Update subscription + /// + protected UpdateSubscription? _updateSubscription; + + /// + /// The timestamp of the first item + /// + protected DateTime? _firstTimestamp; + + /// + public string Exchange { get; } + + /// + public string SymbolName { get; } + + /// + public SharedSymbol Symbol { get; } + + /// + public int? Limit { get; } + /// + public TimeSpan? Period { get; } + + /// + public SyncStatus Status + { + get => _status; + set + { + if (value == _status) + return; + + var old = _status; + _status = value; + _logger.TradeTrackerStatusChanged(SymbolName, old, value); + OnStatusChanged?.Invoke(old, _status); + } + } + + /// + public int Count + { + get + { + lock (_lock) + { + ApplyWindow(true); + return _data.Count; + } + } + } + + /// + public DateTime? SyncedFrom + { + get + { + if (Period == null) + return _firstTimestamp; + + var max = DateTime.UtcNow - Period.Value; + if (_firstTimestamp > max) + return _firstTimestamp; + + return max; + } + } + + /// + public SharedTrade? Last + { + get + { + lock (_lock) + { + ApplyWindow(true); + return _data.LastOrDefault(); + } + } + } + + /// + public event Func? OnAdded; + /// + public event Func? OnRemoved; + /// + public event Func? OnStatusChanged; + + /// + /// ctor + /// + public TradeTracker( + ILogger? logger, + IRecentTradeRestClient? recentRestClient, + ITradeHistoryRestClient? historyRestClient, + ITradeSocketClient socketClient, + SharedSymbol symbol, + int? limit = null, + TimeSpan? period = null) + { + _logger = logger ?? new NullLogger(); + _recentRestClient = recentRestClient; + _historyRestClient = historyRestClient; + _socketClient = socketClient; + Exchange = socketClient.Exchange; + Symbol = symbol; + SymbolName = socketClient.FormatSymbol(symbol.BaseAsset, symbol.QuoteAsset, symbol.TradingMode, symbol.DeliverTime); + Limit = limit; + Period = period; + } + + private TradesStats GetStats(IEnumerable trades) + { + if (!trades.Any()) + return new TradesStats(); + + return new TradesStats + { + TradeCount = trades.Count(), + FirstTradeTime = trades.First().Timestamp, + LastTradeTime = trades.Last().Timestamp, + AveragePrice = Math.Round(trades.Select(d => d.Price).DefaultIfEmpty().Average(), 8), + VolumeWeightedAveragePrice = trades.Any() ? Math.Round(trades.Select(d => d.Price * d.Quantity).DefaultIfEmpty().Sum() / trades.Select(d => d.Quantity).DefaultIfEmpty().Sum(), 8) : null, + Volume = Math.Round(trades.Sum(d => d.Quantity), 8), + QuoteVolume = Math.Round(trades.Sum(d => d.Quantity * d.Price), 8), + BuySellRatio = Math.Round(trades.Where(x => x.Side == SharedOrderSide.Buy).Sum(x => x.Quantity) / trades.Sum(x => x.Quantity), 8) + }; + } + + /// + public TradesStats GetStats(DateTime? fromTimestamp = null, DateTime? toTimestamp = null) + { + var compareTime = SyncedFrom?.AddSeconds(-2); + var stats = GetStats(GetData(fromTimestamp, toTimestamp)); + stats.Complete = (fromTimestamp == null || fromTimestamp >= compareTime) && (toTimestamp == null || toTimestamp >= compareTime); + return stats; + } + + /// + public async Task StartAsync(bool startWithSnapshot = true) + { + if (Status != SyncStatus.Disconnected) + throw new InvalidOperationException($"Can't start syncing unless state is {SyncStatus.Disconnected}. Current state: {Status}"); + + _startWithSnapshot = startWithSnapshot; + Status = SyncStatus.Syncing; + _logger.TradeTrackerStarting(SymbolName); + var subResult = await DoStartAsync().ConfigureAwait(false); + if (!subResult) + { + _logger.TradeTrackerStartFailed(SymbolName, subResult.Error!.ToString()); + Status = SyncStatus.Disconnected; + return subResult; + } + + _updateSubscription = subResult.Data; + _updateSubscription.ConnectionLost += HandleConnectionLost; + _updateSubscription.ConnectionClosed += HandleConnectionClosed; + _updateSubscription.ConnectionRestored += HandleConnectionRestored; + SetSyncStatus(); + _logger.TradeTrackerStarted(SymbolName); + return new CallResult(null); + } + + /// + public async Task StopAsync() + { + _logger.TradeTrackerStopping(SymbolName); + Status = SyncStatus.Disconnected; + await DoStopAsync().ConfigureAwait(false); + _data.Clear(); + _preSnapshotQueue.Clear(); + _logger.TradeTrackerStopped(SymbolName); + } + + /// + /// The start procedure needed for trade syncing, generally subscribing to an update stream and requesting the snapshot + /// + /// + protected virtual async Task> DoStartAsync() + { + var subResult = await _socketClient.SubscribeToTradeUpdatesAsync(new SubscribeTradeRequest(Symbol), + update => + { + AddData(update.Data); + }).ConfigureAwait(false); + + if (!subResult) + { + Status = SyncStatus.Disconnected; + return subResult; + } + + if (!_startWithSnapshot) + return subResult; + + if (_historyRestClient != null) + { + var startTime = Period == null ? DateTime.UtcNow.AddMinutes(-5) : DateTime.UtcNow.Add(-Period.Value); + var request = new GetTradeHistoryRequest(Symbol, startTime, DateTime.UtcNow); + var data = new List(); + await foreach(var result in ExchangeHelpers.ExecutePages(_historyRestClient.GetTradeHistoryAsync, request).ConfigureAwait(false)) + { + if (!result) + { + _ = subResult.Data.CloseAsync(); + Status = SyncStatus.Disconnected; + return subResult.AsError(result.Error!); + } + + if (Limit != null && data.Count > Limit) + break; + + data.AddRange(result.Data); + } + + SetInitialData(data); + } + else if (_recentRestClient != null) + { + int? limit = null; + if (Limit.HasValue) + limit = Math.Min(_recentRestClient.GetRecentTradesOptions.MaxLimit, Limit.Value); + + var snapshot = await _recentRestClient.GetRecentTradesAsync(new GetRecentTradesRequest(Symbol, limit)).ConfigureAwait(false); + if (!snapshot) + { + _ = subResult.Data.CloseAsync(); + Status = SyncStatus.Disconnected; + return subResult.AsError(snapshot.Error!); + } + + SetInitialData(snapshot.Data); + } + + return subResult; + } + + /// + /// The stop procedure needed, generally stopping the update stream + /// + /// + protected virtual Task DoStopAsync() => _updateSubscription?.CloseAsync() ?? Task.CompletedTask; + + /// + public IEnumerable GetData(DateTime? since = null, DateTime? until = null) + { + lock (_lock) + { + ApplyWindow(true); + + IEnumerable result = _data; + if (since != null) + result = result.Where(d => d.Timestamp >= since); + if (until != null) + result = result.Where(d => d.Timestamp <= until); + + return result.ToList(); + } + } + + /// + /// Set the initial trade data snapshot + /// + /// + protected void SetInitialData(IEnumerable data) + { + lock (_lock) + { + _data.Clear(); + + IEnumerable items = data.OrderByDescending(d => d.Timestamp); + if (Limit != null) + items = items.Take(Limit.Value); + if (Period != null) + items = items.Where(e => e.Timestamp >= DateTime.UtcNow.Add(-Period.Value)); + + _snapshotId = data.Max(d => d.Timestamp.Ticks); + foreach (var item in items.OrderBy(d => d.Timestamp)) + _data.Add(item); + + _snapshotSet = true; + _changed = true; + + _logger.TradeTrackerInitialDataSet(SymbolName, _data.Count, _snapshotId); + + foreach (var item in _preSnapshotQueue) + { + if (_snapshotId >= item.Timestamp.Ticks) + { + _logger.TradeTrackerPreSnapshotSkip(SymbolName, item.Timestamp.Ticks); + continue; + } + + _logger.TradeTrackerPreSnapshotApplied(SymbolName, item.Timestamp.Ticks); + _data.Add(item); + } + + _firstTimestamp = _data.Min(v => v.Timestamp); + + ApplyWindow(false); + } + } + + /// + /// Add a trade + /// + /// + protected void AddData(SharedTrade item) => AddData(new[] { item }); + + /// + /// Add a list of trades + /// + /// + protected void AddData(IEnumerable items) + { + lock (_lock) + { + if ((_recentRestClient != null || _historyRestClient != null) && _startWithSnapshot && !_snapshotSet) + { + _preSnapshotQueue.AddRange(items); + return; + } + + foreach (var item in items) + { + _logger.TradeTrackerTradeAdded(SymbolName, item.Timestamp.Ticks); + _data.Add(item); + OnAdded?.Invoke(item); + } + + _firstTimestamp = _data.Min(x => x.Timestamp); + _changed = true; + SetSyncStatus(); + ApplyWindow(true); + } + } + + private void ApplyWindow(bool broadcastEvents) + { + if (!_changed && (DateTime.UtcNow - _lastWindowApplied) < TimeSpan.FromSeconds(1)) + return; + + if (Period != null) + { + var compareDate = DateTime.UtcNow.Add(-Period.Value); + for(var i = 0; i < _data.Count; i++) + { + var item = _data[0]; + if (item.Timestamp >= compareDate) + break; + + _data.Remove(item); + if (broadcastEvents) + OnRemoved?.Invoke(item); + } + } + + if (Limit != null && _data.Count > Limit.Value) + { + var toRemove = _data.Count - Limit.Value; + for (var i = 0; i < toRemove; i++) + { + var item = _data[0]; + _data.Remove(item); + if (broadcastEvents) + OnRemoved?.Invoke(item); + } + } + + _lastWindowApplied = DateTime.UtcNow; + _changed = false; + + if (Status == SyncStatus.PartiallySynced) + // Need to check if sync status should be changed even if there may not be any new data + SetSyncStatus(); + } + + + private void HandleConnectionLost() + { + _logger.TradeTrackerConnectionLost(SymbolName); + if (Status != SyncStatus.Disconnected) + { + Status = SyncStatus.Syncing; + _snapshotSet = false; + _firstTimestamp = null; + _preSnapshotQueue.Clear(); + } + } + + private void HandleConnectionClosed() + { + _logger.TradeTrackerConnectionClosed(SymbolName); + Status = SyncStatus.Disconnected; + _ = StopAsync(); + } + + private async void HandleConnectionRestored(TimeSpan _) + { + Status = SyncStatus.Syncing; + var success = false; + while (!success) + { + if (Status != SyncStatus.Syncing) + return; + + var resyncResult = await DoStartAsync().ConfigureAwait(false); + success = resyncResult; + } + + _logger.TradeTrackerConnectionRestored(SymbolName); + SetSyncStatus(); + } + + private void SetSyncStatus() + { + if (Status == SyncStatus.Synced) + return; + + if (Period != null) + { + if (_firstTimestamp <= DateTime.UtcNow - Period.Value) + Status = SyncStatus.Synced; + else + Status = SyncStatus.PartiallySynced; + } + + if (Limit != null) + { + if (_data.Count == Limit.Value) + Status = SyncStatus.Synced; + else + Status = SyncStatus.PartiallySynced; + } + + if (Period == null && Limit == null) + Status = SyncStatus.Synced; + } + } +} diff --git a/CryptoExchange.Net/Trackers/Trades/TradesCompare.cs b/CryptoExchange.Net/Trackers/Trades/TradesCompare.cs new file mode 100644 index 00000000..1d02593a --- /dev/null +++ b/CryptoExchange.Net/Trackers/Trades/TradesCompare.cs @@ -0,0 +1,37 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace CryptoExchange.Net.Trackers.Trades +{ + /// + /// Trades statistics comparison + /// + public record TradesCompare + { + /// + /// Number of trades + /// + public CompareValue TradeCountDif { get; set; } = new CompareValue(null, null); + /// + /// Average trade price + /// + public CompareValue? AveragePriceDif { get; set; } + /// + /// Volume weighted average trade price + /// + public CompareValue? VolumeWeightedAveragePriceDif { get; set; } + /// + /// Volume of the trades + /// + public CompareValue VolumeDif { get; set; } = new CompareValue(null, null); + /// + /// Volume of the trades in quote asset + /// + public CompareValue QuoteVolumeDif { get; set; } = new CompareValue(null, null); + /// + /// The volume weighted Buy/Sell ratio. A 0.7 ratio means 70% of the trade volume was a buy. + /// + public CompareValue? BuySellRatioDif { get; set; } + } +} diff --git a/CryptoExchange.Net/Trackers/Trades/TradesStats.cs b/CryptoExchange.Net/Trackers/Trades/TradesStats.cs new file mode 100644 index 00000000..2a0b2414 --- /dev/null +++ b/CryptoExchange.Net/Trackers/Trades/TradesStats.cs @@ -0,0 +1,65 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace CryptoExchange.Net.Trackers.Trades +{ + /// + /// Trades statistics + /// + public record TradesStats + { + /// + /// Number of trades + /// + public int TradeCount { get; set; } + /// + /// Timestamp of the last trade + /// + public DateTime? FirstTradeTime { get; set; } + /// + /// Timestamp of the first trade + /// + public DateTime? LastTradeTime { get; set; } + /// + /// Average trade price + /// + public decimal? AveragePrice { get; set; } + /// + /// Volume weighted average trade price + /// + public decimal? VolumeWeightedAveragePrice { get; set; } + /// + /// Volume of the trades + /// + public decimal Volume { get; set; } + /// + /// Volume of the trades in quote asset + /// + public decimal QuoteVolume { get; set; } + /// + /// The volume weighted Buy/Sell ratio. A 0.7 ratio means 70% of the trade volume was a buy. + /// + public decimal? BuySellRatio { get; set; } + /// + /// Whether the data is complete + /// + public bool Complete { get; set; } + + /// + /// Compare 2 stat snapshots to eachother + /// + public TradesCompare CompareTo(TradesStats otherStats) + { + return new TradesCompare + { + TradeCountDif = new CompareValue(TradeCount, otherStats.TradeCount), + AveragePriceDif = new CompareValue(AveragePrice, otherStats.AveragePrice), + VolumeWeightedAveragePriceDif = new CompareValue(VolumeWeightedAveragePrice, otherStats.VolumeWeightedAveragePrice), + VolumeDif = new CompareValue(Volume, otherStats.Volume), + QuoteVolumeDif = new CompareValue(QuoteVolume, otherStats.QuoteVolume), + BuySellRatioDif = new CompareValue(BuySellRatio, otherStats.BuySellRatio), + }; + } + } +} diff --git a/docs/index.html b/docs/index.html index 73148e95..ecda4d6d 100644 --- a/docs/index.html +++ b/docs/index.html @@ -106,6 +106,7 @@