From cb69f184dbeae22fe6675961fe14a27cc5c3eeea Mon Sep 17 00:00:00 2001 From: Blaine Jester Date: Tue, 9 Jan 2024 13:22:48 -0800 Subject: [PATCH] Add task reconciler class, refactor Task.java --- .python-version | 1 + .../org/learningequality/Kolibri/App.java | 14 +- .../Kolibri/ReconcileWorker.java | 19 +- .../Kolibri/TaskworkerWorker.java | 24 +- .../Kolibri/TaskworkerWorkerService.java | 2 +- .../Kolibri/sqlite/JobStorage.java | 47 ++- .../main/java/org/learningequality/Task.java | 356 +++--------------- .../org/learningequality/sqlite/Database.java | 45 ++- .../sqlite/query/FilterableQuery.java | 49 +++ .../learningequality/sqlite/query/Query.java | 10 + .../sqlite/query/SelectQuery.java | 38 +- .../sqlite/query/UpdateQuery.java | 50 +++ .../sqlite/schema/DatabaseTable.java | 22 +- .../org/learningequality/task/Builder.java | 164 ++++++++ .../org/learningequality/task/Reconciler.java | 168 +++++++++ .../org/learningequality/task/Sentinel.java | 247 ++++++++++++ .../org/learningequality/task/StateMap.java | 64 ++++ 17 files changed, 939 insertions(+), 381 deletions(-) create mode 100644 .python-version create mode 100644 python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/FilterableQuery.java create mode 100644 python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/Query.java create mode 100644 python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/UpdateQuery.java create mode 100644 python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Builder.java create mode 100644 python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Reconciler.java create mode 100644 python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Sentinel.java create mode 100644 python-for-android/dists/kolibri/src/main/java/org/learningequality/task/StateMap.java diff --git a/.python-version b/.python-version new file mode 100644 index 00000000..21af9507 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.9.13 diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/App.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/App.java index 1974979a..c6555eda 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/App.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/App.java @@ -72,13 +72,11 @@ private void createNotificationChannels() { private void reconcileTasks() { // Reconcile tasks on startup, in this main thread (blocking!) - CompletableFuture f = Task.reconcile(this, null); - f.whenCompleteAsync((result, throwable) -> { - if (throwable != null) { - Log.e("Kolibri", "Main thread task reconciliation failed", throwable); - } else { - Log.i("Kolibri", "Main thread task reconciliation completed"); - } - }); + boolean result = Task.reconcile(this, null); + if (result) { + Log.i("Kolibri", "Main thread task reconciliation completed"); + } else { + Log.d("Kolibri", "Main thread task reconciliation no-op"); + } } } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/ReconcileWorker.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/ReconcileWorker.java index 3df11f2a..536641aa 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/ReconcileWorker.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/ReconcileWorker.java @@ -30,18 +30,13 @@ public ListenableFuture startWork() { SettableFuture future = SettableFuture.create(); Executor executor = getBackgroundExecutor(); - CompletableFuture f = Task.reconcile(getApplicationContext(), executor); - - f.whenCompleteAsync((result, throwable) -> { - if (throwable != null) { - Log.e(TAG, "Reconcile task failed", throwable); - future.setException(throwable); - } else { - Log.i(TAG, "Reconcile task completed: " + (result ? "success" : "failure")); - future.set(result ? Result.success() : Result.failure()); - } - }, executor); - + boolean result = Task.reconcile(getApplicationContext(), executor); + if (!result) { + Log.e(TAG, "Failed to reconcile tasks"); + future.set(Result.failure()); + return future; + } + future.set(Result.success()); return future; } } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/TaskworkerWorker.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/TaskworkerWorker.java index 7e8e21ce..5c8249b0 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/TaskworkerWorker.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/TaskworkerWorker.java @@ -27,18 +27,6 @@ public TaskworkerWorker( mWorker = this; } - public static Data buildInputData(String workerArgument) { - String dataArgument = workerArgument == null ? "" : workerArgument; - Data data = new Data.Builder() - .putString(ARGUMENT_WORKER_ARGUMENT, dataArgument) - .putString(ARGUMENT_PACKAGE_NAME, "org.learningequality.Kolibri") - .putString(ARGUMENT_CLASS_NAME, - TaskworkerWorkerService.class.getName()) - .build(); - Log.v(TAG, "Request data: " + data.toString()); - return data; - } - protected void cleanup() { hideNotification(); mWorker = null; @@ -80,4 +68,16 @@ public static void clearNotification() { mWorker.hideNotification(); } } + + public static Data buildInputData(String workerArgument) { + String dataArgument = workerArgument == null ? "" : workerArgument; + Data data = new Data.Builder() + .putString(ARGUMENT_WORKER_ARGUMENT, dataArgument) + .putString(ARGUMENT_PACKAGE_NAME, "org.learningequality.Kolibri") + .putString(ARGUMENT_CLASS_NAME, + TaskworkerWorkerService.class.getName()) + .build(); + Log.v(TAG, "Request data: " + data.toString()); + return data; + } } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/TaskworkerWorkerService.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/TaskworkerWorkerService.java index 8548d34d..6b227d4a 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/TaskworkerWorkerService.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/TaskworkerWorkerService.java @@ -28,7 +28,7 @@ public void onCreate() { ); // Initialize the work manager WorkManager.getInstance(getApplicationContext()); - enqueueTaskReconciliation(); +// enqueueTaskReconciliation(); super.onCreate(); // We could potentially remove this and leave the notification up to long-running workers // bound to the service diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/sqlite/JobStorage.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/sqlite/JobStorage.java index 989d0ca6..4b47a8f6 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/sqlite/JobStorage.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/sqlite/JobStorage.java @@ -1,8 +1,14 @@ package org.learningequality.Kolibri.sqlite; +import android.content.Context; +import android.database.sqlite.SQLiteDatabase; + import org.learningequality.sqlite.schema.DatabaseTable; +import org.learningequality.sqlite.Database; + +import java.io.File; -public class JobStorage { +public class JobStorage extends Database { public static final String DATABASE_NAME = "job_storage.sqlite3"; public static class Jobs implements DatabaseTable { @@ -13,13 +19,15 @@ public String getTableName() { } public static final StringColumn id = new StringColumn("id"); + public static final StringColumn func = new StringColumn("func"); + public static final IntegerColumn priority = new IntegerColumn("priority"); public static final StringColumn worker_process = new StringColumn("worker_process"); public static final StringColumn worker_thread = new StringColumn("worker_thread"); public static final StringColumn worker_extra = new StringColumn("worker_extra"); public static final StringColumn time_updated = new StringColumn("time_updated"); public static final StringColumn state = new StringColumn("state"); - public enum State implements ColumnEnum { + public enum State implements StringChoiceEnum { PENDING, QUEUED, SCHEDULED, @@ -35,5 +43,40 @@ public StringColumn getColumn() { return state; } } + + public enum Priority implements ColumnEnum { + LOW(15), + REGULAR(10), + HIGH(5) + ; + + private final int value; + + Priority(int val) { + this.value = val; + } + + public Integer getValue() { + return this.value; + } + + public IntegerColumn getColumn() { + return priority; + } + + + } + } + + protected JobStorage(String path, int flags) { + super(DATABASE_NAME, path, flags); + } + + public static JobStorage readwrite(Context context) { + File f = getDatabasePath(context, DATABASE_NAME); + + return f != null + ? new JobStorage(f.getPath(), SQLiteDatabase.OPEN_READWRITE) + : null; } } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Task.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Task.java index a095a7e5..d919b7a5 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Task.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Task.java @@ -1,142 +1,52 @@ package org.learningequality; import android.content.Context; -import android.os.Bundle; import android.util.Log; import androidx.core.content.ContextCompat; -import androidx.work.BackoffPolicy; -import androidx.work.Data; import androidx.work.ExistingWorkPolicy; import androidx.work.OneTimeWorkRequest; -import androidx.work.OutOfQuotaPolicy; import androidx.work.WorkInfo; import androidx.work.WorkQuery; import androidx.work.multiprocess.RemoteWorkManager; import com.google.common.util.concurrent.ListenableFuture; -import org.learningequality.Kolibri.TaskworkerWorker; import org.learningequality.Kolibri.sqlite.JobStorage; -import org.learningequality.sqlite.Database; -import org.learningequality.sqlite.query.SelectQuery; +import org.learningequality.task.Builder; +import org.learningequality.task.StateMap; +import org.learningequality.task.Sentinel; +import org.learningequality.task.Reconciler; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; -import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; import java9.util.concurrent.CompletableFuture; public class Task { public static final String TAG = "Task"; - public static final String PREFIX_TASK_ID = "kolibri_task_id:"; - - public enum StateMap { - MISSING(null), - PENDING( - JobStorage.Jobs.State.PENDING, - WorkInfo.State.ENQUEUED, - WorkInfo.State.BLOCKED - ), - QUEUED( - JobStorage.Jobs.State.QUEUED, - WorkInfo.State.ENQUEUED, - WorkInfo.State.BLOCKED - ), - SCHEDULED( - JobStorage.Jobs.State.SCHEDULED, - WorkInfo.State.ENQUEUED, - WorkInfo.State.BLOCKED - ), - SELECTED( - JobStorage.Jobs.State.SELECTED, - WorkInfo.State.ENQUEUED, - WorkInfo.State.BLOCKED, - WorkInfo.State.RUNNING - ), - RUNNING(JobStorage.Jobs.State.RUNNING, WorkInfo.State.RUNNING), - CANCELING(JobStorage.Jobs.State.CANCELING, WorkInfo.State.CANCELLED), - CANCELED(JobStorage.Jobs.State.CANCELED, WorkInfo.State.CANCELLED), - FAILED(JobStorage.Jobs.State.FAILED, WorkInfo.State.FAILED), - COMPLETED(JobStorage.Jobs.State.COMPLETED, WorkInfo.State.SUCCEEDED); - - private final JobStorage.Jobs.State jobState; - private final WorkInfo.State[] workInfoStates; - - StateMap(JobStorage.Jobs.State jobState, WorkInfo.State... workInfoStates) { - this.jobState = jobState; - this.workInfoStates = workInfoStates; - } - - public JobStorage.Jobs.State getJobState() { - return this.jobState; - } - - public WorkInfo.State[] getWorkInfoStates() { - return this.workInfoStates; - } - } - - - private static String generateTagFromId(String id) { - return PREFIX_TASK_ID + id; - } - - private static String generateTagFromJobFunc(String jobFunc) { - return "kolibri_job_type:" + jobFunc; - } public static String enqueueOnce(String id, int delay, boolean expedite, String jobFunc, boolean longRunning) { RemoteWorkManager workManager = RemoteWorkManager.getInstance(ContextUtil.getApplicationContext()); - Data data = TaskworkerWorker.buildInputData(id); - - OneTimeWorkRequest.Builder workRequestBuilder = new OneTimeWorkRequest.Builder(TaskworkerWorker.class); - - // Tasks can only be expedited if they are set with no delay. - // This does not appear to be documented, but is evident in the Android Jetpack source code. - // https://android.googlesource.com/platform/frameworks/support/+/HEAD/work/work-runtime/src/main/java/androidx/work/WorkRequest.kt#271 - if (expedite && delay == 0) { - workRequestBuilder.setExpedited(OutOfQuotaPolicy.RUN_AS_NON_EXPEDITED_WORK_REQUEST); - } + Builder.TaskRequest builder = new Builder.TaskRequest(id); + builder.setDelay(delay) + .setExpedite(expedite) + .setJobFunc(jobFunc) + .setLongRunning(longRunning); - if (delay > 0) { - workRequestBuilder.setInitialDelay(delay, TimeUnit.SECONDS); - } - workRequestBuilder.addTag(generateTagFromId(id)); - workRequestBuilder.addTag(generateTagFromJobFunc(jobFunc)); - if (longRunning) { - workRequestBuilder.addTag(TaskworkerWorker.TAG_LONG_RUNNING); - Log.v(TAG, "Tagging work request as long running, ID: " + id); - } - workRequestBuilder.setInputData(data); - workRequestBuilder.setBackoffCriteria(BackoffPolicy.EXPONENTIAL, 5, TimeUnit.SECONDS); - OneTimeWorkRequest workRequest = workRequestBuilder.build(); + OneTimeWorkRequest workRequest = builder.build(); workManager.enqueueUniqueWork(id, ExistingWorkPolicy.APPEND_OR_REPLACE, workRequest); // return the work request ID, different from the task ID passed in return workRequest.getId().toString(); } - protected static WorkQuery buildWorkQuery(String... jobIds) { - return WorkQuery.Builder - .fromUniqueWorkNames(Arrays.asList(jobIds)) - .build(); - } - - protected static WorkQuery buildWorkQuery(UUID... requestIds) { - return WorkQuery.Builder - .fromIds(Arrays.asList(requestIds)) - .build(); - } - public static void clear(String id) { Context context = ContextUtil.getApplicationContext(); RemoteWorkManager workManager = RemoteWorkManager.getInstance(context); - WorkQuery workQuery = buildWorkQuery(id); + WorkQuery workQuery = (new Builder.TaskQuery(id)).build(); ListenableFuture> workInfosFuture = workManager.getWorkInfos(workQuery); workInfosFuture.addListener(() -> { @@ -167,234 +77,58 @@ public static void clear(String id) { }, new MainThreadExecutor()); } - public static CompletableFuture reconcile(Context context, Executor executor) { - CompletableFuture future = new CompletableFuture<>(); + public static boolean reconcile(Context context, Executor executor) { if (executor == null) { executor = ContextCompat.getMainExecutor(context); } - try (Task.Sentinel sentinel = new Task.Sentinel(context, executor)) { - CompletableFuture f = CompletableFuture.allOf( - sentinel.check(Task.StateMap.PENDING), - sentinel.check(Task.StateMap.QUEUED), - sentinel.check(Task.StateMap.SCHEDULED), - sentinel.check(Task.StateMap.SELECTED), - sentinel.check(Task.StateMap.RUNNING) - ); - - f.whenCompleteAsync((result, throwable) -> { - if (throwable != null) { - Log.w(Sentinel.TAG, "Reconciliation encountered an error"); - future.completeExceptionally(throwable); - } else { - Log.i(Sentinel.TAG, "Reconciliation completed successfully"); - future.complete(true); - } - }, executor); - } - - return future; - } - - /** - * Sentinel (as in watcher) for checking and reconciling Kolibri job status with WorkManager - */ - public static class Sentinel implements AutoCloseable { - public static String TAG = "KolibriTask.Sentinel"; - private final RemoteWorkManager workManager; - private final Database db; - private final Executor executor; - - - public Sentinel(Context context, Executor executor) { - workManager = RemoteWorkManager.getInstance(context); - db = Database.readonly(context, JobStorage.DATABASE_NAME); - this.executor = executor; - } - - /** - * Create a sentinel with the main thread executor - */ - public Sentinel(Context context) { - this(context, ContextCompat.getMainExecutor(context)); - } - - /** - * Build a query for jobs with the given status - * - * @param jobStatus The job status in the Kolibri database for which to find jobs - * @return A query for jobs with the given status, and subset of selected columns - */ - private SelectQuery buildQuery(JobStorage.Jobs.State jobStatus) { - return new SelectQuery( - JobStorage.Jobs.id, - JobStorage.Jobs.state, - JobStorage.Jobs.worker_process, - JobStorage.Jobs.worker_thread, - JobStorage.Jobs.worker_extra - ) - .from(JobStorage.Jobs.TABLE_NAME) - .where(jobStatus) - .orderBy(JobStorage.Jobs.time_updated, false); - } - - private WorkQuery buildWorkQuery(Bundle result) { - String requestId = JobStorage.Jobs.worker_extra.getValue(result); - - if (requestId == null) { - String id = JobStorage.Jobs.id.getValue(result); - Log.v(TAG, "No request ID found for job " + id); - return Task.buildWorkQuery(id); - } - - return Task.buildWorkQuery(UUID.fromString(requestId)); - } - - /** - * Check for jobs with the given status and reconcile them with WorkManager - * Defaults to flagging missing work in WorkManager - * - * @param stateRef The job status in the Kolibri database for which to find jobs - * @return A future that will complete when all jobs have been checked, with a list of jobs - */ - public CompletableFuture check(StateMap stateRef) { - return check(false, stateRef); - } - - /** - * Check for jobs with the given status and reconcile them with WorkManager - * @param ignoreMissing Whether to ignore missing work in WorkManager - * @param stateRef The job status in the Kolibri database for which to find jobs - * @return A future that will complete when all jobs have been checked, with a list of jobs - */ - public CompletableFuture check( - boolean ignoreMissing, - StateMap stateRef - ) { - SelectQuery query = buildQuery(stateRef.getJobState()); - Bundle[] jobs = query.execute(db); + boolean didReconcile = false; - if (jobs == null || jobs.length == 0) { - Log.v(TAG, "No jobs to reconcile for status " + stateRef); - return CompletableFuture.completedFuture(null); + try (JobStorage db = JobStorage.readwrite(context)) { + if (db == null) { + Log.e(Sentinel.TAG, "Failed to open job storage database"); + return false; } - return check(jobs, ignoreMissing, stateRef.getWorkInfoStates()); - } + // Reconciliation needs a lock so we can only have one running at a time + try (Reconciler reconciler = Reconciler.from(context, db)) { + // If we can't acquire the lock, then reconciliation is already running + if (!reconciler.begin()) { + return false; + } - /** - * Check for the given jobs (Bundles) and reconciles them with WorkManager - * - * @param jobs The jobs to check - * @param ignoreMissing Whether to ignore missing work in WorkManager - * @param expectedWorkStates The expected WorkManager states for the found jobs - * @return A future that will complete when all jobs have been checked, with a list of jobs - */ - public CompletableFuture check( - Bundle[] jobs, - boolean ignoreMissing, - WorkInfo.State... expectedWorkStates - ) { - final List> jobFutures = new ArrayList>(jobs.length); + Sentinel sentinel = Sentinel.from(context, db, executor); - for (Bundle job : jobs) { - CompletableFuture jobCheck = check(job, ignoreMissing, expectedWorkStates); - jobFutures.add(jobCheck); - } + // Run through all the states and check them, then process the results + for (StateMap stateRef : StateMap.forReconciliation()) { + CompletableFuture stateFuture = sentinel.check(stateRef); + Sentinel.Result[] results = null; - CompletableFuture future = CompletableFuture.allOf( - jobFutures.toArray(new CompletableFuture[0]) - ) - .thenApply((result) -> { - final List allResults = new ArrayList(jobs.length); - for (CompletableFuture jobFuture : jobFutures) { - // Add all the results from the job futures - try { - Bundle jobResult = jobFuture.get(); - if (jobResult != null) { - allResults.add(jobResult); - } - } catch (ExecutionException | InterruptedException e) { - e.printStackTrace(); - } + try { + // Wait for the results to come back + results = stateFuture.get(); + } catch (ExecutionException | InterruptedException e) { + Log.e(TAG, "Failed to check state for reconciliation " + stateRef, e); + continue; } - return allResults.toArray(new Bundle[0]); - }); - future.whenComplete((result, ex) -> { - if (future.isCancelled()) { - for (CompletableFuture jobFuture : jobFutures) { - jobFuture.cancel(true); + if (results != null && results.length >= 0) { + didReconcile = true; + reconciler.process(stateRef, results); } } - }); - return future; - } - - /** - * Check for the given job (Bundle) and reconciles i with WorkManager - * - * @param job The job to check as a `Bundle` - * @param ignoreMissing Whether to ignore the job as missing in WorkManager - * @param expectedWorkStates The expected WorkManager states for the found jobs - * @return A future that will complete when the job has been checked, with the job if it is not reconciled - */ - public CompletableFuture check( - Bundle job, - boolean ignoreMissing, - WorkInfo.State... expectedWorkStates - ) { - CompletableFuture future = new CompletableFuture<>(); - - List workStates = Arrays.asList(expectedWorkStates); - WorkQuery workQuery = buildWorkQuery(job); - ListenableFuture> workInfosFuture = workManager.getWorkInfos(workQuery); - workInfosFuture.addListener(() -> { - boolean checkFailed = false; - try { - List workInfos = workInfosFuture.get(); - - if (workInfos == null || workInfos.size() == 0) { - if (ignoreMissing) { - return; - } - - Log.w(TAG, "No work requests found for job id " + JobStorage.Jobs.id.getValue(job)); - checkFailed = true; - } else { - for (WorkInfo workInfo : workInfos) { - WorkInfo.State state = workInfo.getState(); - - if (!workStates.contains(state)) { - Log.w(TAG, "WorkInfo state " + state + " does not match expected state " + Arrays.toString(expectedWorkStates) + " for request " + workInfo.getId() + " | " + workInfo.getTags()); - checkFailed = true; - } - } - } - } - catch (ExecutionException | InterruptedException e) { - e.printStackTrace(); - } finally { - if (checkFailed) { - future.complete(job); - } else { - future.complete(null); - } - } - }, executor); + // If we get here, all the futures completed successfully + reconciler.end(); - future.whenComplete((result, ex) -> { - if (future.isCancelled()) { - workInfosFuture.cancel(true); + if (didReconcile) { + Log.i(Sentinel.TAG, "Reconciliation completed successfully"); + } else { + Log.i(Sentinel.TAG, "No reconciliation performed"); } - }); - - return future; + } } - public void close() { - db.close(); - } + return didReconcile; } } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/Database.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/Database.java index 89d5530a..74ec088e 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/Database.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/Database.java @@ -8,18 +8,20 @@ import java.io.File; public class Database implements AutoCloseable { - public static final String TAG = "Kolibri.Database"; + public static final String TAG = "KolibriDatabase"; private final String name; private final String path; private final int flags; + private boolean inTransaction; private SQLiteDatabase db; - private Database(String name, String path, int flags) { + protected Database(String name, String path, int flags) { this.name = name; this.path = path; this.db = null; this.flags = flags; + this.inTransaction = false; initialize(); } @@ -43,31 +45,60 @@ protected void initialize() { return; } try { + Log.d(TAG, "Connecting to database"); this.db = SQLiteDatabase.openDatabase(this.path, null, flags); } catch (SQLiteException e) { this.db = null; } } + public void begin() { + if (!isConnected()) { + return; + } + Log.d(TAG, "Starting transaction"); + this.inTransaction = true; + this.db.beginTransaction(); + } + + public void rollback() { + if (!isConnected() || !this.inTransaction) { + return; + } + Log.d(TAG, "Rolling back transaction"); + this.inTransaction = false; + this.db.endTransaction(); + } + + public void commit() { + if (!isConnected() || !this.inTransaction) { + return; + } + Log.d(TAG, "Committing transaction"); + this.inTransaction = false; + this.db.setTransactionSuccessful(); + this.db.endTransaction(); + } + public void close() { if (isConnected()) { + Log.d(TAG, "Closing database"); + rollback(); this.db.close(); this.db = null; } } - public static Database readonly(Context context, String name) { - String path = null; + protected static File getDatabasePath(Context context, String name) { File dir = context.getExternalFilesDir(null); if (dir != null) { File f = new File(new File(dir, "KOLIBRI_DATA"), name); if (f.exists()) { - path = f.getPath(); + return f; } else { Log.v(TAG, "Database file does not exist: " + f.getPath()); } } - - return new Database(name, path, SQLiteDatabase.OPEN_READONLY); + return null; } } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/FilterableQuery.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/FilterableQuery.java new file mode 100644 index 00000000..40473c01 --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/FilterableQuery.java @@ -0,0 +1,49 @@ +package org.learningequality.sqlite.query; + +import org.learningequality.sqlite.schema.DatabaseTable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * A base query that can be filtered + */ +public abstract class FilterableQuery> { + private final List whereClauses; + private final List whereParameters; + + public FilterableQuery() { + this.whereClauses = new ArrayList(); + this.whereParameters = new ArrayList(); + } + + public T where(String clause, String... parameters) { + this.whereClauses.add(clause); + this.whereParameters.addAll(Arrays.asList(parameters)); + return self(); + } + + public T where(DatabaseTable.Column column, String value) { + return where(column.getColumnName() + " = ?", value); + } + + public T where(DatabaseTable.ColumnEnum value) { + return where(value.getColumn(), value.getValue()); + } + + protected String buildSelection() { + // Currently we only support ANDing all where clauses + return String.join(" AND ", this.whereClauses); + } + + protected String[] buildSelectionArgs() { + return this.whereParameters.toArray(new String[this.whereParameters.size()]); + } + + /** + * Method to return the current instance of the query + * @return the current instance of the query + */ + protected abstract T self(); +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/Query.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/Query.java new file mode 100644 index 00000000..dfbf657a --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/Query.java @@ -0,0 +1,10 @@ +package org.learningequality.sqlite.query; + +import org.learningequality.sqlite.Database; + +/** + * A SQL query interface that defines a method to execute the query + */ +public interface Query { + T execute(Database db); +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/SelectQuery.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/SelectQuery.java index ee3c26d4..cadbeb32 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/SelectQuery.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/SelectQuery.java @@ -10,38 +10,32 @@ import java.util.Arrays; import java.util.List; -public class SelectQuery { +/** + * A query that SELECTs rows from a table + */ +public class SelectQuery extends FilterableQuery implements Query { private String tableName; private final DatabaseTable.Column[] selectColumns; - private final List whereClauses; - private final List whereParameters; private String orderBy; public SelectQuery(DatabaseTable.Column... columns) { this.selectColumns = columns.length > 0 ? columns : null; - this.whereClauses = new ArrayList(); - this.whereParameters = new ArrayList(); } - public SelectQuery from(String tableName) { - this.tableName = tableName; + /** + * Method to return the current instance of the query + * @return the current instance of the query + */ + @Override + protected SelectQuery self() { return this; } - public SelectQuery where(String clause, String... parameters) { - this.whereClauses.add(clause); - this.whereParameters.addAll(Arrays.asList(parameters)); + public SelectQuery from(String tableName) { + this.tableName = tableName; return this; } - public SelectQuery where(DatabaseTable.Column column, String value) { - return where(column.getColumnName() + " = ?", value); - } - - public SelectQuery where(DatabaseTable.ColumnEnum value) { - return where(value.getColumn(), value.getValue()); - } - public SelectQuery orderBy(DatabaseTable.Column column, boolean ascending) { this.orderBy = column.getColumnName() + (ascending ? " ASC" : " DESC"); return this; @@ -94,17 +88,13 @@ public Bundle[] execute(Database db) { return null; } - // Currently we only support ANDing all where clauses - String selection = String.join(" AND ", this.whereClauses); - String[] selectionArgs = this.whereParameters.toArray(new String[0]); - try { List results; try (Cursor cursor = db.get().query( this.tableName, this.generateSelectColumns(), - selection, - selectionArgs, + buildSelection(), + buildSelectionArgs(), null, null, this.orderBy diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/UpdateQuery.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/UpdateQuery.java new file mode 100644 index 00000000..b452b08b --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/UpdateQuery.java @@ -0,0 +1,50 @@ +package org.learningequality.sqlite.query; + +import org.learningequality.sqlite.Database; +import org.learningequality.sqlite.schema.DatabaseTable; + +import android.content.ContentValues; + +/** + * A class that represents an UPDATE SQL query + */ +public class UpdateQuery extends FilterableQuery implements Query { + private final String tableName; + private ContentValues values; + + public UpdateQuery(String tableName) { + this.tableName = tableName; + this.values = new ContentValues(); + } + + /** + * Method to return the current instance of the query + * @return the current instance of the query + */ + @Override + protected UpdateQuery self() { + return this; + } + + public UpdateQuery set(ContentValues values) { + this.values = values; + return this; + } + + public UpdateQuery set(DatabaseTable.Column column, String value) { + this.values.put(column.getColumnName(), value); + return this; + } + + public Integer execute(Database db) { + if (!db.isConnected()) { + return 0; + } + + if (this.values.size() == 0) { + throw new IllegalStateException("No values to update"); + } + + return db.get().update(this.tableName, this.values, buildSelection(), buildSelectionArgs()); + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/schema/DatabaseTable.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/schema/DatabaseTable.java index 6590b3bd..c4b98efa 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/schema/DatabaseTable.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/schema/DatabaseTable.java @@ -13,11 +13,9 @@ interface Column { } interface ColumnEnum extends Column { - T name(); + String name(); - default T getValue() { - return this.name(); - } + T getValue(); ColumnImpl getColumn(); @@ -26,6 +24,12 @@ default String getColumnName() { } } + interface StringChoiceEnum extends ColumnEnum { + default String getValue() { + return this.name(); + } + } + abstract class ColumnImpl implements Column { private final String columnName; @@ -47,4 +51,14 @@ public String getValue(Bundle bundle) { return bundle.getString(getColumnName()); } } + + class IntegerColumn extends ColumnImpl { + public IntegerColumn(String columnName) { + super(columnName); + } + + public Integer getValue(Bundle bundle) { + return bundle.getInt(getColumnName()); + } + } } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Builder.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Builder.java new file mode 100644 index 00000000..ab4560fe --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Builder.java @@ -0,0 +1,164 @@ +package org.learningequality.task; + +import android.os.Bundle; + +import java.util.Arrays; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import androidx.work.OneTimeWorkRequest; +import androidx.work.OutOfQuotaPolicy; +import androidx.work.WorkInfo; +import androidx.work.WorkQuery; + +import org.learningequality.Kolibri.TaskworkerWorker; +import org.learningequality.Kolibri.sqlite.JobStorage; + + +/** + * A builder class consolidating logic for creating WorkRequests and WorkQueries + */ +public class Builder { + public static final String TAG = "KolibriTask.Builder"; + + public static final String TAG_PREFIX_TASK_ID = "kolibri_task_id:"; + public static final String TAG_PREFIX_JOB_FUNC = "kolibri_job_type:"; + public static final String TAG_EXPEDITED = "kolibri_job_expedited"; + + public static String generateTagFromId(String id) { + return TAG_PREFIX_TASK_ID + id; + } + + public static String generateTagFromJobFunc(String jobFunc) { + return TAG_PREFIX_JOB_FUNC + jobFunc; + } + + /** + * A builder class for creating WorkQueries + */ + public static class TaskQuery { + private final WorkQuery.Builder builder; + + public TaskQuery(String... jobIds) { + this.builder = WorkQuery.Builder.fromUniqueWorkNames(Arrays.asList(jobIds)); + } + + public TaskQuery(UUID... requestIds) { + this.builder = WorkQuery.Builder.fromIds(Arrays.asList(requestIds)); + } + + public WorkQuery build() { + return this.builder.build(); + } + } + + /** + * A builder class for creating WorkRequests + * Unfortunately, OneTimeWorkRequest.Builder is final so we cannot extend it. + */ + public static class TaskRequest { + private final String id; + private final OneTimeWorkRequest.Builder builder; + private int delay; + private boolean expedite; + + public TaskRequest(String id) { + this.id = id; + builder = new OneTimeWorkRequest.Builder(TaskworkerWorker.class); + builder.addTag(generateTagFromId(id)); + builder.setInputData(TaskworkerWorker.buildInputData(id)); + setDelay(0); + } + + public String getId() { + return this.id; + } + + + public TaskRequest setDelay(int delay) { + this.delay = delay; + if (delay > 0) { + builder.setInitialDelay(delay, TimeUnit.SECONDS); + } + return this; + } + + public TaskRequest setExpedite(boolean expedite) { + this.expedite = expedite; + return this; + } + + public TaskRequest setJobFunc(String jobFunc) { + this.builder.addTag(generateTagFromJobFunc(jobFunc)); + return this; + } + + public TaskRequest setLongRunning(boolean longRunning) { + if (longRunning) builder.addTag(TaskworkerWorker.TAG_LONG_RUNNING); + return this; + } + + /** + * Build a one-time WorkRequest from the TaskRequest information + * @return A OneTimeWorkRequest object + */ + public OneTimeWorkRequest build() { + // Tasks can only be expedited if they are set with no delay. + // This does not appear to be documented, but is evident in the Android Jetpack source code. + // https://android.googlesource.com/platform/frameworks/support/+/HEAD/work/work-runtime/src/main/java/androidx/work/WorkRequest.kt#271 + if (expedite && delay == 0) { + builder.setExpedited(OutOfQuotaPolicy.RUN_AS_NON_EXPEDITED_WORK_REQUEST); + builder.addTag(TAG_EXPEDITED); + } + + return builder.build(); + } + + /** + * Creates a TaskRequest builder from a job Bundle, like that returned by JobStorage + * @param job The existing job Bundle from which to parse task information + * @return A TaskRequest builder + */ + public static TaskRequest fromJob(Bundle job) { + String id = JobStorage.Jobs.id.getValue(job); + int priority = JobStorage.Jobs.priority.getValue(job); + + TaskRequest builder = new TaskRequest(id); + return builder.setJobFunc(JobStorage.Jobs.func.getValue(job)) + .setExpedite(priority <= JobStorage.Jobs.Priority.HIGH.getValue()); + } + + /** + * Creates a TaskRequest builder from an existing WorkInfo object + * @param workInfo The existing WorkInfo from which to parse task information + * @return A TaskRequest builder + */ + public static TaskRequest fromWorkInfo(WorkInfo workInfo) { + String id = null; + String jobFunc = null; + boolean expedite = false; + boolean isLongRunning = false; + + for (String tag : workInfo.getTags()) { + if (tag.startsWith(TAG_PREFIX_TASK_ID)) { + id = tag.substring(TAG_PREFIX_TASK_ID.length()); + } else if (tag.startsWith(TAG_PREFIX_JOB_FUNC)) { + jobFunc = tag.substring(TAG_PREFIX_JOB_FUNC.length()); + } else if (tag.equals(TAG_EXPEDITED)) { + expedite = true; + } else if (tag.equals(TaskworkerWorker.TAG_LONG_RUNNING)) { + isLongRunning = true; + } + } + + if (id == null || jobFunc == null) { + throw new IllegalArgumentException("WorkInfo is missing required task info"); + } + + return (new TaskRequest(id)) + .setJobFunc(jobFunc) + .setExpedite(expedite) + .setLongRunning(isLongRunning); + } + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Reconciler.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Reconciler.java new file mode 100644 index 00000000..bc24ce24 --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Reconciler.java @@ -0,0 +1,168 @@ +package org.learningequality.task; + +import android.content.Context; +import android.util.Log; + +import androidx.work.ExistingWorkPolicy; +import androidx.work.OneTimeWorkRequest; +import androidx.work.multiprocess.RemoteWorkManager; + +import org.learningequality.Kolibri.sqlite.JobStorage; +import org.learningequality.sqlite.query.UpdateQuery; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.util.UUID; + + +public class Reconciler implements AutoCloseable { + public static final String TAG = "KolibriTask.Reconciler"; + public static final String LOCK_FILE = "kolibri_reconciler.lock"; + + private final RemoteWorkManager workManager; + private final FileChannel lockChannel; + private final JobStorage db; + private FileLock lock; + + public Reconciler(RemoteWorkManager workManager, JobStorage db, File lockFile) { + this.workManager = workManager; + this.db = db; + try { + lockChannel = new RandomAccessFile(lockFile, "rw").getChannel(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Create a new Reconciler instance from a Context + * @param context The context to use + * @return A new Reconciler instance + */ + public static Reconciler from(Context context, JobStorage db) { + File lockFile = new File(context.getFilesDir(), LOCK_FILE); + RemoteWorkManager workManager = RemoteWorkManager.getInstance(context); + return new Reconciler(workManager, db, lockFile); + } + + /** + * Attempt to acquire an exclusive lock on the lock file, which will prevent multiple + * Reconciler instances from running at the same time, including in different processes. + * Also starts a transaction on the database. + * @return True if the lock was acquired, false otherwise + */ + public boolean begin() { + // First get a lock on the lock file + try { + Log.d(TAG, "Acquiring lock"); + lock = lockChannel.tryLock(); + if (lock == null) { + Log.d(TAG, "Failed to acquire lock"); + return false; + } + } catch (IOException e) { + Log.e(TAG, "Failed to acquire lock", e); + return false; + } + + // Then start a transaction + Log.d(TAG, "Beginning transaction"); + db.begin(); + return true; + } + + /** + * Commit the database transaction and release the lock + */ + public void end() { + Log.d(TAG, "Committing transaction"); + db.commit(); + + try { + Log.d(TAG, "Releasing lock"); + if (lock != null) lock.release(); + lockChannel.close(); + } catch (Exception e) { + Log.e(TAG, "Failed to close and release lock", e); + } + } + + /** + * Close the Reconciler, rolling back the database transaction and releasing the lock + */ + public void close() { + // this may be a no-op if closing normally + db.rollback(); + end(); + } + + /** + * (Re)enqueue a WorkRequest from a Sentinel.Result + * @param result The result of a Sentinel check operation + */ + protected void enqueueFrom(Sentinel.Result result) { + // We prefer to create the builder from the WorkInfo, if it exists + Builder.TaskRequest builder = (result.isMissing()) + ? Builder.TaskRequest.fromJob(result.getJob()) + : Builder.TaskRequest.fromWorkInfo(result.getWorkInfo()); + + + if (result.isMissing()) { + // if we're missing the WorkInfo, then we can't know if it's supposed to be long running, + // because we don't track `long_running` in the DB, so we can only assume + builder.setLongRunning(true) + .setDelay(0); + } + + Log.d(TAG, "Re-enqueuing job " + builder.getId()); + OneTimeWorkRequest req = builder.build(); + // Using `REPLACE` here because we want to replace the existing request as a more + // forceful way of ensuring that the request is enqueued, since this is reconciliation + workManager.enqueueUniqueWork(builder.getId(), ExistingWorkPolicy.REPLACE, req); + if (updateRequestId(builder.getId(), req.getId()) == 0) { + Log.e(TAG, "Failed to update request ID for job " + builder.getId()); + } + } + + /** + * Update the request ID for a job in the database + * @param id The job ID + * @param requestId The new WorkManager request ID + * @return The number of rows updated + */ + protected int updateRequestId(String id, UUID requestId) { + Log.d(TAG, "Updating request ID for job " + id + " to " + requestId); + UpdateQuery q = new UpdateQuery(JobStorage.Jobs.TABLE_NAME) + .where(JobStorage.Jobs.id, id) + .set(JobStorage.Jobs.worker_extra, requestId.toString()); + return q.execute(db); + } + + /** + * Process results from Sentinel checks that found jobs in the given state didn't match + * the expected WorkManager state, or were missing + * @param stateRef The state which Kolibri thinks the job is in + * @param results The results of the Sentinel checks + */ + public void process(StateMap stateRef, Sentinel.Result[] results) { + Log.d(TAG, "Reconciling " + results.length + " jobs for state " + stateRef); + + for (Sentinel.Result result : results) { + switch (stateRef.getJobState()) { + case PENDING: + case QUEUED: + case SCHEDULED: + case SELECTED: + case RUNNING: + enqueueFrom(result); + break; + default: + Log.d(TAG, "No reconciliation for state " + stateRef.getJobState()); + break; + } + } + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Sentinel.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Sentinel.java new file mode 100644 index 00000000..701f4a49 --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Sentinel.java @@ -0,0 +1,247 @@ +package org.learningequality.task; + +import android.content.Context; +import android.os.Bundle; +import android.util.Log; +import android.util.Pair; + +import androidx.work.WorkInfo; +import androidx.work.WorkQuery; +import androidx.work.multiprocess.RemoteWorkManager; + +import com.google.common.util.concurrent.ListenableFuture; + +import org.learningequality.Kolibri.sqlite.JobStorage; +import org.learningequality.sqlite.query.SelectQuery; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; + +import java9.util.concurrent.CompletableFuture; + +/** + * Sentinel (as in watcher) for checking and reconciling Kolibri job status with WorkManager + */ +public class Sentinel { + public static String TAG = "KolibriTask.Sentinel"; + private final RemoteWorkManager workManager; + private final JobStorage db; + private final Executor executor; + + /** + * A class that holds the pair of Bundle and WorkInfo as a result of the Sentinel's + * check operations + */ + public static class Result extends Pair { + public Result(Bundle first, WorkInfo second) { + super(first, second); + } + + public boolean isMissing() { + return this.second == null; + } + + public Bundle getJob() { + return this.first; + } + + public WorkInfo getWorkInfo() { + return this.second; + } + } + + public Sentinel(RemoteWorkManager workManager, JobStorage db, Executor executor) { + this.workManager = workManager; + this.db = db; + this.executor = executor; + } + + /** + * Create a sentinel + */ + public static Sentinel from(Context context, JobStorage db, Executor executor) { + return new Sentinel( + RemoteWorkManager.getInstance(context), + db, + executor + ); + } + + /** + * Build a query for jobs with the given status + * + * @param jobStatus The job status in the Kolibri database for which to find jobs + * @return A query for jobs with the given status, and subset of selected columns + */ + private SelectQuery buildQuery(JobStorage.Jobs.State jobStatus) { + return new SelectQuery( + JobStorage.Jobs.id, + JobStorage.Jobs.priority, + JobStorage.Jobs.state, + JobStorage.Jobs.worker_process, + JobStorage.Jobs.worker_thread, + JobStorage.Jobs.worker_extra + ) + .from(JobStorage.Jobs.TABLE_NAME) + .where(jobStatus) + .orderBy(JobStorage.Jobs.time_updated, false); + } + + private WorkQuery buildWorkQuery(Bundle result) { + String requestId = JobStorage.Jobs.worker_extra.getValue(result); + final Builder.TaskQuery builder; + + if (requestId == null) { + String id = JobStorage.Jobs.id.getValue(result); + Log.v(TAG, "No request ID found for job " + id); + builder = new Builder.TaskQuery(id); + } else { + builder = new Builder.TaskQuery(UUID.fromString(requestId)); + } + + return builder.build(); + } + + /** + * Check for jobs with the given status and reconcile them with WorkManager + * Defaults to flagging missing work in WorkManager + * + * @param stateRef The job status in the Kolibri database for which to find jobs + * @return A future that will complete when all jobs have been checked, with a list of jobs + */ + public CompletableFuture check(StateMap stateRef) { + return check(false, stateRef); + } + + /** + * Check for jobs with the given status and reconcile them with WorkManager + * @param ignoreMissing Whether to ignore missing work in WorkManager + * @param stateRef The job status in the Kolibri database for which to find jobs + * @return A future that will complete when all jobs have been checked, with a list of jobs + */ + public CompletableFuture check( + boolean ignoreMissing, + StateMap stateRef + ) { + SelectQuery query = buildQuery(stateRef.getJobState()); + Bundle[] jobs = query.execute(db); + + if (jobs == null || jobs.length == 0) { + Log.v(TAG, "No jobs to reconcile for status " + stateRef); + return CompletableFuture.completedFuture(null); + } + + return check(jobs, ignoreMissing, stateRef.getWorkInfoStates()); + } + + /** + * Check for the given jobs (Bundles) and reconciles them with WorkManager + * + * @param jobs The jobs to check + * @param ignoreMissing Whether to ignore missing work in WorkManager + * @param expectedWorkStates The expected WorkManager states for the found jobs + * @return A future that will complete when all jobs have been checked, with a list of jobs + */ + public CompletableFuture check( + Bundle[] jobs, + boolean ignoreMissing, + WorkInfo.State... expectedWorkStates + ) { + final List> jobFutures = new ArrayList>(jobs.length); + + for (Bundle job : jobs) { + CompletableFuture jobCheck = check(job, ignoreMissing, expectedWorkStates); + jobFutures.add(jobCheck); + } + + CompletableFuture future = CompletableFuture.allOf( + jobFutures.toArray(new CompletableFuture[0]) + ) + .thenApply((result) -> { + final List allResults = new ArrayList(jobs.length); + for (CompletableFuture jobFuture : jobFutures) { + // Add all the results from the job futures + try { + Result jobResult = jobFuture.get(); + if (jobResult != null) { + allResults.add(jobResult); + } + } catch (ExecutionException | InterruptedException e) { + e.printStackTrace(); + } + } + return allResults.toArray(new Result[0]); + }); + + future.whenComplete((result, ex) -> { + if (future.isCancelled()) { + for (CompletableFuture jobFuture : jobFutures) { + jobFuture.cancel(true); + } + } + }); + return future; + } + + /** + * Check for the given job (Bundle) and reconciles i with WorkManager + * + * @param job The job to check as a `Bundle` + * @param ignoreMissing Whether to ignore the job as missing in WorkManager + * @param expectedWorkStates The expected WorkManager states for the found jobs + * @return A future that will complete when the job has been checked, with the job if it is not reconciled + */ + public CompletableFuture check( + Bundle job, + boolean ignoreMissing, + WorkInfo.State... expectedWorkStates + ) { + CompletableFuture future = new CompletableFuture<>(); + + List workStates = Arrays.asList(expectedWorkStates); + WorkQuery workQuery = buildWorkQuery(job); + ListenableFuture> workInfosFuture = workManager.getWorkInfos(workQuery); + + workInfosFuture.addListener(() -> { + Result res = null; + try { + List workInfos = workInfosFuture.get(); + + if (workInfos == null || workInfos.size() == 0) { + if (ignoreMissing) { + return; + } + + Log.w(TAG, "No work requests found for job id " + JobStorage.Jobs.id.getValue(job)); + res = new Result(job, null); + } else { + for (WorkInfo workInfo : workInfos) { + WorkInfo.State state = workInfo.getState(); + + if (!workStates.contains(state)) { + Log.w(TAG, "WorkInfo state " + state + " does not match expected state " + Arrays.toString(expectedWorkStates) + " for request " + workInfo.getId() + " | " + workInfo.getTags()); + res = new Result(job, workInfo); + } + } + } + } + catch (ExecutionException | InterruptedException e) { + e.printStackTrace(); + } finally { + future.complete(res); + } + }, executor); + + future.whenComplete((result, ex) -> { + if (future.isCancelled()) { + workInfosFuture.cancel(true); + } + }); + + return future; + } +} \ No newline at end of file diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/StateMap.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/StateMap.java new file mode 100644 index 00000000..c933490e --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/StateMap.java @@ -0,0 +1,64 @@ +package org.learningequality.task; + +import androidx.work.WorkInfo; + +import org.learningequality.Kolibri.sqlite.JobStorage; + +/** + * A mapping between Kolibri job states and WorkManager work states + */ +public enum StateMap { + MISSING(null), + PENDING( + JobStorage.Jobs.State.PENDING, + WorkInfo.State.ENQUEUED, + WorkInfo.State.BLOCKED + ), + QUEUED( + JobStorage.Jobs.State.QUEUED, + WorkInfo.State.ENQUEUED, + WorkInfo.State.BLOCKED + ), + SCHEDULED( + JobStorage.Jobs.State.SCHEDULED, + WorkInfo.State.ENQUEUED, + WorkInfo.State.BLOCKED + ), + SELECTED( + JobStorage.Jobs.State.SELECTED, + WorkInfo.State.ENQUEUED, + WorkInfo.State.BLOCKED, + WorkInfo.State.RUNNING + ), + RUNNING(JobStorage.Jobs.State.RUNNING, WorkInfo.State.RUNNING), + CANCELING(JobStorage.Jobs.State.CANCELING, WorkInfo.State.CANCELLED), + CANCELED(JobStorage.Jobs.State.CANCELED, WorkInfo.State.CANCELLED), + FAILED(JobStorage.Jobs.State.FAILED, WorkInfo.State.FAILED), + COMPLETED(JobStorage.Jobs.State.COMPLETED, WorkInfo.State.SUCCEEDED); + + private final JobStorage.Jobs.State jobState; + private final WorkInfo.State[] workInfoStates; + + StateMap(JobStorage.Jobs.State jobState, WorkInfo.State... workInfoStates) { + this.jobState = jobState; + this.workInfoStates = workInfoStates; + } + + public JobStorage.Jobs.State getJobState() { + return this.jobState; + } + + public WorkInfo.State[] getWorkInfoStates() { + return this.workInfoStates; + } + + public static StateMap[] forReconciliation() { + return new StateMap[] { + PENDING, + QUEUED, + SCHEDULED, + SELECTED, + RUNNING + }; + } +} \ No newline at end of file