diff --git a/python-for-android/dists/kolibri/src/main/AndroidManifest.xml b/python-for-android/dists/kolibri/src/main/AndroidManifest.xml index 98b8c0f6..e1a9694f 100644 --- a/python-for-android/dists/kolibri/src/main/AndroidManifest.xml +++ b/python-for-android/dists/kolibri/src/main/AndroidManifest.xml @@ -22,6 +22,12 @@ + + + + + + + + - startRemoteWork() { final String id = getId().toString(); final String arg = getArgument(); - Log.d(TAG, "Enabling foreground service for long running task for " + id); - setForegroundAsync(getForegroundInfo()); - // See executor defined in configuration final ThreadPoolExecutor executor = (ThreadPoolExecutor) getBackgroundExecutor(); // This is somewhat similar to what the plain `Worker` class does, except that we @@ -66,7 +64,9 @@ public ListenableFuture startRemoteWork() { synchronized (future) { if (future.isCancelled()) { Log.i(TAG, "Interrupting python thread"); - threadFuture.cancel(true); + synchronized (threadFuture) { + threadFuture.cancel(true); + } } } }, getTaskExecutor().getMainThreadExecutor()); @@ -81,17 +81,17 @@ public void onStopped() { } public ForegroundInfo getForegroundInfo() { - NotificationRef ref = getNotificationRef(); - // If we are running in the service, use the service notification ref - synchronized (WorkerService.class) { - if (WorkerService.mService != null) { - ref = WorkerService.mService.getNotificationRef(); - } else { - Log.w(TAG, "No service found, using worker notification for foreground"); - } + NotificationRef ref = WorkerService.buildNotificationRef(); + Builder builder = new Builder(getApplicationContext(), ref); + // If API level is at least 29 + if (android.os.Build.VERSION.SDK_INT >= android.os.Build.VERSION_CODES.Q) { + return new ForegroundInfo( + ref.getId(), + builder.build(), + ServiceInfo.FOREGROUND_SERVICE_TYPE_MANIFEST + ); } - Builder builder = new Builder(getApplicationContext(), ref); return new ForegroundInfo(ref.getId(), builder.build()); } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/WorkControllerService.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/WorkControllerService.java index 2c0d755d..0fbd8a64 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/WorkControllerService.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/WorkControllerService.java @@ -36,9 +36,11 @@ public class WorkControllerService extends Service { protected static final AtomicReference state = new AtomicReference<>(State.SLEEPING); protected static final AtomicInteger taskCount = new AtomicInteger(0); protected final AtomicBoolean isConnected = new AtomicBoolean(false); + protected final AtomicBoolean shouldReconcile = new AtomicBoolean(true); protected Intent workManagerIntent; protected ExecutorService executor; protected CompletableFuture futureChain; + protected WorkControllerHandler messageHandler; protected Messenger messenger; private ServiceConnection connection; @@ -52,9 +54,10 @@ public void onCreate() { workManagerIntent = new Intent(this, RemoteWorkManagerService.class); connection = new WorkManagerConnection(); - executor = Executors.newFixedThreadPool(2); + executor = Executors.newFixedThreadPool(3); futureChain = CompletableFuture.completedFuture(null); - messenger = new Messenger(new WorkControllerHandler()); + messageHandler = new WorkControllerHandler(); + messenger = new Messenger(messageHandler); } @Override @@ -70,6 +73,7 @@ public void onDestroy() { executor = null; futureChain = null; workManagerIntent = null; + messageHandler = null; messenger = null; connection = null; } @@ -89,27 +93,32 @@ protected void onWake() { } } - startTask(() -> { - Log.d(TAG, "Binding to work manager service"); - // Wakey wakey remote work manager service - bindService(workManagerIntent, connection, Context.BIND_AUTO_CREATE); - return null; + startTask(new WorkTask("wake_work_manager") { + @Override + public CompletableFuture run() { + // Wakey wakey remote work manager service + Log.d(TAG, "Binding to work manager service"); + bindService(workManagerIntent, connection, Context.BIND_AUTO_CREATE); + return null; + } }); } protected void onReconcile() { + synchronized (shouldReconcile) { + if (!shouldReconcile.get()) { + Log.d(TAG, "Skipping enqueue of task reconciliation"); + return; + } + shouldReconcile.set(false); + } + Log.d(TAG, "Enqueuing task reconciliation"); - startTask(() -> { - Log.d(TAG, "Running task reconciliation"); - return Task.reconcile(getApplicationContext(), executor) - .thenApply((r) -> { - if (r) { - Log.d(TAG, "Reconciliation task completed"); - } else { - Log.d(TAG, "Reconciliation task failed"); - } - return null; - }); + startTask(new WorkTask("reconciliation") { + @Override + public CompletableFuture run() { + return Task.reconcile(getApplicationContext(), executor).thenApply((r) -> null); + } }); } @@ -126,40 +135,51 @@ protected void onSleep() { Log.d(TAG, "Waiting for " + taskCount.get() + " tasks to complete"); } } + synchronized (shouldReconcile) { + shouldReconcile.set(true); + } } protected void onStop() { Log.d(TAG, "Stopping work controller service"); // should eventually call `onDestroy` and that will set the stopped state + synchronized (state) { + state.set(State.STOPPED); + } stopSelf(); } protected void startTask(WorkTask task) { - futureChain = futureChain.thenComposeAsync((nothing) -> { + futureChain = futureChain.thenCompose((nothing) -> { try { - Log.d(TAG, "Running task"); + Log.d(TAG, "Running task: " + task.getName()); CompletableFuture f = task.run(); if (f != null) { return f; } } catch (Exception e) { - Log.e(TAG, "Failed running task", e); + Log.e(TAG, "Failed running task: " + task.getName(), e); + return CompletableFuture.failedFuture(e); } finally { - Log.d(TAG, "Task completed"); + Log.d(TAG, "Task completed: " + task.getName()); + boolean hasNoMoreTasks = false; synchronized (taskCount) { if (taskCount.decrementAndGet() == 0) { Log.d(TAG, "Checking state for stopping service"); - synchronized (state) { - if (state.get() != State.AWAKE) { - Log.d(TAG, "Stopping service due to no more tasks"); - stopSelf(); - } + hasNoMoreTasks = true; + } + } + if (hasNoMoreTasks) { + synchronized (state) { + if (state.get() != State.AWAKE) { + Log.d(TAG, "Stopping service due to no more tasks"); + stopSelf(); } } } } return CompletableFuture.completedFuture(null); - }, executor); + }); } @Override @@ -181,13 +201,14 @@ public void onTrimMemory(int level) { @Nullable @Override public IBinder onBind(Intent intent) { + Log.d(TAG, "Producing binding to work controller service"); return messenger.getBinder(); } public enum State { - SLEEPING, AWAKE, AWAKE_LOW_MEMORY, + SLEEPING, STOPPED, } @@ -209,8 +230,15 @@ public int getId() { } } - interface WorkTask { - CompletableFuture run(); + abstract static class WorkTask { + protected final String name; + public WorkTask(String name) { + this.name = name; + } + public String getName() { + return name; + } + abstract public CompletableFuture run(); } class WorkManagerConnection implements ServiceConnection { @@ -241,7 +269,7 @@ public void onBindingDied(android.content.ComponentName name) { @Override public void onNullBinding(android.content.ComponentName name) { // WorkManager service should produce a binding normally - Log.d(TAG, "WorkManager service null binding"); + Log.d(TAG, "WorkManager service gave null binding"); synchronized (isConnected) { isConnected.set(false); } @@ -249,6 +277,10 @@ public void onNullBinding(android.content.ComponentName name) { } class WorkControllerHandler extends Handler { + public WorkControllerHandler() { + super(); + } + @Override public void handleMessage(Message msg) { Log.d(TAG, "Received message " + msg.what); diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/WorkerService.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/WorkerService.java index 589dbd71..4654c91e 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/WorkerService.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/WorkerService.java @@ -1,7 +1,10 @@ package org.learningequality.Kolibri; +import android.content.Intent; +import android.os.IBinder; import android.util.Log; +import androidx.annotation.NonNull; import androidx.work.multiprocess.RemoteWorkerService; import org.learningequality.notification.NotificationRef; @@ -34,6 +37,10 @@ public void onDestroy() { } public NotificationRef getNotificationRef() { + return buildNotificationRef(); + } + + public static NotificationRef buildNotificationRef() { return new NotificationRef(NotificationRef.REF_CHANNEL_SERVICE); } } diff --git a/src/taskworker.py b/src/taskworker.py index 0eeb114d..751ef31d 100644 --- a/src/taskworker.py +++ b/src/taskworker.py @@ -11,15 +11,18 @@ def main(job_request): request_id, job_id, process_id, thread_id = job_request.split(",") + logger.debug("Job request: {}".format(job_request)) logger.info( "Starting Kolibri task worker, for job {} and request {}".format( job_id, request_id ) ) - + # Import this after we have initialized Kolibri + logger.debug("Importing executor for job request: {}".format(job_request)) from kolibri.core.tasks.worker import execute_job # noqa: E402 + logger.debug("Executing job request: {}".format(job_request)) execute_job( job_id, worker_process=str(process_id),