Skip to content
This repository has been archived by the owner on Aug 10, 2024. It is now read-only.

Commit

Permalink
feat: improve celery stability
Browse files Browse the repository at this point in the history
  • Loading branch information
bwdmonkey committed Jan 27, 2024
1 parent d2abb22 commit edfda6e
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 28 deletions.
4 changes: 2 additions & 2 deletions app/templates/app/PollState.html
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
if (result.process_percent >= 0) {
percent = Math.max(percent, result.process_percent);
$(".bar").html(percent + "%").css("width", percent + "%");
if (result.process_percent === 100) {
if (result === "SUCCESS") {
$("#user-count").text("complete");
} else {
$("#user-count").text("still processing");
Expand All @@ -91,7 +91,7 @@
return result;
}
}).always(function(result) {
if (result === "SUCCESS" || result.process_percent === 100) {
if (result === "SUCCESS") {
clearInterval(refreshIntervalId);
$(".bar").html("100%").css("width", "100%");
$("#user-count").text("complete");
Expand Down
27 changes: 18 additions & 9 deletions app/views/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import logging

from celery.exceptions import TimeoutError
from celery.result import AsyncResult
from celery.states import FAILURE, PENDING, SUCCESS
# from celery.result import AsyncResult
from celery.states import FAILURE, PENDING, STARTED, SUCCESS
from django.contrib.auth.decorators import login_required
from django.core import serializers
from django.http import (
Expand All @@ -22,13 +22,14 @@

from app.constants.str import PERMISSION_DENIED
from app.models import Item
from app.worker.app_celery import ATTEMPT_LIMIT, PROGRESS
from app.worker.app_celery import ATTEMPT_LIMIT
from app.worker.tasks import receiptor
from app.worker.tasks.exporter import exporter
from app.worker.tasks.importers import historical_data_importer
from reboot.celery import app

logger = logging.getLogger(__name__)

tasks_cache = {}

@require_GET
@login_required(login_url="/login")
Expand Down Expand Up @@ -121,14 +122,16 @@ def poll_state(request: HttpRequest):
request=request,
err_msg="The task_id query parameter of the request was omitted.")

task = AsyncResult(task_id)
task = app.AsyncResult(task_id)
res = JsonResponse(_poll_state(PENDING, 0, 200))
print(f"!!! task id={task_id},state={task.state},successful={task.successful()},ready={task.ready()},failed={task.failed()}")
if task.state == FAILURE or task.failed():
res = JsonResponse(_poll_state(FAILURE, 0, 400))
elif task.state == PROGRESS:
elif task.state == STARTED:
res = JsonResponse(task.result) if isinstance(
task.result, dict) else HttpResponse(task.result)
elif task.state == SUCCESS or task.successful() or task.ready():
tasks_cache[task_id] = task
res = HttpResponse(SUCCESS)
return res

Expand All @@ -142,12 +145,17 @@ def download_file(request: HttpRequest):
task_id = request.GET.get("task_id")
task_name = request.GET.get("task_name", "task")
attempts = 0
# CloudAMQP free tier is unstable and must be circuit breakered
while (attempts < ATTEMPT_LIMIT):
try:
attempts += 1
task = AsyncResult(task_id)
result = task.get(timeout=0.5 * attempts)
# if tasks_cache[task_id]:
# task = tasks_cache[task_id]
# del tasks_cache[task_id]
# else:
# task = app.AsyncResult(task_id)
task = app.AsyncResult(task_id)
print(f"!!! task id={task_id},state={task.state},successful={task.successful()},ready={task.ready()},failed={task.failed()}")
result = task.get(timeout=1.0 * attempts)
print(f"{task} {task_name} success #{attempts}: {result}")
break
except TimeoutError:
Expand All @@ -158,6 +166,7 @@ def download_file(request: HttpRequest):
err_msg="Download exceeded max attempts")
return result
except Exception as e:
print(f"!!! error", e)
return _error(request=request, err_msg=f"Failed to download file: {e}")


Expand Down
19 changes: 10 additions & 9 deletions app/worker/app_celery.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
import traceback
import celery
from celery.states import SUCCESS, FAILURE
from http import HTTPStatus

PROGRESS = 'PROGRESS'
from celery.states import FAILURE, STARTED, SUCCESS

from reboot.celery import app

ATTEMPT_LIMIT = 5


def update_state(state, percent, http_status):
print('{0!r} state: {1!r}, progress: {2!r}'.format(
celery.current_task.request.id, state, percent))
celery.current_task.update_state(state=state, meta={
app.current_task.request.id, state, percent))
app.current_task.update_state(state=state, meta={
'state': state,
'process_percent': percent,
'status': http_status,
})


def update_percent(percent):
update_state(PROGRESS, percent, HTTPStatus.ACCEPTED)
update_state(STARTED, percent, HTTPStatus.ACCEPTED)


def set_success():
update_state(SUCCESS, 100, HTTPStatus.OK)


def set_failure(e):
celery.current_task.update_state(
app.current_task.update_state(
state=FAILURE,
meta={
'exc_type': type(e).__name__,
Expand All @@ -38,17 +38,18 @@ def set_failure(e):
})


class AppTask(celery.Task):
class AppTask(app.Task):
max_retries = 0
# default_retry_delay = 10

def on_success(self, retval, task_id, args, kwargs):
set_success()
print('{0!r} success: {1!r}'.format(task_id, retval))
super().on_success(retval, task_id, args, kwargs)

def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
set_failure(exc)
print('{0!r} failed: {1!r}'.format(task_id, exc))
super().on_failure(exc, task_id, args, kwargs, einfo)

def on_retry(self, exc, task_id, args, kwargs, einfo):
Expand Down
7 changes: 4 additions & 3 deletions app/worker/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
'''
Module for tasks to be sent on task queue
'''
from celery import task

from app.worker.app_celery import AppTask
# from celery import task
from reboot.celery import app

from .create_receipt import Receiptor


@task(bind=True, base=AppTask)
@app.task(bind=True, base=AppTask)
def receiptor(self, queryset, total_count):
receiptor = Receiptor(queryset, total_count)
return receiptor()
5 changes: 3 additions & 2 deletions app/worker/tasks/exporter.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import csv

from celery import task
# from celery import task
from celery.utils.log import get_task_logger
from django.core import serializers
from django.db.models.query import QuerySet
from django.http import HttpResponse

from app.constants.field_names import CURRENT_FIELDS
from app.worker.app_celery import AppTask, update_percent
from reboot.celery import app


@task(bind=True, base=AppTask)
@app.task(bind=True, base=AppTask)
def exporter(self, file_name, qs: QuerySet = None, total_count: int = 0):
rows = serializers.deserialize('json', qs)
csv_exporter = CsvExporter(file_name, rows, total_count)
Expand Down
6 changes: 4 additions & 2 deletions app/worker/tasks/importers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
"""
Module for csv file importers to be sent to queue
"""
from celery import task
# from celery import task

from app.worker.app_celery import AppTask
from reboot.celery import app

from .historical_data_importer import HistoricalDataImporter


@task(bind=True, base=AppTask)
@app.task(bind=True, base=AppTask)
def historical_data_importer(self, csvpath):
importer = HistoricalDataImporter(csvpath)
importer()
4 changes: 3 additions & 1 deletion reboot/celeryconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
broker_pool_limit = 1
event_queue_expires = 60
worker_prefetch_multiplier = 1
worker_concurrency = 10
worker_concurrency = 1
accept_content = ['json', 'pickle']
result_backend = config("REDIS_URL")
task_serializer = 'pickle'
result_serializer = 'pickle'
task_track_started = True
task_ignore_result = False

# Use PROD settings if valid CLOUDAMQP_URl, else dev
if config('CLOUDAMQP_URL', default=False):
Expand Down

0 comments on commit edfda6e

Please sign in to comment.