diff --git a/.python-version b/.python-version new file mode 100644 index 00000000..21af9507 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.9.13 diff --git a/python-for-android/dists/kolibri/build.gradle b/python-for-android/dists/kolibri/build.gradle index 7ea82455..b89c0104 100644 --- a/python-for-android/dists/kolibri/build.gradle +++ b/python-for-android/dists/kolibri/build.gradle @@ -109,4 +109,5 @@ dependencies { implementation 'androidx.concurrent:concurrent-futures:1.1.0' implementation 'androidx.work:work-runtime:2.7.1' implementation 'androidx.work:work-multiprocess:2.7.1' + implementation 'net.sourceforge.streamsupport:java9-concurrent-backport:2.0.5' } diff --git a/python-for-android/dists/kolibri/src/main/AndroidManifest.xml b/python-for-android/dists/kolibri/src/main/AndroidManifest.xml index a4e6b4e1..e1a9694f 100644 --- a/python-for-android/dists/kolibri/src/main/AndroidManifest.xml +++ b/python-for-android/dists/kolibri/src/main/AndroidManifest.xml @@ -1,48 +1,56 @@ - + - android:xlargeScreens="true" - - /> - + - + + - + + - - - android:launchMode="singleTask" - android:windowSoftInputMode="adjustResize" - > + + + @@ -50,23 +58,66 @@ + + + - - - - - + - - - - + + + + + + + + + + + + + + + diff --git a/python-for-android/dists/kolibri/src/main/java/org/kivy/android/PythonActivity.java b/python-for-android/dists/kolibri/src/main/java/org/kivy/android/PythonActivity.java index c6f60aba..ce5461e5 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/kivy/android/PythonActivity.java +++ b/python-for-android/dists/kolibri/src/main/java/org/kivy/android/PythonActivity.java @@ -1,8 +1,5 @@ package org.kivy.android; -import android.app.NotificationChannel; -import android.app.NotificationManager; -import android.os.Build; import android.os.SystemClock; import java.io.InputStream; @@ -42,11 +39,10 @@ import android.webkit.CookieManager; import android.net.Uri; -import androidx.core.app.NotificationManagerCompat; - import org.learningequality.Kolibri.R; import org.renpy.android.ResourceManager; + public class PythonActivity extends Activity { // This activity is modified from a mixture of the SDLActivity and // PythonActivity in the SDL2 bootstrap, but removing all the SDL2 @@ -248,8 +244,7 @@ public void onDestroy() { } public void loadLibraries() { - PythonUtil.loadLibraries( - new File(getApplicationInfo().nativeLibraryDir)); + PythonLoader.doLoad(this); } public static void loadUrl(String url) { diff --git a/python-for-android/dists/kolibri/src/main/java/org/kivy/android/PythonContext.java b/python-for-android/dists/kolibri/src/main/java/org/kivy/android/PythonContext.java new file mode 100644 index 00000000..6ed4cb2f --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/kivy/android/PythonContext.java @@ -0,0 +1,33 @@ +package org.kivy.android; + +import android.content.Context; + +public class PythonContext { + public static PythonContext mInstance; + + private final Context context; + + private PythonContext(Context context) { + this.context = context; + } + + public static PythonContext getInstance(Context context) { + if (mInstance == null) { + synchronized (PythonContext.class) { + if (mInstance == null) { + mInstance = new PythonContext( + context.getApplicationContext() + ); + } + } + } + return PythonContext.mInstance; + } + + public static Context get() { + if (PythonContext.mInstance == null) { + return null; + } + return PythonContext.mInstance.context; + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/kivy/android/PythonLoader.java b/python-for-android/dists/kolibri/src/main/java/org/kivy/android/PythonLoader.java new file mode 100644 index 00000000..578ce567 --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/kivy/android/PythonLoader.java @@ -0,0 +1,44 @@ +package org.kivy.android; + +import android.content.Context; + +import java.io.File; +import java.util.concurrent.atomic.AtomicBoolean; + +public class PythonLoader { + protected static PythonLoader mInstance; + + private final File src; + private final AtomicBoolean isLoaded = new AtomicBoolean(false); + + private PythonLoader(File src) { + this.src = src; + } + + public void load() { + synchronized (isLoaded) { + if (isLoaded.get()) { + return; + } + PythonUtil.loadLibraries(src); + isLoaded.set(true); + } + } + + public static PythonLoader getInstance(Context context) { + if (mInstance == null) { + synchronized (PythonLoader.class) { + if (mInstance == null) { + mInstance = new PythonLoader( + new File(context.getApplicationInfo().nativeLibraryDir) + ); + } + } + } + return PythonLoader.mInstance; + } + + public static void doLoad(Context context) { + PythonLoader.getInstance(context).load(); + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/kivy/android/PythonWorker.java b/python-for-android/dists/kolibri/src/main/java/org/kivy/android/PythonWorker.java index 303492f3..9ea02666 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/kivy/android/PythonWorker.java +++ b/python-for-android/dists/kolibri/src/main/java/org/kivy/android/PythonWorker.java @@ -1,174 +1,73 @@ package org.kivy.android; -import android.annotation.SuppressLint; import android.content.Context; +import android.os.Process; import android.util.Log; import androidx.annotation.NonNull; -import androidx.concurrent.futures.CallbackToFutureAdapter; -import androidx.work.ForegroundInfo; -import androidx.work.WorkerParameters; -import androidx.work.impl.utils.futures.SettableFuture; -import androidx.work.multiprocess.RemoteListenableWorker; -import com.google.common.util.concurrent.ListenableFuture; - -import java.util.concurrent.RunnableFuture; -import java.util.concurrent.ThreadPoolExecutor; - -abstract public class PythonWorker extends RemoteListenableWorker { - private static final String TAG = "PythonWorker"; - - // WorkRequest data key for python worker argument - public static final String ARGUMENT_WORKER_ARGUMENT = "PYTHON_WORKER_ARGUMENT"; - - public static final String TAG_LONG_RUNNING = "worker_long_running"; - - public static final int MAX_WORKER_RETRIES = 3; +import java.io.File; +/** + * Ideally this would be called `PythonWorkerImpl` but the name is used in the native code. + */ +public class PythonWorker { + private static final String TAG = "PythonWorkerImpl"; // Python environment variables - private String androidPrivate; - private String androidArgument; - private String pythonName; - private String pythonHome; - private String pythonPath; - private String workerEntrypoint; - - public static PythonWorker mWorker = null; - - public PythonWorker( - @NonNull Context context, - @NonNull WorkerParameters params) { - super(context, params); + private final String pythonName; + private final String workerEntrypoint; + private final String androidPrivate; + private final String androidArgument; + private final String pythonHome; + private final String pythonPath; + + public PythonWorker(@NonNull Context context, String pythonName, String workerEntrypoint) { + PythonLoader.doLoad(context); + this.pythonName = pythonName; + this.workerEntrypoint = workerEntrypoint; String appRoot = PythonUtil.getAppRoot(context); - - PythonWorker.mWorker = this; - androidPrivate = appRoot; androidArgument = appRoot; pythonHome = appRoot; pythonPath = appRoot + ":" + appRoot + "/lib"; } - public void setPythonName(String value) { - pythonName = value; - } - - public void setWorkerEntrypoint(String value) { - workerEntrypoint = value; - } - - public boolean isLongRunning() { - return getTags().contains(TAG_LONG_RUNNING); - } - - protected String getArgument() { - String dataArg = getInputData().getString(ARGUMENT_WORKER_ARGUMENT); - final String serviceArg; - if (dataArg != null) { - serviceArg = dataArg; - } else { - serviceArg = ""; - } - return serviceArg; - } + // Native part + public static native int nativeStart( + String androidPrivate, String androidArgument, + String workerEntrypoint, String pythonName, + String pythonHome, String pythonPath, + String pythonServiceArgument + ); - protected Result doWork() { - String id = getId().toString(); - String arg = getArgument(); + public static native int tearDownPython(); + public boolean execute(String id, String arg) { Log.d(TAG, id + " Running with python worker argument: " + arg); - int res = nativeStart( - androidPrivate, androidArgument, - workerEntrypoint, pythonName, - pythonHome, pythonPath, - arg + String serializedArg = String.join( + ",", + id, + arg, + Integer.toString(Process.myPid()), + Long.toString(Thread.currentThread().getId()) ); - Log.d(TAG, id + " Finished remote python work: " + res); - - if (res == 0) { - return Result.success(); - } - - return Result.failure(); - } - - @SuppressLint("RestrictedApi") - @NonNull - @Override - public ListenableFuture startRemoteWork() { - SettableFuture future = SettableFuture.create(); - String id = getId().toString(); - if (isLongRunning()) { - Log.d(TAG, id + " Enabling foreground service for long running task"); - setForegroundAsync(getForegroundInfo()); + int res; + try { + res = nativeStart( + androidPrivate, androidArgument, + workerEntrypoint, pythonName, + pythonHome, pythonPath, + serializedArg + ); + Log.d(TAG, id + " Finished executing python work: " + res); + } catch (Exception e) { + Log.e(TAG, "Error executing python work", e); + return false; } - // See executor defined in configuration - ThreadPoolExecutor executor = (ThreadPoolExecutor) getBackgroundExecutor(); - // This is somewhat similar to what the plain `Worker` class does, except that we - // use `submit` instead of `execute` so we can propagate cancellation - // See https://android.googlesource.com/platform/frameworks/support/+/60ae0eec2a32396c22ad92502cde952c80d514a0/work/workmanager/src/main/java/androidx/work/Worker.java - RunnableFuture threadFuture = (RunnableFuture)executor.submit(new Runnable() { - @Override - public void run() { - try { - Result r = doWork(); - future.set(r); - } catch (Exception e) { - if (getRunAttemptCount() > MAX_WORKER_RETRIES) { - Log.e(TAG, id + " Exception in remote python work", e); - future.setException(e); - } else { - Log.w(TAG, id + " Exception in remote python work, scheduling retry", e); - future.set(Result.retry()); - } - } finally { - cleanup(); - } - } - }); - - // If `RunnableFuture` was a `ListenableFuture` we could simply use `future.setFuture` to - // propagate the result and cancellation, but instead add listener to propagate - // cancellation to python thread, using the task executor which should invoke this in the - // main thread (where this was originally called from) - future.addListener(new Runnable() { - @Override - public void run() { - if (future.isCancelled()) { - Log.i(TAG, "Interrupting python thread"); - threadFuture.cancel(true); - } - } - }, getTaskExecutor().getMainThreadExecutor()); - return future; + return res == 0; } - - // Native part - public static native int nativeStart( - String androidPrivate, String androidArgument, - String workerEntrypoint, String pythonName, - String pythonHome, String pythonPath, - String pythonServiceArgument - ); - - public void onStopped() { - cleanup(); - super.onStopped(); - mWorker = null; - } - protected void cleanup() {} - - abstract public ForegroundInfo getForegroundInfo(); - - @Override - public ListenableFuture getForegroundInfoAsync() { - return CallbackToFutureAdapter.getFuture((CallbackToFutureAdapter.Resolver) completer -> completer.set(getForegroundInfo())); - } - - public static native int tearDownPython(); } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/ContextUtil.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/ContextUtil.java index 50a6227d..037ff58c 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/ContextUtil.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/ContextUtil.java @@ -3,20 +3,15 @@ import android.content.Context; import org.kivy.android.PythonActivity; -import org.kivy.android.PythonService; -import org.kivy.android.PythonWorker; -import org.learningequality.NotificationRef; +import org.learningequality.Kolibri.WorkerService; public class ContextUtil { public static Context getApplicationContext() { - if (isActivityContext()) { - return PythonActivity.mActivity.getApplicationContext(); - } if (isServiceContext()) { - return PythonService.mService.getApplicationContext(); + return WorkerService.mService.getApplicationContext(); } - if (isWorkerContext()) { - return PythonWorker.mWorker.getApplicationContext(); + if (isActivityContext()) { + return PythonActivity.mActivity.getApplicationContext(); } return null; } @@ -26,10 +21,6 @@ public static boolean isActivityContext() { } public static boolean isServiceContext() { - return PythonService.mService != null; - } - - public static boolean isWorkerContext() { - return PythonWorker.mWorker != null; + return WorkerService.mService != null; } } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/FuturesUtil.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/FuturesUtil.java new file mode 100644 index 00000000..9e587f0a --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/FuturesUtil.java @@ -0,0 +1,39 @@ +package org.learningequality; + +import android.util.Log; + +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; + +import java9.util.concurrent.CompletableFuture; + +public class FuturesUtil { + public static final String TAG = "Kolibri.FuturesUtil"; + + public static CompletableFuture toCompletable(ListenableFuture future, Executor executor) { + CompletableFuture completableFuture = new CompletableFuture<>(); + + future.addListener(() -> { + try { + completableFuture.complete(future.get(3, java.util.concurrent.TimeUnit.SECONDS)); + } catch (InterruptedException | ExecutionException e) { + Log.d(TAG, "Future encountered exception"); + completableFuture.completeExceptionally(e); + } catch (java.util.concurrent.TimeoutException e) { + Log.d(TAG, "Future timed out"); + completableFuture.completeExceptionally(e); + } + }, executor); + + completableFuture.whenCompleteAsync((result, error) -> { + if (completableFuture.isCancelled()) { + Log.d(TAG, "Propagating cancellation to future"); + future.cancel(true); + } + }, executor); + + return completableFuture; + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/App.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/App.java index cf310969..266cb93c 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/App.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/App.java @@ -1,23 +1,35 @@ package org.learningequality.Kolibri; +import android.app.Activity; import android.app.Application; import android.content.Context; import android.os.Build; +import android.os.Bundle; import androidx.annotation.NonNull; -import androidx.core.app.NotificationManagerCompat; import androidx.core.app.NotificationChannelCompat; +import androidx.core.app.NotificationManagerCompat; import androidx.work.Configuration; -import org.learningequality.NotificationRef; +import org.kivy.android.PythonContext; +import org.learningequality.notification.NotificationRef; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + public class App extends Application implements Configuration.Provider { + protected final AtomicInteger activeActivities = new AtomicInteger(0); + @Override public void onCreate() { super.onCreate(); + // Initialize Python context + PythonContext.getInstance(this); createNotificationChannels(); + // Register activity lifecycle callbacks + registerActivityLifecycleCallbacks(new KolibriActivityLifecycleCallbacks()); + WorkController.getInstance(this).wake(); } @NonNull @@ -26,15 +38,13 @@ public Configuration getWorkManagerConfiguration() { String processName = getApplicationContext().getPackageName(); processName += getApplicationContext().getString(R.string.task_worker_process); - - // Using the same quantity of worker threads as Kolibri's python side: // https://github.com/learningequality/kolibri/blob/release-v0.16.x/kolibri/utils/options.py#L683 return new Configuration.Builder() .setDefaultProcessName(processName) - .setMinimumLoggingLevel(android.util.Log.DEBUG) - .setExecutor(Executors.newFixedThreadPool(6)) - .build(); + .setMinimumLoggingLevel(android.util.Log.DEBUG) + .setExecutor(Executors.newFixedThreadPool(6)) + .build(); } private void createNotificationChannels() { @@ -63,4 +73,63 @@ private void createNotificationChannels() { notificationManager.createNotificationChannel(taskChannel); } } + + protected int incrementActiveActivities() { + synchronized (activeActivities) { + return activeActivities.incrementAndGet(); + } + } + + protected int decrementActiveActivities() { + synchronized (activeActivities) { + // Prevent decrementing below 0 + if (activeActivities.get() == 0) { + return 0; + } + return activeActivities.decrementAndGet(); + } + } + + public class KolibriActivityLifecycleCallbacks implements ActivityLifecycleCallbacks { + @Override + public void onActivityCreated(@NonNull Activity activity, Bundle savedInstanceState) { /* no-op */ } + + @Override + public void onActivityStarted(@NonNull Activity activity) { + incrementActiveActivities(); + WorkController.getInstance(getApplicationContext()).wake(); + } + + @Override + public void onActivityResumed(@NonNull Activity activity) { + incrementActiveActivities(); + WorkController.getInstance(getApplicationContext()).wake(); + } + + @Override + public void onActivityPaused(@NonNull Activity activity) { + if (decrementActiveActivities() == 0) { + WorkController.getInstance(getApplicationContext()).sleep(); + } + } + + @Override + public void onActivityStopped(@NonNull Activity activity) { /* no-op */ } + + @Override + public void onActivityPostStopped(@NonNull Activity activity) { + // using postStopped in case another activity is started + if (decrementActiveActivities() == 0) { + WorkController.getInstance(getApplicationContext()).sleep(); + } + } + + @Override + public void onActivitySaveInstanceState( + @NonNull Activity activity, @NonNull Bundle outState + ) { /* no-op */ } + + @Override + public void onActivityDestroyed(@NonNull Activity activity) { /* no-op */ } + } } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/BackgroundWorker.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/BackgroundWorker.java new file mode 100644 index 00000000..ffc2897d --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/BackgroundWorker.java @@ -0,0 +1,48 @@ +package org.learningequality.Kolibri; + + +import android.content.Context; +import android.util.Log; + +import androidx.annotation.NonNull; +import androidx.work.WorkerParameters; + +import org.learningequality.task.Worker; +import org.kivy.android.PythonWorker; + +/** + * Background worker that runs a Python task in a background thread. This will likely be run by the + * SystemJobService. + */ +final public class BackgroundWorker extends androidx.work.Worker implements Worker { + private static final String TAG = "Kolibri.BackgroundWorker"; + private final PythonWorker workerImpl; + + public BackgroundWorker( + @NonNull Context context, @NonNull WorkerParameters workerParams + ) { + super(context, workerParams); + workerImpl = new PythonWorker(context, "TaskWorker", "taskworker.py"); + } + + /** + * Parent worker class will call this method on a background thread automatically. + */ + @Override + @NonNull + public Result doWork() { + Log.d(TAG, "Running background task " + getId()); + final String id = getId().toString(); + final String arg = getArgument(); + Result r = workerImpl.execute(id, arg) ? Result.success() : Result.failure(); + hideNotification(); + return r; + } + + @Override + public void onStopped() { + Log.d(TAG, "Stopping background remote task " + getId()); + super.onStopped(); + hideNotification(); + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/ForegroundWorker.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/ForegroundWorker.java new file mode 100644 index 00000000..a7c2c3f5 --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/ForegroundWorker.java @@ -0,0 +1,107 @@ +package org.learningequality.Kolibri; + +import android.annotation.SuppressLint; +import android.content.pm.ServiceInfo; +import android.util.Log; + +import androidx.annotation.NonNull; +import androidx.concurrent.futures.CallbackToFutureAdapter; +import androidx.work.ForegroundInfo; +import androidx.work.impl.utils.futures.SettableFuture; +import androidx.work.multiprocess.RemoteListenableWorker; + +import com.google.common.util.concurrent.ListenableFuture; + +import org.learningequality.notification.Builder; +import org.learningequality.notification.NotificationRef; +import org.learningequality.task.Worker; +import org.kivy.android.PythonWorker; + +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; + +final public class ForegroundWorker extends RemoteListenableWorker implements Worker { + private static final String TAG = "Kolibri.ForegroundWorker"; + private final PythonWorker workerImpl; + + public ForegroundWorker( + @NonNull android.content.Context context, + @NonNull androidx.work.WorkerParameters workerParams + ) { + super(context, workerParams); + workerImpl = new PythonWorker(context, "TaskWorker", "taskworker.py"); + } + + @SuppressLint("RestrictedApi") + @Override + @NonNull + public ListenableFuture startRemoteWork() { + Log.d(TAG, "Running foreground remote task " + getId()); + final SettableFuture future = SettableFuture.create(); + final String id = getId().toString(); + final String arg = getArgument(); + + // See executor defined in configuration + final ThreadPoolExecutor executor = (ThreadPoolExecutor) getBackgroundExecutor(); + // This is somewhat similar to what the plain `Worker` class does, except that we + // use `submit` instead of `execute` so we can propagate cancellation + // See https://android.googlesource.com/platform/frameworks/support/+/60ae0eec2a32396c22ad92502cde952c80d514a0/work/workmanager/src/main/java/androidx/work/Worker.java + final Future threadFuture = executor.submit(() -> { + try { + Result r = workerImpl.execute(id, arg) ? Result.success() : Result.failure(); + future.set(r); + } catch (Exception e) { + Log.e(TAG, "Exception in remote python work for " + id, e); + future.setException(e); + } + }); + + // If `RunnableFuture` was a `ListenableFuture` we could simply use `future.setFuture` to + // propagate the result and cancellation, but instead add listener to propagate + // cancellation to python thread, using the task executor which should invoke this in the + // main thread (where this was originally called from) + future.addListener(() -> { + synchronized (future) { + if (future.isCancelled()) { + Log.i(TAG, "Interrupting python thread"); + synchronized (threadFuture) { + threadFuture.cancel(true); + } + } + + if (future.isDone()) { + hideNotification(); + } + } + }, getTaskExecutor().getMainThreadExecutor()); + return future; + } + + @Override + public void onStopped() { + Log.d(TAG, "Stopping foreground remote task " + getId()); + super.onStopped(); + hideNotification(); + } + + public ForegroundInfo getForegroundInfo() { + NotificationRef ref = WorkerService.buildNotificationRef(); + Builder builder = new Builder(getApplicationContext(), ref); + // If API level is at least 29 + if (android.os.Build.VERSION.SDK_INT >= android.os.Build.VERSION_CODES.Q) { + return new ForegroundInfo( + ref.getId(), + builder.build(), + ServiceInfo.FOREGROUND_SERVICE_TYPE_MANIFEST + ); + } + + return new ForegroundInfo(ref.getId(), builder.build()); + } + + @Override + @NonNull + public ListenableFuture getForegroundInfoAsync() { + return CallbackToFutureAdapter.getFuture(completer -> completer.set(getForegroundInfo())); + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/TaskworkerWorker.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/TaskworkerWorker.java deleted file mode 100644 index 7e8e21ce..00000000 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/TaskworkerWorker.java +++ /dev/null @@ -1,83 +0,0 @@ -package org.learningequality.Kolibri; - -import android.content.Context; -import android.util.Log; - -import androidx.annotation.NonNull; -import androidx.work.Data; -import androidx.work.ForegroundInfo; -import androidx.work.WorkerParameters; - -import org.kivy.android.PythonWorker; -import org.learningequality.NotificationBuilder; -import org.learningequality.Notifier; -import org.learningequality.NotificationRef; - -public class TaskworkerWorker extends PythonWorker implements Notifier { - private static final String TAG = "TaskworkerWorker"; - - public static TaskworkerWorker mWorker = null; - - public TaskworkerWorker( - @NonNull Context context, - @NonNull WorkerParameters params) { - super(context, params); - setPythonName("TaskWorker"); - setWorkerEntrypoint("taskworker.py"); - mWorker = this; - } - - public static Data buildInputData(String workerArgument) { - String dataArgument = workerArgument == null ? "" : workerArgument; - Data data = new Data.Builder() - .putString(ARGUMENT_WORKER_ARGUMENT, dataArgument) - .putString(ARGUMENT_PACKAGE_NAME, "org.learningequality.Kolibri") - .putString(ARGUMENT_CLASS_NAME, - TaskworkerWorkerService.class.getName()) - .build(); - Log.v(TAG, "Request data: " + data.toString()); - return data; - } - - protected void cleanup() { - hideNotification(); - mWorker = null; - } - - @Override - public ForegroundInfo getForegroundInfo() { - NotificationRef ref; - // If we are running in the service, use the service notification ref - if (TaskworkerWorkerService.mService != null) { - ref = TaskworkerWorkerService.mService.getNotificationRef(); - } else { - ref = getNotificationRef(); - Log.w(TAG, "No service found, using worker notification for foreground"); - } - - NotificationBuilder builder = new NotificationBuilder(getApplicationContext(), ref); - - return new ForegroundInfo(ref.getId(), builder.build()); - } - - public NotificationRef getNotificationRef() { - // Use worker request ID as notification tag - String tag = getId().toString(); - return new NotificationRef(NotificationRef.REF_CHANNEL_DEFAULT, tag); - } - - public static void updateProgress(String notificationTitle, String notificationText, int progress, int total) { - if (mWorker != null) { - // We could also update progress on the worker here, if we need info about it on - // the Android side - // @see setProgressAsync - mWorker.sendNotification(notificationTitle, notificationText, progress, total); - } - } - - public static void clearNotification() { - if (mWorker != null) { - mWorker.hideNotification(); - } - } -} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/TaskworkerWorkerService.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/TaskworkerWorkerService.java deleted file mode 100644 index cd649f43..00000000 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/TaskworkerWorkerService.java +++ /dev/null @@ -1,46 +0,0 @@ -package org.learningequality.Kolibri; - -import android.content.Context; -import android.util.Log; - -import androidx.work.multiprocess.RemoteWorkerService; -import androidx.work.WorkManager; - -import java.io.File; - -import org.kivy.android.PythonUtil; -import org.learningequality.NotificationRef; -import org.learningequality.Notifier; - -public class TaskworkerWorkerService extends RemoteWorkerService implements Notifier { - private static final String TAG = "TaskworkerWorkerService"; - - public static TaskworkerWorkerService mService = null; - - @Override - public void onCreate() { - mService = this; - Context context = getApplicationContext(); - Log.v(TAG, "Initializing task worker service"); - PythonUtil.loadLibraries( - new File(context.getApplicationInfo().nativeLibraryDir) - ); - // Initialize the work manager - WorkManager.getInstance(getApplicationContext()); - super.onCreate(); - // We could potentially remove this and leave the notification up to long-running workers - // bound to the service - sendNotification(); - } - - @Override - public void onDestroy() { - hideNotification(); - super.onDestroy(); - mService = null; - } - - public NotificationRef getNotificationRef() { - return new NotificationRef(NotificationRef.REF_CHANNEL_SERVICE); - } -} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/WorkController.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/WorkController.java new file mode 100644 index 00000000..a7f6216c --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/WorkController.java @@ -0,0 +1,182 @@ +package org.learningequality.Kolibri; + +import android.content.ComponentName; +import android.content.Context; +import android.content.Intent; +import android.content.ServiceConnection; +import android.os.IBinder; +import android.os.Message; +import android.os.Messenger; +import android.os.RemoteException; +import android.util.Log; + +import java9.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This class is responsible for managing the work controller service. As a singleton, it can be + * used to wake, sleep, and stop the service. + */ +public class WorkController { + public static final String TAG = "Kolibri.WorkController"; + protected static WorkController mInstance; + + private final Context context; + private Connection connection; + private Messenger messenger; + private CompletableFuture connected; + private final AtomicBoolean isConnected = new AtomicBoolean(false); + + private WorkController(Context context) { + this.context = context; + this.connected = new CompletableFuture<>(); + } + + public static WorkController getInstance(Context context) { + // The double checking is a common convention for singletons when synchronizing. + if (mInstance == null) { + synchronized (WorkController.class) { + if (mInstance == null) { + mInstance = new WorkController(context); + } + } + } + return mInstance; + } + + public void wake() { + synchronized (isConnected) { + // If we're already connected, then it's awake + if (isConnected.get()) { + return; + } + } + dispatch(buildMessage(WorkControllerService.Action.WAKE)); + // Always do a reconcile when waking up, and we were previously asleep + dispatch(buildMessage(WorkControllerService.Action.RECONCILE)); + } + + public void sleep() { + synchronized (isConnected) { + // If we're not already connected, then it's asleep + if (!isConnected.get()) { + return; + } + } + dispatch(buildMessage(WorkControllerService.Action.SLEEP)); + } + + public void stop() { + synchronized (isConnected) { + // If we're not already connected, then it's asleep + if (!isConnected.get()) { + return; + } + } + dispatch(buildMessage(WorkControllerService.Action.STOP)); + } + + public void reconcile() { + synchronized (isConnected) { + // If we're not already connected, then it's asleep + if (!isConnected.get()) { + return; + } + } + dispatch(buildMessage(WorkControllerService.Action.RECONCILE)); + } + + public void destroy() { + if (connection != null) { + context.unbindService(connection); + connection = null; + messenger = null; + isConnected.set(false); + } + mInstance = null; + } + + protected Message buildMessage(WorkControllerService.Action action) { + return Message.obtain(null, action.getId(), 0, 0); + } + + protected void dispatch(Message message) { + dispatch(message, 0); + } + + protected void dispatch(Message message, int attempts) { + if (connection == null) { + connection = new Connection(); + } + + synchronized (isConnected) { + // If we're already connected, then it's awake + if (!isConnected.get()) { + if (connected.isDone()) { + connected = new CompletableFuture<>(); + } + // Binding allows us to monitor the connection state + context.bindService( + new Intent(context, WorkControllerService.class), + connection, + Context.BIND_AUTO_CREATE + ); + } + } + + connected.thenApply((messenger) -> { + try { + messenger.send(message); + } catch (RemoteException e) { + // If the remote process has died, then we need to rebind + synchronized (isConnected) { + isConnected.set(false); + messenger = null; + } + if (attempts < 3) { + dispatch(message, attempts + 1); + } + } catch (Exception e) { + Log.e(TAG, "Failed to send message " + message, e); + } + return messenger; + }); + } + + public class Connection implements ServiceConnection { + public void onServiceConnected(ComponentName name, IBinder service) { + Log.d(TAG, "Connected to work manager service"); + synchronized (isConnected) { + isConnected.set(true); + messenger = new Messenger(service); + connected.complete(messenger); + } + } + + public void onServiceDisconnected(ComponentName name) { + Log.d(TAG, "Disconnected from work controller service"); + synchronized (isConnected) { + isConnected.set(false); + messenger = null; + } + } + + @Override + public void onBindingDied(ComponentName name) { + Log.d(TAG, "Disconnected from work controller service"); + synchronized (isConnected) { + isConnected.set(false); + messenger = null; + } + } + + @Override + public void onNullBinding(ComponentName name) { + Log.d(TAG, "Disconnected from work controller service"); + synchronized (isConnected) { + isConnected.set(false); + messenger = null; + } + } + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/WorkControllerService.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/WorkControllerService.java new file mode 100644 index 00000000..08e9b068 --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/WorkControllerService.java @@ -0,0 +1,313 @@ +package org.learningequality.Kolibri; + +import android.app.Service; +import android.content.Context; +import android.content.Intent; +import android.content.ServiceConnection; +import android.os.Handler; +import android.os.IBinder; +import android.os.Message; +import android.os.Messenger; +import android.util.Log; + +import androidx.annotation.Nullable; +import androidx.work.multiprocess.RemoteWorkManagerService; + +import org.learningequality.Task; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import java9.util.concurrent.CompletableFuture; + +/** + * This service is responsible for starting the remote work manager service and + * initializing the work manager in the task worker process. + */ +public class WorkControllerService extends Service { + public static final String TAG = "Kolibri.WorkControllerService"; + public static final int ACTION_WAKE = 1; + public static final int ACTION_SLEEP = 2; + public static final int ACTION_STOP = 3; + public static final int ACTION_RECONCILE = 4; + protected static final AtomicReference state = new AtomicReference<>(State.SLEEPING); + protected static final AtomicInteger taskCount = new AtomicInteger(0); + protected final AtomicBoolean isConnected = new AtomicBoolean(false); + protected final AtomicBoolean shouldReconcile = new AtomicBoolean(true); + protected Intent workManagerIntent; + protected ExecutorService executor; + protected CompletableFuture futureChain; + protected WorkControllerHandler messageHandler; + protected Messenger messenger; + private ServiceConnection connection; + + @Override + public void onCreate() { + Log.v(TAG, "Initializing work controller service"); + + synchronized (state) { + state.set(State.AWAKE); + } + + workManagerIntent = new Intent(this, RemoteWorkManagerService.class); + connection = new WorkManagerConnection(); + executor = Executors.newFixedThreadPool(3); + futureChain = CompletableFuture.completedFuture(null); + messageHandler = new WorkControllerHandler(); + messenger = new Messenger(messageHandler); + } + + @Override + public void onDestroy() { + Log.d(TAG, "Destroying work controller service"); + synchronized (state) { + state.set(State.STOPPED); + } + unbindService(connection); + executor.shutdown(); + futureChain.cancel(true); + + executor = null; + futureChain = null; + workManagerIntent = null; + messageHandler = null; + messenger = null; + connection = null; + } + + protected void onWake() { + Log.d(TAG, "Waking up work controller service"); + synchronized (state) { + if (state.get() != State.AWAKE_LOW_MEMORY) { + state.set(State.AWAKE); + } + } + + synchronized (isConnected) { + if (isConnected.get()) { + // Already connected, no need to bind again + return; + } + } + + startTask(new WorkTask("wake_work_manager") { + @Override + public CompletableFuture run() { + // Wakey wakey remote work manager service + Log.d(TAG, "Binding to work manager service"); + bindService(workManagerIntent, connection, Context.BIND_AUTO_CREATE); + return null; + } + }); + } + + protected void onReconcile() { + synchronized (shouldReconcile) { + if (!shouldReconcile.get()) { + Log.d(TAG, "Skipping enqueue of task reconciliation"); + return; + } + shouldReconcile.set(false); + } + + Log.d(TAG, "Enqueuing task reconciliation"); + startTask(new WorkTask("reconciliation") { + @Override + public CompletableFuture run() { + return Task.reconcile(getApplicationContext(), executor).thenApply((r) -> null); + } + }); + } + + protected void onSleep() { + Log.d(TAG, "Sleeping work controller service"); + synchronized (state) { + state.set(State.SLEEPING); + } + synchronized (taskCount) { + if (taskCount.get() == 0) { + Log.d(TAG, "Stopping service due to no more tasks"); + stopSelf(); + } else { + Log.d(TAG, "Waiting for " + taskCount.get() + " tasks to complete"); + } + } + synchronized (shouldReconcile) { + shouldReconcile.set(true); + } + } + + protected void onStop() { + Log.d(TAG, "Stopping work controller service"); + // should eventually call `onDestroy` and that will set the stopped state + synchronized (state) { + state.set(State.STOPPED); + } + stopSelf(); + } + + protected void startTask(WorkTask task) { + futureChain = futureChain.thenCompose((nothing) -> { + try { + Log.d(TAG, "Running task: " + task.getName()); + CompletableFuture f = task.run(); + if (f != null) { + return f; + } + } catch (Exception e) { + Log.e(TAG, "Failed running task: " + task.getName(), e); + return CompletableFuture.completedFuture(null); + } + return CompletableFuture.completedFuture(null); + }).thenApply((nothing) -> { + Log.d(TAG, "Task completed: " + task.getName()); + boolean hasNoMoreTasks = false; + synchronized (taskCount) { + if (taskCount.decrementAndGet() == 0) { + Log.d(TAG, "Checking state for stopping service"); + hasNoMoreTasks = true; + } + } + if (hasNoMoreTasks) { + synchronized (state) { + if (state.get() != State.AWAKE) { + Log.d(TAG, "Stopping service due to no more tasks"); + stopSelf(); + } + } + } + return null; + }); + } + + @Override + public void onLowMemory() { + Log.d(TAG, "Alerted of low memory"); + synchronized (state) { + state.set(State.AWAKE_LOW_MEMORY); + } + } + + @Override + public void onTrimMemory(int level) { + Log.d(TAG, "Trimming memory, stopping service"); + synchronized (state) { + state.set(State.AWAKE_LOW_MEMORY); + } + } + + @Nullable + @Override + public IBinder onBind(Intent intent) { + Log.d(TAG, "Producing binding to work controller service"); + return messenger.getBinder(); + } + + public enum State { + AWAKE, + AWAKE_LOW_MEMORY, + SLEEPING, + STOPPED, + } + + public enum Action { + WAKE(ACTION_WAKE), + SLEEP(ACTION_SLEEP), + STOP(ACTION_STOP), + RECONCILE(ACTION_RECONCILE), + ; + + public final int id; + + Action(int id) { + this.id = id; + } + + public int getId() { + return id; + } + } + + abstract static class WorkTask { + protected final String name; + public WorkTask(String name) { + this.name = name; + } + public String getName() { + return name; + } + abstract public CompletableFuture run(); + } + + class WorkManagerConnection implements ServiceConnection { + @Override + public void onServiceConnected(android.content.ComponentName name, IBinder service) { + Log.d(TAG, "WorkManager service connected"); + synchronized (isConnected) { + isConnected.set(true); + } + } + + @Override + public void onServiceDisconnected(android.content.ComponentName name) { + Log.d(TAG, "WorkManager service disconnected"); + synchronized (isConnected) { + isConnected.set(false); + } + } + + @Override + public void onBindingDied(android.content.ComponentName name) { + Log.d(TAG, "WorkManager service binding died"); + synchronized (isConnected) { + isConnected.set(false); + } + } + + @Override + public void onNullBinding(android.content.ComponentName name) { + // WorkManager service should produce a binding normally + Log.d(TAG, "WorkManager service gave null binding"); + synchronized (isConnected) { + isConnected.set(false); + } + } + } + + class WorkControllerHandler extends Handler { + public WorkControllerHandler() { + super(); + } + + @Override + public void handleMessage(Message msg) { + Log.d(TAG, "Received message " + msg.what); + synchronized (taskCount) { + taskCount.incrementAndGet(); + } + switch (msg.what) { + case ACTION_WAKE: + onWake(); + break; + case ACTION_RECONCILE: + onReconcile(); + break; + case ACTION_SLEEP: + onSleep(); + break; + case ACTION_STOP: + onStop(); + break; + default: + Log.e(TAG, "Unknown action " + msg.what); + synchronized (taskCount) { + taskCount.decrementAndGet(); + } + break; + } + } + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/WorkerService.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/WorkerService.java new file mode 100644 index 00000000..68a4bb5a --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/WorkerService.java @@ -0,0 +1,43 @@ +package org.learningequality.Kolibri; + +import android.content.Intent; +import android.os.IBinder; +import android.util.Log; + +import androidx.annotation.NonNull; +import androidx.work.multiprocess.RemoteWorkerService; + +import org.learningequality.notification.NotificationRef; +import org.learningequality.notification.Notifier; + +/** + * Dedicated service for running tasks in the foreground via RemoteListenableWorker. + */ +public class WorkerService extends RemoteWorkerService implements Notifier { + private static final String TAG = "Kolibri.ForegroundWorkerService"; + + public static WorkerService mService = null; + + @Override + public void onCreate() { + Log.d(TAG, "Initializing foreground worker service"); + super.onCreate(); + mService = this; + } + + @Override + public void onDestroy() { + Log.d(TAG, "Destroying foreground worker service"); + hideNotification(); + super.onDestroy(); + mService = null; + } + + public NotificationRef getNotificationRef() { + return buildNotificationRef(); + } + + public static NotificationRef buildNotificationRef() { + return new NotificationRef(NotificationRef.REF_CHANNEL_SERVICE); + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/sqlite/JobStorage.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/sqlite/JobStorage.java new file mode 100644 index 00000000..e184305f --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/sqlite/JobStorage.java @@ -0,0 +1,81 @@ +package org.learningequality.Kolibri.sqlite; + +import android.content.Context; +import android.database.sqlite.SQLiteDatabase; + +import org.learningequality.sqlite.schema.DatabaseTable; +import org.learningequality.sqlite.Database; + +import java.io.File; + +public class JobStorage extends Database { + public static final String DATABASE_NAME = "job_storage.sqlite3"; + + protected JobStorage(String path, int flags) { + super(DATABASE_NAME, path, flags); + } + + public static JobStorage readwrite(Context context) { + File f = getDatabasePath(context, DATABASE_NAME); + + return f != null + ? new JobStorage(f.getPath(), SQLiteDatabase.OPEN_READWRITE) + : null; + } + + public static class Jobs implements DatabaseTable { + public static final String TABLE_NAME = "jobs"; + public static final StringColumn id = new StringColumn("id"); + public static final StringColumn func = new StringColumn("func"); + public static final LongColumn priority = new LongColumn("priority"); + public static final StringColumn worker_process = new StringColumn("worker_process"); + public static final StringColumn worker_thread = new StringColumn("worker_thread"); + public static final StringColumn worker_extra = new StringColumn("worker_extra"); + public static final StringColumn time_updated = new StringColumn("time_updated"); + public static final StringColumn state = new StringColumn("state"); + + public String getTableName() { + return TABLE_NAME; + } + + public enum State implements StringChoiceEnum { + PENDING, + QUEUED, + SCHEDULED, + SELECTED, + RUNNING, + CANCELING, + CANCELED, + FAILED, + COMPLETED; + + public StringColumn getColumn() { + return state; + } + } + + public enum Priority implements ColumnEnum { + LOW(15L), + REGULAR(10L), + HIGH(5L); + + private final Long value; + + Priority(Long val) { + this.value = val; + } + + public Long getValue() { + return this.value; + } + + public boolean isAtLeast(Long other) { + return this.value.compareTo(other) >= 0; + } + + public LongColumn getColumn() { + return priority; + } + } + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/task/Builder.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/task/Builder.java new file mode 100644 index 00000000..3547ff5c --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/task/Builder.java @@ -0,0 +1,206 @@ +package org.learningequality.Kolibri.task; + +import android.os.Bundle; +import android.util.Log; + +import androidx.work.Data; +import androidx.work.ListenableWorker; +import androidx.work.OneTimeWorkRequest; +import androidx.work.OutOfQuotaPolicy; +import androidx.work.WorkInfo; +import androidx.work.WorkQuery; +import androidx.work.multiprocess.RemoteListenableWorker; + +import org.learningequality.Kolibri.BackgroundWorker; +import org.learningequality.Kolibri.ForegroundWorker; +import org.learningequality.Kolibri.WorkerService; +import org.learningequality.Kolibri.sqlite.JobStorage; +import org.learningequality.task.Worker; + +import java.util.Arrays; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + + +/** + * A builder class consolidating logic for creating WorkRequests and WorkQueries + */ +public class Builder { + public static final String TAG = "Kolibri.TaskBuilder"; + + public static final String TAG_PREFIX_TASK_ID = "kolibri_task_id:"; + public static final String TAG_PREFIX_JOB_FUNC = "kolibri_job_type:"; + public static final String TAG_EXPEDITED = "kolibri_job_expedited"; + public static final String TAG_LONG_RUNNING = "kolibri_job_long_running"; + + public static String generateTagFromId(String id) { + return TAG_PREFIX_TASK_ID + id; + } + + public static String generateTagFromJobFunc(String jobFunc) { + return TAG_PREFIX_JOB_FUNC + jobFunc; + } + + /** + * A builder class for creating WorkQueries + */ + public static class TaskQuery { + private final WorkQuery.Builder builder; + + public TaskQuery(WorkQuery.Builder builder) { + this.builder = builder; + } + + public static TaskQuery from(String... jobIds) { + return new TaskQuery(WorkQuery.Builder.fromUniqueWorkNames(Arrays.asList(jobIds))); + } + + public static TaskQuery from(UUID... requestIds) { + return new TaskQuery(WorkQuery.Builder.fromIds(Arrays.asList(requestIds))); + } + + public WorkQuery build() { + return this.builder.build(); + } + } + + /** + * A builder class for creating WorkRequests + * Unfortunately, OneTimeWorkRequest.Builder is final so we cannot extend it. + */ + public static class TaskRequest { + private final String id; + private String jobFunc; + private boolean longRunning; + private int delay; + private boolean expedite; + + public TaskRequest(String id) { + this.id = id; + setDelay(0); + } + + /** + * Creates a TaskRequest builder from a job Bundle, like that returned by JobStorage + * + * @param job The existing job Bundle from which to parse task information + * @return A TaskRequest builder + */ + public static TaskRequest fromJob(Bundle job) { + String id = JobStorage.Jobs.id.getValue(job); + Long priority = JobStorage.Jobs.priority.getValue(job); + + TaskRequest builder = new TaskRequest(id); + return builder.setJobFunc(JobStorage.Jobs.func.getValue(job)) + .setExpedite(JobStorage.Jobs.Priority.HIGH.isAtLeast(priority)); + } + + /** + * Creates a TaskRequest builder from an existing WorkInfo object + * + * @param workInfo The existing WorkInfo from which to parse task information + * @return A TaskRequest builder + */ + public static TaskRequest fromWorkInfo(WorkInfo workInfo) { + String id = null; + String jobFunc = null; + boolean expedite = false; + boolean isLongRunning = false; + + for (String tag : workInfo.getTags()) { + if (tag.startsWith(TAG_PREFIX_TASK_ID)) { + id = tag.substring(TAG_PREFIX_TASK_ID.length()); + } else if (tag.startsWith(TAG_PREFIX_JOB_FUNC)) { + jobFunc = tag.substring(TAG_PREFIX_JOB_FUNC.length()); + } else if (tag.equals(TAG_EXPEDITED)) { + expedite = true; + } else if (tag.equals(TAG_LONG_RUNNING)) { + isLongRunning = true; + } + } + + if (id == null || jobFunc == null) { + throw new IllegalArgumentException("WorkInfo is missing required task info"); + } + + return (new TaskRequest(id)) + .setJobFunc(jobFunc) + .setExpedite(expedite) + .setLongRunning(isLongRunning); + } + + public String getId() { + return this.id; + } + + public TaskRequest setDelay(int delay) { + this.delay = delay; + return this; + } + + public TaskRequest setExpedite(boolean expedite) { + this.expedite = expedite; + return this; + } + + public TaskRequest setJobFunc(String jobFunc) { + this.jobFunc = jobFunc; + return this; + } + + public TaskRequest setLongRunning(boolean longRunning) { + this.longRunning = longRunning; + return this; + } + + private Class getWorkerClass() { + return longRunning || expedite ? ForegroundWorker.class : BackgroundWorker.class; + } + + private Data buildInputData() { + String dataArgument = id == null ? "" : id; + Data.Builder builder = new Data.Builder() + .putString(Worker.ARGUMENT_WORKER_ARGUMENT, dataArgument); + + if (longRunning || expedite) { + builder.putString( + RemoteListenableWorker.ARGUMENT_PACKAGE_NAME, "org.learningequality.Kolibri" + ) + .putString( + RemoteListenableWorker.ARGUMENT_CLASS_NAME, + WorkerService.class.getName() + ); + } + Data data = builder.build(); + Log.v(TAG, "Worker request data: " + data.toString()); + return data; + } + + /** + * Build a one-time WorkRequest from the TaskRequest information + * + * @return A OneTimeWorkRequest object + */ + public OneTimeWorkRequest build() { + OneTimeWorkRequest.Builder builder = new OneTimeWorkRequest.Builder(getWorkerClass()); + builder.addTag(generateTagFromId(id)); + builder.addTag(generateTagFromJobFunc(jobFunc)); + if (longRunning) { + builder.addTag(TAG_LONG_RUNNING); + } + builder.setInputData(buildInputData()); + if (delay > 0) { + builder.setInitialDelay(delay, TimeUnit.SECONDS); + } + // Tasks can only be expedited if they are set with no delay. + // This does not appear to be documented, but is evident in the Android Jetpack source code. + // https://android.googlesource.com/platform/frameworks/support/+/HEAD/work/work-runtime/src/main/java/androidx/work/WorkRequest.kt#271 + if (expedite && delay == 0) { + builder.setExpedited(OutOfQuotaPolicy.RUN_AS_NON_EXPEDITED_WORK_REQUEST); + builder.addTag(TAG_EXPEDITED); + } + + return builder.build(); + } + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/task/Reconciler.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/task/Reconciler.java new file mode 100644 index 00000000..6b826762 --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/task/Reconciler.java @@ -0,0 +1,208 @@ +package org.learningequality.Kolibri.task; + +import android.content.Context; +import android.util.Log; + +import androidx.work.ExistingWorkPolicy; +import androidx.work.OneTimeWorkRequest; +import androidx.work.multiprocess.RemoteWorkManager; + +import org.learningequality.FuturesUtil; +import org.learningequality.Kolibri.sqlite.JobStorage; +import org.learningequality.sqlite.query.UpdateQuery; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executor; + +import java9.util.concurrent.CompletableFuture; + + +public class Reconciler implements AutoCloseable { + public static final String TAG = "Kolibri.TaskReconciler"; + public static final String LOCK_FILE = "kolibri_reconciler.lock"; + + private final RemoteWorkManager workManager; + private final LockChannel lockChannel; + private final JobStorage db; + private final Executor executor; + private FileLock lock; + + protected static class LockChannel { + private static LockChannel mInstance; + private final FileChannel channel; + + public LockChannel(File lockFile) { + try { + channel = new RandomAccessFile(lockFile, "rw").getChannel(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public FileLock tryLock() { + try { + return this.channel.tryLock(); + } catch (IOException e) { + Log.e(TAG, "Failed to acquire lock", e); + return null; + } + } + + public static LockChannel getInstance(Context context) { + if (mInstance == null) { + File lockFile = new File(context.getFilesDir(), LOCK_FILE); + mInstance = new LockChannel(lockFile); + } + return mInstance; + } + } + + + public Reconciler(RemoteWorkManager workManager, JobStorage db, LockChannel lockChannel, Executor executor) { + this.workManager = workManager; + this.db = db; + this.lockChannel = lockChannel; + this.executor = executor; + + } + + /** + * Create a new Reconciler instance from a Context + * @param context The context to use + * @return A new Reconciler instance + */ + public static Reconciler from(Context context, JobStorage db, Executor executor) { + RemoteWorkManager workManager = RemoteWorkManager.getInstance(context); + return new Reconciler(workManager, db, LockChannel.getInstance(context), executor); + } + + /** + * Attempt to acquire an exclusive lock on the lock file, which will prevent multiple + * Reconciler instances from running at the same time, including in different processes. + * Also starts a transaction on the database. + * @return True if the lock was acquired, false otherwise + */ + public boolean begin() { + // First get a lock on the lock file + Log.d(TAG, "Acquiring lock"); + lock = lockChannel.tryLock(); + if (lock == null) { + Log.d(TAG, "Failed to acquire lock"); + return false; + } + + // Then start a transaction + Log.d(TAG, "Beginning transaction"); +// db.begin(); + return true; + } + + /** + * Commit the database transaction and release the lock + */ + public void end() { + Log.d(TAG, "Committing transaction"); +// db.commit(); + + try { + Log.d(TAG, "Releasing lock"); + if (lock != null) lock.release(); + } catch (Exception e) { + Log.e(TAG, "Failed to close and release lock", e); + } + } + + /** + * Close the Reconciler, rolling back the database transaction and releasing the lock + */ + public void close() { + // this may be a no-op if closing normally + db.rollback(); + end(); + } + + /** + * (Re)enqueue a WorkRequest from a Sentinel.Result + * @param result The result of a Sentinel check operation + */ + protected CompletableFuture enqueueFrom(Sentinel.Result result) { + // We prefer to create the builder from the WorkInfo, if it exists + Builder.TaskRequest builder = (result.isMissing()) + ? Builder.TaskRequest.fromJob(result.getJob()) + : Builder.TaskRequest.fromWorkInfo(result.getWorkInfo()); + + + if (result.isMissing()) { + // if we're missing the WorkInfo, then we can't know if it's supposed to be long running, + // because we don't track `long_running` in the DB, so we can only assume + builder.setLongRunning(true) + .setDelay(0); + } + + Log.d(TAG, "Re-enqueuing job " + builder.getId()); + OneTimeWorkRequest req = builder.build(); + + // Using `REPLACE` here because we want to replace the existing request as a more + // forceful way of ensuring that the request is enqueued, since this is reconciliation + CompletableFuture future = FuturesUtil.toCompletable( + workManager.enqueueUniqueWork(builder.getId(), ExistingWorkPolicy.REPLACE, req), executor + ); + + // Update the request ID in the database + if (updateRequestId(builder.getId(), req.getId()) == 0) { + Log.e(TAG, "Failed to update request ID for job " + builder.getId()); + } + + return future; + } + + /** + * Update the request ID for a job in the database + * @param id The job ID + * @param requestId The new WorkManager request ID + * @return The number of rows updated + */ + protected int updateRequestId(String id, UUID requestId) { + Log.d(TAG, "Updating request ID for job " + id + " to " + requestId); + UpdateQuery q = new UpdateQuery(JobStorage.Jobs.TABLE_NAME) + .where(JobStorage.Jobs.id, id) + .set(JobStorage.Jobs.worker_extra, requestId.toString()); + return q.execute(db); + } + + /** + * Process results from Sentinel checks that found jobs in the given state didn't match + * the expected WorkManager state, or were missing + * @param stateRef The state which Kolibri thinks the job is in + * @param results The results of the Sentinel checks + */ + public CompletableFuture process(StateMap stateRef, Sentinel.Result[] results) { + Log.d(TAG, "Reconciling " + results.length + " jobs for state " + stateRef); + List> futures = new ArrayList>(); + + for (Sentinel.Result result : results) { + switch (stateRef.getJobState()) { + case PENDING: + case QUEUED: + case SCHEDULED: + case SELECTED: + case RUNNING: + futures.add(enqueueFrom(result)); + break; + default: + Log.d(TAG, "No reconciliation for state " + stateRef.getJobState()); + break; + } + } + + // Wait for all the job enqueues to finish + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/task/Sentinel.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/task/Sentinel.java new file mode 100644 index 00000000..7ef49363 --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/task/Sentinel.java @@ -0,0 +1,251 @@ +package org.learningequality.Kolibri.task; + +import android.content.Context; +import android.os.Bundle; +import android.util.Log; +import android.util.Pair; + +import androidx.work.WorkInfo; +import androidx.work.WorkQuery; +import androidx.work.multiprocess.RemoteWorkManager; + +import org.learningequality.FuturesUtil; +import org.learningequality.Kolibri.sqlite.JobStorage; +import org.learningequality.sqlite.query.SelectQuery; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executor; + +import java9.util.concurrent.CompletableFuture; + +/** + * Sentinel (as in watcher) for checking and reconciling Kolibri job status with WorkManager + */ +public class Sentinel { + public static String TAG = "Kolibri.TaskSentinel"; + private final RemoteWorkManager workManager; + private final JobStorage db; + private final Executor executor; + + public Sentinel(RemoteWorkManager workManager, JobStorage db, Executor executor) { + this.workManager = workManager; + this.db = db; + this.executor = executor; + } + + /** + * Create a sentinel + */ + public static Sentinel from(Context context, JobStorage db, Executor executor) { + return new Sentinel( + RemoteWorkManager.getInstance(context), + db, + executor + ); + } + + /** + * Build a query for jobs with the given status + * + * @param jobStatus The job status in the Kolibri database for which to find jobs + * @return A query for jobs with the given status, and subset of selected columns + */ + private SelectQuery buildQuery(JobStorage.Jobs.State jobStatus) { + return new SelectQuery( + JobStorage.Jobs.id, + JobStorage.Jobs.priority, + JobStorage.Jobs.state, + JobStorage.Jobs.worker_process, + JobStorage.Jobs.worker_thread, + JobStorage.Jobs.worker_extra + ) + .from(JobStorage.Jobs.TABLE_NAME) + .where(jobStatus) + .orderBy(JobStorage.Jobs.time_updated, false); + } + + private WorkQuery buildWorkQuery(Bundle result) { + String requestId = JobStorage.Jobs.worker_extra.getValue(result); + + if (requestId == null) { + String id = JobStorage.Jobs.id.getValue(result); + Log.v(TAG, "No request ID found for job " + id); + return Builder.TaskQuery.from(id).build(); + } + + return Builder.TaskQuery.from(UUID.fromString(requestId)).build(); + } + + /** + * Check for jobs with the given status and reconcile them with WorkManager + * Defaults to flagging missing work in WorkManager + * + * @param stateRef The job status in the Kolibri database for which to find jobs + * @return A future that will complete when all jobs have been checked, with a list of jobs + */ + public CompletableFuture check(StateMap stateRef) { + return check(false, stateRef); + } + + /** + * Check for jobs with the given status and reconcile them with WorkManager + * + * @param ignoreMissing Whether to ignore missing work in WorkManager + * @param stateRef The job status in the Kolibri database for which to find jobs + * @return A future that will complete when all jobs have been checked, with a list of jobs + */ + public CompletableFuture check( + boolean ignoreMissing, + StateMap stateRef + ) { + Log.d(TAG, "Checking for jobs in state " + stateRef.getJobState()); + SelectQuery query = buildQuery(stateRef.getJobState()); + Bundle[] jobs = query.execute(db); + + if (jobs == null || jobs.length == 0) { + Log.v(TAG, "No jobs to reconcile for status " + stateRef); + return CompletableFuture.completedFuture(null); + } + + Log.d(TAG, "Cross-referencing " + jobs.length + " jobs with work manager"); + return check(jobs, ignoreMissing, stateRef.getWorkInfoStates()); + } + + /** + * Check for the given jobs (Bundles) and reconciles them with WorkManager + * + * @param jobs The jobs to check + * @param ignoreMissing Whether to ignore missing work in WorkManager + * @param expectedWorkStates The expected WorkManager states for the found jobs + * @return A future that will complete when all jobs have been checked, with a list of jobs + */ + public CompletableFuture check( + Bundle[] jobs, + boolean ignoreMissing, + WorkInfo.State... expectedWorkStates + ) { + final CompletableFuture future = new CompletableFuture<>(); + final List allResults = new ArrayList(jobs.length); + CompletableFuture> chain = CompletableFuture.completedFuture(allResults); + + for (Bundle job : jobs) { + chain = chain.thenComposeAsync((results) -> { + synchronized (future) { + if (future.isCancelled()) { + return CompletableFuture.completedFuture(results); + } + } + + return check(job, ignoreMissing, expectedWorkStates) + .exceptionally((ex) -> { + Log.e(TAG, "Failed to check job '" + JobStorage.Jobs.id.getValue(job) + "'", ex); + return null; + }) + .thenApply((result) -> { + if (result != null) { + results.add(result); + } + return results; + }); + }, executor); + } + + final CompletableFuture> finalChain = chain; + + finalChain.whenCompleteAsync((results, ex) -> { + if (ex != null) { + Log.e(TAG, "Failed to check jobs", ex); + future.completeExceptionally(ex); + return; + } + + synchronized (future) { + if (!future.isCancelled()) { + future.complete(results.toArray(new Result[0])); + } + } + }, executor); + + future.whenCompleteAsync((results, ex) -> { + synchronized (future) { + if (future.isCancelled()) { + Log.d(TAG, "Propagating cancellation to future"); + synchronized (finalChain) { + finalChain.cancel(true); + } + } + } + }, executor); + return future; + } + + /** + * Check for the given job (Bundle) and reconciles it with WorkManager + * + * @param job The job to check as a `Bundle` + * @param ignoreMissing Whether to ignore the job as missing in WorkManager + * @param expectedWorkStates The expected WorkManager states for the found jobs + * @return A future that will complete when the job has been checked, with the job if it is not reconciled + */ + public CompletableFuture check( + Bundle job, + boolean ignoreMissing, + WorkInfo.State... expectedWorkStates + ) { + final String jobId = JobStorage.Jobs.id.getValue(job); + Log.d(TAG, "Cross-referencing job '" + jobId + "' with work manager"); + + List workStates = Arrays.asList(expectedWorkStates); + WorkQuery workQuery = buildWorkQuery(job); + + return FuturesUtil.toCompletable(workManager.getWorkInfos(workQuery), executor) + .thenApplyAsync((workInfos) -> { + Log.d(TAG, "Completed cross-reference of job '" + jobId + "'"); + + if (workInfos == null || workInfos.size() == 0) { + if (ignoreMissing) { + return null; + } + + Log.w(TAG, "No work requests found for job id '" + jobId + "'"); + return new Result(job, null); + } + + for (WorkInfo workInfo : workInfos) { + WorkInfo.State state = workInfo.getState(); + + if (!workStates.contains(state)) { + Log.w(TAG, "WorkInfo state " + state + " does not match expected state " + Arrays.toString(expectedWorkStates) + " for request " + workInfo.getId() + " | " + workInfo.getTags()); + return new Result(job, workInfo); + } + } + + return null; + }, executor); + } + + /** + * A class that holds the pair of Bundle and WorkInfo as a result of the Sentinel's + * check operations + */ + public static class Result extends Pair { + public Result(Bundle first, WorkInfo second) { + super(first, second); + } + + public boolean isMissing() { + return this.second == null; + } + + public Bundle getJob() { + return this.first; + } + + public WorkInfo getWorkInfo() { + return this.second; + } + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/task/StateMap.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/task/StateMap.java new file mode 100644 index 00000000..7d50e835 --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/task/StateMap.java @@ -0,0 +1,69 @@ +package org.learningequality.Kolibri.task; + +import androidx.work.WorkInfo; + +import org.learningequality.Kolibri.sqlite.JobStorage; + +/** + * A mapping between Kolibri job states and WorkManager work states + */ +public enum StateMap { + PENDING( + JobStorage.Jobs.State.PENDING, + WorkInfo.State.ENQUEUED, + WorkInfo.State.BLOCKED, + WorkInfo.State.RUNNING + ), + QUEUED( + JobStorage.Jobs.State.QUEUED, + WorkInfo.State.ENQUEUED, + WorkInfo.State.BLOCKED, + WorkInfo.State.RUNNING + ), + SCHEDULED( + JobStorage.Jobs.State.SCHEDULED, + WorkInfo.State.ENQUEUED, + WorkInfo.State.BLOCKED, + WorkInfo.State.RUNNING + ), + SELECTED( + JobStorage.Jobs.State.SELECTED, + WorkInfo.State.ENQUEUED, + WorkInfo.State.BLOCKED, + WorkInfo.State.RUNNING + ), + // We include 'ENQUEUED' here because it is possible for a job to be re-enqueued by the + // reconciler while Kolibri thinks it's running + RUNNING( + JobStorage.Jobs.State.RUNNING, + WorkInfo.State.ENQUEUED, + WorkInfo.State.RUNNING, + WorkInfo.State.SUCCEEDED + ); + + private final JobStorage.Jobs.State jobState; + private final WorkInfo.State[] workInfoStates; + + StateMap(JobStorage.Jobs.State jobState, WorkInfo.State... workInfoStates) { + this.jobState = jobState; + this.workInfoStates = workInfoStates; + } + + public static StateMap[] forReconciliation() { + return new StateMap[]{ + PENDING, + QUEUED, + SCHEDULED, + SELECTED, + RUNNING + }; + } + + public JobStorage.Jobs.State getJobState() { + return this.jobState; + } + + public WorkInfo.State[] getWorkInfoStates() { + return this.workInfoStates; + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Task.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Task.java index d4844d67..7ed83bf3 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Task.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Task.java @@ -3,76 +3,54 @@ import android.content.Context; import android.util.Log; -import androidx.work.BackoffPolicy; -import androidx.work.Data; -import androidx.work.ExistingPeriodicWorkPolicy; +import androidx.core.content.ContextCompat; import androidx.work.ExistingWorkPolicy; import androidx.work.OneTimeWorkRequest; -import androidx.work.OutOfQuotaPolicy; -import androidx.work.PeriodicWorkRequest; import androidx.work.WorkInfo; -import androidx.work.WorkManager; import androidx.work.WorkQuery; import androidx.work.multiprocess.RemoteWorkManager; import com.google.common.util.concurrent.ListenableFuture; -import org.learningequality.Kolibri.TaskworkerWorker; +import org.learningequality.Kolibri.sqlite.JobStorage; +import org.learningequality.Kolibri.task.Builder; +import org.learningequality.Kolibri.task.Reconciler; +import org.learningequality.Kolibri.task.Sentinel; +import org.learningequality.Kolibri.task.StateMap; +import org.learningequality.notification.Manager; +import org.learningequality.notification.NotificationRef; +import org.learningequality.task.Worker; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java9.util.concurrent.CompletableFuture; -public class Task { - public static final String TAG = "Task"; - - private static String generateTagFromId(String id) { - return "kolibri_task_id:" + id; - } - private static String generateTagFromJobFunc(String jobFunc) { - return "kolibri_job_type:" + jobFunc; - } +public class Task { + public static final String TAG = "Kolibri.Task"; - public static void enqueueOnce(String id, int delay, boolean expedite, String jobFunc, boolean longRunning) { + public static String enqueueOnce(String id, int delay, boolean expedite, String jobFunc, boolean longRunning) { RemoteWorkManager workManager = RemoteWorkManager.getInstance(ContextUtil.getApplicationContext()); - Data data = TaskworkerWorker.buildInputData(id); + Builder.TaskRequest builder = new Builder.TaskRequest(id); + builder.setDelay(delay) + .setExpedite(expedite) + .setJobFunc(jobFunc) + .setLongRunning(longRunning); - OneTimeWorkRequest.Builder workRequestBuilder = new OneTimeWorkRequest.Builder(TaskworkerWorker.class); - - // Tasks can only be expedited if they are set with no delay. - // This does not appear to be documented, but is evident in the Android Jetpack source code. - // https://android.googlesource.com/platform/frameworks/support/+/HEAD/work/work-runtime/src/main/java/androidx/work/WorkRequest.kt#271 - if (expedite && delay == 0) { - workRequestBuilder.setExpedited(OutOfQuotaPolicy.RUN_AS_NON_EXPEDITED_WORK_REQUEST); - } - - if (delay > 0) { - workRequestBuilder.setInitialDelay(delay, TimeUnit.SECONDS); - } - workRequestBuilder.addTag(generateTagFromId(id)); - workRequestBuilder.addTag(generateTagFromJobFunc(jobFunc)); - if (longRunning) { - workRequestBuilder.addTag(TaskworkerWorker.TAG_LONG_RUNNING); - Log.v(TAG, "Tagging work request as long running, ID: " + id); - } - workRequestBuilder.setInputData(data); - workRequestBuilder.setBackoffCriteria(BackoffPolicy.EXPONENTIAL, 5, TimeUnit.SECONDS); - OneTimeWorkRequest workRequest = workRequestBuilder.build(); + OneTimeWorkRequest workRequest = builder.build(); workManager.enqueueUniqueWork(id, ExistingWorkPolicy.APPEND_OR_REPLACE, workRequest); + // return the work request ID, different from the task ID passed in + return workRequest.getId().toString(); } public static void clear(String id) { Context context = ContextUtil.getApplicationContext(); - List tags = new ArrayList(); - String tag = generateTagFromId(id); - tags.add(tag); RemoteWorkManager workManager = RemoteWorkManager.getInstance(context); - WorkQuery workQuery = WorkQuery.Builder - .fromTags(tags) - .build(); + WorkQuery workQuery = Builder.TaskQuery.from(id).build(); ListenableFuture> workInfosFuture = workManager.getWorkInfos(workQuery); workInfosFuture.addListener(() -> { @@ -94,7 +72,7 @@ public static void clear(String id) { } if (anyInfo && clearable) { // If the tasks are marked as completed we - workManager.cancelAllWorkByTag(tag); + workManager.cancelUniqueWork(id); } } } catch (ExecutionException | InterruptedException e) { @@ -102,4 +80,115 @@ public static void clear(String id) { } }, new MainThreadExecutor()); } + + public static CompletableFuture reconcile(Context context, Executor executor) { + if (executor == null) { + executor = ContextCompat.getMainExecutor(context); + } + + final AtomicBoolean didReconcile = new AtomicBoolean(false); + final JobStorage db = JobStorage.readwrite(context); + final Reconciler reconciler = Reconciler.from(context, db, executor); + + if (db == null) { + Log.e(Sentinel.TAG, "Failed to open job storage database"); + return CompletableFuture.completedFuture(false); + } + + // If we can't acquire the lock, then reconciliation is already running + if (!reconciler.begin()) { + return CompletableFuture.completedFuture(false); + } + + final Sentinel sentinel = Sentinel.from(context, db, executor); + final CompletableFuture future = new CompletableFuture<>(); + CompletableFuture chain = CompletableFuture.completedFuture(didReconcile); + + // Run through all the states and check them, then process the results + for (StateMap stateRef : StateMap.forReconciliation()) { + chain = chain.thenComposeAsync((_didReconcile) -> { + // Avoid checking if future is cancelled + synchronized (future) { + if (future.isCancelled()) { + return CompletableFuture.completedFuture(_didReconcile); + } + } + + Log.i(TAG, "Requesting sentinel check state " + stateRef); + return sentinel.check(stateRef) + .exceptionally((e) -> { + Log.e(TAG, "Failed to check state for reconciliation " + stateRef, e); + return null; + }) + .thenCompose((results) -> { + if (results != null && results.length > 0) { + Log.d(TAG, "Received results for sentinel checking " + stateRef); + _didReconcile.set(true); + return reconciler.process(stateRef, results) + .thenApply((r) -> _didReconcile); + } + return CompletableFuture.completedFuture(_didReconcile); + }); + }, executor); + } + + final CompletableFuture finalChain + = chain.orTimeout(15, java.util.concurrent.TimeUnit.SECONDS); + + finalChain.whenCompleteAsync((result, error) -> { + try { + reconciler.end(); + db.close(); + } catch (Exception e) { + Log.e(TAG, "Failed cleaning up reconciliation", e); + } finally { + synchronized (future) { + if (!future.isCancelled()) { + if (error instanceof TimeoutException) { + Log.e(TAG, "Timed out waiting for reconciliation chain", error); + future.completeExceptionally(error); + } else if (error != null) { + Log.e(TAG, "Failed during reconciliation chain", error); + future.completeExceptionally(error); + } else if (result != null) { + if (result.get()) { + Log.i(TAG, "Reconciliation completed successfully"); + } else { + Log.i(TAG, "No reconciliation performed"); + } + future.complete(result.get()); + } else { + future.complete(false); + } + } + } + } + }, executor); + + // Propagate cancellation to the chain + future.whenCompleteAsync((result, error) -> { + synchronized (future) { + if (future.isCancelled()) { + finalChain.cancel(true); + } + } + }, executor); + + return future; + } + + /** + * @param id The task request ID + * @param notificationTitle The notification title + * @param notificationText The notification text + * @param progress The task progress + * @param total The total of completed task progress + */ + public static void updateProgress( + String id, String notificationTitle, String notificationText, int progress, int total + ) { + NotificationRef ref = Worker.buildNotificationRef(id); + Manager manager = new Manager(ContextUtil.getApplicationContext(), ref); + manager.send(notificationTitle, notificationText, progress, total); + } } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/NotificationBuilder.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/notification/Builder.java similarity index 80% rename from python-for-android/dists/kolibri/src/main/java/org/learningequality/NotificationBuilder.java rename to python-for-android/dists/kolibri/src/main/java/org/learningequality/notification/Builder.java index 105a14de..3bcf81ca 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/NotificationBuilder.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/notification/Builder.java @@ -1,4 +1,4 @@ -package org.learningequality; +package org.learningequality.notification; import android.app.PendingIntent; import android.content.Context; @@ -9,12 +9,17 @@ import org.learningequality.Kolibri.R; -public class NotificationBuilder extends NotificationCompat.Builder { - public NotificationBuilder(Context context, String channelId) { +public class Builder extends NotificationCompat.Builder { + public Builder(Context context, String channelId) { super(context, channelId); setSmallIcon(R.drawable.ic_stat_kolibri_notification); setPriority(NotificationCompat.PRIORITY_LOW); - setColor(context.getColor(R.color.primary)); + try { + setColor(context.getColor(R.color.primary)); + } catch (NullPointerException e) { + // This seems to happen on Android 7 + // when this method is invoked from Python. + } setSilent(true); // Default title @@ -46,15 +51,15 @@ public NotificationBuilder(Context context, String channelId) { } } - public NotificationBuilder(Context context, int channelRef) { + public Builder(Context context, int channelRef) { this(context, NotificationRef.getChannelId(context, channelRef)); } - public NotificationBuilder(Context context, NotificationRef ref) { + public Builder(Context context, NotificationRef ref) { this(context, ref.getChannelRef()); } - public NotificationBuilder(Context context) { + public Builder(Context context) { this(context, NotificationRef.REF_CHANNEL_DEFAULT); } } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Notifier.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/notification/Manager.java similarity index 52% rename from python-for-android/dists/kolibri/src/main/java/org/learningequality/Notifier.java rename to python-for-android/dists/kolibri/src/main/java/org/learningequality/notification/Manager.java index cd914c89..c5866d8c 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Notifier.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/notification/Manager.java @@ -1,31 +1,27 @@ -package org.learningequality; +package org.learningequality.notification; import android.content.Context; import androidx.core.app.NotificationManagerCompat; -public interface Notifier { - Context getApplicationContext(); +public class Manager { + private final Context context; + private final NotificationRef ref; - default NotificationRef getNotificationRef() { - return null; + public Manager(Context context, NotificationRef ref) { + this.context = context; + this.ref = ref; } - default void sendNotification() { - sendNotification(null, null, -1, -1); + public void send() { + send(null, null, -1, -1); } - default NotificationBuilder getNotificationBuilder(NotificationRef ref) { - return new NotificationBuilder(getApplicationContext(), ref); - } - - default void sendNotification(String notificationTitle, String notificationText, int notificationProgress, int notificationTotal) { - NotificationRef ref = getNotificationRef(); + public void send(String notificationTitle, String notificationText, int notificationProgress, int notificationTotal) { if (ref == null) { return; } - Context context = getApplicationContext(); - NotificationBuilder builder = getNotificationBuilder(ref); + Builder builder = new Builder(context, ref); if (notificationTitle != null) { builder.setContentTitle(notificationTitle); } @@ -39,12 +35,10 @@ default void sendNotification(String notificationTitle, String notificationText, notificationManager.notify(ref.getTag(), ref.getId(), builder.build()); } - default void hideNotification() { - NotificationRef ref = getNotificationRef(); + public void hide() { if (ref == null) { return; } - Context context = getApplicationContext(); NotificationManagerCompat notificationManager = NotificationManagerCompat.from(context); notificationManager.cancel(ref.getTag(), ref.getId()); } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/NotificationRef.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/notification/NotificationRef.java similarity index 96% rename from python-for-android/dists/kolibri/src/main/java/org/learningequality/NotificationRef.java rename to python-for-android/dists/kolibri/src/main/java/org/learningequality/notification/NotificationRef.java index 6e183042..08aa1242 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/NotificationRef.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/notification/NotificationRef.java @@ -1,4 +1,4 @@ -package org.learningequality; +package org.learningequality.notification; import android.content.Context; diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/notification/Notifier.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/notification/Notifier.java new file mode 100644 index 00000000..b04ee2f6 --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/notification/Notifier.java @@ -0,0 +1,34 @@ +package org.learningequality.notification; + +import android.content.Context; + + +public interface Notifier { + Context getApplicationContext(); + + default NotificationRef getNotificationRef() { + return null; + } + + default void sendNotification() { + sendNotification(null, null, -1, -1); + } + + default Manager getNotificationManager(NotificationRef ref) { + return new Manager(getApplicationContext(), ref); + } + + default void sendNotification( + String notificationTitle, + String notificationText, + int notificationProgress, + int notificationTotal + ) { + getNotificationManager(getNotificationRef()) + .send(notificationTitle, notificationText, notificationProgress, notificationTotal); + } + + default void hideNotification() { + getNotificationManager(getNotificationRef()).hide(); + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/Database.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/Database.java new file mode 100644 index 00000000..74ec088e --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/Database.java @@ -0,0 +1,104 @@ +package org.learningequality.sqlite; + +import android.content.Context; +import android.database.sqlite.SQLiteDatabase; +import android.database.sqlite.SQLiteException; +import android.util.Log; + +import java.io.File; + +public class Database implements AutoCloseable { + public static final String TAG = "KolibriDatabase"; + + private final String name; + private final String path; + private final int flags; + private boolean inTransaction; + private SQLiteDatabase db; + + protected Database(String name, String path, int flags) { + this.name = name; + this.path = path; + this.db = null; + this.flags = flags; + this.inTransaction = false; + initialize(); + } + + public boolean isConnected() { + return this.path != null && this.db != null; + } + + public SQLiteDatabase get() { + if (!isConnected()) { + throw new IllegalStateException("Database is not connected"); + } + return this.db; + } + + public String getName() { + return this.name; + } + + protected void initialize() { + if (this.path == null) { + return; + } + try { + Log.d(TAG, "Connecting to database"); + this.db = SQLiteDatabase.openDatabase(this.path, null, flags); + } catch (SQLiteException e) { + this.db = null; + } + } + + public void begin() { + if (!isConnected()) { + return; + } + Log.d(TAG, "Starting transaction"); + this.inTransaction = true; + this.db.beginTransaction(); + } + + public void rollback() { + if (!isConnected() || !this.inTransaction) { + return; + } + Log.d(TAG, "Rolling back transaction"); + this.inTransaction = false; + this.db.endTransaction(); + } + + public void commit() { + if (!isConnected() || !this.inTransaction) { + return; + } + Log.d(TAG, "Committing transaction"); + this.inTransaction = false; + this.db.setTransactionSuccessful(); + this.db.endTransaction(); + } + + public void close() { + if (isConnected()) { + Log.d(TAG, "Closing database"); + rollback(); + this.db.close(); + this.db = null; + } + } + + protected static File getDatabasePath(Context context, String name) { + File dir = context.getExternalFilesDir(null); + if (dir != null) { + File f = new File(new File(dir, "KOLIBRI_DATA"), name); + if (f.exists()) { + return f; + } else { + Log.v(TAG, "Database file does not exist: " + f.getPath()); + } + } + return null; + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/FilterableQuery.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/FilterableQuery.java new file mode 100644 index 00000000..40473c01 --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/FilterableQuery.java @@ -0,0 +1,49 @@ +package org.learningequality.sqlite.query; + +import org.learningequality.sqlite.schema.DatabaseTable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * A base query that can be filtered + */ +public abstract class FilterableQuery> { + private final List whereClauses; + private final List whereParameters; + + public FilterableQuery() { + this.whereClauses = new ArrayList(); + this.whereParameters = new ArrayList(); + } + + public T where(String clause, String... parameters) { + this.whereClauses.add(clause); + this.whereParameters.addAll(Arrays.asList(parameters)); + return self(); + } + + public T where(DatabaseTable.Column column, String value) { + return where(column.getColumnName() + " = ?", value); + } + + public T where(DatabaseTable.ColumnEnum value) { + return where(value.getColumn(), value.getValue()); + } + + protected String buildSelection() { + // Currently we only support ANDing all where clauses + return String.join(" AND ", this.whereClauses); + } + + protected String[] buildSelectionArgs() { + return this.whereParameters.toArray(new String[this.whereParameters.size()]); + } + + /** + * Method to return the current instance of the query + * @return the current instance of the query + */ + protected abstract T self(); +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/Query.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/Query.java new file mode 100644 index 00000000..dfbf657a --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/Query.java @@ -0,0 +1,10 @@ +package org.learningequality.sqlite.query; + +import org.learningequality.sqlite.Database; + +/** + * A SQL query interface that defines a method to execute the query + */ +public interface Query { + T execute(Database db); +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/SelectQuery.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/SelectQuery.java new file mode 100644 index 00000000..cadbeb32 --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/SelectQuery.java @@ -0,0 +1,113 @@ +package org.learningequality.sqlite.query; + +import android.database.Cursor; +import android.os.Bundle; + +import org.learningequality.sqlite.Database; +import org.learningequality.sqlite.schema.DatabaseTable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * A query that SELECTs rows from a table + */ +public class SelectQuery extends FilterableQuery implements Query { + private String tableName; + private final DatabaseTable.Column[] selectColumns; + private String orderBy; + + public SelectQuery(DatabaseTable.Column... columns) { + this.selectColumns = columns.length > 0 ? columns : null; + } + + /** + * Method to return the current instance of the query + * @return the current instance of the query + */ + @Override + protected SelectQuery self() { + return this; + } + + public SelectQuery from(String tableName) { + this.tableName = tableName; + return this; + } + + public SelectQuery orderBy(DatabaseTable.Column column, boolean ascending) { + this.orderBy = column.getColumnName() + (ascending ? " ASC" : " DESC"); + return this; + } + + protected Bundle buildBundle(Database db, Cursor cursor) { + Bundle b = new Bundle(cursor.getColumnCount() + 2); + b.putString(DatabaseTable.DATABASE_NAME, db.getName()); + b.putString(DatabaseTable.TABLE_NAME, this.tableName); + + for (int i = 0; i < cursor.getColumnCount(); i++) { + String columnName = cursor.getColumnName(i); + switch (cursor.getType(i)) { + case Cursor.FIELD_TYPE_NULL: + b.putString(columnName, null); + break; + case Cursor.FIELD_TYPE_INTEGER: + b.putLong(columnName, cursor.getLong(i)); + break; + case Cursor.FIELD_TYPE_FLOAT: + b.putDouble(columnName, cursor.getDouble(i)); + break; + case Cursor.FIELD_TYPE_STRING: + b.putString(columnName, cursor.getString(i)); + break; + case Cursor.FIELD_TYPE_BLOB: + b.putByteArray(columnName, cursor.getBlob(i)); + break; + } + } + return b; + } + + protected String[] generateSelectColumns() { + if (this.selectColumns == null) { + return null; + } + + // This can be simpler with Java 8 streams + List selectColumns = new ArrayList(); + for (DatabaseTable.Column column : this.selectColumns) { + selectColumns.add(column.getColumnName()); + } + + return selectColumns.toArray(new String[0]); + } + + public Bundle[] execute(Database db) { + if (!db.isConnected()) { + return null; + } + + try { + List results; + try (Cursor cursor = db.get().query( + this.tableName, + this.generateSelectColumns(), + buildSelection(), + buildSelectionArgs(), + null, + null, + this.orderBy + )) { + results = new ArrayList(); + while (cursor.moveToNext()) { + results.add(buildBundle(db, cursor)); + } + } + return results.toArray(new Bundle[0]); + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/UpdateQuery.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/UpdateQuery.java new file mode 100644 index 00000000..b452b08b --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/UpdateQuery.java @@ -0,0 +1,50 @@ +package org.learningequality.sqlite.query; + +import org.learningequality.sqlite.Database; +import org.learningequality.sqlite.schema.DatabaseTable; + +import android.content.ContentValues; + +/** + * A class that represents an UPDATE SQL query + */ +public class UpdateQuery extends FilterableQuery implements Query { + private final String tableName; + private ContentValues values; + + public UpdateQuery(String tableName) { + this.tableName = tableName; + this.values = new ContentValues(); + } + + /** + * Method to return the current instance of the query + * @return the current instance of the query + */ + @Override + protected UpdateQuery self() { + return this; + } + + public UpdateQuery set(ContentValues values) { + this.values = values; + return this; + } + + public UpdateQuery set(DatabaseTable.Column column, String value) { + this.values.put(column.getColumnName(), value); + return this; + } + + public Integer execute(Database db) { + if (!db.isConnected()) { + return 0; + } + + if (this.values.size() == 0) { + throw new IllegalStateException("No values to update"); + } + + return db.get().update(this.tableName, this.values, buildSelection(), buildSelectionArgs()); + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/schema/DatabaseTable.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/schema/DatabaseTable.java new file mode 100644 index 00000000..6f4f6316 --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/schema/DatabaseTable.java @@ -0,0 +1,64 @@ +package org.learningequality.sqlite.schema; + +import android.os.Bundle; + +public interface DatabaseTable { + String DATABASE_NAME = "DATABASE_NAME"; + String TABLE_NAME = "TABLE_NAME"; + + String getTableName(); + + interface Column { + String getColumnName(); + } + + interface ColumnEnum extends Column { + String name(); + + T getValue(); + + ColumnImpl getColumn(); + + default String getColumnName() { + return getColumn().getColumnName(); + } + } + + interface StringChoiceEnum extends ColumnEnum { + default String getValue() { + return this.name(); + } + } + + abstract class ColumnImpl implements Column { + private final String columnName; + + public ColumnImpl(String columnName) { + this.columnName = columnName; + } + + public String getColumnName() { + return this.columnName; + } + } + + class StringColumn extends ColumnImpl { + public StringColumn(String columnName) { + super(columnName); + } + + public String getValue(Bundle bundle) { + return bundle.getString(getColumnName()); + } + } + + class LongColumn extends ColumnImpl { + public LongColumn(String columnName) { + super(columnName); + } + + public Long getValue(Bundle bundle) { + return bundle.getLong(getColumnName()); + } + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Worker.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Worker.java new file mode 100644 index 00000000..6eb49209 --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/task/Worker.java @@ -0,0 +1,41 @@ +package org.learningequality.task; + +import androidx.work.Data; + +import org.learningequality.notification.Notifier; +import org.learningequality.notification.NotificationRef; + +import java.util.UUID; + +public interface Worker extends Notifier { + String TAG = "Kolibri.TaskWorker"; + String ARGUMENT_WORKER_ARGUMENT = "PYTHON_WORKER_ARGUMENT"; + + UUID getId(); + + Data getInputData(); + + default String getArgument() { + String dataArg = getInputData().getString(ARGUMENT_WORKER_ARGUMENT); + final String serviceArg; + if (dataArg != null) { + serviceArg = dataArg; + } else { + serviceArg = ""; + } + return serviceArg; + } + + default NotificationRef getNotificationRef() { + // Use worker request ID as notification tag + return buildNotificationRef(getId()); + } + + static NotificationRef buildNotificationRef(UUID id) { + return buildNotificationRef(id.toString()); + } + + static NotificationRef buildNotificationRef(String id) { + return new NotificationRef(NotificationRef.REF_CHANNEL_DEFAULT, id); + } +} diff --git a/src/android_app_plugin/kolibri_plugin.py b/src/android_app_plugin/kolibri_plugin.py index e14bc68a..7a136f57 100644 --- a/src/android_app_plugin/kolibri_plugin.py +++ b/src/android_app_plugin/kolibri_plugin.py @@ -9,7 +9,6 @@ Locale = autoclass("java.util.Locale") Task = autoclass("org.learningequality.Task") -TaskWorker = autoclass("org.learningequality.Kolibri.TaskworkerWorker") logger = logging.getLogger(__name__) @@ -48,13 +47,14 @@ def schedule( job.func, orm_job.id, delay, high_priority ) ) - Task.enqueueOnce( + request_id = Task.enqueueOnce( orm_job.id, delay, high_priority, job.func, job.long_running, ) + job.update_worker_info(extra=request_id) def update(self, job, orm_job, state=None, **kwargs): currentLocale = Locale.getDefault().toLanguageTag() @@ -68,7 +68,8 @@ def update(self, job, orm_job, state=None, **kwargs): else: progress = -1 total_progress = -1 - TaskWorker.updateProgress( + Task.updateProgress( + orm_job.worker_extra, status.title, status.text, progress, diff --git a/src/android_utils.py b/src/android_utils.py index db8a99f6..94353949 100644 --- a/src/android_utils.py +++ b/src/android_utils.py @@ -49,15 +49,8 @@ def get_version_name(): @cache def get_context(): - if is_service_context(): - PythonService = autoclass("org.kivy.android.PythonService") - return PythonService.mService.getApplicationContext() - elif is_taskworker_context(): - PythonWorker = autoclass("org.kivy.android.PythonWorker") - return PythonWorker.mWorker.getApplicationContext() - else: - PythonActivity = autoclass("org.kivy.android.PythonActivity") - return PythonActivity.mActivity.getApplicationContext() + PythonContext = autoclass("org.kivy.android.PythonContext") + return PythonContext.get() @cache diff --git a/src/taskworker.py b/src/taskworker.py index 9811ec06..f6302770 100644 --- a/src/taskworker.py +++ b/src/taskworker.py @@ -9,12 +9,30 @@ logger = logging.getLogger(__name__) -def main(job_id): - logger.info("Starting Kolibri task worker, for job {}".format(job_id)) +def main(job_request): + request_id, job_id, process_id, thread_id = job_request.split(",") + logger.info( + "Starting Kolibri task worker, for job {} and request {}".format( + job_id, request_id + ) + ) # Import this after we have initialized Kolibri from kolibri.core.tasks.worker import execute_job # noqa: E402 - execute_job(job_id) - - logger.info("Ending Kolibri task worker, for job {}".format(job_id)) + try: + execute_job( + str(job_id), + worker_process=str(process_id), + worker_thread=str(thread_id), + worker_extra=str(request_id), + ) + except Exception as e: + logger.exception("Error occurred executing job", exc_info=e) + raise e + + logger.info( + "Ending Kolibri task worker, for job {} and request {}".format( + job_id, request_id + ) + )