-
Notifications
You must be signed in to change notification settings - Fork 9
/
endorsement_processor.py
289 lines (242 loc) · 11.5 KB
/
endorsement_processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
import asyncio
import datetime
import sys
from typing import List, NoReturn
from aries_cloudcontroller import AcaPyClient
from endorser.util.endorsement import accept_endorsement, should_accept_endorsement
from shared.constants import (
GOVERNANCE_AGENT_API_KEY,
GOVERNANCE_AGENT_URL,
GOVERNANCE_LABEL,
)
from shared.log_config import get_logger
from shared.models.endorsement import Endorsement
from shared.models.webhook_events.payloads import CloudApiWebhookEventGeneric
from shared.services.redis_service import RedisService
from shared.util.rich_parsing import parse_json_with_error_handling
logger = get_logger(__name__)
class EndorsementProcessor:
"""
Class to process endorsement webhook events that the Webhooks service writes to `endorsement_redis_prefix`
"""
def __init__(self, redis_service: RedisService) -> None:
self.redis_service = redis_service
self._new_event_notification = asyncio.Event()
self.endorse_prefix = self.redis_service.endorsement_redis_prefix
self._pubsub = None # for managing redis pubsub connection
self._pubsub_thread = None
self._tasks: List[asyncio.Task] = [] # To keep track of running tasks
def start(self) -> None:
"""
Starts the background tasks for processing endorsement events.
"""
# self._start_notification_listener() # disable as it is currently unused
self._tasks.append(
asyncio.create_task(
self._process_endorsement_requests(), name="Process endorsements"
)
)
logger.info("Endorsement processing started.")
async def stop(self) -> None:
"""
Stops all background tasks gracefully.
"""
for task in self._tasks:
task.cancel() # Request cancellation of the task
try:
await task # Wait for the task to be cancelled
except asyncio.CancelledError:
pass # Expected error upon cancellation, can be ignored
self._tasks.clear() # Clear the list of tasks
if self._pubsub_thread:
self._pubsub_thread.stop()
logger.info("Stopped Endorsement pubsub thread")
if self._pubsub:
await asyncio.sleep(0.1) # allow thread to stop before disconnecting
self._pubsub.disconnect()
logger.info("Disconnected Endorsement pubsub instance")
logger.info("Endorsement processing stopped.")
def are_tasks_running(self) -> bool:
"""
Checks if the background tasks are still running.
Returns:
True if all background tasks are running, False if any task has stopped.
"""
logger.debug("Checking if all tasks are running")
# todo: disabling pubsub thread check as it's currently unused and disconnects periodically on test env
pubsub_thread_running = (
True # self._pubsub_thread and self._pubsub_thread.is_alive()
)
tasks_running = self._tasks and all(not task.done() for task in self._tasks)
if not pubsub_thread_running:
logger.error("Pubsub thread is not running")
if not tasks_running:
for task in self._tasks:
if task.done():
logger.error("Task `{}` is not running", task.get_name())
all_running = tasks_running and pubsub_thread_running
logger.debug("All tasks running: {}", all_running)
return all_running
def _set_notification_handler(self, msg) -> None:
"""
Processing handler for when set notifications are received
"""
if f"{self.endorse_prefix}:" in msg["data"].decode():
logger.trace("Received endorse set notification: {}", msg)
self._new_event_notification.set()
def _start_notification_listener(self) -> None:
"""
Listens for keyspace notifications related to endorsements and sets an event to resume processing.
"""
self._pubsub = self.redis_service.redis.pubsub()
# Subscribe this pubsub channel to the notification pattern (set may represent endorsement events)
notification_pattern = "__keyevent@0__:set"
self._pubsub.psubscribe(
**{notification_pattern: self._set_notification_handler}
)
self._pubsub_thread = self._pubsub.run_in_thread(sleep_time=0.01)
logger.info("Notification listener subscribed to redis keyspace notifications")
async def _process_endorsement_requests(self) -> NoReturn:
"""
Processing handler for incoming endorsement events
"""
logger.info("Starting endorsement processor")
exception_count = 0
max_exception_count = 5 # break inf loop after 5 consecutive exceptions
attempts_without_events = 0
max_attempts_without_events = sys.maxsize # use max int to never stop
sleep_duration = 0.1
while True:
try:
batch_keys = self.redis_service.scan_keys(
match_pattern=f"{self.endorse_prefix}:*", count=10000
)
if batch_keys:
attempts_without_events = 0 # Reset the counter
for key in batch_keys:
await self._attempt_process_endorsement(key)
else:
attempts_without_events += 1
if attempts_without_events >= max_attempts_without_events:
# Wait for a keyspace notification before continuing
logger.debug(
(
"Scan has returned no keys {} times in a row. "
"Waiting for keyspace notification..."
),
max_attempts_without_events,
)
await self._new_event_notification.wait()
logger.info("Keyspace notification triggered")
self._new_event_notification.clear() # Reset the event for the next wait
attempts_without_events = 0 # Reset the counter
else:
await asyncio.sleep(sleep_duration) # prevent a busy loop
exception_count = 0 # reset exception count after successful loop
except Exception: # pylint: disable=W0718
exception_count += 1
logger.exception(
"Something went wrong while processing endorsement events. Continuing..."
)
if exception_count >= max_exception_count:
raise # exit inf loop
async def _attempt_process_endorsement(self, event_key: str) -> None:
"""
Attempts to process an endorsement event from Redis, ensuring that only one instance
processes the event at a time by acquiring a lock.
Args:
list_key: The Redis key of the list to process.
"""
logger.trace("Attempt process: {}", event_key)
lock_key = f"lock:{event_key}"
extend_lock_task = None
lock_duration = 1 # second
if self.redis_service.set_lock(
key=lock_key,
px=lock_duration * 1000, # to milliseconds
):
logger.trace("Successfully set lock for {}", event_key)
event_json = self.redis_service.get(event_key)
if not event_json:
logger.warning(
"Tried to read an event from key {}, but event has been deleted:",
event_key,
)
return
try:
# Start a background task to extend the lock periodically
extend_lock_task = self.redis_service.extend_lock_task(
lock_key, interval=datetime.timedelta(seconds=lock_duration)
)
await self._process_endorsement_event(event_json)
if self.redis_service.delete_key(event_key):
logger.info("Deleted processed endorsement event: {}", event_key)
else:
logger.warning(
"Couldn't delete processed endorsement event: {}", event_key
)
except Exception as e: # pylint: disable=W0718
# if this particular event is unprocessable, we should remove it from the inputs, to avoid deadlocking
logger.error("Processing {} raised an exception: {}", event_key, e)
self._handle_unprocessable_endorse_event(event_key, event_json, e)
finally:
# Cancel the lock extension task if it's still running
if extend_lock_task:
extend_lock_task.cancel()
else:
logger.debug(
"Event {} is currently being processed by another instance.", event_key
)
async def _process_endorsement_event(self, event_json: str) -> None:
"""
Processes an individual endorsement event, evaluating if it should be accepted and then endorsing as governance
Args:
event_json: The JSON string representation of the endorsement event.
"""
event = parse_json_with_error_handling(
CloudApiWebhookEventGeneric, event_json, logger
)
logger.debug("Processing endorsement event for agent `{}`", event.origin)
# We're only interested in events from the governance agent
if event.wallet_id != GOVERNANCE_LABEL:
logger.warning("Endorsement request is not for governance agent.")
return
endorsement = Endorsement(**event.payload)
async with AcaPyClient(
base_url=GOVERNANCE_AGENT_URL, api_key=GOVERNANCE_AGENT_API_KEY
) as client:
# Check if endorsement request is indeed applicable
if not await should_accept_endorsement(client, endorsement):
logger.info( # check already logged the reason as warning
"Endorsement request with transaction id `{}` is not applicable for endorsement.",
endorsement.transaction_id,
)
return
logger.info(
"Endorsement request with transaction id `{}` is applicable for endorsement, accepting request.",
endorsement.transaction_id,
)
await accept_endorsement(client, endorsement)
def _handle_unprocessable_endorse_event(
self, key: str, event_json: str, error: Exception
) -> None:
"""
Handles an event that could not be processed successfully. The unprocessable event is persisted
to a separate key for further investigation.
Args:
key: The Redis key where the problematic event was found.
error: The exception that occurred during event processing.
"""
bound_logger = logger.bind(body={"key": key})
bound_logger.warning("Handling problematic endorsement event")
unprocessable_key = f"unprocessable:{key}"
error_message = f"Could not process: {event_json}. Error: {error}"
bound_logger.info(
"Saving record of problematic event at key: {}. Error: `{}`",
unprocessable_key,
error_message,
)
self.redis_service.set(key=unprocessable_key, value=error_message)
bound_logger.info("Deleting original problematic event")
self.redis_service.delete_key(key=key)
bound_logger.info("Successfully handled unprocessable event.")