diff --git a/src/kolibri_tasks.py b/src/kolibri_tasks.py index 6e9be60d..93098d7a 100644 --- a/src/kolibri_tasks.py +++ b/src/kolibri_tasks.py @@ -1,18 +1,19 @@ -from jnius import autoclass from datetime import datetime from datetime import timedelta + from android_utils import PythonActivity +from jnius import autoclass -WorkManager = autoclass('androidx.work.WorkManager') -OneTimeWorkRequestBuilder = autoclass('androidx.work.OneTimeWorkRequest$Builder') -PeriodicWorkRequestBuilder = autoclass('androidx.work.PeriodicWorkRequest$Builder') -BackoffPolicy = autoclass('androidx.work.BackoffPolicy') +WorkManager = autoclass("androidx.work.WorkManager") +OneTimeWorkRequestBuilder = autoclass("androidx.work.OneTimeWorkRequest$Builder") +PeriodicWorkRequestBuilder = autoclass("androidx.work.PeriodicWorkRequest$Builder") +BackoffPolicy = autoclass("androidx.work.BackoffPolicy") ExistingWorkPolicy = autoclass("androidx.work.ExistingWorkPolicy") ExistingPeriodicWorkPolicy = autoclass("androidx.work.ExistingPeriodicWorkPolicy") -TimeUnit = autoclass('java.util.concurrent$TimeUnit') +TimeUnit = autoclass("java.util.concurrent$TimeUnit") -TaskWorker = autoclass('org.learningequality.Kolibri.TaskWorker') +TaskWorker = autoclass("org.learningequality.Kolibri.TaskWorker") def get_work_manager(): @@ -36,15 +37,23 @@ def queue_task( # Kolibri uses `None` for repeat to indicate a task that repeats indefinitely # in this case it is suitable for the Android PeriodicWorkRequest as that is # designed for indefinitely repeating tasks. - work_request = PeriodicWorkRequestBuilder(TaskWorker._class, interval, TimeUnit.SECONDS) + work_request = PeriodicWorkRequestBuilder( + TaskWorker._class, interval, TimeUnit.SECONDS + ) existing_work_policy = ExistingPeriodicWorkPolicy.KEEP enqueue_method = get_work_manager().enqueueUniquePeriodicWork else: work_request = OneTimeWorkRequestBuilder(TaskWorker._class) - existing_work_policy = ExistingWorkPolicy.KEEP if keep else ExistingWorkPolicy.APPEND_OR_REPLACE + existing_work_policy = ( + ExistingWorkPolicy.KEEP + if keep + else ExistingWorkPolicy.APPEND_OR_REPLACE + ) enqueue_method = get_work_manager().enqueueUniqueWork if retry_interval is not None: - work_request.setBackOffCriteria(BackoffPolicy.LINEAR, retry_interval, TimeUnit.SECONDS) + work_request.setBackOffCriteria( + BackoffPolicy.LINEAR, retry_interval, TimeUnit.SECONDS + ) if scheduled_time: delay = max(0, (scheduled_time - datetime.now()).total_seconds()) if delay: @@ -57,12 +66,45 @@ def task_updates(job, orm_job, state=None, **kwargs): from kolibri.core.tasks.job import State if state is not None and orm_job.repeat is None or orm_job.repeat > 0: - if state in {State.COMPLETED, State.CANCELED} or (state == State.FAILED and not orm_job.retry_interval): - queue_task(id=orm_job.id, priority=orm_job.priority, interval=orm_job.interval, repeat=orm_job.repeat - 1, retry_interval=orm_job.retry_interval, scheduled_time=datetime.now() + timedelta(seconds=orm_job.interval), keep=False) + if state in {State.COMPLETED, State.CANCELED} or ( + state == State.FAILED and not orm_job.retry_interval + ): + queue_task( + id=orm_job.id, + priority=orm_job.priority, + interval=orm_job.interval, + repeat=orm_job.repeat - 1, + retry_interval=orm_job.retry_interval, + scheduled_time=datetime.now() + timedelta(seconds=orm_job.interval), + keep=False, + ) + + +def execute_job(job_id): + from django.db import connection as django_connection + + from kolibri.core.tasks.storage import Storage + from kolibri.core.tasks.utils import db_connection + + connection = db_connection() + + storage = Storage( + connection, schedule_hooks=[queue_task], update_hooks=[task_updates] + ) + + job = storage.get_job(job_id) + + job.execute() + + connection.dispose() + + # Close any django connections opened here + django_connection.close() def start_default_tasks(): from kolibri.core.analytics.tasks import schedule_ping from kolibri.core.deviceadmin.tasks import schedule_vacuum + schedule_ping() schedule_vacuum() diff --git a/src/taskworker.py b/src/taskworker.py index 2eb69da1..69b6ca40 100644 --- a/src/taskworker.py +++ b/src/taskworker.py @@ -2,31 +2,13 @@ from os import environ import initialization # noqa: F401 keep this first, to ensure we're set up for other imports -from kolibri_tasks import queue_task -from kolibri_tasks import task_updates from kolibri.main import initialize +from kolibri_tasks import execute_job logging.info("Starting Kolibri task worker") initialize(skip_update=True) -job_id = environ.get('PYTHON_SERVICE_ARGUMENT', '') - -from django.db import connection as django_connection - -from kolibri.core.tasks.storage import Storage -from kolibri.core.tasks.utils import db_connection - -connection = db_connection() - -storage = Storage(connection, schedule_hooks=[queue_task], update_hooks=[task_updates]) - -job = storage.get_job(job_id) - -job.execute() - -connection.dispose() - -# Close any django connections opened here -django_connection.close() +job_id = environ.get("PYTHON_SERVICE_ARGUMENT", "") +execute_job(job_id)