From 980074d4d500fb02e25e621f031831b1753eb1d9 Mon Sep 17 00:00:00 2001 From: Blaine Jester Date: Fri, 12 Jan 2024 09:59:16 -0800 Subject: [PATCH] Cleanup, reformatting, fix int/long mixup, and future cancellation propogation --- .../org/learningequality/FuturesUtil.java | 4 +- .../Kolibri/ReconcileWorker.java | 48 +++++--- .../Kolibri/sqlite/JobStorage.java | 61 +++++----- .../main/java/org/learningequality/Task.java | 40 +++++-- .../sqlite/schema/DatabaseTable.java | 12 +- .../org/learningequality/task/Builder.java | 110 +++++++++--------- .../org/learningequality/task/Sentinel.java | 70 ++++++----- .../org/learningequality/task/StateMap.java | 32 ++--- 8 files changed, 212 insertions(+), 165 deletions(-) diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/FuturesUtil.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/FuturesUtil.java index 062e66dd..9e587f0a 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/FuturesUtil.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/FuturesUtil.java @@ -14,10 +14,10 @@ public class FuturesUtil { public static CompletableFuture toCompletable(ListenableFuture future, Executor executor) { CompletableFuture completableFuture = new CompletableFuture<>(); + future.addListener(() -> { try { completableFuture.complete(future.get(3, java.util.concurrent.TimeUnit.SECONDS)); - Log.d(TAG, "Future completed"); } catch (InterruptedException | ExecutionException e) { Log.d(TAG, "Future encountered exception"); completableFuture.completeExceptionally(e); @@ -26,12 +26,14 @@ public static CompletableFuture toCompletable(ListenableFuture future, completableFuture.completeExceptionally(e); } }, executor); + completableFuture.whenCompleteAsync((result, error) -> { if (completableFuture.isCancelled()) { Log.d(TAG, "Propagating cancellation to future"); future.cancel(true); } }, executor); + return completableFuture; } } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/ReconcileWorker.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/ReconcileWorker.java index 443612b2..2d4f59fc 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/ReconcileWorker.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/ReconcileWorker.java @@ -12,9 +12,11 @@ import com.google.common.util.concurrent.ListenableFuture; +import org.learningequality.Task; + import java.util.concurrent.Executor; -import org.learningequality.Task; +import java9.util.concurrent.CompletableFuture; public class ReconcileWorker extends RemoteListenableWorker { public static final String TAG = "Kolibri.ReconcileWorker"; @@ -23,6 +25,14 @@ public ReconcileWorker(@NonNull Context appContext, @NonNull WorkerParameters wo super(appContext, workerParams); } + public static Data buildInputData() { + return new Data.Builder() + .putString(ARGUMENT_PACKAGE_NAME, "org.learningequality.Kolibri") + .putString(ARGUMENT_CLASS_NAME, + TaskworkerWorkerService.class.getName()) + .build(); + } + @SuppressLint("RestrictedApi") @NonNull public ListenableFuture startRemoteWork() { @@ -30,24 +40,26 @@ public ListenableFuture startRemoteWork() { SettableFuture future = SettableFuture.create(); Executor executor = getBackgroundExecutor(); - Task.reconcile(getApplicationContext(), executor) - .whenCompleteAsync((result, error) -> { - if (error != null) { - Log.e(TAG, "Failed to reconcile tasks", error); - future.set(Result.failure()); - } else { - future.set(Result.success()); - } - }, executor); + final CompletableFuture reconcile = Task.reconcile(getApplicationContext(), executor); - return future; - } + reconcile.whenCompleteAsync((result, error) -> { + if (error != null) { + Log.e(TAG, "Failed to reconcile tasks", error); + future.set(Result.failure()); + } else { + future.set(Result.success()); + } + }, executor); - public static Data buildInputData() { - return new Data.Builder() - .putString(ARGUMENT_PACKAGE_NAME, "org.learningequality.Kolibri") - .putString(ARGUMENT_CLASS_NAME, - TaskworkerWorkerService.class.getName()) - .build(); + future.addListener(() -> { + if (future.isCancelled()) { + Log.d(TAG, "Propagating cancellation to future"); + synchronized (reconcile) { + reconcile.cancel(true); + } + } + }, executor); + + return future; } } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/sqlite/JobStorage.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/sqlite/JobStorage.java index 4b47a8f6..e184305f 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/sqlite/JobStorage.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/sqlite/JobStorage.java @@ -11,22 +11,33 @@ public class JobStorage extends Database { public static final String DATABASE_NAME = "job_storage.sqlite3"; - public static class Jobs implements DatabaseTable { - public static final String TABLE_NAME = "jobs"; + protected JobStorage(String path, int flags) { + super(DATABASE_NAME, path, flags); + } - public String getTableName() { - return TABLE_NAME; - } + public static JobStorage readwrite(Context context) { + File f = getDatabasePath(context, DATABASE_NAME); + + return f != null + ? new JobStorage(f.getPath(), SQLiteDatabase.OPEN_READWRITE) + : null; + } + public static class Jobs implements DatabaseTable { + public static final String TABLE_NAME = "jobs"; public static final StringColumn id = new StringColumn("id"); public static final StringColumn func = new StringColumn("func"); - public static final IntegerColumn priority = new IntegerColumn("priority"); + public static final LongColumn priority = new LongColumn("priority"); public static final StringColumn worker_process = new StringColumn("worker_process"); public static final StringColumn worker_thread = new StringColumn("worker_thread"); public static final StringColumn worker_extra = new StringColumn("worker_extra"); public static final StringColumn time_updated = new StringColumn("time_updated"); public static final StringColumn state = new StringColumn("state"); + public String getTableName() { + return TABLE_NAME; + } + public enum State implements StringChoiceEnum { PENDING, QUEUED, @@ -36,47 +47,35 @@ public enum State implements StringChoiceEnum { CANCELING, CANCELED, FAILED, - COMPLETED - ; + COMPLETED; public StringColumn getColumn() { return state; } } - public enum Priority implements ColumnEnum { - LOW(15), - REGULAR(10), - HIGH(5) - ; + public enum Priority implements ColumnEnum { + LOW(15L), + REGULAR(10L), + HIGH(5L); - private final int value; + private final Long value; - Priority(int val) { + Priority(Long val) { this.value = val; } - public Integer getValue() { + public Long getValue() { return this.value; } - public IntegerColumn getColumn() { - return priority; + public boolean isAtLeast(Long other) { + return this.value.compareTo(other) >= 0; } - + public LongColumn getColumn() { + return priority; + } } } - - protected JobStorage(String path, int flags) { - super(DATABASE_NAME, path, flags); - } - - public static JobStorage readwrite(Context context) { - File f = getDatabasePath(context, DATABASE_NAME); - - return f != null - ? new JobStorage(f.getPath(), SQLiteDatabase.OPEN_READWRITE) - : null; - } } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Task.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Task.java index f9c48ca7..7f5e612c 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Task.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Task.java @@ -14,9 +14,9 @@ import org.learningequality.Kolibri.sqlite.JobStorage; import org.learningequality.task.Builder; -import org.learningequality.task.StateMap; -import org.learningequality.task.Sentinel; import org.learningequality.task.Reconciler; +import org.learningequality.task.Sentinel; +import org.learningequality.task.StateMap; import java.util.List; import java.util.concurrent.ExecutionException; @@ -78,7 +78,7 @@ public static void clear(String id) { }, new MainThreadExecutor()); } - public static CompletableFuture reconcile(Context context, Executor executor) { + public static CompletableFuture reconcile(Context context, Executor executor) { if (executor == null) { executor = ContextCompat.getMainExecutor(context); } @@ -104,6 +104,7 @@ public static CompletableFuture reconcile(Context context, Executor exe // Run through all the states and check them, then process the results for (StateMap stateRef : StateMap.forReconciliation()) { chain = chain.thenComposeAsync((_didReconcile) -> { + // Avoid checking if future is cancelled synchronized (future) { if (future.isCancelled()) { return CompletableFuture.completedFuture(_didReconcile); @@ -128,14 +129,18 @@ public static CompletableFuture reconcile(Context context, Executor exe }, executor); } - chain.orTimeout(15, java.util.concurrent.TimeUnit.SECONDS) - .whenCompleteAsync((result, error) -> { - try { - reconciler.end(); - db.close(); - } catch (Exception e) { - Log.e(TAG, "Failed cleaning up reconciliation", e); - } finally { + final CompletableFuture finalChain + = chain.orTimeout(15, java.util.concurrent.TimeUnit.SECONDS); + + finalChain.whenCompleteAsync((result, error) -> { + try { + reconciler.end(); + db.close(); + } catch (Exception e) { + Log.e(TAG, "Failed cleaning up reconciliation", e); + } finally { + synchronized (future) { + if (!future.isCancelled()) { if (error instanceof TimeoutException) { Log.e(TAG, "Timed out waiting for reconciliation chain", error); future.completeExceptionally(error); @@ -153,7 +158,18 @@ public static CompletableFuture reconcile(Context context, Executor exe future.complete(false); } } - }, executor); + } + } + }, executor); + + // Propagate cancellation to the chain + future.whenCompleteAsync((result, error) -> { + synchronized (future) { + if (future.isCancelled()) { + finalChain.cancel(true); + } + } + }, executor); return future; } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/schema/DatabaseTable.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/schema/DatabaseTable.java index c4b98efa..6f4f6316 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/schema/DatabaseTable.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/schema/DatabaseTable.java @@ -3,8 +3,8 @@ import android.os.Bundle; public interface DatabaseTable { - public static final String DATABASE_NAME = "DATABASE_NAME"; - public static final String TABLE_NAME = "TABLE_NAME"; + String DATABASE_NAME = "DATABASE_NAME"; + String TABLE_NAME = "TABLE_NAME"; String getTableName(); @@ -52,13 +52,13 @@ public String getValue(Bundle bundle) { } } - class IntegerColumn extends ColumnImpl { - public IntegerColumn(String columnName) { + class LongColumn extends ColumnImpl { + public LongColumn(String columnName) { super(columnName); } - public Integer getValue(Bundle bundle) { - return bundle.getInt(getColumnName()); + public Long getValue(Bundle bundle) { + return bundle.getLong(getColumnName()); } } } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Builder.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Builder.java index 62375188..a0d413f1 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Builder.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Builder.java @@ -2,10 +2,6 @@ import android.os.Bundle; -import java.util.Arrays; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - import androidx.work.OneTimeWorkRequest; import androidx.work.OutOfQuotaPolicy; import androidx.work.WorkInfo; @@ -14,6 +10,10 @@ import org.learningequality.Kolibri.TaskworkerWorker; import org.learningequality.Kolibri.sqlite.JobStorage; +import java.util.Arrays; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + /** * A builder class consolidating logic for creating WorkRequests and WorkQueries @@ -43,10 +43,6 @@ public TaskQuery(WorkQuery.Builder builder) { this.builder = builder; } - public WorkQuery build() { - return this.builder.build(); - } - public static TaskQuery from(String... jobIds) { return new TaskQuery(WorkQuery.Builder.fromUniqueWorkNames(Arrays.asList(jobIds))); } @@ -54,6 +50,10 @@ public static TaskQuery from(String... jobIds) { public static TaskQuery from(UUID... requestIds) { return new TaskQuery(WorkQuery.Builder.fromIds(Arrays.asList(requestIds))); } + + public WorkQuery build() { + return this.builder.build(); + } } /** @@ -74,66 +74,24 @@ public TaskRequest(String id) { setDelay(0); } - public String getId() { - return this.id; - } - - - public TaskRequest setDelay(int delay) { - this.delay = delay; - if (delay > 0) { - builder.setInitialDelay(delay, TimeUnit.SECONDS); - } - return this; - } - - public TaskRequest setExpedite(boolean expedite) { - this.expedite = expedite; - return this; - } - - public TaskRequest setJobFunc(String jobFunc) { - this.builder.addTag(generateTagFromJobFunc(jobFunc)); - return this; - } - - public TaskRequest setLongRunning(boolean longRunning) { - if (longRunning) builder.addTag(TaskworkerWorker.TAG_LONG_RUNNING); - return this; - } - - /** - * Build a one-time WorkRequest from the TaskRequest information - * @return A OneTimeWorkRequest object - */ - public OneTimeWorkRequest build() { - // Tasks can only be expedited if they are set with no delay. - // This does not appear to be documented, but is evident in the Android Jetpack source code. - // https://android.googlesource.com/platform/frameworks/support/+/HEAD/work/work-runtime/src/main/java/androidx/work/WorkRequest.kt#271 - if (expedite && delay == 0) { - builder.setExpedited(OutOfQuotaPolicy.RUN_AS_NON_EXPEDITED_WORK_REQUEST); - builder.addTag(TAG_EXPEDITED); - } - - return builder.build(); - } - /** * Creates a TaskRequest builder from a job Bundle, like that returned by JobStorage + * * @param job The existing job Bundle from which to parse task information * @return A TaskRequest builder */ public static TaskRequest fromJob(Bundle job) { String id = JobStorage.Jobs.id.getValue(job); - int priority = JobStorage.Jobs.priority.getValue(job); + Long priority = JobStorage.Jobs.priority.getValue(job); TaskRequest builder = new TaskRequest(id); return builder.setJobFunc(JobStorage.Jobs.func.getValue(job)) - .setExpedite(priority <= JobStorage.Jobs.Priority.HIGH.getValue()); + .setExpedite(JobStorage.Jobs.Priority.HIGH.isAtLeast(priority)); } /** * Creates a TaskRequest builder from an existing WorkInfo object + * * @param workInfo The existing WorkInfo from which to parse task information * @return A TaskRequest builder */ @@ -164,5 +122,49 @@ public static TaskRequest fromWorkInfo(WorkInfo workInfo) { .setExpedite(expedite) .setLongRunning(isLongRunning); } + + public String getId() { + return this.id; + } + + public TaskRequest setDelay(int delay) { + this.delay = delay; + if (delay > 0) { + builder.setInitialDelay(delay, TimeUnit.SECONDS); + } + return this; + } + + public TaskRequest setExpedite(boolean expedite) { + this.expedite = expedite; + return this; + } + + public TaskRequest setJobFunc(String jobFunc) { + this.builder.addTag(generateTagFromJobFunc(jobFunc)); + return this; + } + + public TaskRequest setLongRunning(boolean longRunning) { + if (longRunning) builder.addTag(TaskworkerWorker.TAG_LONG_RUNNING); + return this; + } + + /** + * Build a one-time WorkRequest from the TaskRequest information + * + * @return A OneTimeWorkRequest object + */ + public OneTimeWorkRequest build() { + // Tasks can only be expedited if they are set with no delay. + // This does not appear to be documented, but is evident in the Android Jetpack source code. + // https://android.googlesource.com/platform/frameworks/support/+/HEAD/work/work-runtime/src/main/java/androidx/work/WorkRequest.kt#271 + if (expedite && delay == 0) { + builder.setExpedited(OutOfQuotaPolicy.RUN_AS_NON_EXPEDITED_WORK_REQUEST); + builder.addTag(TAG_EXPEDITED); + } + + return builder.build(); + } } } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Sentinel.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Sentinel.java index 9c3f2385..538a0286 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Sentinel.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Sentinel.java @@ -30,28 +30,6 @@ public class Sentinel { private final JobStorage db; private final Executor executor; - /** - * A class that holds the pair of Bundle and WorkInfo as a result of the Sentinel's - * check operations - */ - public static class Result extends Pair { - public Result(Bundle first, WorkInfo second) { - super(first, second); - } - - public boolean isMissing() { - return this.second == null; - } - - public Bundle getJob() { - return this.first; - } - - public WorkInfo getWorkInfo() { - return this.second; - } - } - public Sentinel(RemoteWorkManager workManager, JobStorage db, Executor executor) { this.workManager = workManager; this.db = db; @@ -114,8 +92,9 @@ public CompletableFuture check(StateMap stateRef) { /** * Check for jobs with the given status and reconcile them with WorkManager + * * @param ignoreMissing Whether to ignore missing work in WorkManager - * @param stateRef The job status in the Kolibri database for which to find jobs + * @param stateRef The job status in the Kolibri database for which to find jobs * @return A future that will complete when all jobs have been checked, with a list of jobs */ public CompletableFuture check( @@ -138,8 +117,8 @@ public CompletableFuture check( /** * Check for the given jobs (Bundles) and reconciles them with WorkManager * - * @param jobs The jobs to check - * @param ignoreMissing Whether to ignore missing work in WorkManager + * @param jobs The jobs to check + * @param ignoreMissing Whether to ignore missing work in WorkManager * @param expectedWorkStates The expected WorkManager states for the found jobs * @return A future that will complete when all jobs have been checked, with a list of jobs */ @@ -174,7 +153,9 @@ public CompletableFuture check( }, executor); } - chain.whenCompleteAsync((results, ex) -> { + final CompletableFuture> finalChain = chain; + + finalChain.whenCompleteAsync((results, ex) -> { if (ex != null) { Log.e(TAG, "Failed to check jobs", ex); future.completeExceptionally(ex); @@ -187,14 +168,25 @@ public CompletableFuture check( } } }, executor); + + future.whenCompleteAsync((results, ex) -> { + synchronized (future) { + if (future.isCancelled()) { + Log.d(TAG, "Propagating cancellation to future"); + synchronized (finalChain) { + finalChain.cancel(true); + } + } + } + }, executor); return future; } /** * Check for the given job (Bundle) and reconciles it with WorkManager * - * @param job The job to check as a `Bundle` - * @param ignoreMissing Whether to ignore the job as missing in WorkManager + * @param job The job to check as a `Bundle` + * @param ignoreMissing Whether to ignore the job as missing in WorkManager * @param expectedWorkStates The expected WorkManager states for the found jobs * @return A future that will complete when the job has been checked, with the job if it is not reconciled */ @@ -234,4 +226,26 @@ public CompletableFuture check( return null; }, executor); } + + /** + * A class that holds the pair of Bundle and WorkInfo as a result of the Sentinel's + * check operations + */ + public static class Result extends Pair { + public Result(Bundle first, WorkInfo second) { + super(first, second); + } + + public boolean isMissing() { + return this.second == null; + } + + public Bundle getJob() { + return this.first; + } + + public WorkInfo getWorkInfo() { + return this.second; + } + } } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/StateMap.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/StateMap.java index 69579368..38df95b8 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/StateMap.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/StateMap.java @@ -8,7 +8,6 @@ * A mapping between Kolibri job states and WorkManager work states */ public enum StateMap { - MISSING(null), PENDING( JobStorage.Jobs.State.PENDING, WorkInfo.State.ENQUEUED, @@ -33,11 +32,14 @@ public enum StateMap { WorkInfo.State.BLOCKED, WorkInfo.State.RUNNING ), - RUNNING(JobStorage.Jobs.State.RUNNING, WorkInfo.State.RUNNING, WorkInfo.State.SUCCEEDED), - CANCELING(JobStorage.Jobs.State.CANCELING, WorkInfo.State.CANCELLED), - CANCELED(JobStorage.Jobs.State.CANCELED, WorkInfo.State.CANCELLED), - FAILED(JobStorage.Jobs.State.FAILED, WorkInfo.State.FAILED), - COMPLETED(JobStorage.Jobs.State.COMPLETED, WorkInfo.State.SUCCEEDED); + // We include 'ENQUEUED' here because it is possible for a job to be re-enqueued by the + // reconciler while Kolibri thinks it's running + RUNNING( + JobStorage.Jobs.State.RUNNING, + WorkInfo.State.ENQUEUED, + WorkInfo.State.RUNNING, + WorkInfo.State.SUCCEEDED + ); private final JobStorage.Jobs.State jobState; private final WorkInfo.State[] workInfoStates; @@ -47,16 +49,8 @@ public enum StateMap { this.workInfoStates = workInfoStates; } - public JobStorage.Jobs.State getJobState() { - return this.jobState; - } - - public WorkInfo.State[] getWorkInfoStates() { - return this.workInfoStates; - } - public static StateMap[] forReconciliation() { - return new StateMap[] { + return new StateMap[]{ PENDING, QUEUED, SCHEDULED, @@ -64,4 +58,12 @@ public static StateMap[] forReconciliation() { RUNNING }; } + + public JobStorage.Jobs.State getJobState() { + return this.jobState; + } + + public WorkInfo.State[] getWorkInfoStates() { + return this.workInfoStates; + } }