Skip to content

Commit

Permalink
Merge pull request #312 from bug-or-feature/streamer
Browse files Browse the repository at this point in the history
streamer
  • Loading branch information
bug-or-feature authored Nov 30, 2023
2 parents 2a8a0d2 + 96fdcee commit 83fd18c
Show file tree
Hide file tree
Showing 8 changed files with 351 additions and 0 deletions.
9 changes: 9 additions & 0 deletions docs/source/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,12 @@ Assuming config as above
>>> sub.addlistener(on_update)
>>> ig_stream_service.ls_client.subscribe(sub)
>>> ig_stream_service.disconnect()


Using the Streamer classes to connect with the Streaming API
------------------------------------------------------------

See the streamer samples:

* `sample/sample_ticker.py <https://github.com/ig-python/trading-ig/blob/master/sample/sample_ticker.py>`_
* `sample/sample_ticker_rich.py <https://github.com/ig-python/trading-ig/blob/master/sample/sample_ticker_rich.py>`_
38 changes: 38 additions & 0 deletions sample/sample_ticker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import logging
import time
from trading_ig import IGService, IGStreamService
from trading_ig.config import config
from trading_ig.streamer.manager import StreamingManager
from sample.sample_utils import crypto_epics # fx_epics, index_epics, weekend_epics


def main():
logging.basicConfig(level=logging.INFO)

ig_service = IGService(
config.username,
config.password,
config.api_key,
config.acc_type,
acc_number=config.acc_number,
)

ig = IGStreamService(ig_service)
ig.create_session(version="3")
sm = StreamingManager(ig)

tickers = []
for epic in crypto_epics: # fx_epics, index_epics, crypto_epics
sm.start_tick_subscription(epic)
tickers.append(sm.ticker(epic))

for idx in range(0, 10):
for ticker in tickers:
print(ticker)
time.sleep(0.5)

sm.stop_subscriptions()


if __name__ == "__main__":
main()
78 changes: 78 additions & 0 deletions sample/sample_ticker_rich.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import logging
import time
from trading_ig import IGService, IGStreamService
from trading_ig.config import config
from trading_ig.streamer.manager import StreamingManager
from sample.sample_utils import crypto_epics # fx_epics, index_epics, weekend_epics

try:
from rich.table import Table
from rich.live import Live
except ImportError:
print("Rich must be installed for this sample")


def main():
logging.basicConfig(level=logging.WARNING)

ig_service = IGService(
config.username,
config.password,
config.api_key,
config.acc_type,
acc_number=config.acc_number,
)

ig = IGStreamService(ig_service)
ig.create_session(version="3")
sm = StreamingManager(ig)

tickers = []
for epic in crypto_epics: # fx_epics, index_epics, crypto_epics
sm.start_tick_subscription(epic)
tickers.append(sm.ticker(epic))

def generate_table(tickers) -> Table:
table = Table(title="EPIC Prices")
table.add_column("EPIC")
table.add_column("Bid", justify="right")
table.add_column("Offer", justify="right")
table.add_column("Last traded price", justify="right")
table.add_column("Last traded volume", justify="right")
table.add_column("Incremental volume", justify="right")
table.add_column("Mid open", justify="right")
table.add_column("Change since open", justify="right")
table.add_column("Daily % change", justify="right")
table.add_column("Day high", justify="right")
table.add_column("Day low", justify="right")
table.add_column("Timestamp")

for ticker in tickers:
table.add_row(
f"{ticker.epic}",
f"{ticker.bid:.2f}",
f"{ticker.offer:.2f}",
f"{ticker.last_traded_price:.2f}",
f"{ticker.last_traded_volume}",
f"{ticker.incr_volume}",
f"{ticker.day_open_mid:.2f}",
f"{ticker.day_net_change_mid:.2f}",
f"{ticker.day_percent_change_mid:.2f}",
f"{ticker.day_high:.2f}",
f"{ticker.day_low:.2f}",
f"{ticker.timestamp.strftime('%Y-%m-%d %H:%M:%S')}",
)

return table

with Live(generate_table(tickers), refresh_per_second=4) as live:
for _ in range(100):
time.sleep(0.4)
live.update(generate_table(tickers))

for epic in crypto_epics:
sm.stop_tick_subscription(epic)


if __name__ == "__main__":
main()
3 changes: 3 additions & 0 deletions trading_ig/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ def create_session(self, encryption=False, version="2"):
def subscribe(self, subscription: Subscription):
self.ls_client.subscribe(subscription)

def unsubscribe(self, subscription: Subscription):
self.ls_client.unsubscribe(subscription)

def unsubscribe_all(self):
# To avoid a RuntimeError: dictionary changed size during iteration
subscriptions = self.ls_client.getSubscriptions().copy()
Expand Down
Empty file added trading_ig/streamer/__init__.py
Empty file.
120 changes: 120 additions & 0 deletions trading_ig/streamer/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import logging
from queue import Queue
from threading import Thread
import time

from lightstreamer.client import SubscriptionListener, ItemUpdate

from trading_ig import IGStreamService
from .ticker import Ticker
from .ticker import TickerSubscription

logger = logging.getLogger(__name__)


class StreamingManager:
def __init__(self, service: IGStreamService):
self._service = service
self._subs = {}

# setup data objects
self._tickers = {}

# set up consumer queue
self._queue = Queue()
self._consumer_thread = Consumer(self._queue, self)
self._consumer_thread.start()

@property
def service(self):
return self._service

@property
def tickers(self):
return self._tickers

