Skip to content

Commit

Permalink
Fix/646 websocket error handling (#662)
Browse files Browse the repository at this point in the history
* Test workaround from GH issues
* Test worker-controller more updates
  • Loading branch information
sambles authored Jun 28, 2022
1 parent dfee8d4 commit 518266d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
2 changes: 1 addition & 1 deletion kubernetes/charts/oasis-platform/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ workerController:

# Debug option - Read the auto scaling configuration from the API on every update
# default is only to read it on for each model each time replicas is changed from 0 (first analysis running))
continueUpdateScaling: false
continueUpdateScaling: true

# Debug option - This prevents workers for a model set to FIXED_WORKERS to be scaled to 0. They will always be started.
neverShutdownFixedWorkers: false
Expand Down
21 changes: 20 additions & 1 deletion src/server/oasisapi/queues/consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,25 @@
from src.server.oasisapi.queues.utils import QueueInfo


class GuardedAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer):
async def receive(self, text_data=None, bytes_data=None, **kwargs):
error_code = 4011
try:
await super().receive(text_data=text_data, bytes_data=bytes_data, **kwargs)
except Exception:
await self.disconnect({'code': error_code})
# # Or, if you need websocket_disconnect (eg. for autogroups), try this:
#
# try:
# await self.websocket_disconnect({'code': error_code})
# except StopConsumer:
# pass

# Try and close cleanly
await self.close(error_code)
raise


class ContentStatus(Enum):
ERROR = 'ERROR'
SUCCESS = 'SUCCESS'
Expand Down Expand Up @@ -95,7 +114,7 @@ def build_all_queue_status_message(analysis_filter=None, message_type='queue_sta
return build_task_status_message(status_message, message_type=message_type)


class QueueStatusConsumer(AsyncJsonWebsocketConsumer):
class QueueStatusConsumer(GuardedAsyncJsonWebsocketConsumer):
groups = ['queue_status']

async def connect(self):
Expand Down

0 comments on commit 518266d

Please sign in to comment.