From 0d365ad28fd010689509f1971655f99407828f5e Mon Sep 17 00:00:00 2001 From: Roman Yavnikov <45608740+Romazes@users.noreply.github.com> Date: Wed, 21 Feb 2024 20:14:54 +0200 Subject: [PATCH] Initialization via Job Packet and Rename DataQueueHandler to DataProvider (#7) * feat: initialization through Job packet * rename: DataQueueHandler to DataProvider * rename: projects, solution fix: path in workflow file * fix: tryGetConfig of iex-price-plan * feat: handle "parsing to enum" excpetion * fix: additional validation to enum parsed value * feat: use global instead of Config in ValidateSubscription --- .github/workflows/build.yml | 6 +- Lean.DataSource.IEX.sln | 4 +- .../IEXDataDownloaderTests.cs | 2 +- QuantConnect.IEX.Tests/IEXDataHistoryTests.cs | 12 +- ...andlerTests.cs => IEXDataProviderTests.cs} | 115 +++++++++--- ... QuantConnect.DataSource.IEX.Tests.csproj} | 10 +- QuantConnect.IEX.Tests/TestSetup.cs | 2 +- .../Constants/IEXDataStreamChannels.cs | 2 +- QuantConnect.IEX/EnumeratorWrapper.cs | 2 +- QuantConnect.IEX/Enums/IEXPricePlan.cs | 2 +- QuantConnect.IEX/IEXDataDownloader.cs | 6 +- ...DataQueueHandler.cs => IEXDataProvider.cs} | 163 +++++++++++------- QuantConnect.IEX/IEXEventSourceCollection.cs | 4 +- ...roj => QuantConnect.DataSource.IEX.csproj} | 8 +- .../Response/DataStreamLastSale.cs | 2 +- .../Response/DataStreamTopOfBook.cs | 2 +- 16 files changed, 221 insertions(+), 121 deletions(-) rename QuantConnect.IEX.Tests/{IEXDataQueueHandlerTests.cs => IEXDataProviderTests.cs} (82%) rename QuantConnect.IEX.Tests/{QuantConnect.IEX.Tests.csproj => QuantConnect.DataSource.IEX.Tests.csproj} (77%) rename QuantConnect.IEX/{IEXDataQueueHandler.cs => IEXDataProvider.cs} (87%) rename QuantConnect.IEX/{QuantConnect.IEX.csproj => QuantConnect.DataSource.IEX.csproj} (84%) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index e361f14..54636cb 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -47,10 +47,10 @@ jobs: options: -v /home/runner/work:/__w --workdir /__w/Lean.DataSource.IEX/Lean.DataSource.IEX -e QC_JOB_USER_ID=${{ secrets.QC_JOB_USER_ID }} -e QC_API_ACCESS_TOKEN=${{ secrets.QC_API_ACCESS_TOKEN }} -e QC_JOB_ORGANIZATION_ID=${{ secrets.QC_JOB_ORGANIZATION_ID }} -e QC_IEX_CLOUD_API_KEY=${{ secrets.QC_IEX_CLOUD_API_KEY }} - name: BuildDataSource - run: dotnet build ./QuantConnect.IEX/QuantConnect.IEX.csproj /p:Configuration=Release /v:quiet /p:WarningLevel=1 + run: dotnet build ./QuantConnect.IEX/QuantConnect.DataSource.IEX.csproj /p:Configuration=Release /v:quiet /p:WarningLevel=1 - name: BuildDataSourceTests - run: dotnet build ./QuantConnect.IEX.Tests/QuantConnect.IEX.Tests.csproj /p:Configuration=Release /v:quiet /p:WarningLevel=1 + run: dotnet build ./QuantConnect.IEX.Tests/QuantConnect.DataSource.IEX.Tests.csproj /p:Configuration=Release /v:quiet /p:WarningLevel=1 - name: Run Tests - run: dotnet test ./QuantConnect.IEX.Tests/bin/Release/QuantConnect.IEX.Tests.dll \ No newline at end of file + run: dotnet test ./QuantConnect.IEX.Tests/bin/Release/QuantConnect.Lean.DataSource.IEX.Tests.dll \ No newline at end of file diff --git a/Lean.DataSource.IEX.sln b/Lean.DataSource.IEX.sln index d96b17c..84a3351 100644 --- a/Lean.DataSource.IEX.sln +++ b/Lean.DataSource.IEX.sln @@ -3,9 +3,9 @@ Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio Version 17 VisualStudioVersion = 17.5.002.0 MinimumVisualStudioVersion = 10.0.40219.1 -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "QuantConnect.IEX.Tests", "QuantConnect.IEX.Tests\QuantConnect.IEX.Tests.csproj", "{38BD8123-9838-4984-A4D1-B8D1062197B4}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "QuantConnect.DataSource.IEX.Tests", "QuantConnect.IEX.Tests\QuantConnect.DataSource.IEX.Tests.csproj", "{38BD8123-9838-4984-A4D1-B8D1062197B4}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "QuantConnect.IEX", "QuantConnect.IEX\QuantConnect.IEX.csproj", "{6324C205-9693-4FA1-9EC1-9662E569D046}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "QuantConnect.DataSource.IEX", "QuantConnect.IEX\QuantConnect.DataSource.IEX.csproj", "{6324C205-9693-4FA1-9EC1-9662E569D046}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "QuantConnect.Tests", "..\Lean\Tests\QuantConnect.Tests.csproj", "{D6772C71-231E-44D1-9DD1-E796D393AAAB}" EndProject diff --git a/QuantConnect.IEX.Tests/IEXDataDownloaderTests.cs b/QuantConnect.IEX.Tests/IEXDataDownloaderTests.cs index aa5951c..d07e746 100644 --- a/QuantConnect.IEX.Tests/IEXDataDownloaderTests.cs +++ b/QuantConnect.IEX.Tests/IEXDataDownloaderTests.cs @@ -18,7 +18,7 @@ using NUnit.Framework; using System.Collections.Generic; -namespace QuantConnect.IEX.Tests +namespace QuantConnect.Lean.DataSource.IEX.Tests { public class IEXDataDownloaderTests { diff --git a/QuantConnect.IEX.Tests/IEXDataHistoryTests.cs b/QuantConnect.IEX.Tests/IEXDataHistoryTests.cs index d866d49..61471df 100644 --- a/QuantConnect.IEX.Tests/IEXDataHistoryTests.cs +++ b/QuantConnect.IEX.Tests/IEXDataHistoryTests.cs @@ -25,25 +25,25 @@ using QuantConnect.Data.Market; using System.Collections.Generic; -namespace QuantConnect.IEX.Tests +namespace QuantConnect.Lean.DataSource.IEX.Tests { [TestFixture] public class IEXDataHistoryTests { private static MarketHoursDatabase _marketHoursDatabase = MarketHoursDatabase.FromDataFolder(); - private IEXDataQueueHandler iexDataQueueHandler; + private IEXDataProvider iexDataProvider; [SetUp] public void SetUp() { - iexDataQueueHandler = new IEXDataQueueHandler(); + iexDataProvider = new IEXDataProvider(); } [TearDown] public void TearDown() { - iexDataQueueHandler.Dispose(); + iexDataProvider.Dispose(); } /// @@ -228,8 +228,8 @@ private Slice[] GetHistory(Symbol symbol, Resolution resolution, TickType tickTy { var requests = new[] { CreateHistoryRequest(symbol, resolution, tickType, period) }; - var slices = iexDataQueueHandler.GetHistory(requests, TimeZones.Utc).ToArray(); - Log.Trace("Data points retrieved: " + iexDataQueueHandler.DataPointCount); + var slices = iexDataProvider.GetHistory(requests, TimeZones.Utc).ToArray(); + Log.Trace("Data points retrieved: " + iexDataProvider.DataPointCount); Log.Trace("tick Type: " + tickType); return slices; } diff --git a/QuantConnect.IEX.Tests/IEXDataQueueHandlerTests.cs b/QuantConnect.IEX.Tests/IEXDataProviderTests.cs similarity index 82% rename from QuantConnect.IEX.Tests/IEXDataQueueHandlerTests.cs rename to QuantConnect.IEX.Tests/IEXDataProviderTests.cs index 6478de9..53503ba 100644 --- a/QuantConnect.IEX.Tests/IEXDataQueueHandlerTests.cs +++ b/QuantConnect.IEX.Tests/IEXDataProviderTests.cs @@ -19,6 +19,7 @@ using System.Threading; using QuantConnect.Data; using QuantConnect.Tests; +using QuantConnect.Packets; using QuantConnect.Logging; using Microsoft.CodeAnalysis; using System.Threading.Tasks; @@ -28,13 +29,16 @@ using System.Collections.Concurrent; using QuantConnect.Data.UniverseSelection; -namespace QuantConnect.IEX.Tests +namespace QuantConnect.Lean.DataSource.IEX.Tests { [TestFixture, Explicit("This tests require a iexcloud.io api key")] - public class IEXDataQueueHandlerTests + public class IEXDataProviderTests { + private readonly string _apiKey = Config.Get("iex-cloud-api-key"); + private readonly string _pricePlan = Config.Get("iex-cloud-api-key"); + private CancellationTokenSource _cancellationTokenSource; - private IEXDataQueueHandler iexDataQueueHandler; + private IEXDataProvider iexDataProvider; private static readonly string[] HardCodedSymbolsSNP = { "AAPL", "MSFT", "AMZN", "FB", "GOOGL", "GOOG", "BRK.B", "JNJ", "PG", "NVDA", "V", "JPM", "HD", "UNH", "MA", "VZ", @@ -76,15 +80,15 @@ public class IEXDataQueueHandlerTests public void SetUp() { _cancellationTokenSource = new(); - iexDataQueueHandler = new IEXDataQueueHandler(); + iexDataProvider = new IEXDataProvider(); } [TearDown] public void TearDown() { - if (iexDataQueueHandler != null) + if (iexDataProvider != null) { - iexDataQueueHandler.Dispose(); + iexDataProvider.Dispose(); } _cancellationTokenSource.Dispose(); @@ -119,7 +123,7 @@ public void IEXCouldSubscribeManyTimes(Queue configs) foreach (var config in configs) { ProcessFeed( - iexDataQueueHandler.Subscribe(config, (s, e) => { }), + iexDataProvider.Subscribe(config, (s, e) => { }), tick => { if (tick != null) @@ -155,7 +159,7 @@ public void IEXCouldSubscribeManyTimes(Queue configs) foreach (var config in configs) { - iexDataQueueHandler.Unsubscribe(config); + iexDataProvider.Unsubscribe(config); } _cancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(20)); @@ -177,12 +181,12 @@ public void IEXSubscribeToSeveralThenUnSubscribeExceptOne(Queue { }), + iexDataProvider.Subscribe(config, (s, e) => { }), tick => { if (tick != null) { - Log.Debug($"{nameof(IEXDataQueueHandlerTests)}: tick: {tick}"); + Log.Debug($"{nameof(IEXDataProviderTests)}: tick: {tick}"); tempDictionary.AddOrUpdate(tick.Symbol, 1, (id, count) => count + 1); @@ -201,7 +205,7 @@ public void IEXSubscribeToSeveralThenUnSubscribeExceptOne(Queue 2; i--) { configs.TryDequeue(out var config); - iexDataQueueHandler.Unsubscribe(config); + iexDataProvider.Unsubscribe(config); tempDictionary.TryRemove(config.Symbol, out _); } @@ -216,7 +220,7 @@ public void IEXSubscribeToSeveralThenUnSubscribeExceptOne(Queue 0; i--) { configs.TryDequeue(out var config); - iexDataQueueHandler.Unsubscribe(config); + iexDataProvider.Unsubscribe(config); } Thread.Sleep(TimeSpan.FromSeconds(1)); @@ -244,7 +248,7 @@ public void IEXCouldSubscribeAndUnsubscribe() { _cancellationTokenSource.Cancel(); }; - + var configs = new[] { GetSubscriptionDataConfig(Symbol.Create("MBLY", SecurityType.Equity, Market.USA), Resolution.Second), GetSubscriptionDataConfig(Symbol.Create("USO", SecurityType.Equity, Market.USA), Resolution.Second) @@ -253,14 +257,14 @@ public void IEXCouldSubscribeAndUnsubscribe() Array.ForEach(configs, (c) => { ProcessFeed( - iexDataQueueHandler.Subscribe(c, (s, e) => { }), + iexDataProvider.Subscribe(c, (s, e) => { }), callback, throwExceptionCallback); }); Assert.IsFalse(_cancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(20)), "The cancellation token was cancelled block thread."); - iexDataQueueHandler.Unsubscribe(Enumerable.First(configs, c => string.Equals(c.Symbol.Value, "MBLY"))); + iexDataProvider.Unsubscribe(Enumerable.First(configs, c => string.Equals(c.Symbol.Value, "MBLY"))); Log.Trace("Unsubscribing"); _cancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(2)); @@ -279,12 +283,12 @@ public void IEXCouldSubscribeMoreThan100Symbols() { foreach (var config in GetSubscriptionDataConfigs(ticker, Resolution.Second)) { - ProcessFeed(iexDataQueueHandler.Subscribe(config, (s, e) => { }), throwExceptionCallback: () => _cancellationTokenSource.Cancel()); + ProcessFeed(iexDataProvider.Subscribe(config, (s, e) => { }), throwExceptionCallback: () => _cancellationTokenSource.Cancel()); } } Assert.IsFalse(resetEvent.WaitOne(TimeSpan.FromMinutes(2), _cancellationTokenSource.Token), "The cancellation token was cancelled block thread."); - Assert.IsTrue(iexDataQueueHandler.IsConnected); + Assert.IsTrue(iexDataProvider.IsConnected); } [Test] @@ -299,12 +303,12 @@ public void SubscribeOnALotOfSymbolsThrowArgumentExceptionExceedsAllowedLimit() { foreach (var config in GetSubscriptionDataConfigs(ticker, Resolution.Second)) { - iexDataQueueHandler.Subscribe(config, (s, e) => { }); + iexDataProvider.Subscribe(config, (s, e) => { }); } } cancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(20)); }); - Assert.Less(iexDataQueueHandler.maxAllowedSymbolLimit, symbolCounter); + Assert.Less(iexDataProvider.maxAllowedSymbolLimit, symbolCounter); } [Test] @@ -315,7 +319,7 @@ public void NotSubscribeOnUniverseSymbol() foreach (var config in GetSubscriptionDataConfigs(universeSymbol, Resolution.Second)) { - Assert.IsNull(iexDataQueueHandler.Subscribe(config, (s, e) => { })); + Assert.IsNull(iexDataProvider.Subscribe(config, (s, e) => { })); } } @@ -326,7 +330,7 @@ public void NotSubscribeOnCanonicalSymbol() foreach (var config in GetSubscriptionDataConfigs(spy, Resolution.Second)) { - Assert.IsNull(iexDataQueueHandler.Subscribe(config, (s, e) => { })); + Assert.IsNull(iexDataProvider.Subscribe(config, (s, e) => { })); } } @@ -337,20 +341,21 @@ public void SubscribeWithWrongApiKeyThrowException() var isSubscribeThrowException = false; - var iexDataQueueHandler = new IEXDataQueueHandler(); + var iexDataProvider = new IEXDataProvider(); foreach (var config in GetSubscriptionDataConfigs(Symbols.SPY, Resolution.Second)) { ProcessFeed( - iexDataQueueHandler.Subscribe(config, (s, e) => { }), + iexDataProvider.Subscribe(config, (s, e) => { }), tick => { - if (tick != null) + if (tick != null) { - Log.Debug($"{nameof(IEXDataQueueHandlerTests)}: tick: {tick}"); + Log.Debug($"{nameof(IEXDataProviderTests)}: tick: {tick}"); } - }, - () => { + }, + () => + { isSubscribeThrowException = true; _cancellationTokenSource.Cancel(); }); @@ -360,6 +365,60 @@ public void SubscribeWithWrongApiKeyThrowException() Assert.IsTrue(isSubscribeThrowException); } + [Test] + public void CanInitializeUsingJobPacket() + { + Config.Set("iex-cloud-api-key", ""); + + var job = new LiveNodePacket + { + BrokerageData = new Dictionary() { + { "iex-cloud-api-key", "InvalidApiKeyThatWontBeUsed" }, + { "iex-price-plan", "Launch" } + } + }; + + using var iexDataProvider = new IEXDataProvider(); + + Assert.Zero(iexDataProvider.maxAllowedSymbolLimit); + + iexDataProvider.SetJob(job); + + Assert.Greater(iexDataProvider.maxAllowedSymbolLimit, 0); + + Config.Set("iex-cloud-api-key", _apiKey); + } + + [Test] + public void JobPacketWontOverrideCredentials() + { + Config.Set("iex-cloud-api-key", "wrong_key"); + Config.Set("iex-cloud-api-key", "Launch"); + + var job = new LiveNodePacket + { + BrokerageData = new Dictionary() { + { "iex-cloud-api-key", "InvalidApiKeyThatWontBeUsed" }, + { "iex-price-plan", "Enterprise" } + } + }; + + using var iexDataProvider = new IEXDataProvider(); + + var maxSymbolLimitBeforeSetJob = iexDataProvider.maxAllowedSymbolLimit; + + // it has initialized already + Assert.Greater(maxSymbolLimitBeforeSetJob, 0); + + iexDataProvider.SetJob(job); + + // we use Enterprise plan in job variable => we must have unlimited maxAllowedSymbolLimit, but our config keep Launch + Assert.That(maxSymbolLimitBeforeSetJob, Is.EqualTo(iexDataProvider.maxAllowedSymbolLimit)); + + Config.Set("iex-cloud-api-key", _apiKey); + Config.Set("iex-cloud-api-key", _pricePlan); + } + private void ProcessFeed(IEnumerator enumerator, Action callback = null, Action throwExceptionCallback = null) { Task.Factory.StartNew(() => @@ -385,7 +444,7 @@ private void ProcessFeed(IEnumerator enumerator, Action call }).ContinueWith(task => { if (throwExceptionCallback != null) - { + { throwExceptionCallback(); } Log.Error("The throwExceptionCallback is null."); diff --git a/QuantConnect.IEX.Tests/QuantConnect.IEX.Tests.csproj b/QuantConnect.IEX.Tests/QuantConnect.DataSource.IEX.Tests.csproj similarity index 77% rename from QuantConnect.IEX.Tests/QuantConnect.IEX.Tests.csproj rename to QuantConnect.IEX.Tests/QuantConnect.DataSource.IEX.Tests.csproj index c19ee1e..8de6415 100644 --- a/QuantConnect.IEX.Tests/QuantConnect.IEX.Tests.csproj +++ b/QuantConnect.IEX.Tests/QuantConnect.DataSource.IEX.Tests.csproj @@ -7,10 +7,10 @@ false UnitTest bin\$(Configuration)\ - QuantConnect.IEX.Tests - QuantConnect.IEX.Tests - QuantConnect.IEX.Tests - QuantConnect.IEX.Tests + QuantConnect.Lean.DataSource.IEX.Tests + QuantConnect.Lean.DataSource.IEX.Tests + QuantConnect.Lean.DataSource.IEX.Tests + QuantConnect.Lean.DataSource.IEX.Tests false @@ -26,7 +26,7 @@ - + diff --git a/QuantConnect.IEX.Tests/TestSetup.cs b/QuantConnect.IEX.Tests/TestSetup.cs index ffed87f..61e423f 100644 --- a/QuantConnect.IEX.Tests/TestSetup.cs +++ b/QuantConnect.IEX.Tests/TestSetup.cs @@ -20,7 +20,7 @@ using QuantConnect.Logging; using QuantConnect.Configuration; -namespace QuantConnect.IEX.Tests +namespace QuantConnect.Lean.DataSource.IEX.Tests { [SetUpFixture] public class TestSetup diff --git a/QuantConnect.IEX/Constants/IEXDataStreamChannels.cs b/QuantConnect.IEX/Constants/IEXDataStreamChannels.cs index c485444..2d34fd3 100644 --- a/QuantConnect.IEX/Constants/IEXDataStreamChannels.cs +++ b/QuantConnect.IEX/Constants/IEXDataStreamChannels.cs @@ -14,7 +14,7 @@ * */ -namespace QuantConnect.IEX.Constants +namespace QuantConnect.Lean.DataSource.IEX.Constants { /// /// Represents constants for accessing IEX Cloud data streams channels, specifically related to last sale and top-of-book information. diff --git a/QuantConnect.IEX/EnumeratorWrapper.cs b/QuantConnect.IEX/EnumeratorWrapper.cs index 7a6fb92..9cebb8b 100644 --- a/QuantConnect.IEX/EnumeratorWrapper.cs +++ b/QuantConnect.IEX/EnumeratorWrapper.cs @@ -16,7 +16,7 @@ using QuantConnect.Data; using System.Collections; -namespace QuantConnect.IEX +namespace QuantConnect.Lean.DataSource.IEX { /// /// The new enumerator wrapper for this subscription request diff --git a/QuantConnect.IEX/Enums/IEXPricePlan.cs b/QuantConnect.IEX/Enums/IEXPricePlan.cs index ca9d79e..1f05421 100644 --- a/QuantConnect.IEX/Enums/IEXPricePlan.cs +++ b/QuantConnect.IEX/Enums/IEXPricePlan.cs @@ -13,7 +13,7 @@ * limitations under the License. */ -namespace QuantConnect.IEX.Enums +namespace QuantConnect.Lean.DataSource.IEX.Enums { /// /// Enum representing different IEX cloud Price Plans. diff --git a/QuantConnect.IEX/IEXDataDownloader.cs b/QuantConnect.IEX/IEXDataDownloader.cs index 4b5c975..fa789d6 100644 --- a/QuantConnect.IEX/IEXDataDownloader.cs +++ b/QuantConnect.IEX/IEXDataDownloader.cs @@ -18,17 +18,17 @@ using QuantConnect.Securities; using QuantConnect.Data.Market; -namespace QuantConnect.IEX +namespace QuantConnect.Lean.DataSource.IEX { public class IEXDataDownloader : IDataDownloader, IDisposable { - private readonly IEXDataQueueHandler _handler; + private readonly IEXDataProvider _handler; private readonly MarketHoursDatabase _marketHoursDatabase; public IEXDataDownloader() { - _handler = new IEXDataQueueHandler(); + _handler = new IEXDataProvider(); _marketHoursDatabase = MarketHoursDatabase.FromDataFolder(); } diff --git a/QuantConnect.IEX/IEXDataQueueHandler.cs b/QuantConnect.IEX/IEXDataProvider.cs similarity index 87% rename from QuantConnect.IEX/IEXDataQueueHandler.cs rename to QuantConnect.IEX/IEXDataProvider.cs index 52976ac..f5a8a99 100644 --- a/QuantConnect.IEX/IEXDataQueueHandler.cs +++ b/QuantConnect.IEX/IEXDataProvider.cs @@ -25,27 +25,27 @@ using QuantConnect.Logging; using Newtonsoft.Json.Linq; using System.Globalization; -using QuantConnect.IEX.Enums; using QuantConnect.Interfaces; using QuantConnect.Securities; using QuantConnect.Data.Market; -using QuantConnect.IEX.Response; using QuantConnect.Configuration; -using QuantConnect.IEX.Constants; using System.Security.Cryptography; using System.Net.NetworkInformation; using System.Collections.Concurrent; using System.Runtime.CompilerServices; using QuantConnect.Lean.Engine.DataFeeds; +using QuantConnect.Lean.DataSource.IEX.Enums; using QuantConnect.Lean.Engine.HistoricalData; +using QuantConnect.Lean.DataSource.IEX.Response; +using QuantConnect.Lean.DataSource.IEX.Constants; -namespace QuantConnect.IEX +namespace QuantConnect.Lean.DataSource.IEX { /// /// IEX live data handler. /// See more at https://iexcloud.io/docs/api/ /// - public class IEXDataQueueHandler : SynchronizingHistoryProvider, IDataQueueHandler + public class IEXDataProvider : SynchronizingHistoryProvider, IDataQueueHandler { /// /// The base URL for the deprecated IEX Cloud API, used to retrieve adjusted and unadjusted historical data. @@ -67,7 +67,7 @@ public class IEXDataQueueHandler : SynchronizingHistoryProvider, IDataQueueHandl /// /// Represents an API key that is read-only once assigned. /// - private readonly string _apiKey = Config.Get("iex-cloud-api-key"); + private string? _apiKey; /// /// Object used as a synchronization lock for thread-safe operations. @@ -103,12 +103,12 @@ public class IEXDataQueueHandler : SynchronizingHistoryProvider, IDataQueueHandl /// /// Handle subscription/unSubscription processes /// - private readonly EventBasedDataQueueHandlerSubscriptionManager _subscriptionManager; + private EventBasedDataQueueHandlerSubscriptionManager? _subscriptionManager; /// /// Represents a RateGate instance used to control the rate of certain operations. /// - private readonly RateGate _rateGate; + private RateGate? _rateGate; private readonly CancellationTokenSource _cancellationTokenSource = new(); @@ -136,12 +136,12 @@ public class IEXDataQueueHandler : SynchronizingHistoryProvider, IDataQueueHandl /// Returns whether the data provider is connected /// True if the data provider is connected /// - public bool IsConnected => _clients.All(client => client.IsConnected); + public bool IsConnected => _clients.Count != 0 && _clients.All(client => client.IsConnected); /// /// Represents the maximum allowed symbol limit for a subscription. /// - public readonly int maxAllowedSymbolLimit; + public int maxAllowedSymbolLimit; /// /// Gets a value indicating whether an error has occurred on the client side. @@ -152,48 +152,28 @@ public class IEXDataQueueHandler : SynchronizingHistoryProvider, IDataQueueHandl private (bool, string) IsErrorClientHappen { get; set; } /// - /// Initializes a new instance of the class. + /// Flag indicates that instance was initialized through /// - public IEXDataQueueHandler() + private bool _initialized; + + /// + /// Initializes a new instance of the class. + /// + public IEXDataProvider() { - if (string.IsNullOrWhiteSpace(_apiKey)) + if (!Config.TryGetValue("iex-cloud-api-key", out var configApiKey) || string.IsNullOrEmpty(configApiKey)) { - throw new ArgumentException("Invalid or missing IEX API key. Please ensure that the API key is set and not empty."); + // If the API key is not provided, we can't do anything. + // The handler might going to be initialized using a node packet job. + return; } - if (Config.TryGetValue("iex-price-plan", "Grow", out var plan) && string.IsNullOrEmpty(plan)) + if (!Config.TryGetValue("iex-price-plan", out var plan) || string.IsNullOrEmpty(plan)) { plan = "Grow"; } - var (requestPerSecond, maximumClients) = RateLimits[Enum.Parse(plan, true)]; - _rateGate = new RateGate(requestPerSecond, Time.OneSecond); - - _subscriptionManager = new EventBasedDataQueueHandlerSubscriptionManager(); - - _subscriptionManager.SubscribeImpl += Subscribe; - _subscriptionManager.UnsubscribeImpl += Unsubscribe; - - ValidateSubscription(); - - // Set the sse-clients collection - foreach (var channelName in IEXDataStreamChannels.Subscriptions) - { - _clients.Add(new IEXEventSourceCollection( - (o, message) => ProcessJsonObject(message.Item1.Message.Data, message.Item2), - _apiKey, - channelName, - _rateGate, - (_, errorMessage) => IsErrorClientHappen = (true, errorMessage))); - } - - // Calculate the maximum allowed symbol limit based on the IEX price plan's client capacity. - // We subscribe to both quote and trade channels, so we divide by the number of subscriptions, - // then multiply by the maximum available symbols per connection to get the limit. - maxAllowedSymbolLimit = maximumClients / IEXDataStreamChannels.Subscriptions.Length * IEXDataStreamChannels.MaximumSymbolsPerConnectionLimit; - - // Initiates the stream listener task. - StreamAction(); + Initialize(configApiKey, plan); } /// @@ -229,7 +209,7 @@ private void StreamAction() client.UpdateSubscription(subscribeSymbols); } } - Log.Debug($"{nameof(IEXDataQueueHandler)}.{nameof(StreamAction)}: End"); + Log.Debug($"{nameof(IEXDataProvider)}.{nameof(StreamAction)}: End"); }, _cancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); } @@ -245,7 +225,7 @@ private bool Subscribe(IEnumerable symbols, TickType _) if (_symbols.Count > maxAllowedSymbolLimit) { - throw new ArgumentException($"{nameof(IEXDataQueueHandler)}.{nameof(Subscribe)}: Symbol quantity exceeds allowed limit. Adjust amount or upgrade your pricing plan."); + throw new ArgumentException($"{nameof(IEXDataProvider)}.{nameof(Subscribe)}: Symbol quantity exceeds allowed limit. Adjust amount or upgrade your pricing plan."); } Refresh(); @@ -271,7 +251,7 @@ private void Refresh() [MethodImpl(MethodImplOptions.AggressiveInlining)] private void ProcessJsonObject(string json, string channelName) { - // Log.Debug($"{nameof(IEXDataQueueHandler)}.{nameof(ProcessJsonObject)}: Channel:{channelName}, Data:{json}"); + // Log.Debug($"{nameof(IEXDataProvider)}.{nameof(ProcessJsonObject)}: Channel:{channelName}, Data:{json}"); if (json == "[]") { @@ -366,7 +346,7 @@ public IEnumerator Subscribe(SubscriptionDataConfig dataConfig, EventH { if (!_extendedMarketHoursWarningFired) { - Log.Error($"{nameof(IEXDataQueueHandler)}.{nameof(Subscribe)}: Algorithm Subscription Error - Extended market hours not supported." + + Log.Error($"{nameof(IEXDataProvider)}.{nameof(Subscribe)}: Algorithm Subscription Error - Extended market hours not supported." + $"Subscribe during regular hours for optimal performance."); _extendedMarketHoursWarningFired = true; } @@ -377,7 +357,7 @@ public IEnumerator Subscribe(SubscriptionDataConfig dataConfig, EventH } var enumerator = _aggregator.Add(dataConfig, newDataAvailableHandler); - _subscriptionManager.Subscribe(dataConfig); + _subscriptionManager?.Subscribe(dataConfig); return new EnumeratorWrapper(enumerator, () => IsErrorClientHappen); } @@ -388,6 +368,67 @@ public IEnumerator Subscribe(SubscriptionDataConfig dataConfig, EventH /// Job we're subscribing for public void SetJob(LiveNodePacket job) { + if (_initialized) + { + return; + } + + if (!job.BrokerageData.TryGetValue("iex-cloud-api-key", out var apiKey) || string.IsNullOrEmpty(apiKey)) + { + throw new ArgumentException("Invalid or missing IEX API key. Please ensure that the API key is set and not empty."); + } + + if (!job.BrokerageData.TryGetValue("iex-price-plan", out var plan) || string.IsNullOrEmpty(plan)) + { + plan = "Grow"; + } + + Initialize(apiKey, plan); + } + + /// + /// Initializes the data queue handler and validates the product subscription + /// + /// iex api key + /// iex price plan + private void Initialize(string apiKey, string pricePlan) + { + ValidateSubscription(); + + _apiKey = apiKey; + + if (!Enum.TryParse(pricePlan, out var parsedPricePlan) || !Enum.IsDefined(typeof(IEXPricePlan), parsedPricePlan)) + { + throw new ArgumentException($"An error occurred while parsing the price plan '{pricePlan}'. Please ensure that the provided price plan is valid and supported by the system."); + } + + var (requestPerSecond, maximumClients) = RateLimits[parsedPricePlan]; + _rateGate = new RateGate(requestPerSecond, Time.OneSecond); + + _subscriptionManager = new EventBasedDataQueueHandlerSubscriptionManager(); + + _subscriptionManager.SubscribeImpl += Subscribe; + _subscriptionManager.UnsubscribeImpl += Unsubscribe; + + // Set the sse-clients collection + foreach (var channelName in IEXDataStreamChannels.Subscriptions) + { + _clients.Add(new IEXEventSourceCollection( + (o, message) => ProcessJsonObject(message.Item1.Message.Data, message.Item2), + _apiKey, + channelName, + _rateGate, + (_, errorMessage) => IsErrorClientHappen = (true, errorMessage))); + } + + // Calculate the maximum allowed symbol limit based on the IEX price plan's client capacity. + // We subscribe to both quote and trade channels, so we divide by the number of subscriptions, + // then multiply by the maximum available symbols per connection to get the limit. + maxAllowedSymbolLimit = maximumClients / IEXDataStreamChannels.Subscriptions.Length * IEXDataStreamChannels.MaximumSymbolsPerConnectionLimit; + + // Initiates the stream listener task. + StreamAction(); + _initialized = true; } /// @@ -409,7 +450,7 @@ private static bool CanSubscribe(Symbol symbol) /// Subscription config to be removed public void Unsubscribe(SubscriptionDataConfig dataConfig) { - _subscriptionManager.Unsubscribe(dataConfig); + _subscriptionManager?.Unsubscribe(dataConfig); _aggregator.Remove(dataConfig); } @@ -426,7 +467,7 @@ public void Dispose() client.Dispose(); } - Log.Trace("IEXDataQueueHandler.Dispose(): Disconnected from IEX data provider"); + Log.Trace("IEXDataProvider.Dispose(): Disconnected from IEX data provider"); } #region IHistoryProvider implementation @@ -461,7 +502,7 @@ public override IEnumerable GetHistory(IEnumerable r { if (!_invalidHistoryDataTypeWarningFired) { - Log.Error($"{nameof(IEXDataQueueHandler)}.{nameof(GetHistory)}: Not supported data type - {request.DataType.Name}. " + + Log.Error($"{nameof(IEXDataProvider)}.{nameof(GetHistory)}: Not supported data type - {request.DataType.Name}. " + "Currently available support only for historical of type - TradeBar"); _invalidHistoryDataTypeWarningFired = true; } @@ -470,13 +511,13 @@ public override IEnumerable GetHistory(IEnumerable r if (request.Symbol.SecurityType != SecurityType.Equity) { - Log.Trace($"{nameof(IEXDataQueueHandler)}.{nameof(GetHistory)}: Unsupported SecurityType '{request.Symbol.SecurityType}' for symbol '{request.Symbol}'"); + Log.Trace($"{nameof(IEXDataProvider)}.{nameof(GetHistory)}: Unsupported SecurityType '{request.Symbol.SecurityType}' for symbol '{request.Symbol}'"); return Enumerable.Empty(); } if (request.StartTimeUtc >= request.EndTimeUtc) { - Log.Error($"{nameof(IEXDataQueueHandler)}.{nameof(GetHistory)}: Error - The start date in the history request must come before the end date. No historical data will be returned."); + Log.Error($"{nameof(IEXDataProvider)}.{nameof(GetHistory)}: Error - The start date in the history request must come before the end date. No historical data will be returned."); return Enumerable.Empty(); } @@ -499,11 +540,11 @@ private IEnumerable ProcessHistoryRequests(Data.HistoryRequest request if (request.Resolution != Resolution.Daily && request.Resolution != Resolution.Minute) { - Log.Error("IEXDataQueueHandler.GetHistory(): History calls for IEX only support daily & minute resolution."); + Log.Error("IEXDataProvider.GetHistory(): History calls for IEX only support daily & minute resolution."); yield break; } - Log.Trace($"{nameof(IEXDataQueueHandler)}.{nameof(ProcessHistoryRequests)}: {request.Symbol.SecurityType}.{ticker}, Resolution: {request.Resolution}, DateTime: [{start} - {end}]."); + Log.Trace($"{nameof(IEXDataProvider)}.{nameof(ProcessHistoryRequests)}: {request.Symbol.SecurityType}.{ticker}, Resolution: {request.Resolution}, DateTime: [{start} - {end}]."); var span = end - start; var urls = new List(); @@ -639,13 +680,13 @@ private string ExecuteGetRequest(string url) { lock (_lock) { - _rateGate.WaitToProceed(); + _rateGate!.WaitToProceed(); var response = _restClient.Execute(new RestRequest(url, Method.GET)); if (response.StatusCode != HttpStatusCode.OK) { - throw new Exception($"{nameof(IEXDataQueueHandler)}.{nameof(ProcessHistoryRequests)} failed: [{(int)response.StatusCode}] {response.StatusDescription}, Content: {response.Content}, ErrorMessage: {response.ErrorMessage}"); + throw new Exception($"{nameof(IEXDataProvider)}.{nameof(ProcessHistoryRequests)} failed: [{(int)response.StatusCode}] {response.StatusDescription}, Content: {response.Content}, ErrorMessage: {response.ErrorMessage}"); } return response.Content; @@ -700,9 +741,9 @@ private static void ValidateSubscription() try { const int productId = 333; - var userId = Config.GetInt("job-user-id"); - var token = Config.Get("api-access-token"); - var organizationId = Config.Get("job-organization-id", null); + var userId = Globals.UserId; + var token = Globals.UserToken; + var organizationId = Globals.OrganizationID; // Verify we can authenticate with this user and token var api = new ApiConnection(userId, token); if (!api.Connected) @@ -817,7 +858,7 @@ private static void ValidateSubscription() } catch (Exception e) { - Log.Error($"{nameof(IEXDataQueueHandler)}.{nameof(ValidateSubscription)}: Failed during validation, shutting down. Error : {e.Message}"); + Log.Error($"{nameof(IEXDataProvider)}.{nameof(ValidateSubscription)}: Failed during validation, shutting down. Error : {e.Message}"); throw; } } diff --git a/QuantConnect.IEX/IEXEventSourceCollection.cs b/QuantConnect.IEX/IEXEventSourceCollection.cs index 0de9d47..d4e6a0b 100644 --- a/QuantConnect.IEX/IEXEventSourceCollection.cs +++ b/QuantConnect.IEX/IEXEventSourceCollection.cs @@ -16,10 +16,10 @@ using QuantConnect.Util; using QuantConnect.Logging; using LaunchDarkly.EventSource; -using QuantConnect.IEX.Constants; using System.Collections.Concurrent; +using QuantConnect.Lean.DataSource.IEX.Constants; -namespace QuantConnect.IEX +namespace QuantConnect.Lean.DataSource.IEX { /// /// Class wraps a collection of clients for getting data on SSE. diff --git a/QuantConnect.IEX/QuantConnect.IEX.csproj b/QuantConnect.IEX/QuantConnect.DataSource.IEX.csproj similarity index 84% rename from QuantConnect.IEX/QuantConnect.IEX.csproj rename to QuantConnect.IEX/QuantConnect.DataSource.IEX.csproj index 63f96b2..b6ac760 100644 --- a/QuantConnect.IEX/QuantConnect.IEX.csproj +++ b/QuantConnect.IEX/QuantConnect.DataSource.IEX.csproj @@ -4,10 +4,10 @@ Release AnyCPU net6.0 - QuantConnect.IEX - QuantConnect.IEX - QuantConnect.IEX - QuantConnect.IEX + QuantConnect.Lean.DataSource.IEX + QuantConnect.Lean.DataSource.IEX + QuantConnect.Lean.DataSource.IEX + QuantConnect.Lean.DataSource.IEX Library bin\$(Configuration) false diff --git a/QuantConnect.IEX/Response/DataStreamLastSale.cs b/QuantConnect.IEX/Response/DataStreamLastSale.cs index 7ac822c..6cc2cde 100644 --- a/QuantConnect.IEX/Response/DataStreamLastSale.cs +++ b/QuantConnect.IEX/Response/DataStreamLastSale.cs @@ -16,7 +16,7 @@ using Newtonsoft.Json; -namespace QuantConnect.IEX.Response +namespace QuantConnect.Lean.DataSource.IEX.Response { /// /// Last provides trade data for executions on IEX. It is a near real time, intraday API that provides IEX last sale price, size and time. Last is ideal for developers that need a lightweight stock quote. diff --git a/QuantConnect.IEX/Response/DataStreamTopOfBook.cs b/QuantConnect.IEX/Response/DataStreamTopOfBook.cs index fc8ee4f..d99271f 100644 --- a/QuantConnect.IEX/Response/DataStreamTopOfBook.cs +++ b/QuantConnect.IEX/Response/DataStreamTopOfBook.cs @@ -16,7 +16,7 @@ using Newtonsoft.Json; -namespace QuantConnect.IEX.Response +namespace QuantConnect.Lean.DataSource.IEX.Response { /// /// TOPS provides aggregated best-quoted bid and offer position in near real time for all securities on displayed limit order book from IEX. TOPS is ideal for developers needing both quote and trade data.