Skip to content

Commit

Permalink
Cleanup, reformatting, fix int/long mixup, and future cancellation pr…
Browse files Browse the repository at this point in the history
…opogation
  • Loading branch information
bjester committed Jan 12, 2024
1 parent bf6fe20 commit 980074d
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ public class FuturesUtil {

public static <T> CompletableFuture<T> toCompletable(ListenableFuture<T> future, Executor executor) {
CompletableFuture<T> completableFuture = new CompletableFuture<>();

future.addListener(() -> {
try {
completableFuture.complete(future.get(3, java.util.concurrent.TimeUnit.SECONDS));
Log.d(TAG, "Future completed");
} catch (InterruptedException | ExecutionException e) {
Log.d(TAG, "Future encountered exception");
completableFuture.completeExceptionally(e);
Expand All @@ -26,12 +26,14 @@ public static <T> CompletableFuture<T> toCompletable(ListenableFuture<T> future,
completableFuture.completeExceptionally(e);
}
}, executor);

completableFuture.whenCompleteAsync((result, error) -> {
if (completableFuture.isCancelled()) {
Log.d(TAG, "Propagating cancellation to future");
future.cancel(true);
}
}, executor);

return completableFuture;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@

import com.google.common.util.concurrent.ListenableFuture;

import org.learningequality.Task;

import java.util.concurrent.Executor;

import org.learningequality.Task;
import java9.util.concurrent.CompletableFuture;

public class ReconcileWorker extends RemoteListenableWorker {
public static final String TAG = "Kolibri.ReconcileWorker";
Expand All @@ -23,31 +25,41 @@ public ReconcileWorker(@NonNull Context appContext, @NonNull WorkerParameters wo
super(appContext, workerParams);
}

public static Data buildInputData() {
return new Data.Builder()
.putString(ARGUMENT_PACKAGE_NAME, "org.learningequality.Kolibri")
.putString(ARGUMENT_CLASS_NAME,
TaskworkerWorkerService.class.getName())
.build();
}

@SuppressLint("RestrictedApi")
@NonNull
public ListenableFuture<Result> startRemoteWork() {
Log.i(TAG, "Starting reconcile task");
SettableFuture<Result> future = SettableFuture.create();
Executor executor = getBackgroundExecutor();

Task.reconcile(getApplicationContext(), executor)
.whenCompleteAsync((result, error) -> {
if (error != null) {
Log.e(TAG, "Failed to reconcile tasks", error);
future.set(Result.failure());
} else {
future.set(Result.success());
}
}, executor);
final CompletableFuture<Boolean> reconcile = Task.reconcile(getApplicationContext(), executor);

return future;
}
reconcile.whenCompleteAsync((result, error) -> {
if (error != null) {
Log.e(TAG, "Failed to reconcile tasks", error);
future.set(Result.failure());
} else {
future.set(Result.success());
}
}, executor);

public static Data buildInputData() {
return new Data.Builder()
.putString(ARGUMENT_PACKAGE_NAME, "org.learningequality.Kolibri")
.putString(ARGUMENT_CLASS_NAME,
TaskworkerWorkerService.class.getName())
.build();
future.addListener(() -> {
if (future.isCancelled()) {
Log.d(TAG, "Propagating cancellation to future");
synchronized (reconcile) {
reconcile.cancel(true);
}
}
}, executor);

return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,33 @@
public class JobStorage extends Database {
public static final String DATABASE_NAME = "job_storage.sqlite3";

public static class Jobs implements DatabaseTable {
public static final String TABLE_NAME = "jobs";
protected JobStorage(String path, int flags) {
super(DATABASE_NAME, path, flags);
}

public String getTableName() {
return TABLE_NAME;
}
public static JobStorage readwrite(Context context) {
File f = getDatabasePath(context, DATABASE_NAME);

return f != null
? new JobStorage(f.getPath(), SQLiteDatabase.OPEN_READWRITE)
: null;
}

public static class Jobs implements DatabaseTable {
public static final String TABLE_NAME = "jobs";
public static final StringColumn id = new StringColumn("id");
public static final StringColumn func = new StringColumn("func");
public static final IntegerColumn priority = new IntegerColumn("priority");
public static final LongColumn priority = new LongColumn("priority");
public static final StringColumn worker_process = new StringColumn("worker_process");
public static final StringColumn worker_thread = new StringColumn("worker_thread");
public static final StringColumn worker_extra = new StringColumn("worker_extra");
public static final StringColumn time_updated = new StringColumn("time_updated");
public static final StringColumn state = new StringColumn("state");

public String getTableName() {
return TABLE_NAME;
}

public enum State implements StringChoiceEnum {
PENDING,
QUEUED,
Expand All @@ -36,47 +47,35 @@ public enum State implements StringChoiceEnum {
CANCELING,
CANCELED,
FAILED,
COMPLETED
;
COMPLETED;

public StringColumn getColumn() {
return state;
}
}

public enum Priority implements ColumnEnum<Integer> {
LOW(15),
REGULAR(10),
HIGH(5)
;
public enum Priority implements ColumnEnum<Long> {
LOW(15L),
REGULAR(10L),
HIGH(5L);

private final int value;
private final Long value;

Priority(int val) {
Priority(Long val) {
this.value = val;
}

public Integer getValue() {
public Long getValue() {
return this.value;
}

public IntegerColumn getColumn() {
return priority;
public boolean isAtLeast(Long other) {
return this.value.compareTo(other) >= 0;
}


public LongColumn getColumn() {
return priority;
}
}
}

protected JobStorage(String path, int flags) {
super(DATABASE_NAME, path, flags);
}

public static JobStorage readwrite(Context context) {
File f = getDatabasePath(context, DATABASE_NAME);

return f != null
? new JobStorage(f.getPath(), SQLiteDatabase.OPEN_READWRITE)
: null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

import org.learningequality.Kolibri.sqlite.JobStorage;
import org.learningequality.task.Builder;
import org.learningequality.task.StateMap;
import org.learningequality.task.Sentinel;
import org.learningequality.task.Reconciler;
import org.learningequality.task.Sentinel;
import org.learningequality.task.StateMap;

import java.util.List;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -78,7 +78,7 @@ public static void clear(String id) {
}, new MainThreadExecutor());
}

public static CompletableFuture<Boolean> reconcile(Context context, Executor executor) {
public static CompletableFuture<Boolean> reconcile(Context context, Executor executor) {
if (executor == null) {
executor = ContextCompat.getMainExecutor(context);
}
Expand All @@ -104,6 +104,7 @@ public static CompletableFuture<Boolean> reconcile(Context context, Executor exe
// Run through all the states and check them, then process the results
for (StateMap stateRef : StateMap.forReconciliation()) {
chain = chain.thenComposeAsync((_didReconcile) -> {
// Avoid checking if future is cancelled
synchronized (future) {
if (future.isCancelled()) {
return CompletableFuture.completedFuture(_didReconcile);
Expand All @@ -128,14 +129,18 @@ public static CompletableFuture<Boolean> reconcile(Context context, Executor exe
}, executor);
}

chain.orTimeout(15, java.util.concurrent.TimeUnit.SECONDS)
.whenCompleteAsync((result, error) -> {
try {
reconciler.end();
db.close();
} catch (Exception e) {
Log.e(TAG, "Failed cleaning up reconciliation", e);
} finally {
final CompletableFuture<AtomicBoolean> finalChain
= chain.orTimeout(15, java.util.concurrent.TimeUnit.SECONDS);

finalChain.whenCompleteAsync((result, error) -> {
try {
reconciler.end();
db.close();
} catch (Exception e) {
Log.e(TAG, "Failed cleaning up reconciliation", e);
} finally {
synchronized (future) {
if (!future.isCancelled()) {
if (error instanceof TimeoutException) {
Log.e(TAG, "Timed out waiting for reconciliation chain", error);
future.completeExceptionally(error);
Expand All @@ -153,7 +158,18 @@ public static CompletableFuture<Boolean> reconcile(Context context, Executor exe
future.complete(false);
}
}
}, executor);
}
}
}, executor);

// Propagate cancellation to the chain
future.whenCompleteAsync((result, error) -> {
synchronized (future) {
if (future.isCancelled()) {
finalChain.cancel(true);
}
}
}, executor);

return future;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import android.os.Bundle;

public interface DatabaseTable {
public static final String DATABASE_NAME = "DATABASE_NAME";
public static final String TABLE_NAME = "TABLE_NAME";
String DATABASE_NAME = "DATABASE_NAME";
String TABLE_NAME = "TABLE_NAME";

String getTableName();

Expand Down Expand Up @@ -52,13 +52,13 @@ public String getValue(Bundle bundle) {
}
}

class IntegerColumn extends ColumnImpl<Integer> {
public IntegerColumn(String columnName) {
class LongColumn extends ColumnImpl<Long> {
public LongColumn(String columnName) {
super(columnName);
}

public Integer getValue(Bundle bundle) {
return bundle.getInt(getColumnName());
public Long getValue(Bundle bundle) {
return bundle.getLong(getColumnName());
}
}
}
Loading

0 comments on commit 980074d

Please sign in to comment.