def start_tick_subscription(self, epic) -> TickerSubscription:
tick_sub = TickerSubscription(epic)
tick_sub.addListener(TickerListener(self._queue))
self.service.subscribe(tick_sub)
self._subs[epic] = tick_sub
return tick_sub

def stop_tick_subscription(self, epic):
subscription = self._subs.pop(epic)
self.service.unsubscribe(subscription)

def ticker(self, epic):
# we won't have a ticker until at least one update is received from server,
# let's give it a few seconds
timeout = time.time() + 3
while True:
logger.debug("Waiting for ticker...")
if epic in self._tickers or time.time() > timeout:
break
time.sleep(0.25)
ticker = self._tickers[epic]
if not ticker:
raise Exception(f"No ticker found for {epic}, giving up")
return ticker

def on_update(self, update):
self._queue.put(update)

def stop_subscriptions(self):
logger.info("Unsubscribing from all")
self.service.unsubscribe_all()
self.service.disconnect()
if self._consumer_thread:
self._consumer_thread.join(timeout=5)
self._consumer_thread = None


class TickerListener(SubscriptionListener):
def __init__(self, queue: Queue) -> None:
self._queue = queue

def onItemUpdate(self, update: ItemUpdate):
self._queue.put(update)

def onSubscription(self):
logger.info("TickerListener onSubscription()")

def onSubscriptionError(self, code, message):
logger.info(f"TickerListener onSubscriptionError(): '{code}' {message}")

def onUnsubscription(self):
logger.info("TickerListener onUnsubscription()")


class Consumer(Thread):
def __init__(self, queue: Queue, manager: StreamingManager):
super().__init__(name="ConsumerThread", daemon=True)
self._queue = queue
self._manager = manager

@property
def manager(self):
return self._manager

def run(self):
logger.info("Consumer: Running")
while True:
item = self._queue.get()

# deal with each different type of update
name = item.getItemName()
if name.startswith("CHART:"):
self._handle_ticker_update(item)

logger.debug(f"Consumer thread alive. queue length: {self._queue.qsize()}")

def _handle_ticker_update(self, item: ItemUpdate):
epic = Ticker.identifier(item.getItemName())

if epic not in self.manager.tickers:
ticker = Ticker(epic)
self.manager.tickers[epic] = ticker

ticker = self.manager.tickers[epic]
ticker.populate(item.getChangedFields())
24 changes: 24 additions & 0 deletions trading_ig/streamer/objects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from datetime import datetime


nan = float("nan")


class StreamObject:
def set_by_name(self, attr_name, values, key, type):
try:
if key in values:
setattr(self, attr_name, type(values[key]))
except TypeError:
# ignore, there will be plenty of dud values
pass

def set_timestamp_by_name(self, attr_name, values, key):
try:
if key in values:
setattr(
self, attr_name, datetime.fromtimestamp(int(values[key]) / 1000)
)
except TypeError:
# ignore, there will be plenty of dud values
pass
79 changes: 79 additions & 0 deletions trading_ig/streamer/ticker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from dataclasses import dataclass
from datetime import datetime
from lightstreamer.client import Subscription
from .objects import nan, StreamObject


class TickerSubscription(Subscription):
"""Represents a subscription for tick prices"""

TICKER_FIELDS = [
"BID",
"OFR",
"LTP",
"LTV",
"TTV",
"UTM",
"DAY_OPEN_MID",
"DAY_NET_CHG_MID",
"DAY_PERC_CHG_MID",
"DAY_HIGH",
"DAY_LOW",
]

def __init__(self, epic: str):
super().__init__(
mode="DISTINCT",
items=[f"CHART:{epic}:TICK"],
fields=self.TICKER_FIELDS,
)

def __repr__(self) -> str:
return f"TickSubscription with {len(self.item_names)} epics"


@dataclass
class Ticker(StreamObject):
epic: str
timestamp: datetime = None
bid: float = nan
offer: float = nan
last_traded_price: float = nan
last_traded_volume: int = 0
incr_volume: int = 0
day_open_mid: float = nan
day_net_change_mid: float = nan
day_percent_change_mid: float = nan
day_high: float = nan
day_low: float = nan

def __init__(self, epic):
self.epic = epic

def __repr__(self) -> str:
return (
f"{self.epic}: {self.timestamp.strftime('%Y-%m-%d %H:%M:%S')} "
f"{self.bid} {self.offer} {self.last_traded_price} "
f"{self.last_traded_volume} {self.incr_volume} {self.day_open_mid} "
f"{self.day_net_change_mid} {self.day_percent_change_mid}% {self.day_high} "
f"{self.day_low}"
)

def populate(self, values):
# print(f"ticker populating: {values}")
self.set_timestamp_by_name("timestamp", values, "UTM")
self.set_by_name("bid", values, "BID", float)
self.set_by_name("offer", values, "OFR", float)
self.set_by_name("last_traded_price", values, "LTP", float)
self.set_by_name("last_traded_volume", values, "LTV", int)
self.set_by_name("incr_volume", values, "TTV", int)
self.set_by_name("day_open_mid", values, "DAY_OPEN_MID", float)
self.set_by_name("day_net_change_mid", values, "DAY_NET_CHG_MID", float)
self.set_by_name("day_percent_change_mid", values, "DAY_PERC_CHG_MID", float)
self.set_by_name("day_high", values, "DAY_HIGH", float)
self.set_by_name("day_low", values, "DAY_LOW", float)

@classmethod
def identifier(cls, name):
epic = name.split(":")[1]
return epic

0 comments on commit 83fd18c

Please sign in to comment.