Skip to content

Commit

Permalink
Create wsMongoTicker.py
Browse files Browse the repository at this point in the history
built from the conversation at #115
  • Loading branch information
s4w3d0ff authored Jun 28, 2017
1 parent d86e5c7 commit 2e4e517
Showing 1 changed file with 104 additions and 0 deletions.
104 changes: 104 additions & 0 deletions examples/ticker/wsMongoTicker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
import websocket # pip install websocket-client
from pymongo import MongoClient # pip install pymongo

from poloniex import Poloniex

from multiprocessing.dummy import Process as Thread
import json
import logging

logger = logging.getLogger(__name__)


class wsTicker(object):

def __init__(self, api=None):
self.api = api
if not self.api:
self.api = Poloniex(jsonNums=float)
self.db = MongoClient().poloniex['ticker']
self.db.drop()
self.ws = websocket.WebSocketApp("wss://api2.poloniex.com/",
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
self.ws.on_open = self.on_open

def __call__(self, market=None):
""" returns ticker from mongodb """
if market:
return self.db.find_one({'_id': market})
return list(self.db.find())

def on_message(self, ws, message):
message = json.loads(message)
if 'error' in message:
print(message['error'])
return

if message[0] == 1002:
if message[1] == 1:
print('Subscribed to ticker')
return

if message[1] == 0:
print('Unsubscribed to ticker')
return

data = message[2]

self.db.update_one(
{"id": float(data[0])},
{"$set": {'last': data[1],
'lowestAsk': data[2],
'highestBid': data[3],
'percentChange': data[4],
'baseVolume': data[5],
'quoteVolume': data[6],
'isFrozen': data[7],
'high24hr': data[8],
'low24hr': data[9]
}},
upsert=True)

def on_error(self, ws, error):
print(error)

def on_close(self, ws):
print("Websocket closed!")

def on_open(self, ws):
tick = self.api.returnTicker()
for market in tick:
self.db.update_one(
{'_id': market},
{'$set': tick[market]},
upsert=True)
print('Populated markets database with ticker data')
ws.send(json.dumps({'command': 'subscribe',
'channel': 1002}))

def start(self):
self.t = Thread(target=self.ws.run_forever)
self.t.daemon = True
self.t.start()
print('Thread started')

def stop(self):
self.ws.close()
self.t.join()
print('Thread joined')


if __name__ == "__main__":
import pprint
from time import sleep
# websocket.enableTrace(True)
ticker = wsTicker()
ticker.start()
for i in range(5):
sleep(10)
pprint.pprint(ticker('USDT_BTC'))
ticker.stop()

5 comments on commit 2e4e517

@hockeyadc
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@s4w3d0ff Awesome! So if you want to just leave this running constantly, storing tick data over and over into the DB, how would you go about that?

@s4w3d0ff
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can import or copy/paste, then in your script:

ticker = wsTicker() # create the ticker object
ticker.start() # start the ticker object thread (connects the websocket and stores data in mongodb)
#
# do whatever you want here, it will execute because the websocket is in a thread
# call `ticker()` for the entire ticker or `ticker('USDT_BTC')` for just a single market
#
ticker.stop() # closes the websocket and joins/completes the ticker thread

If you want you can pass the wsTicker class an instance of the Poloniex class to use for the initial tick data, in case you need to sync the with a coach that is used elsewhere in your script. 2e4e517?diff=unified#diff-dcd9cfa7721627fd467974df3cb7999bR17
2e4e517?diff=unified#diff-dcd9cfa7721627fd467974df3cb7999bR73 is where it is used, just when opening the websocket to fill the initial data in the database (because some coins have little/no activity at times).

@hockeyadc
Copy link

@hockeyadc hockeyadc commented on 2e4e517 Jun 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if I keep running this over and over again, it just replaces the data in the currency pair field or does it append to it?

Edit: I've been doing some research. It looks like the suggested schema to store this data should follow as so:
https://www.mongodb.com/blog/post/schema-design-for-time-series-data-in-mongodb

Interesting read!

@s4w3d0ff
Copy link
Owner Author

@s4w3d0ff s4w3d0ff commented on 2e4e517 Jun 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the ticker is started it will overwrite the data in the database. Every time you call ticker() it pulls whatever data is saved from the database, which is continuously being updated with data from the websocket as it comes in (as long as the ticker is running and its websocket is connected).

@s4w3d0ff
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify:

When creating an instance of the wsTicker class (ticker = wsTicker()) it creates a connection to the mongod server. It creates/connects to the database called 'poloniex' and the self.db var is a reference/connection to the 'ticker' collection/table within the 'poloniex' database.

        self.db = MongoClient().poloniex['ticker']

It then clears/drops the 'ticker' collection (just in case)

        self.db.drop()

A websocket var is created and the on_message, on_error, on_close and on_open functions are passed to the websocket

        self.ws = websocket.WebSocketApp("wss://api2.poloniex.com/",
                                         on_message=self.on_message,
                                         on_error=self.on_error,
                                         on_close=self.on_close)
        self.ws.on_open = self.on_open

Then when ticker.start() is called the websocket is connected within a thread. Once the websocket has connected it will first fill the 'ticker' collection with fresh ticker data using the returnTicker command (using the market pair name for each document _id).

    def on_open(self, ws):
        tick = self.api.returnTicker()
        for market in tick:
            self.db.update_one(
                {'_id': market},
                {'$set': tick[market]},
                upsert=True)

The websocket then subscribes to the poloniex ticker channel:

        ws.send(json.dumps({'command': 'subscribe',  'channel': 1002}))

When a market 'tick' gets sent from poloniex, the ticker object will overwrite the data in the database.

            self.db.update_one(
                {"id": float(data[0])},
                {"$set": {'last': data[1],
                          'lowestAsk': data[2],
                          'highestBid': data[3],
                          'percentChange': data[4],
                          'baseVolume': data[5],
                          'quoteVolume': data[6],
                          'isFrozen': data[7],
                          'high24hr': data[8],
                          'low24hr': data[9]
                          }},
                upsert=True)

Please sign in to comment.