-
-
Notifications
You must be signed in to change notification settings - Fork 540
/
notifier.py
138 lines (113 loc) · 5.3 KB
/
notifier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import json
import logging
from contextlib import suppress
from typing import TYPE_CHECKING, Any, Callable, Optional, Union
from gevent.lock import Semaphore
from geventwebsocket import WebSocketApplication
from geventwebsocket.exceptions import WebSocketError
from geventwebsocket.websocket import WebSocket
from rotkehlchen.logging import RotkehlchenLogsAdapter
if TYPE_CHECKING:
from rotkehlchen.api.websockets.typedefs import WSMessageType
logger = logging.getLogger(__name__)
log = RotkehlchenLogsAdapter(logger)
def _ws_send_impl(
websocket: WebSocket,
lock: Semaphore,
to_send_msg: str,
success_callback: Optional[Callable] = None,
success_callback_args: Optional[dict[str, Any]] = None,
failure_callback: Optional[Callable] = None,
failure_callback_args: Optional[dict[str, Any]] = None,
) -> None:
try:
with lock:
websocket.send(to_send_msg)
except WebSocketError as e:
log.error(f'Websocket send with message {to_send_msg} failed due to {e!s}')
if failure_callback:
failure_callback_args = {} if failure_callback_args is None else failure_callback_args # noqa: E501
failure_callback(**failure_callback_args)
return
if success_callback: # send success
success_callback_args = {} if success_callback_args is None else success_callback_args # noqa: E501
success_callback(**success_callback_args)
class RotkiNotifier:
def __init__(self) -> None:
self.subscribers: list[WebSocket] = []
self.locks: dict[WebSocket, Semaphore] = {}
def subscribe(self, websocket: WebSocket) -> None:
log.info(f'Websocket with hash id {hash(websocket)} subscribed to rotki notifier')
self.subscribers.append(websocket)
self.locks[websocket] = Semaphore()
def unsubscribe(self, websocket: WebSocket) -> None:
self.locks.pop(websocket, None)
with suppress(ValueError):
self.subscribers.remove(websocket)
log.info(f'Websocket with hash id {hash(websocket)} unsubscribed from rotki notifier') # noqa: E501
def broadcast(
self,
message_type: 'WSMessageType',
to_send_data: Union[dict[str, Any], list[Any]],
success_callback: Optional[Callable] = None,
success_callback_args: Optional[dict[str, Any]] = None,
failure_callback: Optional[Callable] = None,
failure_callback_args: Optional[dict[str, Any]] = None,
) -> None:
"""Broadcasts a websocket message
A callback to run on message success and a callback to run on message
failure can be optionally provided.
"""
message_data = {'type': str(message_type), 'data': to_send_data}
try:
message = json.dumps(message_data)
except TypeError as e:
log.error(f'Failed to broadcast websocket {message_type} message due to {e!s}')
if failure_callback is not None:
failure_callback_args = {} if failure_callback_args is None else failure_callback_args # noqa: E501
failure_callback(**failure_callback_args)
return # get out of the broadcast
to_remove_indices = set()
spawned_one_broadcast = False
for idx, websocket in enumerate(self.subscribers):
if websocket.closed is True:
to_remove_indices.add(idx)
continue
_ws_send_impl(
websocket=websocket,
lock=self.locks[websocket],
to_send_msg=message,
success_callback=success_callback,
success_callback_args=success_callback_args,
failure_callback=failure_callback,
failure_callback_args=failure_callback_args,
)
spawned_one_broadcast = True
if len(to_remove_indices) != 0: # removed closed websockets from the list
self.subscribers = [
i for j, i in enumerate(self.subscribers) if j not in to_remove_indices
]
if spawned_one_broadcast is False and failure_callback is not None:
failure_callback_args = {} if failure_callback_args is None else failure_callback_args # noqa: E501
failure_callback(**failure_callback_args)
class RotkiWSApp(WebSocketApplication):
"""The WebSocket app that's instantiated for every message as it seems from the code
Only way to pass it extra arguments is through "environ" which is why we have
a different class "RotkiNotifier" handling the bulk of the work
"""
def on_open(self, *args: Any, **kwargs: Any) -> None:
rotki_notifier: RotkiNotifier = self.ws.environ['rotki_notifier']
rotki_notifier.subscribe(self.ws)
def on_message(self, message: Optional[str], *args: Any, **kwargs: Any) -> None:
if self.ws.closed:
return
try:
self.ws.send(message, **kwargs)
except WebSocketError as e:
log.warning(
f'Got WebSocketError {e!s} for sending message {message} to a websocket',
)
def on_close(self, *args: Any, **kwargs: Any) -> None:
if self.ws.environ is not None:
rotki_notifier: RotkiNotifier = self.ws.environ['rotki_notifier']
rotki_notifier.unsubscribe(self.ws)