From edfda6e285cab4de6076c37ca835d77605767c06 Mon Sep 17 00:00:00 2001 From: Tom Lee Date: Sat, 27 Jan 2024 12:35:34 -0800 Subject: [PATCH] feat: improve celery stability --- app/templates/app/PollState.html | 4 ++-- app/views/views.py | 27 +++++++++++++++++--------- app/worker/app_celery.py | 19 +++++++++--------- app/worker/tasks/__init__.py | 7 ++++--- app/worker/tasks/exporter.py | 5 +++-- app/worker/tasks/importers/__init__.py | 6 ++++-- reboot/celeryconfig.py | 4 +++- 7 files changed, 44 insertions(+), 28 deletions(-) diff --git a/app/templates/app/PollState.html b/app/templates/app/PollState.html index 3f5e0c39..98f9f33b 100644 --- a/app/templates/app/PollState.html +++ b/app/templates/app/PollState.html @@ -82,7 +82,7 @@ if (result.process_percent >= 0) { percent = Math.max(percent, result.process_percent); $(".bar").html(percent + "%").css("width", percent + "%"); - if (result.process_percent === 100) { + if (result === "SUCCESS") { $("#user-count").text("complete"); } else { $("#user-count").text("still processing"); @@ -91,7 +91,7 @@ return result; } }).always(function(result) { - if (result === "SUCCESS" || result.process_percent === 100) { + if (result === "SUCCESS") { clearInterval(refreshIntervalId); $(".bar").html("100%").css("width", "100%"); $("#user-count").text("complete"); diff --git a/app/views/views.py b/app/views/views.py index 60b11027..5a259c13 100644 --- a/app/views/views.py +++ b/app/views/views.py @@ -2,8 +2,8 @@ import logging from celery.exceptions import TimeoutError -from celery.result import AsyncResult -from celery.states import FAILURE, PENDING, SUCCESS +# from celery.result import AsyncResult +from celery.states import FAILURE, PENDING, STARTED, SUCCESS from django.contrib.auth.decorators import login_required from django.core import serializers from django.http import ( @@ -22,13 +22,14 @@ from app.constants.str import PERMISSION_DENIED from app.models import Item -from app.worker.app_celery import ATTEMPT_LIMIT, PROGRESS +from app.worker.app_celery import ATTEMPT_LIMIT from app.worker.tasks import receiptor from app.worker.tasks.exporter import exporter from app.worker.tasks.importers import historical_data_importer +from reboot.celery import app logger = logging.getLogger(__name__) - +tasks_cache = {} @require_GET @login_required(login_url="/login") @@ -121,14 +122,16 @@ def poll_state(request: HttpRequest): request=request, err_msg="The task_id query parameter of the request was omitted.") - task = AsyncResult(task_id) + task = app.AsyncResult(task_id) res = JsonResponse(_poll_state(PENDING, 0, 200)) + print(f"!!! task id={task_id},state={task.state},successful={task.successful()},ready={task.ready()},failed={task.failed()}") if task.state == FAILURE or task.failed(): res = JsonResponse(_poll_state(FAILURE, 0, 400)) - elif task.state == PROGRESS: + elif task.state == STARTED: res = JsonResponse(task.result) if isinstance( task.result, dict) else HttpResponse(task.result) elif task.state == SUCCESS or task.successful() or task.ready(): + tasks_cache[task_id] = task res = HttpResponse(SUCCESS) return res @@ -142,12 +145,17 @@ def download_file(request: HttpRequest): task_id = request.GET.get("task_id") task_name = request.GET.get("task_name", "task") attempts = 0 - # CloudAMQP free tier is unstable and must be circuit breakered while (attempts < ATTEMPT_LIMIT): try: attempts += 1 - task = AsyncResult(task_id) - result = task.get(timeout=0.5 * attempts) + # if tasks_cache[task_id]: + # task = tasks_cache[task_id] + # del tasks_cache[task_id] + # else: + # task = app.AsyncResult(task_id) + task = app.AsyncResult(task_id) + print(f"!!! task id={task_id},state={task.state},successful={task.successful()},ready={task.ready()},failed={task.failed()}") + result = task.get(timeout=1.0 * attempts) print(f"{task} {task_name} success #{attempts}: {result}") break except TimeoutError: @@ -158,6 +166,7 @@ def download_file(request: HttpRequest): err_msg="Download exceeded max attempts") return result except Exception as e: + print(f"!!! error", e) return _error(request=request, err_msg=f"Failed to download file: {e}") diff --git a/app/worker/app_celery.py b/app/worker/app_celery.py index 3204964d..d9d17070 100644 --- a/app/worker/app_celery.py +++ b/app/worker/app_celery.py @@ -1,17 +1,17 @@ import traceback -import celery -from celery.states import SUCCESS, FAILURE from http import HTTPStatus -PROGRESS = 'PROGRESS' +from celery.states import FAILURE, STARTED, SUCCESS + +from reboot.celery import app ATTEMPT_LIMIT = 5 def update_state(state, percent, http_status): print('{0!r} state: {1!r}, progress: {2!r}'.format( - celery.current_task.request.id, state, percent)) - celery.current_task.update_state(state=state, meta={ + app.current_task.request.id, state, percent)) + app.current_task.update_state(state=state, meta={ 'state': state, 'process_percent': percent, 'status': http_status, @@ -19,7 +19,7 @@ def update_state(state, percent, http_status): def update_percent(percent): - update_state(PROGRESS, percent, HTTPStatus.ACCEPTED) + update_state(STARTED, percent, HTTPStatus.ACCEPTED) def set_success(): @@ -27,7 +27,7 @@ def set_success(): def set_failure(e): - celery.current_task.update_state( + app.current_task.update_state( state=FAILURE, meta={ 'exc_type': type(e).__name__, @@ -38,17 +38,18 @@ def set_failure(e): }) -class AppTask(celery.Task): +class AppTask(app.Task): max_retries = 0 # default_retry_delay = 10 def on_success(self, retval, task_id, args, kwargs): + set_success() print('{0!r} success: {1!r}'.format(task_id, retval)) super().on_success(retval, task_id, args, kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo): - print('{0!r} failed: {1!r}'.format(task_id, exc)) set_failure(exc) + print('{0!r} failed: {1!r}'.format(task_id, exc)) super().on_failure(exc, task_id, args, kwargs, einfo) def on_retry(self, exc, task_id, args, kwargs, einfo): diff --git a/app/worker/tasks/__init__.py b/app/worker/tasks/__init__.py index fb4770f6..4bdf8a12 100644 --- a/app/worker/tasks/__init__.py +++ b/app/worker/tasks/__init__.py @@ -1,13 +1,14 @@ ''' Module for tasks to be sent on task queue ''' -from celery import task - from app.worker.app_celery import AppTask +# from celery import task +from reboot.celery import app + from .create_receipt import Receiptor -@task(bind=True, base=AppTask) +@app.task(bind=True, base=AppTask) def receiptor(self, queryset, total_count): receiptor = Receiptor(queryset, total_count) return receiptor() diff --git a/app/worker/tasks/exporter.py b/app/worker/tasks/exporter.py index 9419aec7..28a39714 100644 --- a/app/worker/tasks/exporter.py +++ b/app/worker/tasks/exporter.py @@ -1,6 +1,6 @@ import csv -from celery import task +# from celery import task from celery.utils.log import get_task_logger from django.core import serializers from django.db.models.query import QuerySet @@ -8,9 +8,10 @@ from app.constants.field_names import CURRENT_FIELDS from app.worker.app_celery import AppTask, update_percent +from reboot.celery import app -@task(bind=True, base=AppTask) +@app.task(bind=True, base=AppTask) def exporter(self, file_name, qs: QuerySet = None, total_count: int = 0): rows = serializers.deserialize('json', qs) csv_exporter = CsvExporter(file_name, rows, total_count) diff --git a/app/worker/tasks/importers/__init__.py b/app/worker/tasks/importers/__init__.py index cf650561..dc6b7dad 100644 --- a/app/worker/tasks/importers/__init__.py +++ b/app/worker/tasks/importers/__init__.py @@ -1,13 +1,15 @@ """ Module for csv file importers to be sent to queue """ -from celery import task +# from celery import task from app.worker.app_celery import AppTask +from reboot.celery import app + from .historical_data_importer import HistoricalDataImporter -@task(bind=True, base=AppTask) +@app.task(bind=True, base=AppTask) def historical_data_importer(self, csvpath): importer = HistoricalDataImporter(csvpath) importer() diff --git a/reboot/celeryconfig.py b/reboot/celeryconfig.py index 290d77b9..9a274bae 100644 --- a/reboot/celeryconfig.py +++ b/reboot/celeryconfig.py @@ -8,11 +8,13 @@ broker_pool_limit = 1 event_queue_expires = 60 worker_prefetch_multiplier = 1 -worker_concurrency = 10 +worker_concurrency = 1 accept_content = ['json', 'pickle'] result_backend = config("REDIS_URL") task_serializer = 'pickle' result_serializer = 'pickle' +task_track_started = True +task_ignore_result = False # Use PROD settings if valid CLOUDAMQP_URl, else dev if config('CLOUDAMQP_URL', default=False):