Skip to content

Commit

Permalink
Restructure future handling
Browse files Browse the repository at this point in the history
  • Loading branch information
bjester committed Jan 11, 2024
1 parent fe0feb6 commit 4cba9e0
Show file tree
Hide file tree
Showing 10 changed files with 295 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import com.google.common.util.concurrent.ListenableFuture;

import java.util.concurrent.RunnableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

abstract public class PythonWorker extends RemoteListenableWorker {
Expand All @@ -27,6 +27,8 @@ abstract public class PythonWorker extends RemoteListenableWorker {

public static final int MAX_WORKER_RETRIES = 3;

public static final boolean DO_RETRY = false;

// Python environment variables
private String androidPrivate;
private String androidArgument;
Expand Down Expand Up @@ -115,37 +117,31 @@ public ListenableFuture<Result> startRemoteWork() {
// This is somewhat similar to what the plain `Worker` class does, except that we
// use `submit` instead of `execute` so we can propagate cancellation
// See https://android.googlesource.com/platform/frameworks/support/+/60ae0eec2a32396c22ad92502cde952c80d514a0/work/workmanager/src/main/java/androidx/work/Worker.java
RunnableFuture<?> threadFuture = (RunnableFuture<?>)executor.submit(new Runnable() {
@Override
public void run() {
try {
Result r = doWork();
future.set(r);
} catch (Exception e) {
if (getRunAttemptCount() > MAX_WORKER_RETRIES) {
Log.e(TAG, id + " Exception in remote python work", e);
future.setException(e);
} else {
Log.w(TAG, id + " Exception in remote python work, scheduling retry", e);
future.set(Result.retry());
}
} finally {
cleanup();
Future<?> threadFuture = executor.submit(() -> {
try {
Result r = doWork();
future.set(r);
} catch (Exception e) {
if (!DO_RETRY || getRunAttemptCount() > MAX_WORKER_RETRIES) {
Log.e(TAG, id + " Exception in remote python work", e);
future.setException(e);
} else {
Log.w(TAG, id + " Exception in remote python work, scheduling retry", e);
future.set(Result.retry());
}
} finally {
cleanup();
}
});

// If `RunnableFuture` was a `ListenableFuture` we could simply use `future.setFuture` to
// propagate the result and cancellation, but instead add listener to propagate
// cancellation to python thread, using the task executor which should invoke this in the
// main thread (where this was originally called from)
future.addListener(new Runnable() {
@Override
public void run() {
if (future.isCancelled()) {
Log.i(TAG, "Interrupting python thread");
threadFuture.cancel(true);
}
future.addListener(() -> {
if (future.isCancelled()) {
Log.i(TAG, "Interrupting python thread");
threadFuture.cancel(true);
}
}, getTaskExecutor().getMainThreadExecutor());
return future;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.learningequality;

import android.util.Log;

import com.google.common.util.concurrent.ListenableFuture;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;

import java9.util.concurrent.CompletableFuture;

public class FuturesUtil {
public static final String TAG = "Kolibri.FuturesUtil";

public static <T> CompletableFuture<T> toCompletable(ListenableFuture<T> future, Executor executor) {
CompletableFuture<T> completableFuture = new CompletableFuture<>();
future.addListener(() -> {
try {
completableFuture.complete(future.get(3, java.util.concurrent.TimeUnit.SECONDS));
Log.d(TAG, "Future completed");
} catch (InterruptedException | ExecutionException e) {
Log.d(TAG, "Future encountered exception");
completableFuture.completeExceptionally(e);
} catch (java.util.concurrent.TimeoutException e) {
Log.d(TAG, "Future timed out");
completableFuture.completeExceptionally(e);
}
}, executor);
completableFuture.whenCompleteAsync((result, error) -> {
if (completableFuture.isCancelled()) {
Log.d(TAG, "Propagating cancellation to future");
future.cancel(true);
}
}, executor);
return completableFuture;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,22 @@
import android.app.Application;
import android.content.Context;
import android.os.Build;
import android.util.Log;

import androidx.annotation.NonNull;
import androidx.core.app.NotificationManagerCompat;
import androidx.core.app.NotificationChannelCompat;
import androidx.work.Configuration;
import androidx.work.PeriodicWorkRequest;
import androidx.work.WorkManager;

import org.learningequality.NotificationRef;
import org.learningequality.Task;

import java.util.concurrent.Executors;

import java9.util.concurrent.CompletableFuture;

public class App extends Application implements Configuration.Provider {
@Override
public void onCreate() {
super.onCreate();
NotificationRef.initialize(this);
createNotificationChannels();
reconcileTasks();
}

@NonNull
Expand All @@ -39,7 +32,7 @@ public Configuration getWorkManagerConfiguration() {
return new Configuration.Builder()
.setDefaultProcessName(processName)
.setMinimumLoggingLevel(android.util.Log.DEBUG)
.setExecutor(Executors.newFixedThreadPool(6))
.setExecutor(Executors.newFixedThreadPool(12))
.build();
}

Expand Down Expand Up @@ -69,14 +62,4 @@ private void createNotificationChannels() {
notificationManager.createNotificationChannel(taskChannel);
}
}

private void reconcileTasks() {
// Reconcile tasks on startup, in this main thread (blocking!)
boolean result = Task.reconcile(this, null);
if (result) {
Log.i("Kolibri", "Main thread task reconciliation completed");
} else {
Log.d("Kolibri", "Main thread task reconciliation no-op");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@
import android.util.Log;

import androidx.annotation.NonNull;
import androidx.work.ListenableWorker;
import androidx.work.Data;
import androidx.work.WorkerParameters;
import androidx.work.impl.utils.futures.SettableFuture;
import androidx.work.multiprocess.RemoteListenableWorker;

import com.google.common.util.concurrent.ListenableFuture;

import java9.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

import org.learningequality.Task;

public class ReconcileWorker extends ListenableWorker {
public class ReconcileWorker extends RemoteListenableWorker {
public static final String TAG = "Kolibri.ReconcileWorker";

public ReconcileWorker(@NonNull Context appContext, @NonNull WorkerParameters workerParams) {
Expand All @@ -25,18 +25,29 @@ public ReconcileWorker(@NonNull Context appContext, @NonNull WorkerParameters wo

@SuppressLint("RestrictedApi")
@NonNull
public ListenableFuture<Result> startWork() {
public ListenableFuture<Result> startRemoteWork() {
Log.i(TAG, "Starting reconcile task");
SettableFuture<Result> future = SettableFuture.create();
Executor executor = getBackgroundExecutor();

boolean result = Task.reconcile(getApplicationContext(), executor);
if (!result) {
Log.e(TAG, "Failed to reconcile tasks");
future.set(Result.failure());
return future;
}
future.set(Result.success());
Task.reconcile(getApplicationContext(), executor)
.whenCompleteAsync((result, error) -> {
if (error != null) {
Log.e(TAG, "Failed to reconcile tasks", error);
future.set(Result.failure());
} else {
future.set(Result.success());
}
}, executor);

return future;
}

public static Data buildInputData() {
return new Data.Builder()
.putString(ARGUMENT_PACKAGE_NAME, "org.learningequality.Kolibri")
.putString(ARGUMENT_CLASS_NAME,
TaskworkerWorkerService.class.getName())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import android.content.Context;
import android.util.Log;

import androidx.work.ExistingPeriodicWorkPolicy;
import androidx.work.PeriodicWorkRequest;
import androidx.work.multiprocess.RemoteWorkerService;
import androidx.work.WorkManager;
Expand All @@ -28,7 +29,7 @@ public void onCreate() {
);
// Initialize the work manager
WorkManager.getInstance(getApplicationContext());
// enqueueTaskReconciliation();
enqueueTaskReconciliation();
super.onCreate();
// We could potentially remove this and leave the notification up to long-running workers
// bound to the service
Expand All @@ -48,16 +49,18 @@ public NotificationRef getNotificationRef() {

private void enqueueTaskReconciliation() {
WorkManager workManager = WorkManager.getInstance(this);
PeriodicWorkRequest reconcileRequest = new PeriodicWorkRequest.Builder(

PeriodicWorkRequest.Builder builder = new PeriodicWorkRequest.Builder(
ReconcileWorker.class,
60,
30,
java.util.concurrent.TimeUnit.MINUTES
).build();
);
builder.setInputData(ReconcileWorker.buildInputData());

workManager.enqueueUniquePeriodicWork(
"task_reconciliation",
androidx.work.ExistingPeriodicWorkPolicy.KEEP,
reconcileRequest
ExistingPeriodicWorkPolicy.REPLACE,
builder.build()
);
}
}
Loading

0 comments on commit 4cba9e0

Please sign in to comment.