Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactional saving of volume annotations #7264

Merged
merged 24 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f33455e
create volume transactions according to debounced pushqueue.push; inc…
philippotto Aug 14, 2023
1e135bf
Merge branch 'master' of github.com:scalableminds/webknossos into vol…
philippotto Aug 14, 2023
0c444ab
make PushQueue.push more robust by avoiding concurrent execution of i…
philippotto Aug 14, 2023
d29e86b
Revert "Revert "temporarily disable most CI checks""
philippotto Jun 27, 2023
da39e1d
don't use AsyncTaskQueue in pushqueue anymore
philippotto Aug 15, 2023
9ca1834
remove AsyncTaskQueue implementation + specs
philippotto Aug 15, 2023
a596752
implement small AsyncFifoResolver to prevent theoretical race condition
philippotto Aug 15, 2023
bd5d355
ensure that the save saga consumes N items from the save queue where …
philippotto Aug 15, 2023
b0254a4
fix tests
philippotto Aug 15, 2023
8a5d387
fix accidentally skipped tests; improve linting rule to avoid this; f…
philippotto Aug 15, 2023
a2574b8
harden error handling in PushQueue
philippotto Aug 15, 2023
e954202
move some lib modules into libs/async
philippotto Aug 15, 2023
db611cc
warn user when pushqueue is starving
philippotto Aug 15, 2023
e3228df
Apply suggestions from code review
philippotto Aug 15, 2023
31df268
clean up a bit
philippotto Aug 15, 2023
1e90b3d
tune batch count constants for volume tracings; also show downloading…
philippotto Aug 16, 2023
9111358
fix race condition in AsyncFifoResolver
philippotto Aug 16, 2023
5dceb0c
fix incorrect dtype in comment
philippotto Aug 16, 2023
e1f4a53
update changelog
philippotto Aug 16, 2023
3799b22
improve comment
philippotto Aug 16, 2023
48b19c4
rename pendingQueue to pendingBuckets
philippotto Aug 16, 2023
f4b6c89
fix incorrect method name
philippotto Aug 16, 2023
5a0e630
Merge branch 'master' of github.com:scalableminds/webknossos into vol…
philippotto Aug 17, 2023
7205d46
Merge branch 'master' of github.com:scalableminds/webknossos into vol…
philippotto Aug 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions frontend/javascripts/libs/async/async_fifo_resolver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* This class can be used to await promises
* in the order they were passed to
* orderedWaitFor.
*
* This enables scheduling of asynchronous work
* concurrently while ensuring that the results
* are processed in the order they were requested
* (instead of the order in which they finished).
*
* Example:
* const resolver = new AsyncFifoResolver();
* const promise1Done = resolver.orderedWaitFor(promise1);
* const promise2Done = resolver.orderedWaitFor(promise2);
*
* Even if promise2 resolves before promise1, promise2Done
* will resolve *after* promise1Done.
*/

export class AsyncFifoResolver<T> {
queue: Promise<T>[];
constructor() {
this.queue = [];
}

async orderedWaitFor(promise: Promise<T>): Promise<T> {
this.queue.push(promise);
const promiseCount = this.queue.length;
const retVals = await Promise.all(this.queue);
// Trim awaited promises
this.queue = this.queue.slice(promiseCount);
return retVals[promiseCount - 1];
}
}
95 changes: 95 additions & 0 deletions frontend/javascripts/libs/async/debounced_abortable_saga.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { call, type Saga } from "oxalis/model/sagas/effect-generators";
import { buffers, Channel, channel, runSaga } from "redux-saga";
import { delay, race, take } from "redux-saga/effects";

/*
* This function takes a saga and a debounce threshold
* and returns a function F that will trigger the given saga
* in a debounced manner.
* In contrast to a normal debouncing mechanism, the saga
* will be cancelled if F is called while the saga is running.
* Note that this means that concurrent executions of the saga
* are impossible that way (by design).
*
* Also note that the performance of this debouncing mechanism
* is slower than a standard _.debounce. Also see
* debounced_abortable_saga.spec.ts for a small benchmark.
*/
export function createDebouncedAbortableCallable<T, C>(
fn: (param1: T) => Saga<void>,
debounceThreshold: number,
context: C,
) {
// The communication with the saga is done via a channel.
// That way, we can expose a normal function that
// does the triggering by filling the channel.

// Only the most recent invocation should survive.
// Therefore, create a sliding buffer with size 1.
const buffer = buffers.sliding<T>(1);
const triggerChannel: Channel<T> = channel<T>(buffer);

const _task = runSaga(
{},
debouncedAbortableSagaRunner,
debounceThreshold,
triggerChannel,
// @ts-expect-error TS thinks fn doesnt match, but it does.
fn,
context,
);

return (msg: T) => {
triggerChannel.put(msg);
};
}

export function createDebouncedAbortableParameterlessCallable<C>(
fn: () => Saga<void>,
debounceThreshold: number,
context: C,
) {
const wrappedFn = createDebouncedAbortableCallable(fn, debounceThreshold, context);
const dummyParameter = {};
return () => {
wrappedFn(dummyParameter);
};
}

