-
Notifications
You must be signed in to change notification settings - Fork 19
/
spot_trading_bot_template_v2.py
250 lines (210 loc) · 8.29 KB
/
spot_trading_bot_template_v2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2023 Benjamin Thomas Schwertfeger
# GitHub: https://github.com/btschwertfeger
# ruff: noqa: RUF027
"""
Module that provides a template to build a Spot trading algorithm using the
python-kraken-sdk and Kraken Spot websocket API v2.
"""
from __future__ import annotations
import asyncio
import logging
import logging.config
import os
import sys
import traceback
from typing import Any, Optional
import requests
import urllib3
from kraken.exceptions import KrakenAuthenticationError # , KrakenPermissionDeniedError
from kraken.spot import Funding, KrakenSpotWSClientV2, Market, Staking, Trade, User
logging.basicConfig(
format="%(asctime)s %(module)s,line: %(lineno)d %(levelname)8s | %(message)s",
datefmt="%Y/%m/%d %H:%M:%S",
level=logging.INFO,
)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
class TradingBot(KrakenSpotWSClientV2):
"""
Class that implements the trading strategy
* The on_message function gets all messages sent by the websocket feeds.
* Decisions can be made based on these messages
* Can place trades using the self.__trade client or self.send_message
* Do everything you want
====== P A R A M E T E R S ======
config: dict
configuration like: {
"key": "kraken-spot-key",
"secret": "kraken-spot-secret",
"pairs": ["DOT/USD", "BTC/USD"],
}
"""
def __init__(self: TradingBot, config: dict, **kwargs: Any) -> None:
super().__init__( # initialize the KrakenSpotWSClientV2
key=config["key"],
secret=config["secret"],
**kwargs,
)
self.__config: dict = config
self.__user: User = User(key=config["key"], secret=config["secret"])
self.__trade: Trade = Trade(key=config["key"], secret=config["secret"])
self.__market: Market = Market(key=config["key"], secret=config["secret"])
self.__funding: Funding = Funding(key=config["key"], secret=config["secret"])
self.__staking: Staking = Staking(key=config["key"], secret=config["secret"])
async def on_message(self: TradingBot, message: dict) -> None:
"""Receives all messages of the websocket connection(s)"""
if message.get("method") == "pong" or message.get("channel") == "heartbeat":
return
if "error" in message:
# handle exceptions/errors sent by websocket connection …
pass
logging.info(message)
# == apply your trading strategy here ==
# Call functions of `self.__trade` and other clients if conditions met …
# try:
# print(self.__trade.create_order(
# ordertype='limit',
# side='buy',
# volume=2,
# pair='XBTUSD',
# price=12000
# ))
# except KrakenPermissionDeniedError:
# # … handle exceptions
# pass
# The spot websocket client also allow sending orders via websockets
# this is way faster than using REST endpoints.
# await self.send_message(
# message={
# "method": "add_order",
# "params": {
# "limit_price": 1234.56,
# "order_type": "limit",
# "order_userref": 123456789,
# "order_qty": 1.0,
# "side": "buy",
# "symbol": "BTC/USD",
# "validate": True,
# },
# }
# )
# You can also un-/subscribe here using `self.subscribe(...)` or
# `self.unsubscribe(...)`.
#
# … more can be found in the documentation
# (https://python-kraken-sdk.readthedocs.io/en/stable/).
# Add more functions to customize the trading strategy …
def save_exit(self: TradingBot, reason: Optional[str] = "") -> None:
"""controlled shutdown of the strategy"""
logging.warning(
"Save exit triggered, reason: {reason}",
extra={"reason": reason},
)
# some ideas:
# * save the current data
# * maybe close trades
# * enable dead man's switch
sys.exit(1)
class Manager:
"""
Class to manage the trading strategy
… subscribes to desired feeds, instantiates the strategy and runs as long
as there is no error.
====== P A R A M E T E R S ======
config: dict
configuration like: {
"key" "kraken-spot-key",
"secret": "kraken-secret-key",
"pairs": ["DOT/USD", "BTC/USD"],
}
"""
def __init__(self: Manager, config: dict):
self.__config: dict = config
self.__trading_strategy: Optional[TradingBot] = None
def run(self: Manager) -> None:
"""Starts the event loop and bot"""
if not self.__check_credentials():
sys.exit(1)
try:
asyncio.run(self.__main())
except KeyboardInterrupt:
self.save_exit(reason="KeyboardInterrupt")
else:
self.save_exit(reason="Asyncio loop left")
async def __main(self: Manager) -> None:
"""
Instantiates the trading strategy and subscribes to the desired
websocket feeds. While no exception within the strategy occur run the
loop.
The variable `exception_occur` which is an attribute of the
KrakenSpotWSClientV2 can be set individually but is also being set to
`True` if the websocket connection has some fatal error. This is used to
exit the asyncio loop - but you can also apply your own reconnect rules.
"""
self.__trading_strategy = TradingBot(config=self.__config)
await self.__trading_strategy.subscribe(
params={"channel": "ticker", "symbol": self.__config["pairs"]},
)
await self.__trading_strategy.subscribe(
params={
"channel": "ohlc",
"interval": 15,
"symbol": self.__config["pairs"],
},
)
await self.__trading_strategy.subscribe(params={"channel": "executions"})
while not self.__trading_strategy.exception_occur:
try:
# check if the algorithm feels good
# maybe send a status update every day via Telegram or Mail
# …
pass
except Exception as exc:
message: str = f"Exception in main: {exc} {traceback.format_exc()}"
logging.error(message)
self.__trading_strategy.save_exit(reason=message)
await asyncio.sleep(6)
self.__trading_strategy.save_exit(
reason="Left main loop because of exception in strategy.",
)
def __check_credentials(self: Manager) -> bool:
"""Checks the user credentials and the connection to Kraken"""
try:
User(self.__config["key"], self.__config["secret"]).get_account_balance()
logging.info("Client credentials are valid.")
return True
except urllib3.exceptions.MaxRetryError:
logging.error("MaxRetryError, cannot connect.")
return False
except requests.exceptions.ConnectionError:
logging.error("ConnectionError, Kraken not available.")
return False
except KrakenAuthenticationError:
logging.error("Invalid credentials!")
return False
def save_exit(self: Manager, reason: str = "") -> None:
"""Invoke the save exit function of the trading strategy"""
print(f"Save exit triggered - {reason}")
if self.__trading_strategy is not None:
self.__trading_strategy.save_exit(reason=reason)
else:
sys.exit(1)
def main() -> None:
"""Example main - load environment variables and run the strategy."""
manager: Manager = Manager(
config={
"key": os.getenv("SPOT_API_KEY"),
"secret": os.getenv("SPOT_SECRET_KEY"),
"pairs": ["DOT/USD", "BTC/USD"],
},
)
try:
manager.run()
except Exception:
manager.save_exit(
reason=f"manageBot.run() has ended: {traceback.format_exc()}",
)
if __name__ == "__main__":
main()