Skip to content

Commit

Permalink
Transactional saving of volume annotations (#7264)
Browse files Browse the repository at this point in the history
* create volume transactions according to debounced pushqueue.push; increase efficiency of compressing-worker by increasing payload size

* make PushQueue.push more robust by avoiding concurrent execution of it (by implementing createDebouncedAbortableCallable

* Revert "Revert "temporarily disable most CI checks""

This reverts commit d69a7cf.

* don't use AsyncTaskQueue in pushqueue anymore

* remove AsyncTaskQueue implementation + specs

* implement small AsyncFifoResolver to prevent theoretical race condition

* ensure that the save saga consumes N items from the save queue where N is the size of the queue at the time the auto-save-interval kicked in

* fix tests

* fix accidentally skipped tests; improve linting rule to avoid this; fix broken segment group test

* harden error handling in PushQueue

* move some lib modules into libs/async

* warn user when pushqueue is starving

* Apply suggestions from code review

* clean up a bit

* tune batch count constants for volume tracings; also show downloading buckets in save button tooltip

* fix race condition in AsyncFifoResolver

* fix incorrect dtype in comment

* update changelog

* improve comment

* rename pendingQueue to pendingBuckets

* fix incorrect method name
  • Loading branch information
philippotto authored Aug 21, 2023
1 parent 2485549 commit 58f5703
Show file tree
Hide file tree
Showing 38 changed files with 688 additions and 462 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released

### Changed
- Small messages during annotating (e.g. “finished undo”, “applying mapping…”) are now click-through so they do not block users from selecting tools. [7239](https://github.com/scalableminds/webknossos/pull/7239)
- Annotating volume data uses a transaction-based mechanism now. As a result, WK is more robust against partial saves (i.e., due to a crashing tab). [#7264](https://github.com/scalableminds/webknossos/pull/7264)
- Improved speed of saving volume data. [#7264](https://github.com/scalableminds/webknossos/pull/7264)
- Improved progress indicator when saving volume data. [#7264](https://github.com/scalableminds/webknossos/pull/7264)
- The order of color layers can now also be manipulated in additive blend mode (see [#7188](https://github.com/scalableminds/webknossos/pull/7188)). [#7289](https://github.com/scalableminds/webknossos/pull/7289)
- OpenID Connect authorization now fetches the server’s public key automatically. The config keys `singleSignOn.openIdConnect.publicKey` and `singleSignOn.openIdConnect.publicKeyAlgorithm` are now unused. [7267](https://github.com/scalableminds/webknossos/pull/7267)

Expand Down
35 changes: 35 additions & 0 deletions frontend/javascripts/libs/async/async_fifo_resolver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* This class can be used to await promises
* in the order they were passed to
* orderedWaitFor.
*
* This enables scheduling of asynchronous work
* concurrently while ensuring that the results
* are processed in the order they were requested
* (instead of the order in which they finished).
*
* Example:
* const resolver = new AsyncFifoResolver();
* const promise1Done = resolver.orderedWaitFor(promise1);
* const promise2Done = resolver.orderedWaitFor(promise2);
*
* Even if promise2 resolves before promise1, promise2Done
* will resolve *after* promise1Done.
*/

export class AsyncFifoResolver<T> {
queue: Promise<T>[];
constructor() {
this.queue = [];
}

async orderedWaitFor(promise: Promise<T>): Promise<T> {
this.queue.push(promise);
const promiseCountToAwait = this.queue.length;
const retVals = await Promise.all(this.queue);
// Note that this.queue can have changed during the await.
// Find the index of the promise and trim the queue accordingly.
this.queue = this.queue.slice(this.queue.indexOf(promise) + 1);
return retVals[promiseCountToAwait - 1];
}
}
95 changes: 95 additions & 0 deletions frontend/javascripts/libs/async/debounced_abortable_saga.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { call, type Saga } from "oxalis/model/sagas/effect-generators";
import { buffers, Channel, channel, runSaga } from "redux-saga";
import { delay, race, take } from "redux-saga/effects";

/*
* This function takes a saga and a debounce threshold
* and returns a function F that will trigger the given saga
* in a debounced manner.
* In contrast to a normal debouncing mechanism, the saga
* will be cancelled if F is called while the saga is running.
* Note that this means that concurrent executions of the saga
* are impossible that way (by design).
*
* Also note that the performance of this debouncing mechanism
* is slower than a standard _.debounce. Also see
* debounced_abortable_saga.spec.ts for a small benchmark.
*/
export function createDebouncedAbortableCallable<T, C>(
fn: (param1: T) => Saga<void>,
debounceThreshold: number,
context: C,
) {
// The communication with the saga is done via a channel.
// That way, we can expose a normal function that
// does the triggering by filling the channel.

// Only the most recent invocation should survive.
// Therefore, create a sliding buffer with size 1.
const buffer = buffers.sliding<T>(1);
const triggerChannel: Channel<T> = channel<T>(buffer);

const _task = runSaga(
{},
debouncedAbortableSagaRunner,
debounceThreshold,
triggerChannel,
// @ts-expect-error TS thinks fn doesnt match, but it does.
fn,
context,
);

return (msg: T) => {
triggerChannel.put(msg);
};
}

export function createDebouncedAbortableParameterlessCallable<C>(
fn: () => Saga<void>,
debounceThreshold: number,
context: C,
) {
const wrappedFn = createDebouncedAbortableCallable(fn, debounceThreshold, context);
const dummyParameter = {};
return () => {
wrappedFn(dummyParameter);
};
}

function* debouncedAbortableSagaRunner<T, C>(
debounceThreshold: number,
triggerChannel: Channel<T>,
abortableFn: (param: T) => Saga<void>,
context: C,
): Saga<void> {
while (true) {
// Wait for a trigger-call by consuming
// the channel.
let msg = yield take(triggerChannel);

// Repeatedly try to execute abortableFn (each try
// might be cancelled due to new initiation-requests)
while (true) {
const { debounced, latestMessage } = yield race({
debounced: delay(debounceThreshold),
latestMessage: take(triggerChannel),
});

if (latestMessage) {
msg = latestMessage;
}

if (debounced) {
const { abortingMessage } = yield race({
finished: call([context, abortableFn], msg),
abortingMessage: take(triggerChannel),
});
if (abortingMessage) {
msg = abortingMessage;
} else {
break;
}
}
}
}
}
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Deferred from "libs/deferred";
import Deferred from "libs/async/deferred";
type Task<T> = () => Promise<T>;
export const SKIPPED_TASK_REASON = "Skipped task";
/*
Expand All @@ -11,6 +11,10 @@ export const SKIPPED_TASK_REASON = "Skipped task";
* LatestTaskExecutor instance.
*
* See the corresponding spec for examples.
*
* If you need the same behavior plus cancellation of running
* tasks, take a look at the saga-based `createDebouncedAbortableCallable`
* utility.
*/

export default class LatestTaskExecutor<T> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Saga, Task } from "oxalis/model/sagas/effect-generators";
import { join, call, fork } from "typed-redux-saga";
import type { Saga } from "oxalis/model/sagas/effect-generators";
import { join, call, fork, FixedTask } from "typed-redux-saga";

/*
Given an array of async tasks, processTaskWithPool
Expand All @@ -10,12 +10,11 @@ export default function* processTaskWithPool(
tasks: Array<() => Saga<void>>,
poolSize: number,
): Saga<void> {
const startedTasks: Array<Task<void>> = [];
const startedTasks: Array<FixedTask<void>> = [];
let isFinalResolveScheduled = false;
let error: Error | null = null;

// @ts-expect-error ts-migrate(7006) FIXME: Parameter 'fn' implicitly has an 'any' type.
function* forkSafely(fn): Saga<void> {
function* forkSafely(fn: () => Saga<void>): Saga<void> {
// Errors from forked tasks cannot be caught, see https://redux-saga.js.org/docs/advanced/ForkModel/#error-propagation
// However, the task pool should not abort if a single task fails.
// Therefore, use this wrapper to safely execute all tasks and possibly rethrow the last error in the end.
Expand All @@ -32,17 +31,15 @@ export default function* processTaskWithPool(
isFinalResolveScheduled = true;
// All tasks were kicked off, which is why all tasks can be
// awaited now together.
// @ts-expect-error ts-migrate(2769) FIXME: No overload matches this call.
yield* join(startedTasks);
if (error != null) throw error;
}

return;
}

const task = tasks.shift();
const task = tasks.shift() as () => Saga<void>;
const newTask = yield* fork(forkSafely, task);
// @ts-expect-error ts-migrate(2345) FIXME: Argument of type 'FixedTask<void>' is not assignab... Remove this comment to see the full error message
startedTasks.push(newTask);
// If that task is done, process a new one (that way,
// the pool size stays constant until the queue is almost empty.)
Expand Down
143 changes: 0 additions & 143 deletions frontend/javascripts/libs/async_task_queue.ts

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import _ from "lodash";
export default class WorkerPool<P, R> {
export default class WebWorkerPool<P, R> {
// This class can be used to instantiate multiple web workers
// which are then used for computation in a simple round-robin manner.
//
// Example:
// const compressionPool = new WorkerPool(
// () => createWorker(ByteArrayToLz4Base64Worker),
// const compressionPool = new WebWorkerPool(
// () => createWorker(ByteArraysToLz4Base64Worker),
// COMPRESSION_WORKER_COUNT,
// );
// const promise1 = compressionPool.submit(data1);
Expand Down
Loading

0 comments on commit 58f5703

Please sign in to comment.