function* debouncedAbortableSagaRunner<T, C>(
debounceThreshold: number,
triggerChannel: Channel<T>,
abortableFn: (param: T) => Saga<void>,
context: C,
): Saga<void> {
while (true) {
// Wait for a trigger-call by consuming
// the channel.
let msg = yield take(triggerChannel);

// Repeatedly try to execute abortableFn (each try
// might be cancelled due to new initiation-requests)
while (true) {
const { debounced, latestMessage } = yield race({
debounced: delay(debounceThreshold),
latestMessage: take(triggerChannel),
});

if (latestMessage) {
msg = latestMessage;
}

if (debounced) {
const { abortingMessage } = yield race({
finished: call([context, abortableFn], msg),
abortingMessage: take(triggerChannel),
});
if (abortingMessage) {
msg = abortingMessage;
} else {
break;
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Deferred from "libs/deferred";
import Deferred from "libs/async/deferred";
type Task<T> = () => Promise<T>;
export const SKIPPED_TASK_REASON = "Skipped task";
/*
Expand All @@ -11,6 +11,10 @@ export const SKIPPED_TASK_REASON = "Skipped task";
* LatestTaskExecutor instance.
*
* See the corresponding spec for examples.
*
* If you need the same behavior plus cancellation of running
* tasks, take a look at the saga-based `createDebouncedAbortableCallable`
* utility.
*/

export default class LatestTaskExecutor<T> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Saga, Task } from "oxalis/model/sagas/effect-generators";
import { join, call, fork } from "typed-redux-saga";
import type { Saga } from "oxalis/model/sagas/effect-generators";
import { join, call, fork, FixedTask } from "typed-redux-saga";

/*
Given an array of async tasks, processTaskWithPool
Expand All @@ -10,12 +10,11 @@ export default function* processTaskWithPool(
tasks: Array<() => Saga<void>>,
poolSize: number,
): Saga<void> {
const startedTasks: Array<Task<void>> = [];
const startedTasks: Array<FixedTask<void>> = [];
let isFinalResolveScheduled = false;
let error: Error | null = null;

// @ts-expect-error ts-migrate(7006) FIXME: Parameter 'fn' implicitly has an 'any' type.
function* forkSafely(fn): Saga<void> {
function* forkSafely(fn: () => Saga<void>): Saga<void> {
// Errors from forked tasks cannot be caught, see https://redux-saga.js.org/docs/advanced/ForkModel/#error-propagation
// However, the task pool should not abort if a single task fails.
// Therefore, use this wrapper to safely execute all tasks and possibly rethrow the last error in the end.
Expand All @@ -32,17 +31,15 @@ export default function* processTaskWithPool(
isFinalResolveScheduled = true;
// All tasks were kicked off, which is why all tasks can be
// awaited now together.
// @ts-expect-error ts-migrate(2769) FIXME: No overload matches this call.
yield* join(startedTasks);
if (error != null) throw error;
}

return;
}

const task = tasks.shift();
const task = tasks.shift() as () => Saga<void>;
const newTask = yield* fork(forkSafely, task);
// @ts-expect-error ts-migrate(2345) FIXME: Argument of type 'FixedTask<void>' is not assignab... Remove this comment to see the full error message
startedTasks.push(newTask);
// If that task is done, process a new one (that way,
// the pool size stays constant until the queue is almost empty.)
Expand Down
143 changes: 0 additions & 143 deletions frontend/javascripts/libs/async_task_queue.ts

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import _ from "lodash";
export default class WorkerPool<P, R> {
export default class WebWorkerPool<P, R> {
// This class can be used to instantiate multiple web workers
// which are then used for computation in a simple round-robin manner.
//
// Example:
// const compressionPool = new WorkerPool(
// () => createWorker(ByteArrayToLz4Base64Worker),
// const compressionPool = new WebWorkerPool(
// () => createWorker(ByteArraysToLz4Base64Worker),
// COMPRESSION_WORKER_COUNT,
// );
// const promise1 = compressionPool.submit(data1);
Expand Down
16 changes: 11 additions & 5 deletions frontend/javascripts/oxalis/model.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import _ from "lodash";
import { COMPRESSING_BATCH_SIZE } from "oxalis/model/bucket_data_handling/pushqueue";
import type { Vector3 } from "oxalis/constants";
import type { Versions } from "oxalis/view/version_view";
import { getActiveSegmentationTracingLayer } from "oxalis/model/accessors/volumetracing_accessor";
Expand Down Expand Up @@ -286,16 +285,23 @@ export class OxalisModel {
return storeStateSaved && pushQueuesSaved;
}

getLongestPushQueueWaitTime() {
return (
_.max(
Utils.values(this.dataLayers).map((layer) => layer.pushQueue.getTransactionWaitTime()),
) || 0
);
}

getPushQueueStats() {
const compressingBucketCount = _.sum(
Utils.values(this.dataLayers).map(
(dataLayer) =>
dataLayer.pushQueue.compressionTaskQueue.tasks.length * COMPRESSING_BATCH_SIZE,
Utils.values(this.dataLayers).map((dataLayer) =>
dataLayer.pushQueue.getCompressingBucketCount(),
),
);

const waitingForCompressionBucketCount = _.sum(
Utils.values(this.dataLayers).map((dataLayer) => dataLayer.pushQueue.pendingQueue.size),
Utils.values(this.dataLayers).map((dataLayer) => dataLayer.pushQueue.getPendingQueueSize()),
);

return {
Expand Down
Loading