diff --git a/app/templates/app/PollState.html b/app/templates/app/PollState.html
index 3f5e0c39..98f9f33b 100644
--- a/app/templates/app/PollState.html
+++ b/app/templates/app/PollState.html
@@ -82,7 +82,7 @@
if (result.process_percent >= 0) {
percent = Math.max(percent, result.process_percent);
$(".bar").html(percent + "%").css("width", percent + "%");
- if (result.process_percent === 100) {
+ if (result === "SUCCESS") {
$("#user-count").text("complete");
} else {
$("#user-count").text("still processing");
@@ -91,7 +91,7 @@
return result;
}
}).always(function(result) {
- if (result === "SUCCESS" || result.process_percent === 100) {
+ if (result === "SUCCESS") {
clearInterval(refreshIntervalId);
$(".bar").html("100%").css("width", "100%");
$("#user-count").text("complete");
diff --git a/app/views/views.py b/app/views/views.py
index 60b11027..ed5b54be 100644
--- a/app/views/views.py
+++ b/app/views/views.py
@@ -2,8 +2,8 @@
import logging
from celery.exceptions import TimeoutError
-from celery.result import AsyncResult
-from celery.states import FAILURE, PENDING, SUCCESS
+# from celery.result import AsyncResult
+from celery.states import FAILURE, PENDING, STARTED, SUCCESS
from django.contrib.auth.decorators import login_required
from django.core import serializers
from django.http import (
@@ -22,13 +22,15 @@
from app.constants.str import PERMISSION_DENIED
from app.models import Item
-from app.worker.app_celery import ATTEMPT_LIMIT, PROGRESS
+from app.worker.app_celery import ATTEMPT_LIMIT
from app.worker.tasks import receiptor
from app.worker.tasks.exporter import exporter
from app.worker.tasks.importers import historical_data_importer
+from reboot.celery import app
logger = logging.getLogger(__name__)
-
+tasks_cache = {}
+results_cache = {}
@require_GET
@login_required(login_url="/login")
@@ -121,14 +123,20 @@ def poll_state(request: HttpRequest):
request=request,
err_msg="The task_id query parameter of the request was omitted.")
- task = AsyncResult(task_id)
+ task = app.AsyncResult(task_id)
res = JsonResponse(_poll_state(PENDING, 0, 200))
+ print(f"!!! task id={task_id},state={task.state},successful={task.successful()},ready={task.ready()},failed={task.failed()}")
if task.state == FAILURE or task.failed():
res = JsonResponse(_poll_state(FAILURE, 0, 400))
- elif task.state == PROGRESS:
+ elif task.state == STARTED:
res = JsonResponse(task.result) if isinstance(
task.result, dict) else HttpResponse(task.result)
elif task.state == SUCCESS or task.successful() or task.ready():
+ tasks_cache[task_id] = task
+ try:
+ results_cache[task_id] = task.get(timeout=5)
+ except Exception as e:
+ print(f"!!! error", e)
res = HttpResponse(SUCCESS)
return res
@@ -142,13 +150,22 @@ def download_file(request: HttpRequest):
task_id = request.GET.get("task_id")
task_name = request.GET.get("task_name", "task")
attempts = 0
- # CloudAMQP free tier is unstable and must be circuit breakered
+ if task_id in results_cache:
+ return results_cache[task_id]
while (attempts < ATTEMPT_LIMIT):
try:
attempts += 1
- task = AsyncResult(task_id)
- result = task.get(timeout=0.5 * attempts)
+ # if tasks_cache[task_id]:
+ # task = tasks_cache[task_id]
+ # del tasks_cache[task_id]
+ # else:
+ # task = app.AsyncResult(task_id)
+ task = tasks_cache[task_id] if task_id in tasks_cache else app.AsyncResult(task_id)
+ print(f"!!! task id={task_id},state={task.state},successful={task.successful()},ready={task.ready()},failed={task.failed()}")
+ result = task.get(timeout=1.0 * attempts)
print(f"{task} {task_name} success #{attempts}: {result}")
+ if task_id in tasks_cache:
+ del tasks_cache[task_id]
break
except TimeoutError:
print(f"{task} {task_name} failed #{attempts}")
@@ -158,6 +175,7 @@ def download_file(request: HttpRequest):
err_msg="Download exceeded max attempts")
return result
except Exception as e:
+ print(f"!!! error", e)
return _error(request=request, err_msg=f"Failed to download file: {e}")
diff --git a/app/worker/app_celery.py b/app/worker/app_celery.py
index 3204964d..d9d17070 100644
--- a/app/worker/app_celery.py
+++ b/app/worker/app_celery.py
@@ -1,17 +1,17 @@
import traceback
-import celery
-from celery.states import SUCCESS, FAILURE
from http import HTTPStatus
-PROGRESS = 'PROGRESS'
+from celery.states import FAILURE, STARTED, SUCCESS
+
+from reboot.celery import app
ATTEMPT_LIMIT = 5
def update_state(state, percent, http_status):
print('{0!r} state: {1!r}, progress: {2!r}'.format(
- celery.current_task.request.id, state, percent))
- celery.current_task.update_state(state=state, meta={
+ app.current_task.request.id, state, percent))
+ app.current_task.update_state(state=state, meta={
'state': state,
'process_percent': percent,
'status': http_status,
@@ -19,7 +19,7 @@ def update_state(state, percent, http_status):
def update_percent(percent):
- update_state(PROGRESS, percent, HTTPStatus.ACCEPTED)
+ update_state(STARTED, percent, HTTPStatus.ACCEPTED)
def set_success():
@@ -27,7 +27,7 @@ def set_success():
def set_failure(e):
- celery.current_task.update_state(
+ app.current_task.update_state(
state=FAILURE,
meta={
'exc_type': type(e).__name__,
@@ -38,17 +38,18 @@ def set_failure(e):
})
-class AppTask(celery.Task):
+class AppTask(app.Task):
max_retries = 0
# default_retry_delay = 10
def on_success(self, retval, task_id, args, kwargs):
+ set_success()
print('{0!r} success: {1!r}'.format(task_id, retval))
super().on_success(retval, task_id, args, kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
- print('{0!r} failed: {1!r}'.format(task_id, exc))
set_failure(exc)
+ print('{0!r} failed: {1!r}'.format(task_id, exc))
super().on_failure(exc, task_id, args, kwargs, einfo)
def on_retry(self, exc, task_id, args, kwargs, einfo):
diff --git a/app/worker/tasks/__init__.py b/app/worker/tasks/__init__.py
index fb4770f6..4bdf8a12 100644
--- a/app/worker/tasks/__init__.py
+++ b/app/worker/tasks/__init__.py
@@ -1,13 +1,14 @@
'''
Module for tasks to be sent on task queue
'''
-from celery import task
-
from app.worker.app_celery import AppTask
+# from celery import task
+from reboot.celery import app
+
from .create_receipt import Receiptor
-@task(bind=True, base=AppTask)
+@app.task(bind=True, base=AppTask)
def receiptor(self, queryset, total_count):
receiptor = Receiptor(queryset, total_count)
return receiptor()
diff --git a/app/worker/tasks/exporter.py b/app/worker/tasks/exporter.py
index 9419aec7..28a39714 100644
--- a/app/worker/tasks/exporter.py
+++ b/app/worker/tasks/exporter.py
@@ -1,6 +1,6 @@
import csv
-from celery import task
+# from celery import task
from celery.utils.log import get_task_logger
from django.core import serializers
from django.db.models.query import QuerySet
@@ -8,9 +8,10 @@
from app.constants.field_names import CURRENT_FIELDS
from app.worker.app_celery import AppTask, update_percent
+from reboot.celery import app
-@task(bind=True, base=AppTask)
+@app.task(bind=True, base=AppTask)
def exporter(self, file_name, qs: QuerySet = None, total_count: int = 0):
rows = serializers.deserialize('json', qs)
csv_exporter = CsvExporter(file_name, rows, total_count)
diff --git a/app/worker/tasks/importers/__init__.py b/app/worker/tasks/importers/__init__.py
index cf650561..dc6b7dad 100644
--- a/app/worker/tasks/importers/__init__.py
+++ b/app/worker/tasks/importers/__init__.py
@@ -1,13 +1,15 @@
"""
Module for csv file importers to be sent to queue
"""
-from celery import task
+# from celery import task
from app.worker.app_celery import AppTask
+from reboot.celery import app
+
from .historical_data_importer import HistoricalDataImporter
-@task(bind=True, base=AppTask)
+@app.task(bind=True, base=AppTask)
def historical_data_importer(self, csvpath):
importer = HistoricalDataImporter(csvpath)
importer()
diff --git a/reboot/celeryconfig.py b/reboot/celeryconfig.py
index 290d77b9..9a274bae 100644
--- a/reboot/celeryconfig.py
+++ b/reboot/celeryconfig.py
@@ -8,11 +8,13 @@
broker_pool_limit = 1
event_queue_expires = 60
worker_prefetch_multiplier = 1
-worker_concurrency = 10
+worker_concurrency = 1
accept_content = ['json', 'pickle']
result_backend = config("REDIS_URL")
task_serializer = 'pickle'
result_serializer = 'pickle'
+task_track_started = True
+task_ignore_result = False
# Use PROD settings if valid CLOUDAMQP_URl, else dev
if config('CLOUDAMQP_URL', default=False):