From e1f8fb511b77aa452b3061332482c6905f310b5a Mon Sep 17 00:00:00 2001 From: Blaine Jester Date: Thu, 7 Dec 2023 07:18:32 -0800 Subject: [PATCH] Access kolibri's job storage database to reconcile tasks --- python-for-android/dists/kolibri/build.gradle | 1 + .../java/org/kivy/android/PythonWorker.java | 5 +- .../org/learningequality/ContextUtil.java | 19 +- .../org/learningequality/Kolibri/App.java | 21 +- .../Kolibri/ReconcileWorker.java | 47 +++ .../Kolibri/TaskworkerWorkerService.java | 17 + .../Kolibri/sqlite/JobStorage.java | 39 +++ .../main/java/org/learningequality/Task.java | 319 +++++++++++++++++- .../org/learningequality/sqlite/Database.java | 73 ++++ .../sqlite/query/SelectQuery.java | 123 +++++++ .../sqlite/schema/DatabaseTable.java | 50 +++ src/android_app_plugin/kolibri_plugin.py | 3 +- src/taskworker.py | 14 +- 13 files changed, 701 insertions(+), 30 deletions(-) create mode 100644 python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/ReconcileWorker.java create mode 100644 python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/sqlite/JobStorage.java create mode 100644 python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/Database.java create mode 100644 python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/SelectQuery.java create mode 100644 python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/schema/DatabaseTable.java diff --git a/python-for-android/dists/kolibri/build.gradle b/python-for-android/dists/kolibri/build.gradle index 7ea82455..b89c0104 100644 --- a/python-for-android/dists/kolibri/build.gradle +++ b/python-for-android/dists/kolibri/build.gradle @@ -109,4 +109,5 @@ dependencies { implementation 'androidx.concurrent:concurrent-futures:1.1.0' implementation 'androidx.work:work-runtime:2.7.1' implementation 'androidx.work:work-multiprocess:2.7.1' + implementation 'net.sourceforge.streamsupport:java9-concurrent-backport:2.0.5' } diff --git a/python-for-android/dists/kolibri/src/main/java/org/kivy/android/PythonWorker.java b/python-for-android/dists/kolibri/src/main/java/org/kivy/android/PythonWorker.java index 303492f3..166aecdb 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/kivy/android/PythonWorker.java +++ b/python-for-android/dists/kolibri/src/main/java/org/kivy/android/PythonWorker.java @@ -3,6 +3,7 @@ import android.annotation.SuppressLint; import android.content.Context; import android.util.Log; +import android.os.Process; import androidx.annotation.NonNull; import androidx.concurrent.futures.CallbackToFutureAdapter; @@ -80,11 +81,13 @@ protected Result doWork() { Log.d(TAG, id + " Running with python worker argument: " + arg); + String serializedArg = String.join(",", id, arg, Integer.toString(Process.myPid()), Long.toString(Thread.currentThread().getId())); + int res = nativeStart( androidPrivate, androidArgument, workerEntrypoint, pythonName, pythonHome, pythonPath, - arg + serializedArg ); Log.d(TAG, id + " Finished remote python work: " + res); diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/ContextUtil.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/ContextUtil.java index 50a6227d..ae960930 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/ContextUtil.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/ContextUtil.java @@ -3,20 +3,19 @@ import android.content.Context; import org.kivy.android.PythonActivity; -import org.kivy.android.PythonService; -import org.kivy.android.PythonWorker; -import org.learningequality.NotificationRef; +import org.learningequality.Kolibri.TaskworkerWorker; +import org.learningequality.Kolibri.TaskworkerWorkerService; public class ContextUtil { public static Context getApplicationContext() { - if (isActivityContext()) { - return PythonActivity.mActivity.getApplicationContext(); + if (isWorkerContext()) { + return TaskworkerWorker.mWorker.getApplicationContext(); } if (isServiceContext()) { - return PythonService.mService.getApplicationContext(); + return TaskworkerWorkerService.mService.getApplicationContext(); } - if (isWorkerContext()) { - return PythonWorker.mWorker.getApplicationContext(); + if (isActivityContext()) { + return PythonActivity.mActivity.getApplicationContext(); } return null; } @@ -26,10 +25,10 @@ public static boolean isActivityContext() { } public static boolean isServiceContext() { - return PythonService.mService != null; + return TaskworkerWorkerService.mService != null; } public static boolean isWorkerContext() { - return PythonWorker.mWorker != null; + return TaskworkerWorker.mWorker != null; } } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/App.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/App.java index 7bacceec..1974979a 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/App.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/App.java @@ -3,22 +3,29 @@ import android.app.Application; import android.content.Context; import android.os.Build; +import android.util.Log; import androidx.annotation.NonNull; import androidx.core.app.NotificationManagerCompat; import androidx.core.app.NotificationChannelCompat; import androidx.work.Configuration; +import androidx.work.PeriodicWorkRequest; +import androidx.work.WorkManager; import org.learningequality.NotificationRef; +import org.learningequality.Task; import java.util.concurrent.Executors; +import java9.util.concurrent.CompletableFuture; + public class App extends Application implements Configuration.Provider { @Override public void onCreate() { super.onCreate(); NotificationRef.initialize(this); createNotificationChannels(); + reconcileTasks(); } @NonNull @@ -27,8 +34,6 @@ public Configuration getWorkManagerConfiguration() { String processName = getApplicationContext().getPackageName(); processName += getApplicationContext().getString(R.string.task_worker_process); - - // Using the same quantity of worker threads as Kolibri's python side: // https://github.com/learningequality/kolibri/blob/release-v0.16.x/kolibri/utils/options.py#L683 return new Configuration.Builder() @@ -64,4 +69,16 @@ private void createNotificationChannels() { notificationManager.createNotificationChannel(taskChannel); } } + + private void reconcileTasks() { + // Reconcile tasks on startup, in this main thread (blocking!) + CompletableFuture f = Task.reconcile(this, null); + f.whenCompleteAsync((result, throwable) -> { + if (throwable != null) { + Log.e("Kolibri", "Main thread task reconciliation failed", throwable); + } else { + Log.i("Kolibri", "Main thread task reconciliation completed"); + } + }); + } } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/ReconcileWorker.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/ReconcileWorker.java new file mode 100644 index 00000000..3df11f2a --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/ReconcileWorker.java @@ -0,0 +1,47 @@ +package org.learningequality.Kolibri; + +import android.annotation.SuppressLint; +import android.content.Context; +import android.util.Log; + +import androidx.annotation.NonNull; +import androidx.work.ListenableWorker; +import androidx.work.WorkerParameters; +import androidx.work.impl.utils.futures.SettableFuture; + +import com.google.common.util.concurrent.ListenableFuture; + +import java9.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import org.learningequality.Task; + +public class ReconcileWorker extends ListenableWorker { + public static final String TAG = "Kolibri.ReconcileWorker"; + + public ReconcileWorker(@NonNull Context appContext, @NonNull WorkerParameters workerParams) { + super(appContext, workerParams); + } + + @SuppressLint("RestrictedApi") + @NonNull + public ListenableFuture startWork() { + Log.i(TAG, "Starting reconcile task"); + SettableFuture future = SettableFuture.create(); + Executor executor = getBackgroundExecutor(); + + CompletableFuture f = Task.reconcile(getApplicationContext(), executor); + + f.whenCompleteAsync((result, throwable) -> { + if (throwable != null) { + Log.e(TAG, "Reconcile task failed", throwable); + future.setException(throwable); + } else { + Log.i(TAG, "Reconcile task completed: " + (result ? "success" : "failure")); + future.set(result ? Result.success() : Result.failure()); + } + }, executor); + + return future; + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/TaskworkerWorkerService.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/TaskworkerWorkerService.java index cd649f43..8548d34d 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/TaskworkerWorkerService.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/TaskworkerWorkerService.java @@ -3,6 +3,7 @@ import android.content.Context; import android.util.Log; +import androidx.work.PeriodicWorkRequest; import androidx.work.multiprocess.RemoteWorkerService; import androidx.work.WorkManager; @@ -27,6 +28,7 @@ public void onCreate() { ); // Initialize the work manager WorkManager.getInstance(getApplicationContext()); + enqueueTaskReconciliation(); super.onCreate(); // We could potentially remove this and leave the notification up to long-running workers // bound to the service @@ -43,4 +45,19 @@ public void onDestroy() { public NotificationRef getNotificationRef() { return new NotificationRef(NotificationRef.REF_CHANNEL_SERVICE); } + + private void enqueueTaskReconciliation() { + WorkManager workManager = WorkManager.getInstance(this); + PeriodicWorkRequest reconcileRequest = new PeriodicWorkRequest.Builder( + ReconcileWorker.class, + 60, + java.util.concurrent.TimeUnit.MINUTES + ).build(); + + workManager.enqueueUniquePeriodicWork( + "task_reconciliation", + androidx.work.ExistingPeriodicWorkPolicy.KEEP, + reconcileRequest + ); + } } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/sqlite/JobStorage.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/sqlite/JobStorage.java new file mode 100644 index 00000000..989d0ca6 --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Kolibri/sqlite/JobStorage.java @@ -0,0 +1,39 @@ +package org.learningequality.Kolibri.sqlite; + +import org.learningequality.sqlite.schema.DatabaseTable; + +public class JobStorage { + public static final String DATABASE_NAME = "job_storage.sqlite3"; + + public static class Jobs implements DatabaseTable { + public static final String TABLE_NAME = "jobs"; + + public String getTableName() { + return TABLE_NAME; + } + + public static final StringColumn id = new StringColumn("id"); + public static final StringColumn worker_process = new StringColumn("worker_process"); + public static final StringColumn worker_thread = new StringColumn("worker_thread"); + public static final StringColumn worker_extra = new StringColumn("worker_extra"); + public static final StringColumn time_updated = new StringColumn("time_updated"); + public static final StringColumn state = new StringColumn("state"); + + public enum State implements ColumnEnum { + PENDING, + QUEUED, + SCHEDULED, + SELECTED, + RUNNING, + CANCELING, + CANCELED, + FAILED, + COMPLETED + ; + + public StringColumn getColumn() { + return state; + } + } + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Task.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Task.java index d4844d67..a095a7e5 100644 --- a/python-for-android/dists/kolibri/src/main/java/org/learningequality/Task.java +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/Task.java @@ -1,42 +1,97 @@ package org.learningequality; import android.content.Context; +import android.os.Bundle; import android.util.Log; +import androidx.core.content.ContextCompat; import androidx.work.BackoffPolicy; import androidx.work.Data; -import androidx.work.ExistingPeriodicWorkPolicy; import androidx.work.ExistingWorkPolicy; import androidx.work.OneTimeWorkRequest; import androidx.work.OutOfQuotaPolicy; -import androidx.work.PeriodicWorkRequest; import androidx.work.WorkInfo; -import androidx.work.WorkManager; import androidx.work.WorkQuery; import androidx.work.multiprocess.RemoteWorkManager; import com.google.common.util.concurrent.ListenableFuture; import org.learningequality.Kolibri.TaskworkerWorker; +import org.learningequality.Kolibri.sqlite.JobStorage; +import org.learningequality.sqlite.Database; +import org.learningequality.sqlite.query.SelectQuery; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java9.util.concurrent.CompletableFuture; + public class Task { public static final String TAG = "Task"; + public static final String PREFIX_TASK_ID = "kolibri_task_id:"; + + public enum StateMap { + MISSING(null), + PENDING( + JobStorage.Jobs.State.PENDING, + WorkInfo.State.ENQUEUED, + WorkInfo.State.BLOCKED + ), + QUEUED( + JobStorage.Jobs.State.QUEUED, + WorkInfo.State.ENQUEUED, + WorkInfo.State.BLOCKED + ), + SCHEDULED( + JobStorage.Jobs.State.SCHEDULED, + WorkInfo.State.ENQUEUED, + WorkInfo.State.BLOCKED + ), + SELECTED( + JobStorage.Jobs.State.SELECTED, + WorkInfo.State.ENQUEUED, + WorkInfo.State.BLOCKED, + WorkInfo.State.RUNNING + ), + RUNNING(JobStorage.Jobs.State.RUNNING, WorkInfo.State.RUNNING), + CANCELING(JobStorage.Jobs.State.CANCELING, WorkInfo.State.CANCELLED), + CANCELED(JobStorage.Jobs.State.CANCELED, WorkInfo.State.CANCELLED), + FAILED(JobStorage.Jobs.State.FAILED, WorkInfo.State.FAILED), + COMPLETED(JobStorage.Jobs.State.COMPLETED, WorkInfo.State.SUCCEEDED); + + private final JobStorage.Jobs.State jobState; + private final WorkInfo.State[] workInfoStates; + + StateMap(JobStorage.Jobs.State jobState, WorkInfo.State... workInfoStates) { + this.jobState = jobState; + this.workInfoStates = workInfoStates; + } + + public JobStorage.Jobs.State getJobState() { + return this.jobState; + } + + public WorkInfo.State[] getWorkInfoStates() { + return this.workInfoStates; + } + } + private static String generateTagFromId(String id) { - return "kolibri_task_id:" + id; + return PREFIX_TASK_ID + id; } private static String generateTagFromJobFunc(String jobFunc) { return "kolibri_job_type:" + jobFunc; } - public static void enqueueOnce(String id, int delay, boolean expedite, String jobFunc, boolean longRunning) { + public static String enqueueOnce(String id, int delay, boolean expedite, String jobFunc, boolean longRunning) { RemoteWorkManager workManager = RemoteWorkManager.getInstance(ContextUtil.getApplicationContext()); Data data = TaskworkerWorker.buildInputData(id); @@ -62,17 +117,26 @@ public static void enqueueOnce(String id, int delay, boolean expedite, String jo workRequestBuilder.setBackoffCriteria(BackoffPolicy.EXPONENTIAL, 5, TimeUnit.SECONDS); OneTimeWorkRequest workRequest = workRequestBuilder.build(); workManager.enqueueUniqueWork(id, ExistingWorkPolicy.APPEND_OR_REPLACE, workRequest); + // return the work request ID, different from the task ID passed in + return workRequest.getId().toString(); + } + + protected static WorkQuery buildWorkQuery(String... jobIds) { + return WorkQuery.Builder + .fromUniqueWorkNames(Arrays.asList(jobIds)) + .build(); + } + + protected static WorkQuery buildWorkQuery(UUID... requestIds) { + return WorkQuery.Builder + .fromIds(Arrays.asList(requestIds)) + .build(); } public static void clear(String id) { Context context = ContextUtil.getApplicationContext(); - List tags = new ArrayList(); - String tag = generateTagFromId(id); - tags.add(tag); RemoteWorkManager workManager = RemoteWorkManager.getInstance(context); - WorkQuery workQuery = WorkQuery.Builder - .fromTags(tags) - .build(); + WorkQuery workQuery = buildWorkQuery(id); ListenableFuture> workInfosFuture = workManager.getWorkInfos(workQuery); workInfosFuture.addListener(() -> { @@ -94,7 +158,7 @@ public static void clear(String id) { } if (anyInfo && clearable) { // If the tasks are marked as completed we - workManager.cancelAllWorkByTag(tag); + workManager.cancelUniqueWork(id); } } } catch (ExecutionException | InterruptedException e) { @@ -102,4 +166,235 @@ public static void clear(String id) { } }, new MainThreadExecutor()); } + + public static CompletableFuture reconcile(Context context, Executor executor) { + CompletableFuture future = new CompletableFuture<>(); + if (executor == null) { + executor = ContextCompat.getMainExecutor(context); + } + + try (Task.Sentinel sentinel = new Task.Sentinel(context, executor)) { + CompletableFuture f = CompletableFuture.allOf( + sentinel.check(Task.StateMap.PENDING), + sentinel.check(Task.StateMap.QUEUED), + sentinel.check(Task.StateMap.SCHEDULED), + sentinel.check(Task.StateMap.SELECTED), + sentinel.check(Task.StateMap.RUNNING) + ); + + f.whenCompleteAsync((result, throwable) -> { + if (throwable != null) { + Log.w(Sentinel.TAG, "Reconciliation encountered an error"); + future.completeExceptionally(throwable); + } else { + Log.i(Sentinel.TAG, "Reconciliation completed successfully"); + future.complete(true); + } + }, executor); + } + + return future; + } + + /** + * Sentinel (as in watcher) for checking and reconciling Kolibri job status with WorkManager + */ + public static class Sentinel implements AutoCloseable { + public static String TAG = "KolibriTask.Sentinel"; + private final RemoteWorkManager workManager; + private final Database db; + private final Executor executor; + + + public Sentinel(Context context, Executor executor) { + workManager = RemoteWorkManager.getInstance(context); + db = Database.readonly(context, JobStorage.DATABASE_NAME); + this.executor = executor; + } + + /** + * Create a sentinel with the main thread executor + */ + public Sentinel(Context context) { + this(context, ContextCompat.getMainExecutor(context)); + } + + /** + * Build a query for jobs with the given status + * + * @param jobStatus The job status in the Kolibri database for which to find jobs + * @return A query for jobs with the given status, and subset of selected columns + */ + private SelectQuery buildQuery(JobStorage.Jobs.State jobStatus) { + return new SelectQuery( + JobStorage.Jobs.id, + JobStorage.Jobs.state, + JobStorage.Jobs.worker_process, + JobStorage.Jobs.worker_thread, + JobStorage.Jobs.worker_extra + ) + .from(JobStorage.Jobs.TABLE_NAME) + .where(jobStatus) + .orderBy(JobStorage.Jobs.time_updated, false); + } + + private WorkQuery buildWorkQuery(Bundle result) { + String requestId = JobStorage.Jobs.worker_extra.getValue(result); + + if (requestId == null) { + String id = JobStorage.Jobs.id.getValue(result); + Log.v(TAG, "No request ID found for job " + id); + return Task.buildWorkQuery(id); + } + + return Task.buildWorkQuery(UUID.fromString(requestId)); + } + + /** + * Check for jobs with the given status and reconcile them with WorkManager + * Defaults to flagging missing work in WorkManager + * + * @param stateRef The job status in the Kolibri database for which to find jobs + * @return A future that will complete when all jobs have been checked, with a list of jobs + */ + public CompletableFuture check(StateMap stateRef) { + return check(false, stateRef); + } + + /** + * Check for jobs with the given status and reconcile them with WorkManager + * @param ignoreMissing Whether to ignore missing work in WorkManager + * @param stateRef The job status in the Kolibri database for which to find jobs + * @return A future that will complete when all jobs have been checked, with a list of jobs + */ + public CompletableFuture check( + boolean ignoreMissing, + StateMap stateRef + ) { + SelectQuery query = buildQuery(stateRef.getJobState()); + Bundle[] jobs = query.execute(db); + + if (jobs == null || jobs.length == 0) { + Log.v(TAG, "No jobs to reconcile for status " + stateRef); + return CompletableFuture.completedFuture(null); + } + + return check(jobs, ignoreMissing, stateRef.getWorkInfoStates()); + } + + /** + * Check for the given jobs (Bundles) and reconciles them with WorkManager + * + * @param jobs The jobs to check + * @param ignoreMissing Whether to ignore missing work in WorkManager + * @param expectedWorkStates The expected WorkManager states for the found jobs + * @return A future that will complete when all jobs have been checked, with a list of jobs + */ + public CompletableFuture check( + Bundle[] jobs, + boolean ignoreMissing, + WorkInfo.State... expectedWorkStates + ) { + final List> jobFutures = new ArrayList>(jobs.length); + + for (Bundle job : jobs) { + CompletableFuture jobCheck = check(job, ignoreMissing, expectedWorkStates); + jobFutures.add(jobCheck); + } + + CompletableFuture future = CompletableFuture.allOf( + jobFutures.toArray(new CompletableFuture[0]) + ) + .thenApply((result) -> { + final List allResults = new ArrayList(jobs.length); + for (CompletableFuture jobFuture : jobFutures) { + // Add all the results from the job futures + try { + Bundle jobResult = jobFuture.get(); + if (jobResult != null) { + allResults.add(jobResult); + } + } catch (ExecutionException | InterruptedException e) { + e.printStackTrace(); + } + } + return allResults.toArray(new Bundle[0]); + }); + + future.whenComplete((result, ex) -> { + if (future.isCancelled()) { + for (CompletableFuture jobFuture : jobFutures) { + jobFuture.cancel(true); + } + } + }); + return future; + } + + /** + * Check for the given job (Bundle) and reconciles i with WorkManager + * + * @param job The job to check as a `Bundle` + * @param ignoreMissing Whether to ignore the job as missing in WorkManager + * @param expectedWorkStates The expected WorkManager states for the found jobs + * @return A future that will complete when the job has been checked, with the job if it is not reconciled + */ + public CompletableFuture check( + Bundle job, + boolean ignoreMissing, + WorkInfo.State... expectedWorkStates + ) { + CompletableFuture future = new CompletableFuture<>(); + + List workStates = Arrays.asList(expectedWorkStates); + WorkQuery workQuery = buildWorkQuery(job); + ListenableFuture> workInfosFuture = workManager.getWorkInfos(workQuery); + + workInfosFuture.addListener(() -> { + boolean checkFailed = false; + try { + List workInfos = workInfosFuture.get(); + + if (workInfos == null || workInfos.size() == 0) { + if (ignoreMissing) { + return; + } + + Log.w(TAG, "No work requests found for job id " + JobStorage.Jobs.id.getValue(job)); + checkFailed = true; + } else { + for (WorkInfo workInfo : workInfos) { + WorkInfo.State state = workInfo.getState(); + + if (!workStates.contains(state)) { + Log.w(TAG, "WorkInfo state " + state + " does not match expected state " + Arrays.toString(expectedWorkStates) + " for request " + workInfo.getId() + " | " + workInfo.getTags()); + checkFailed = true; + } + } + } + } + catch (ExecutionException | InterruptedException e) { + e.printStackTrace(); + } finally { + if (checkFailed) { + future.complete(job); + } else { + future.complete(null); + } + } + }, executor); + + future.whenComplete((result, ex) -> { + if (future.isCancelled()) { + workInfosFuture.cancel(true); + } + }); + + return future; + } + + public void close() { + db.close(); + } + } } diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/Database.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/Database.java new file mode 100644 index 00000000..89d5530a --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/Database.java @@ -0,0 +1,73 @@ +package org.learningequality.sqlite; + +import android.content.Context; +import android.database.sqlite.SQLiteDatabase; +import android.database.sqlite.SQLiteException; +import android.util.Log; + +import java.io.File; + +public class Database implements AutoCloseable { + public static final String TAG = "Kolibri.Database"; + + private final String name; + private final String path; + private final int flags; + private SQLiteDatabase db; + + private Database(String name, String path, int flags) { + this.name = name; + this.path = path; + this.db = null; + this.flags = flags; + initialize(); + } + + public boolean isConnected() { + return this.path != null && this.db != null; + } + + public SQLiteDatabase get() { + if (!isConnected()) { + throw new IllegalStateException("Database is not connected"); + } + return this.db; + } + + public String getName() { + return this.name; + } + + protected void initialize() { + if (this.path == null) { + return; + } + try { + this.db = SQLiteDatabase.openDatabase(this.path, null, flags); + } catch (SQLiteException e) { + this.db = null; + } + } + + public void close() { + if (isConnected()) { + this.db.close(); + this.db = null; + } + } + + public static Database readonly(Context context, String name) { + String path = null; + File dir = context.getExternalFilesDir(null); + if (dir != null) { + File f = new File(new File(dir, "KOLIBRI_DATA"), name); + if (f.exists()) { + path = f.getPath(); + } else { + Log.v(TAG, "Database file does not exist: " + f.getPath()); + } + } + + return new Database(name, path, SQLiteDatabase.OPEN_READONLY); + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/SelectQuery.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/SelectQuery.java new file mode 100644 index 00000000..ee3c26d4 --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/query/SelectQuery.java @@ -0,0 +1,123 @@ +package org.learningequality.sqlite.query; + +import android.database.Cursor; +import android.os.Bundle; + +import org.learningequality.sqlite.Database; +import org.learningequality.sqlite.schema.DatabaseTable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class SelectQuery { + private String tableName; + private final DatabaseTable.Column[] selectColumns; + private final List whereClauses; + private final List whereParameters; + private String orderBy; + + public SelectQuery(DatabaseTable.Column... columns) { + this.selectColumns = columns.length > 0 ? columns : null; + this.whereClauses = new ArrayList(); + this.whereParameters = new ArrayList(); + } + + public SelectQuery from(String tableName) { + this.tableName = tableName; + return this; + } + + public SelectQuery where(String clause, String... parameters) { + this.whereClauses.add(clause); + this.whereParameters.addAll(Arrays.asList(parameters)); + return this; + } + + public SelectQuery where(DatabaseTable.Column column, String value) { + return where(column.getColumnName() + " = ?", value); + } + + public SelectQuery where(DatabaseTable.ColumnEnum value) { + return where(value.getColumn(), value.getValue()); + } + + public SelectQuery orderBy(DatabaseTable.Column column, boolean ascending) { + this.orderBy = column.getColumnName() + (ascending ? " ASC" : " DESC"); + return this; + } + + protected Bundle buildBundle(Database db, Cursor cursor) { + Bundle b = new Bundle(cursor.getColumnCount() + 2); + b.putString(DatabaseTable.DATABASE_NAME, db.getName()); + b.putString(DatabaseTable.TABLE_NAME, this.tableName); + + for (int i = 0; i < cursor.getColumnCount(); i++) { + String columnName = cursor.getColumnName(i); + switch (cursor.getType(i)) { + case Cursor.FIELD_TYPE_NULL: + b.putString(columnName, null); + break; + case Cursor.FIELD_TYPE_INTEGER: + b.putLong(columnName, cursor.getLong(i)); + break; + case Cursor.FIELD_TYPE_FLOAT: + b.putDouble(columnName, cursor.getDouble(i)); + break; + case Cursor.FIELD_TYPE_STRING: + b.putString(columnName, cursor.getString(i)); + break; + case Cursor.FIELD_TYPE_BLOB: + b.putByteArray(columnName, cursor.getBlob(i)); + break; + } + } + return b; + } + + protected String[] generateSelectColumns() { + if (this.selectColumns == null) { + return null; + } + + // This can be simpler with Java 8 streams + List selectColumns = new ArrayList(); + for (DatabaseTable.Column column : this.selectColumns) { + selectColumns.add(column.getColumnName()); + } + + return selectColumns.toArray(new String[0]); + } + + public Bundle[] execute(Database db) { + if (!db.isConnected()) { + return null; + } + + // Currently we only support ANDing all where clauses + String selection = String.join(" AND ", this.whereClauses); + String[] selectionArgs = this.whereParameters.toArray(new String[0]); + + try { + List results; + try (Cursor cursor = db.get().query( + this.tableName, + this.generateSelectColumns(), + selection, + selectionArgs, + null, + null, + this.orderBy + )) { + results = new ArrayList(); + while (cursor.moveToNext()) { + results.add(buildBundle(db, cursor)); + } + } + return results.toArray(new Bundle[0]); + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } +} diff --git a/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/schema/DatabaseTable.java b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/schema/DatabaseTable.java new file mode 100644 index 00000000..6590b3bd --- /dev/null +++ b/python-for-android/dists/kolibri/src/main/java/org/learningequality/sqlite/schema/DatabaseTable.java @@ -0,0 +1,50 @@ +package org.learningequality.sqlite.schema; + +import android.os.Bundle; + +public interface DatabaseTable { + public static final String DATABASE_NAME = "DATABASE_NAME"; + public static final String TABLE_NAME = "TABLE_NAME"; + + String getTableName(); + + interface Column { + String getColumnName(); + } + + interface ColumnEnum extends Column { + T name(); + + default T getValue() { + return this.name(); + } + + ColumnImpl getColumn(); + + default String getColumnName() { + return getColumn().getColumnName(); + } + } + + abstract class ColumnImpl implements Column { + private final String columnName; + + public ColumnImpl(String columnName) { + this.columnName = columnName; + } + + public String getColumnName() { + return this.columnName; + } + } + + class StringColumn extends ColumnImpl { + public StringColumn(String columnName) { + super(columnName); + } + + public String getValue(Bundle bundle) { + return bundle.getString(getColumnName()); + } + } +} diff --git a/src/android_app_plugin/kolibri_plugin.py b/src/android_app_plugin/kolibri_plugin.py index e14bc68a..1d2715db 100644 --- a/src/android_app_plugin/kolibri_plugin.py +++ b/src/android_app_plugin/kolibri_plugin.py @@ -48,13 +48,14 @@ def schedule( job.func, orm_job.id, delay, high_priority ) ) - Task.enqueueOnce( + request_id = Task.enqueueOnce( orm_job.id, delay, high_priority, job.func, job.long_running, ) + job.update_worker_info(extra=request_id) def update(self, job, orm_job, state=None, **kwargs): currentLocale = Locale.getDefault().toLanguageTag() diff --git a/src/taskworker.py b/src/taskworker.py index 9811ec06..51e04f85 100644 --- a/src/taskworker.py +++ b/src/taskworker.py @@ -9,12 +9,18 @@ logger = logging.getLogger(__name__) -def main(job_id): - logger.info("Starting Kolibri task worker, for job {}".format(job_id)) +def main(job_request): + request_id, job_id, process_id, thread_id = job_request.split(',') + logger.info("Starting Kolibri task worker, for job {} and request {}".format(job_id, request_id)) # Import this after we have initialized Kolibri from kolibri.core.tasks.worker import execute_job # noqa: E402 - execute_job(job_id) + execute_job( + job_id, + worker_process=str(process_id), + worker_thread=str(thread_id), + worker_extra=str(request_id) + ) - logger.info("Ending Kolibri task worker, for job {}".format(job_id)) + logger.info("Ending Kolibri task worker, for job {} and request {}".format(job_id, request_id))