diff --git a/docs/source/quickstart.rst b/docs/source/quickstart.rst index 12a6bf3..cb86312 100644 --- a/docs/source/quickstart.rst +++ b/docs/source/quickstart.rst @@ -141,3 +141,12 @@ Assuming config as above >>> sub.addlistener(on_update) >>> ig_stream_service.ls_client.subscribe(sub) >>> ig_stream_service.disconnect() + + +Using the Streamer classes to connect with the Streaming API +------------------------------------------------------------ + +See the streamer samples: + +* `sample/sample_ticker.py `_ +* `sample/sample_ticker_rich.py `_ diff --git a/sample/sample_ticker.py b/sample/sample_ticker.py new file mode 100644 index 0000000..99bcd6b --- /dev/null +++ b/sample/sample_ticker.py @@ -0,0 +1,38 @@ +import logging +import time +from trading_ig import IGService, IGStreamService +from trading_ig.config import config +from trading_ig.streamer.manager import StreamingManager +from sample.sample_utils import crypto_epics # fx_epics, index_epics, weekend_epics + + +def main(): + logging.basicConfig(level=logging.INFO) + + ig_service = IGService( + config.username, + config.password, + config.api_key, + config.acc_type, + acc_number=config.acc_number, + ) + + ig = IGStreamService(ig_service) + ig.create_session(version="3") + sm = StreamingManager(ig) + + tickers = [] + for epic in crypto_epics: # fx_epics, index_epics, crypto_epics + sm.start_tick_subscription(epic) + tickers.append(sm.ticker(epic)) + + for idx in range(0, 10): + for ticker in tickers: + print(ticker) + time.sleep(0.5) + + sm.stop_subscriptions() + + +if __name__ == "__main__": + main() diff --git a/sample/sample_ticker_rich.py b/sample/sample_ticker_rich.py new file mode 100644 index 0000000..90e53a1 --- /dev/null +++ b/sample/sample_ticker_rich.py @@ -0,0 +1,78 @@ +import logging +import time +from trading_ig import IGService, IGStreamService +from trading_ig.config import config +from trading_ig.streamer.manager import StreamingManager +from sample.sample_utils import crypto_epics # fx_epics, index_epics, weekend_epics + +try: + from rich.table import Table + from rich.live import Live +except ImportError: + print("Rich must be installed for this sample") + + +def main(): + logging.basicConfig(level=logging.WARNING) + + ig_service = IGService( + config.username, + config.password, + config.api_key, + config.acc_type, + acc_number=config.acc_number, + ) + + ig = IGStreamService(ig_service) + ig.create_session(version="3") + sm = StreamingManager(ig) + + tickers = [] + for epic in crypto_epics: # fx_epics, index_epics, crypto_epics + sm.start_tick_subscription(epic) + tickers.append(sm.ticker(epic)) + + def generate_table(tickers) -> Table: + table = Table(title="EPIC Prices") + table.add_column("EPIC") + table.add_column("Bid", justify="right") + table.add_column("Offer", justify="right") + table.add_column("Last traded price", justify="right") + table.add_column("Last traded volume", justify="right") + table.add_column("Incremental volume", justify="right") + table.add_column("Mid open", justify="right") + table.add_column("Change since open", justify="right") + table.add_column("Daily % change", justify="right") + table.add_column("Day high", justify="right") + table.add_column("Day low", justify="right") + table.add_column("Timestamp") + + for ticker in tickers: + table.add_row( + f"{ticker.epic}", + f"{ticker.bid:.2f}", + f"{ticker.offer:.2f}", + f"{ticker.last_traded_price:.2f}", + f"{ticker.last_traded_volume}", + f"{ticker.incr_volume}", + f"{ticker.day_open_mid:.2f}", + f"{ticker.day_net_change_mid:.2f}", + f"{ticker.day_percent_change_mid:.2f}", + f"{ticker.day_high:.2f}", + f"{ticker.day_low:.2f}", + f"{ticker.timestamp.strftime('%Y-%m-%d %H:%M:%S')}", + ) + + return table + + with Live(generate_table(tickers), refresh_per_second=4) as live: + for _ in range(100): + time.sleep(0.4) + live.update(generate_table(tickers)) + + for epic in crypto_epics: + sm.stop_tick_subscription(epic) + + +if __name__ == "__main__": + main() diff --git a/trading_ig/stream.py b/trading_ig/stream.py index 9b2e648..b730e70 100644 --- a/trading_ig/stream.py +++ b/trading_ig/stream.py @@ -47,6 +47,9 @@ def create_session(self, encryption=False, version="2"): def subscribe(self, subscription: Subscription): self.ls_client.subscribe(subscription) + def unsubscribe(self, subscription: Subscription): + self.ls_client.unsubscribe(subscription) + def unsubscribe_all(self): # To avoid a RuntimeError: dictionary changed size during iteration subscriptions = self.ls_client.getSubscriptions().copy() diff --git a/trading_ig/streamer/__init__.py b/trading_ig/streamer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/trading_ig/streamer/manager.py b/trading_ig/streamer/manager.py new file mode 100644 index 0000000..8e31554 --- /dev/null +++ b/trading_ig/streamer/manager.py @@ -0,0 +1,120 @@ +import logging +from queue import Queue +from threading import Thread +import time + +from lightstreamer.client import SubscriptionListener, ItemUpdate + +from trading_ig import IGStreamService +from .ticker import Ticker +from .ticker import TickerSubscription + +logger = logging.getLogger(__name__) + + +class StreamingManager: + def __init__(self, service: IGStreamService): + self._service = service + self._subs = {} + + # setup data objects + self._tickers = {} + + # set up consumer queue + self._queue = Queue() + self._consumer_thread = Consumer(self._queue, self) + self._consumer_thread.start() + + @property + def service(self): + return self._service + + @property + def tickers(self): + return self._tickers + + def start_tick_subscription(self, epic) -> TickerSubscription: + tick_sub = TickerSubscription(epic) + tick_sub.addListener(TickerListener(self._queue)) + self.service.subscribe(tick_sub) + self._subs[epic] = tick_sub + return tick_sub + + def stop_tick_subscription(self, epic): + subscription = self._subs.pop(epic) + self.service.unsubscribe(subscription) + + def ticker(self, epic): + # we won't have a ticker until at least one update is received from server, + # let's give it a few seconds + timeout = time.time() + 3 + while True: + logger.debug("Waiting for ticker...") + if epic in self._tickers or time.time() > timeout: + break + time.sleep(0.25) + ticker = self._tickers[epic] + if not ticker: + raise Exception(f"No ticker found for {epic}, giving up") + return ticker + + def on_update(self, update): + self._queue.put(update) + + def stop_subscriptions(self): + logger.info("Unsubscribing from all") + self.service.unsubscribe_all() + self.service.disconnect() + if self._consumer_thread: + self._consumer_thread.join(timeout=5) + self._consumer_thread = None + + +class TickerListener(SubscriptionListener): + def __init__(self, queue: Queue) -> None: + self._queue = queue + + def onItemUpdate(self, update: ItemUpdate): + self._queue.put(update) + + def onSubscription(self): + logger.info("TickerListener onSubscription()") + + def onSubscriptionError(self, code, message): + logger.info(f"TickerListener onSubscriptionError(): '{code}' {message}") + + def onUnsubscription(self): + logger.info("TickerListener onUnsubscription()") + + +class Consumer(Thread): + def __init__(self, queue: Queue, manager: StreamingManager): + super().__init__(name="ConsumerThread", daemon=True) + self._queue = queue + self._manager = manager + + @property + def manager(self): + return self._manager + + def run(self): + logger.info("Consumer: Running") + while True: + item = self._queue.get() + + # deal with each different type of update + name = item.getItemName() + if name.startswith("CHART:"): + self._handle_ticker_update(item) + + logger.debug(f"Consumer thread alive. queue length: {self._queue.qsize()}") + + def _handle_ticker_update(self, item: ItemUpdate): + epic = Ticker.identifier(item.getItemName()) + + if epic not in self.manager.tickers: + ticker = Ticker(epic) + self.manager.tickers[epic] = ticker + + ticker = self.manager.tickers[epic] + ticker.populate(item.getChangedFields()) diff --git a/trading_ig/streamer/objects.py b/trading_ig/streamer/objects.py new file mode 100644 index 0000000..a8a3101 --- /dev/null +++ b/trading_ig/streamer/objects.py @@ -0,0 +1,24 @@ +from datetime import datetime + + +nan = float("nan") + + +class StreamObject: + def set_by_name(self, attr_name, values, key, type): + try: + if key in values: + setattr(self, attr_name, type(values[key])) + except TypeError: + # ignore, there will be plenty of dud values + pass + + def set_timestamp_by_name(self, attr_name, values, key): + try: + if key in values: + setattr( + self, attr_name, datetime.fromtimestamp(int(values[key]) / 1000) + ) + except TypeError: + # ignore, there will be plenty of dud values + pass diff --git a/trading_ig/streamer/ticker.py b/trading_ig/streamer/ticker.py new file mode 100644 index 0000000..44b0ad3 --- /dev/null +++ b/trading_ig/streamer/ticker.py @@ -0,0 +1,79 @@ +from dataclasses import dataclass +from datetime import datetime +from lightstreamer.client import Subscription +from .objects import nan, StreamObject + + +class TickerSubscription(Subscription): + """Represents a subscription for tick prices""" + + TICKER_FIELDS = [ + "BID", + "OFR", + "LTP", + "LTV", + "TTV", + "UTM", + "DAY_OPEN_MID", + "DAY_NET_CHG_MID", + "DAY_PERC_CHG_MID", + "DAY_HIGH", + "DAY_LOW", + ] + + def __init__(self, epic: str): + super().__init__( + mode="DISTINCT", + items=[f"CHART:{epic}:TICK"], + fields=self.TICKER_FIELDS, + ) + + def __repr__(self) -> str: + return f"TickSubscription with {len(self.item_names)} epics" + + +@dataclass +class Ticker(StreamObject): + epic: str + timestamp: datetime = None + bid: float = nan + offer: float = nan + last_traded_price: float = nan + last_traded_volume: int = 0 + incr_volume: int = 0 + day_open_mid: float = nan + day_net_change_mid: float = nan + day_percent_change_mid: float = nan + day_high: float = nan + day_low: float = nan + + def __init__(self, epic): + self.epic = epic + + def __repr__(self) -> str: + return ( + f"{self.epic}: {self.timestamp.strftime('%Y-%m-%d %H:%M:%S')} " + f"{self.bid} {self.offer} {self.last_traded_price} " + f"{self.last_traded_volume} {self.incr_volume} {self.day_open_mid} " + f"{self.day_net_change_mid} {self.day_percent_change_mid}% {self.day_high} " + f"{self.day_low}" + ) + + def populate(self, values): + # print(f"ticker populating: {values}") + self.set_timestamp_by_name("timestamp", values, "UTM") + self.set_by_name("bid", values, "BID", float) + self.set_by_name("offer", values, "OFR", float) + self.set_by_name("last_traded_price", values, "LTP", float) + self.set_by_name("last_traded_volume", values, "LTV", int) + self.set_by_name("incr_volume", values, "TTV", int) + self.set_by_name("day_open_mid", values, "DAY_OPEN_MID", float) + self.set_by_name("day_net_change_mid", values, "DAY_NET_CHG_MID", float) + self.set_by_name("day_percent_change_mid", values, "DAY_PERC_CHG_MID", float) + self.set_by_name("day_high", values, "DAY_HIGH", float) + self.set_by_name("day_low", values, "DAY_LOW", float) + + @classmethod + def identifier(cls, name): + epic = name.split(":")[1] + return epic