From f33455e245af7d6b77977965714e7df59f907a61 Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Mon, 14 Aug 2023 15:23:24 +0200 Subject: [PATCH 01/21] create volume transactions according to debounced pushqueue.push; increase efficiency of compressing-worker by increasing payload size --- frontend/javascripts/libs/task_pool.ts | 13 +++---- frontend/javascripts/libs/worker_pool.ts | 2 +- .../model/bucket_data_handling/data_cube.ts | 1 + .../model/bucket_data_handling/pushqueue.ts | 35 ++++++++--------- .../bucket_data_handling/wkstore_adapter.ts | 38 ++++++++++++++----- .../oxalis/model/sagas/save_saga.ts | 1 + .../oxalis/model/sagas/save_saga_constants.ts | 4 +- .../volume_annotation_sampling.ts | 2 +- .../oxalis/view/action-bar/save_button.tsx | 10 ++--- ...ts => byte_arrays_to_lz4_base64.worker.ts} | 13 +++++-- .../async_debounced_reentrantable.spec.ts | 20 ++++++++++ .../binary/layers/wkstore_adapter.spec.ts | 6 +-- 12 files changed, 92 insertions(+), 53 deletions(-) rename frontend/javascripts/oxalis/workers/{byte_array_to_lz4_base64.worker.ts => byte_arrays_to_lz4_base64.worker.ts} (57%) create mode 100644 frontend/javascripts/test/libs/async_debounced_reentrantable.spec.ts diff --git a/frontend/javascripts/libs/task_pool.ts b/frontend/javascripts/libs/task_pool.ts index ce2a8947b50..c188092dc5b 100644 --- a/frontend/javascripts/libs/task_pool.ts +++ b/frontend/javascripts/libs/task_pool.ts @@ -1,5 +1,5 @@ -import type { Saga, Task } from "oxalis/model/sagas/effect-generators"; -import { join, call, fork } from "typed-redux-saga"; +import type { Saga } from "oxalis/model/sagas/effect-generators"; +import { join, call, fork, FixedTask } from "typed-redux-saga"; /* Given an array of async tasks, processTaskWithPool @@ -10,12 +10,11 @@ export default function* processTaskWithPool( tasks: Array<() => Saga>, poolSize: number, ): Saga { - const startedTasks: Array> = []; + const startedTasks: Array> = []; let isFinalResolveScheduled = false; let error: Error | null = null; - // @ts-expect-error ts-migrate(7006) FIXME: Parameter 'fn' implicitly has an 'any' type. - function* forkSafely(fn): Saga { + function* forkSafely(fn: () => Saga): Saga { // Errors from forked tasks cannot be caught, see https://redux-saga.js.org/docs/advanced/ForkModel/#error-propagation // However, the task pool should not abort if a single task fails. // Therefore, use this wrapper to safely execute all tasks and possibly rethrow the last error in the end. @@ -32,7 +31,6 @@ export default function* processTaskWithPool( isFinalResolveScheduled = true; // All tasks were kicked off, which is why all tasks can be // awaited now together. - // @ts-expect-error ts-migrate(2769) FIXME: No overload matches this call. yield* join(startedTasks); if (error != null) throw error; } @@ -40,9 +38,8 @@ export default function* processTaskWithPool( return; } - const task = tasks.shift(); + const task = tasks.shift() as () => Saga; const newTask = yield* fork(forkSafely, task); - // @ts-expect-error ts-migrate(2345) FIXME: Argument of type 'FixedTask' is not assignab... Remove this comment to see the full error message startedTasks.push(newTask); // If that task is done, process a new one (that way, // the pool size stays constant until the queue is almost empty.) diff --git a/frontend/javascripts/libs/worker_pool.ts b/frontend/javascripts/libs/worker_pool.ts index 7a07172aa7f..65cb3b58769 100644 --- a/frontend/javascripts/libs/worker_pool.ts +++ b/frontend/javascripts/libs/worker_pool.ts @@ -5,7 +5,7 @@ export default class WorkerPool { // // Example: // const compressionPool = new WorkerPool( - // () => createWorker(ByteArrayToLz4Base64Worker), + // () => createWorker(ByteArraysToLz4Base64Worker), // COMPRESSION_WORKER_COUNT, // ); // const promise1 = compressionPool.submit(data1); diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/data_cube.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/data_cube.ts index 23f63a30d81..ad7c0071826 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/data_cube.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/data_cube.ts @@ -679,6 +679,7 @@ class DataCube { } triggerPushQueue() { + console.log("triggerPushQueue"); this.pushQueue.push(); } diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts index fe0fbf64e30..d68d43d31d5 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts @@ -5,17 +5,16 @@ import { sendToStore } from "oxalis/model/bucket_data_handling/wkstore_adapter"; import AsyncTaskQueue from "libs/async_task_queue"; import type DataCube from "oxalis/model/bucket_data_handling/data_cube"; import Toast from "libs/toast"; +import { sleep } from "libs/utils"; export const COMPRESSING_BATCH_SIZE = 32; -// Only process the PushQueue after there was no user interaction -// for PUSH_DEBOUNCE_TIME milliseconds... +// Only process the PushQueue after there was no user interaction (or bucket modification due to +// downsampling) for PUSH_DEBOUNCE_TIME milliseconds... const PUSH_DEBOUNCE_TIME = 1000; // ...unless a timeout of PUSH_DEBOUNCE_MAX_WAIT_TIME milliseconds // is exceeded. Then, initiate a push. const PUSH_DEBOUNCE_MAX_WAIT_TIME = 30000; class PushQueue { - // @ts-expect-error ts-migrate(2564) FIXME: Property 'dataSetName' has no initializer and is n... Remove this comment to see the full error message - dataSetName: string; cube: DataCube; compressionTaskQueue: AsyncTaskQueue; sendData: boolean; @@ -65,7 +64,6 @@ class PushQueue { this.pendingQueue.add(bucket); bucket.dirtyCount++; } - this.push(); } @@ -78,26 +76,27 @@ class PushQueue { } pushImpl = async () => { + console.log("pushImpl start"); await this.cube.temporalBucketManager.getAllLoadedPromise(); + // Ensure that no earlier pushImpl calls are active + await this.compressionTaskQueue.join(); + if (!this.sendData) { return; } - while (this.pendingQueue.size) { - let batchSize = Math.min(COMPRESSING_BATCH_SIZE, this.pendingQueue.size); - const batch: DataBucket[] = []; + console.log("this.pendingQueue.size", this.pendingQueue.size); - for (const bucket of this.pendingQueue) { - if (batchSize <= 0) break; - this.pendingQueue.delete(bucket); - batch.push(bucket); - batchSize--; - } + // Flush pendingQueue. Note that it's important to do this synchronously. + // If other actors could add to queue concurrently, the front-end could + // send an inconsistent state for a transaction. + const batch: DataBucket[] = Array.from(this.pendingQueue); + this.pendingQueue = new Set(); - // fire and forget - this.compressionTaskQueue.scheduleTask(() => this.pushBatch(batch)); - } + // fire and forget + this.compressionTaskQueue.scheduleTask(() => this.pushBatch(batch)); + // } try { // wait here @@ -105,6 +104,7 @@ class PushQueue { } catch (_error) { alert("We've encountered a permanent issue while saving. Please try to reload the page."); } + console.log("pushImpl end"); }; push = _.debounce(this.pushImpl, PUSH_DEBOUNCE_TIME, { @@ -112,6 +112,7 @@ class PushQueue { }); pushBatch(batch: Array): Promise { + // The batch will be put into one transaction. return sendToStore(batch, this.cube.layerName); } } diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts index c2c24028316..32cf90257ed 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts @@ -13,7 +13,7 @@ import { parseAsMaybe } from "libs/utils"; import { pushSaveQueueTransaction } from "oxalis/model/actions/save_actions"; import type { UpdateAction } from "oxalis/model/sagas/update_actions"; import { updateBucket } from "oxalis/model/sagas/update_actions"; -import ByteArrayToLz4Base64Worker from "oxalis/workers/byte_array_to_lz4_base64.worker"; +import ByteArraysToLz4Base64Worker from "oxalis/workers/byte_arrays_to_lz4_base64.worker"; import DecodeFourBitWorker from "oxalis/workers/decode_four_bit.worker"; import ErrorHandling from "libs/error_handling"; import Request from "libs/request"; @@ -25,13 +25,20 @@ import constants, { MappingStatusEnum } from "oxalis/constants"; import window from "libs/window"; import { getGlobalDataConnectionInfo } from "../data_connection_info"; import { ResolutionInfo } from "../helpers/resolution_info"; +import _ from "lodash"; const decodeFourBit = createWorker(DecodeFourBitWorker); + +// For 64-bit buckets with 32^3 voxel, a COMPRESSION_BATCH_SIZE of +// 128 corresponds to 16.8 MB that are sent to a webworker in one +// go. +const COMPRESSION_BATCH_SIZE = 128; const COMPRESSION_WORKER_COUNT = 2; const compressionPool = new WorkerPool( - () => createWorker(ByteArrayToLz4Base64Worker), + () => createWorker(ByteArraysToLz4Base64Worker), COMPRESSION_WORKER_COUNT, ); + export const REQUEST_TIMEOUT = 60000; export type SendBucketInfo = { position: Vector3; @@ -239,14 +246,25 @@ function sliceBufferIntoPieces( } export async function sendToStore(batch: Array, tracingId: string): Promise { - const items: Array = await Promise.all( - batch.map(async (bucket): Promise => { - const data = bucket.getCopyOfData(); - const bucketInfo = createSendBucketInfo(bucket.zoomedAddress, bucket.cube.resolutionInfo); - const byteArray = new Uint8Array(data.buffer, data.byteOffset, data.byteLength); - const compressedBase64 = await compressionPool.submit(byteArray); - return updateBucket(bucketInfo, compressedBase64); - }), + const items: Array = _.flatten( + await Promise.all( + _.chunk(batch, COMPRESSION_BATCH_SIZE).map(async (batchSubset): Promise => { + const byteArrays = []; + for (const bucket of batchSubset) { + const data = bucket.getCopyOfData(); + const byteArray = new Uint8Array(data.buffer, data.byteOffset, data.byteLength); + byteArrays.push(byteArray); + } + + const compressedBase64Strings = await compressionPool.submit(byteArrays); + return compressedBase64Strings.map((compressedBase64, index) => { + const bucket = batchSubset[index]; + const bucketInfo = createSendBucketInfo(bucket.zoomedAddress, bucket.cube.resolutionInfo); + return updateBucket(bucketInfo, compressedBase64); + }); + }), + ), ); + Store.dispatch(pushSaveQueueTransaction(items, "volume", tracingId)); } diff --git a/frontend/javascripts/oxalis/model/sagas/save_saga.ts b/frontend/javascripts/oxalis/model/sagas/save_saga.ts index 89d8ce97107..a9feebf516f 100644 --- a/frontend/javascripts/oxalis/model/sagas/save_saga.ts +++ b/frontend/javascripts/oxalis/model/sagas/save_saga.ts @@ -154,6 +154,7 @@ let didShowFailedSimultaneousTracingError = false; export function* sendRequestToServer(saveQueueType: SaveQueueType, tracingId: string): Saga { const fullSaveQueue = yield* select((state) => selectQueue(state, saveQueueType, tracingId)); const saveQueue = sliceAppropriateBatchCount(fullSaveQueue); + console.log("saving", saveQueue.length, "items out of", fullSaveQueue.length); let compactedSaveQueue = compactSaveQueue(saveQueue); const { version, type } = yield* select((state) => selectTracing(state, saveQueueType, tracingId), diff --git a/frontend/javascripts/oxalis/model/sagas/save_saga_constants.ts b/frontend/javascripts/oxalis/model/sagas/save_saga_constants.ts index 233d31fed01..30e90a68ef1 100644 --- a/frontend/javascripts/oxalis/model/sagas/save_saga_constants.ts +++ b/frontend/javascripts/oxalis/model/sagas/save_saga_constants.ts @@ -11,5 +11,5 @@ export const UNDO_HISTORY_SIZE = 20; export const SETTINGS_RETRY_DELAY = 15 * 1000; export const SETTINGS_MAX_RETRY_COUNT = 20; // 20 * 15s == 5m -export const maximumActionCountPerBatch = 5000; -export const maximumActionCountPerSave = 15000; +export const maximumActionCountPerBatch = 20; +export const maximumActionCountPerSave = 60; diff --git a/frontend/javascripts/oxalis/model/volumetracing/volume_annotation_sampling.ts b/frontend/javascripts/oxalis/model/volumetracing/volume_annotation_sampling.ts index 3a7a6bc8343..5c4cec475f3 100644 --- a/frontend/javascripts/oxalis/model/volumetracing/volume_annotation_sampling.ts +++ b/frontend/javascripts/oxalis/model/volumetracing/volume_annotation_sampling.ts @@ -80,7 +80,7 @@ function upsampleVoxelMap( ]); if (currentGoalBucket.type === "null") { - console.warn(warnAboutCouldNotCreate([...currentGoalBucketAddress, targetZoomStep])); + warnAboutCouldNotCreate([...currentGoalBucketAddress, targetZoomStep]); continue; } diff --git a/frontend/javascripts/oxalis/view/action-bar/save_button.tsx b/frontend/javascripts/oxalis/view/action-bar/save_button.tsx index 9a6cdb83017..54e3bf047c2 100644 --- a/frontend/javascripts/oxalis/view/action-bar/save_button.tsx +++ b/frontend/javascripts/oxalis/view/action-bar/save_button.tsx @@ -1,7 +1,7 @@ import { connect } from "react-redux"; import React from "react"; import _ from "lodash"; -import Store from "oxalis/store"; +import Store, { SaveState } from "oxalis/store"; import type { OxalisState, IsBusyInfo } from "oxalis/store"; import { isBusy } from "oxalis/model/accessors/save_accessor"; import ButtonComponent from "oxalis/view/components/button_component"; @@ -17,7 +17,7 @@ import { import ErrorHandling from "libs/error_handling"; import * as Utils from "libs/utils"; type OwnProps = { - onClick: (arg0: React.SyntheticEvent) => Promise; + onClick: (arg0: React.MouseEvent) => Promise; className?: string; }; type StateProps = { @@ -113,7 +113,6 @@ class SaveButton extends React.PureComponent { this.state.saveInfo.waitingForCompressionBucketCount + this.state.saveInfo.compressingBucketCount; return ( - // @ts-expect-error ts-migrate(2769) FIXME: No overload matches this call. { } } -// @ts-expect-error ts-migrate(7006) FIXME: Parameter 'saveQueue' implicitly has an 'any' type... Remove this comment to see the full error message -function getOldestUnsavedTimestamp(saveQueue): number | null | undefined { +function getOldestUnsavedTimestamp(saveQueue: SaveState["queue"]): number | null | undefined { let oldestUnsavedTimestamp; if (saveQueue.skeleton.length > 0) { @@ -168,9 +166,7 @@ function getOldestUnsavedTimestamp(saveQueue): number | null | undefined { } for (const volumeQueue of Utils.values(saveQueue.volumes)) { - // @ts-expect-error ts-migrate(2571) FIXME: Object is of type 'unknown'. if (volumeQueue.length > 0) { - // @ts-expect-error ts-migrate(2571) FIXME: Object is of type 'unknown'. const oldestVolumeTimestamp = volumeQueue[0].timestamp; oldestUnsavedTimestamp = Math.min( oldestUnsavedTimestamp != null ? oldestUnsavedTimestamp : Infinity, diff --git a/frontend/javascripts/oxalis/workers/byte_array_to_lz4_base64.worker.ts b/frontend/javascripts/oxalis/workers/byte_arrays_to_lz4_base64.worker.ts similarity index 57% rename from frontend/javascripts/oxalis/workers/byte_array_to_lz4_base64.worker.ts rename to frontend/javascripts/oxalis/workers/byte_arrays_to_lz4_base64.worker.ts index 618cb91049d..1cb2b323ea3 100644 --- a/frontend/javascripts/oxalis/workers/byte_array_to_lz4_base64.worker.ts +++ b/frontend/javascripts/oxalis/workers/byte_arrays_to_lz4_base64.worker.ts @@ -10,8 +10,13 @@ function compressLz4Block(data: Uint8Array): Uint8Array { return newCompressed.slice(4); } -export function byteArrayToLz4Base64(byteArray: Uint8Array): string { - const compressed = compressLz4Block(byteArray); - return Base64.fromByteArray(compressed); +export function byteArraysToLz4Base64(byteArrays: Uint8Array[]): string[] { + const base64Strings: string[] = []; + for (const byteArray of byteArrays) { + const compressed = compressLz4Block(byteArray); + base64Strings.push(Base64.fromByteArray(compressed)); + } + + return base64Strings; } -export default expose(byteArrayToLz4Base64); +export default expose(byteArraysToLz4Base64); diff --git a/frontend/javascripts/test/libs/async_debounced_reentrantable.spec.ts b/frontend/javascripts/test/libs/async_debounced_reentrantable.spec.ts new file mode 100644 index 00000000000..7f67a58f347 --- /dev/null +++ b/frontend/javascripts/test/libs/async_debounced_reentrantable.spec.ts @@ -0,0 +1,20 @@ +import { call, type Saga } from "oxalis/model/sagas/effect-generators"; +import { runSaga } from "redux-saga"; +import { sleep } from "libs/utils"; +import test from "ava"; + +export default function* reentrable(): Saga { + console.log("start"); + yield* call(sleep, 500); + console.log("after sleep"); +} + +test.serial("processTaskWithPool should run a simple task", async (t) => { + t.plan(1); + const protocol: number[] = []; + + const task = runSaga({}, reentrable); + task.cancel(); + await task.toPromise(); + t.is(protocol.length, 0); +}); diff --git a/frontend/javascripts/test/model/binary/layers/wkstore_adapter.spec.ts b/frontend/javascripts/test/model/binary/layers/wkstore_adapter.spec.ts index 0ea4ab70260..6e188adaecc 100644 --- a/frontend/javascripts/test/model/binary/layers/wkstore_adapter.spec.ts +++ b/frontend/javascripts/test/model/binary/layers/wkstore_adapter.spec.ts @@ -1,7 +1,7 @@ import _ from "lodash"; import "test/model/binary/layers/wkstore_adapter.mock.js"; import { getBitDepth } from "oxalis/model/accessors/dataset_accessor"; -import { byteArrayToLz4Base64 } from "oxalis/workers/byte_array_to_lz4_base64.worker"; +import { byteArraysToLz4Base64 } from "oxalis/workers/byte_arrays_to_lz4_base64.worker"; import datasetServerObject from "test/fixtures/dataset_server_object"; import mockRequire from "mock-require"; import sinon from "sinon"; @@ -246,7 +246,7 @@ test.serial("sendToStore: Request Handling should send the correct request param position: [0, 0, 0], mag: [1, 1, 1], cubeSize: 32, - base64Data: byteArrayToLz4Base64(data), + base64Data: byteArraysToLz4Base64([data])[0], }, }, { @@ -255,7 +255,7 @@ test.serial("sendToStore: Request Handling should send the correct request param position: [64, 64, 64], mag: [2, 2, 2], cubeSize: 32, - base64Data: byteArrayToLz4Base64(data), + base64Data: byteArraysToLz4Base64([data])[0], }, }, ], From 0c444ab36437c78aa1e6ea51631275287b2bd04b Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Mon, 14 Aug 2023 17:42:34 +0200 Subject: [PATCH 02/21] make PushQueue.push more robust by avoiding concurrent execution of it (by implementing createDebouncedAbortableCallable --- .../libs/debounced_abortable_saga.ts | 95 +++++++++++++++++++ .../model/bucket_data_handling/pushqueue.ts | 53 +++++------ .../temporal_bucket_manager.ts | 4 +- .../oxalis/model/sagas/save_saga.ts | 1 + .../oxalis/view/action-bar/save_button.tsx | 2 +- .../async_debounced_reentrantable.spec.ts | 20 ---- .../libs/debounced_abortable_saga.spec.ts | 87 +++++++++++++++++ 7 files changed, 212 insertions(+), 50 deletions(-) create mode 100644 frontend/javascripts/libs/debounced_abortable_saga.ts delete mode 100644 frontend/javascripts/test/libs/async_debounced_reentrantable.spec.ts create mode 100644 frontend/javascripts/test/libs/debounced_abortable_saga.spec.ts diff --git a/frontend/javascripts/libs/debounced_abortable_saga.ts b/frontend/javascripts/libs/debounced_abortable_saga.ts new file mode 100644 index 00000000000..ad12d56019d --- /dev/null +++ b/frontend/javascripts/libs/debounced_abortable_saga.ts @@ -0,0 +1,95 @@ +import { call, type Saga } from "oxalis/model/sagas/effect-generators"; +import { buffers, Channel, channel, runSaga } from "redux-saga"; +import { delay, race, take } from "redux-saga/effects"; + +/* + * This function takes a saga and a debounce threshold + * and returns a function F that will trigger the given saga + * in a debounced manner. + * In contrast to a normal debouncing mechanism, the saga + * will be cancelled if F is called while the saga is running. + * Note that this means that concurrent executions of the saga + * are impossible that way (by design). + * + * Also note that the performance of this debouncing mechanism + * slower than a standard _.debounce. Also see + * debounced_abortable_saga.spec.ts for a small benchmark. + */ +export function createDebouncedAbortableCallable( + fn: (param1: T) => Saga, + debounceThreshold: number, + context: C, +) { + // The communication with the saga is done via a channel. + // That way, we can expose a normal function that + // does the triggering by filling the channel. + + // Only the most recent invocation should survive. + // Therefore, create a sliding buffer with size 1. + const buffer = buffers.sliding(1); + const triggerChannel: Channel = channel(buffer); + + const _task = runSaga( + {}, + debouncedAbortableSagaRunner, + debounceThreshold, + triggerChannel, + // @ts-expect-error TS thinks fn doesnt match, but it does. + fn, + context, + ); + + return (msg: T) => { + triggerChannel.put(msg); + }; +} + +export function createDebouncedAbortableParameterlessCallable( + fn: () => Saga, + debounceThreshold: number, + context: C, +) { + const wrappedFn = createDebouncedAbortableCallable(fn, debounceThreshold, context); + const dummyParameter = {}; + return () => { + wrappedFn(dummyParameter); + }; +} + +function* debouncedAbortableSagaRunner( + debounceThreshold: number, + triggerChannel: Channel, + abortableFn: (param: T) => Saga, + context: C, +): Saga { + while (true) { + // Wait for a trigger-call by consuming + // the channel. + let msg = yield take(triggerChannel); + + // Repeatedly try to execute abortableFn (each try + // might be cancelled due to new initiation-requests) + while (true) { + const { debounced, latestMessage } = yield race({ + debounced: delay(debounceThreshold), + latestMessage: take(triggerChannel), + }); + + if (latestMessage) { + msg = latestMessage; + } + + if (debounced) { + const { abortingMessage } = yield race({ + finished: call([context, abortableFn], msg), + abortingMessage: take(triggerChannel), + }); + if (abortingMessage) { + msg = abortingMessage; + } else { + break; + } + } + } + } +} diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts index d68d43d31d5..f34f99978e8 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts @@ -5,14 +5,16 @@ import { sendToStore } from "oxalis/model/bucket_data_handling/wkstore_adapter"; import AsyncTaskQueue from "libs/async_task_queue"; import type DataCube from "oxalis/model/bucket_data_handling/data_cube"; import Toast from "libs/toast"; -import { sleep } from "libs/utils"; +import { createDebouncedAbortableParameterlessCallable } from "libs/debounced_abortable_saga"; +import { call } from "redux-saga/effects"; export const COMPRESSING_BATCH_SIZE = 32; // Only process the PushQueue after there was no user interaction (or bucket modification due to // downsampling) for PUSH_DEBOUNCE_TIME milliseconds... const PUSH_DEBOUNCE_TIME = 1000; // ...unless a timeout of PUSH_DEBOUNCE_MAX_WAIT_TIME milliseconds // is exceeded. Then, initiate a push. -const PUSH_DEBOUNCE_MAX_WAIT_TIME = 30000; +// todo: reactivate? +const _PUSH_DEBOUNCE_MAX_WAIT_TIME = 30000; class PushQueue { cube: DataCube; @@ -75,41 +77,38 @@ class PushQueue { this.pendingQueue.forEach((e) => console.log(e)); } - pushImpl = async () => { - console.log("pushImpl start"); - await this.cube.temporalBucketManager.getAllLoadedPromise(); - - // Ensure that no earlier pushImpl calls are active - await this.compressionTaskQueue.join(); - - if (!this.sendData) { - return; - } + pushImpl = function* (this: PushQueue) { + try { + console.log("pushImpl start"); + yield call(this.cube.temporalBucketManager.getAllLoadedPromise); - console.log("this.pendingQueue.size", this.pendingQueue.size); + if (!this.sendData) { + return; + } - // Flush pendingQueue. Note that it's important to do this synchronously. - // If other actors could add to queue concurrently, the front-end could - // send an inconsistent state for a transaction. - const batch: DataBucket[] = Array.from(this.pendingQueue); - this.pendingQueue = new Set(); + console.log("this.pendingQueue.size", this.pendingQueue.size); - // fire and forget - this.compressionTaskQueue.scheduleTask(() => this.pushBatch(batch)); - // } + // Flush pendingQueue. Note that it's important to do this synchronously. + // If other actors could add to queue concurrently, the front-end could + // send an inconsistent state for a transaction. + const batch: DataBucket[] = Array.from(this.pendingQueue); + this.pendingQueue = new Set(); - try { - // wait here - await this.compressionTaskQueue.join(); + // fire and forget + this.pushBatch(batch); } catch (_error) { + // todo: somewhere else? alert("We've encountered a permanent issue while saving. Please try to reload the page."); } console.log("pushImpl end"); }; - push = _.debounce(this.pushImpl, PUSH_DEBOUNCE_TIME, { - maxWait: PUSH_DEBOUNCE_MAX_WAIT_TIME, - }); + // push = _.debounce(this.pushImpl, PUSH_DEBOUNCE_TIME, { + // maxWait: PUSH_DEBOUNCE_MAX_WAIT_TIME, + // }); + + // todo: prevent user from brushing for eternity? + push = createDebouncedAbortableParameterlessCallable(this.pushImpl, PUSH_DEBOUNCE_TIME, this); pushBatch(batch: Array): Promise { // The batch will be put into one transaction. diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/temporal_bucket_manager.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/temporal_bucket_manager.ts index a4c94fafca7..5de198998e1 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/temporal_bucket_manager.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/temporal_bucket_manager.ts @@ -51,9 +51,9 @@ class TemporalBucketManager { return loadedPromise; } - async getAllLoadedPromise(): Promise { + getAllLoadedPromise = async () => { await Promise.all(this.loadedPromises); - } + }; } export default TemporalBucketManager; diff --git a/frontend/javascripts/oxalis/model/sagas/save_saga.ts b/frontend/javascripts/oxalis/model/sagas/save_saga.ts index a9feebf516f..e65f2fe836f 100644 --- a/frontend/javascripts/oxalis/model/sagas/save_saga.ts +++ b/frontend/javascripts/oxalis/model/sagas/save_saga.ts @@ -163,6 +163,7 @@ export function* sendRequestToServer(saveQueueType: SaveQueueType, tracingId: st compactedSaveQueue = addVersionNumbers(compactedSaveQueue, version); let retryCount = 0; + // This while-loop only exists for the purpose of a retry-mechanism while (true) { let exceptionDuringMarkBucketsAsNotDirty = false; diff --git a/frontend/javascripts/oxalis/view/action-bar/save_button.tsx b/frontend/javascripts/oxalis/view/action-bar/save_button.tsx index 54e3bf047c2..2f0e41b0340 100644 --- a/frontend/javascripts/oxalis/view/action-bar/save_button.tsx +++ b/frontend/javascripts/oxalis/view/action-bar/save_button.tsx @@ -120,7 +120,7 @@ class SaveButton extends React.PureComponent { icon={this.getSaveButtonIcon()} className={this.props.className} style={{ - background: showUnsavedWarning ? "var(--ant-error)" : null, + background: showUnsavedWarning ? "var(--ant-error)" : undefined, }} > { - console.log("start"); - yield* call(sleep, 500); - console.log("after sleep"); -} - -test.serial("processTaskWithPool should run a simple task", async (t) => { - t.plan(1); - const protocol: number[] = []; - - const task = runSaga({}, reentrable); - task.cancel(); - await task.toPromise(); - t.is(protocol.length, 0); -}); diff --git a/frontend/javascripts/test/libs/debounced_abortable_saga.spec.ts b/frontend/javascripts/test/libs/debounced_abortable_saga.spec.ts new file mode 100644 index 00000000000..d3af11d5f4c --- /dev/null +++ b/frontend/javascripts/test/libs/debounced_abortable_saga.spec.ts @@ -0,0 +1,87 @@ +import { type Saga } from "oxalis/model/sagas/effect-generators"; +import { sleep } from "libs/utils"; +import test from "ava"; +import _ from "lodash"; +import { createDebouncedAbortableCallable } from "libs/debounced_abortable_saga"; + +const createAbortableFnWithProtocol = () => { + const protocol: string[] = []; + function* abortableFn(msg: { id: number }): Saga { + protocol.push(`await-${msg.id}`); + yield sleep(1000); + protocol.push(`run-${msg.id}`); + } + + return { abortableFn, protocol }; +}; + +const DEBOUNCE_THRESHOLD = 100; +test.serial("Test simplest case", async (t) => { + t.plan(1); + const { abortableFn, protocol } = createAbortableFnWithProtocol(); + const fn = createDebouncedAbortableCallable(abortableFn, DEBOUNCE_THRESHOLD, this); + + fn({ id: 1 }); + + await sleep(2000); + t.deepEqual(protocol, ["await-1", "run-1"]); +}); + +test.serial("Rapid calls where the last one should win", async (t) => { + t.plan(1); + const { abortableFn, protocol } = createAbortableFnWithProtocol(); + const fn = createDebouncedAbortableCallable(abortableFn, DEBOUNCE_THRESHOLD, this); + + fn({ id: 1 }); + fn({ id: 2 }); + fn({ id: 3 }); + + await sleep(2000); + t.deepEqual(protocol, ["await-3", "run-3"]); +}); + +test.serial("Rapid calls with small breaks", async (t) => { + t.plan(1); + const { abortableFn, protocol } = createAbortableFnWithProtocol(); + const fn = createDebouncedAbortableCallable(abortableFn, DEBOUNCE_THRESHOLD, this); + + // Rapid calls with small breaks in-between. + // The small breaks are long enough to satisfy the debounding, + // but not long enough to let the awaiting through. + fn({ id: 1 }); + fn({ id: 2 }); + fn({ id: 3 }); + await sleep(150); + fn({ id: 4 }); + fn({ id: 5 }); + await sleep(150); + fn({ id: 6 }); + fn({ id: 7 }); + + await sleep(2000); + t.deepEqual(protocol, ["await-3", "await-5", "await-7", "run-7"]); +}); + +test.serial.skip("High volume calls", async () => { + // This is not a unit test, but rather a small speed test + // to make a note that the classic _.debounce is way faster. + // For 1000 invocations, _.debounce is roughly 10x faster. + // However, this probably not a bottleneck right now. + + const lodashDebounced = _.debounce((_obj: Object) => {}); + + const { abortableFn } = createAbortableFnWithProtocol(); + const fn = createDebouncedAbortableCallable(abortableFn, DEBOUNCE_THRESHOLD, this); + + console.time("Benchmarking debounce stuff"); + for (let i = 0; i < 1000; i++) { + fn({ id: i }); + } + console.timeEnd("Benchmarking debounce stuff"); + + console.time("Benchmarking lodash debounce stuff"); + for (let i = 0; i < 1000; i++) { + lodashDebounced({ id: i }); + } + console.timeEnd("Benchmarking lodash debounce stuff"); +}); From d29e86ba7c4b8bad042371aeb38f603e904b6013 Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Tue, 27 Jun 2023 16:24:36 +0200 Subject: [PATCH 03/21] Revert "Revert "temporarily disable most CI checks"" This reverts commit d69a7cf89dfd84a6269af26044389255ec923f35. --- .circleci/not-on-master.sh | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.circleci/not-on-master.sh b/.circleci/not-on-master.sh index 581393ebead..e3078cdb9ce 100755 --- a/.circleci/not-on-master.sh +++ b/.circleci/not-on-master.sh @@ -1,8 +1,8 @@ #!/usr/bin/env bash set -Eeuo pipefail -if [ "${CIRCLE_BRANCH}" == "master" ]; then +# if [ "${CIRCLE_BRANCH}" == "master" ]; then echo "Skipping this step on master..." -else - exec "$@" -fi +# else +# exec "$@" +# fi From da39e1d33749f919db4c206fe7c7a52d73923e41 Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Tue, 15 Aug 2023 10:09:45 +0200 Subject: [PATCH 04/21] don't use AsyncTaskQueue in pushqueue anymore --- .../libs/debounced_abortable_saga.ts | 2 +- .../model/bucket_data_handling/pushqueue.ts | 93 +++++++++---------- .../bucket_data_handling/wkstore_adapter.ts | 2 +- .../test/libs/async_task_queue.spec.ts | 1 + .../libs/debounced_abortable_saga.spec.ts | 8 +- 5 files changed, 49 insertions(+), 57 deletions(-) diff --git a/frontend/javascripts/libs/debounced_abortable_saga.ts b/frontend/javascripts/libs/debounced_abortable_saga.ts index ad12d56019d..63feaa1f8ba 100644 --- a/frontend/javascripts/libs/debounced_abortable_saga.ts +++ b/frontend/javascripts/libs/debounced_abortable_saga.ts @@ -12,7 +12,7 @@ import { delay, race, take } from "redux-saga/effects"; * are impossible that way (by design). * * Also note that the performance of this debouncing mechanism - * slower than a standard _.debounce. Also see + * is slower than a standard _.debounce. Also see * debounced_abortable_saga.spec.ts for a small benchmark. */ export function createDebouncedAbortableCallable( diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts index f34f99978e8..be5609d0293 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts @@ -1,10 +1,8 @@ import _ from "lodash"; import type { DataBucket } from "oxalis/model/bucket_data_handling/bucket"; -import { alert, document } from "libs/window"; +import { alert } from "libs/window"; import { sendToStore } from "oxalis/model/bucket_data_handling/wkstore_adapter"; -import AsyncTaskQueue from "libs/async_task_queue"; import type DataCube from "oxalis/model/bucket_data_handling/data_cube"; -import Toast from "libs/toast"; import { createDebouncedAbortableParameterlessCallable } from "libs/debounced_abortable_saga"; import { call } from "redux-saga/effects"; export const COMPRESSING_BATCH_SIZE = 32; @@ -18,46 +16,31 @@ const _PUSH_DEBOUNCE_MAX_WAIT_TIME = 30000; class PushQueue { cube: DataCube; - compressionTaskQueue: AsyncTaskQueue; - sendData: boolean; - // The pendingQueue contains all buckets which are marked as - // "should be snapshotted and saved". That queue is processed - // in a debounced manner and sent to the `compressionTaskQueue`. - // The `compressionTaskQueue` compresses the bucket data and - // sends it to the save queue. - pendingQueue: Set; - - constructor(cube: DataCube, sendData: boolean = true) { + + // The pendingQueue contains all buckets that should be: + // - snapshotted, + // - put into one transaction and then + // - saved + // That queue is flushed in a debounced manner so that the time of the + // snapshot should be suitable for a transaction (since neither WK nor the + // user edited the buckets in a certain time window). + private pendingQueue: Set; + + // Everytime the pendingQueue is flushed, its content is put into a transaction. + // That transaction is compressed asynchronously before it is sent to the store. + // During that compression, the transaction is counted as pending. + private pendingTransactionCount: number = 0; + + constructor(cube: DataCube) { this.cube = cube; - this.compressionTaskQueue = new AsyncTaskQueue(Infinity); - this.sendData = sendData; this.pendingQueue = new Set(); - const autoSaveFailureMessage = "Auto-Save failed!"; - this.compressionTaskQueue.on("failure", () => { - console.error("PushQueue failure"); - - if (document.body != null) { - document.body.classList.add("save-error"); - } - - Toast.error(autoSaveFailureMessage, { - sticky: true, - }); - }); - this.compressionTaskQueue.on("success", () => { - if (document.body != null) { - document.body.classList.remove("save-error"); - } - - Toast.close(autoSaveFailureMessage); - }); } stateSaved(): boolean { return ( this.pendingQueue.size === 0 && this.cube.temporalBucketManager.getCount() === 0 && - !this.compressionTaskQueue.isBusy() + this.pendingTransactionCount == 0 ); } @@ -80,29 +63,35 @@ class PushQueue { pushImpl = function* (this: PushQueue) { try { console.log("pushImpl start"); + // Wait until there are no temporal buckets, anymore, so that + // all buckets can be snapshotted and saved to the server. yield call(this.cube.temporalBucketManager.getAllLoadedPromise); - if (!this.sendData) { - return; - } - - console.log("this.pendingQueue.size", this.pendingQueue.size); - - // Flush pendingQueue. Note that it's important to do this synchronously. - // If other actors could add to queue concurrently, the front-end could - // send an inconsistent state for a transaction. - const batch: DataBucket[] = Array.from(this.pendingQueue); - this.pendingQueue = new Set(); - - // fire and forget - this.pushBatch(batch); + // It is important that flushAndSnapshot does not use a generator + // mechanism, because it could get cancelled due to + // createDebouncedAbortableParameterlessCallable otherwise. + this.flushAndSnapshot(); } catch (_error) { + // Error Recovery! // todo: somewhere else? alert("We've encountered a permanent issue while saving. Please try to reload the page."); } console.log("pushImpl end"); }; + private flushAndSnapshot() { + console.log("this.pendingQueue.size", this.pendingQueue.size); + + // Flush pendingQueue. Note that it's important to do this synchronously. + // If other actors could add to queue concurrently, the front-end could + // send an inconsistent state for a transaction. + const batch: DataBucket[] = Array.from(this.pendingQueue); + this.pendingQueue = new Set(); + + // Fire and forget + this.pushTransaction(batch); + } + // push = _.debounce(this.pushImpl, PUSH_DEBOUNCE_TIME, { // maxWait: PUSH_DEBOUNCE_MAX_WAIT_TIME, // }); @@ -110,9 +99,11 @@ class PushQueue { // todo: prevent user from brushing for eternity? push = createDebouncedAbortableParameterlessCallable(this.pushImpl, PUSH_DEBOUNCE_TIME, this); - pushBatch(batch: Array): Promise { + async pushTransaction(batch: Array): Promise { // The batch will be put into one transaction. - return sendToStore(batch, this.cube.layerName); + this.pendingTransactionCount++; + await sendToStore(batch, this.cube.layerName); + this.pendingTransactionCount--; } } diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts index 32cf90257ed..6e25bf0638c 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts @@ -29,7 +29,7 @@ import _ from "lodash"; const decodeFourBit = createWorker(DecodeFourBitWorker); -// For 64-bit buckets with 32^3 voxel, a COMPRESSION_BATCH_SIZE of +// For 64-bit buckets with 32^3 voxels, a COMPRESSION_BATCH_SIZE of // 128 corresponds to 16.8 MB that are sent to a webworker in one // go. const COMPRESSION_BATCH_SIZE = 128; diff --git a/frontend/javascripts/test/libs/async_task_queue.spec.ts b/frontend/javascripts/test/libs/async_task_queue.spec.ts index b98adaaf3f9..8a4c5fa52f8 100644 --- a/frontend/javascripts/test/libs/async_task_queue.spec.ts +++ b/frontend/javascripts/test/libs/async_task_queue.spec.ts @@ -4,6 +4,7 @@ import Deferred from "libs/deferred"; import * as Utils from "libs/utils"; import sinon from "sinon"; import test from "ava"; + test("AsyncTaskQueue should run a task (1/2)", async (t) => { t.plan(1); const queue = new AsyncTaskQueue(); diff --git a/frontend/javascripts/test/libs/debounced_abortable_saga.spec.ts b/frontend/javascripts/test/libs/debounced_abortable_saga.spec.ts index d3af11d5f4c..45b9894e303 100644 --- a/frontend/javascripts/test/libs/debounced_abortable_saga.spec.ts +++ b/frontend/javascripts/test/libs/debounced_abortable_saga.spec.ts @@ -16,7 +16,7 @@ const createAbortableFnWithProtocol = () => { }; const DEBOUNCE_THRESHOLD = 100; -test.serial("Test simplest case", async (t) => { +test("Test simplest case", async (t) => { t.plan(1); const { abortableFn, protocol } = createAbortableFnWithProtocol(); const fn = createDebouncedAbortableCallable(abortableFn, DEBOUNCE_THRESHOLD, this); @@ -27,7 +27,7 @@ test.serial("Test simplest case", async (t) => { t.deepEqual(protocol, ["await-1", "run-1"]); }); -test.serial("Rapid calls where the last one should win", async (t) => { +test("Rapid calls where the last one should win", async (t) => { t.plan(1); const { abortableFn, protocol } = createAbortableFnWithProtocol(); const fn = createDebouncedAbortableCallable(abortableFn, DEBOUNCE_THRESHOLD, this); @@ -40,7 +40,7 @@ test.serial("Rapid calls where the last one should win", async (t) => { t.deepEqual(protocol, ["await-3", "run-3"]); }); -test.serial("Rapid calls with small breaks", async (t) => { +test("Rapid calls with small breaks", async (t) => { t.plan(1); const { abortableFn, protocol } = createAbortableFnWithProtocol(); const fn = createDebouncedAbortableCallable(abortableFn, DEBOUNCE_THRESHOLD, this); @@ -62,7 +62,7 @@ test.serial("Rapid calls with small breaks", async (t) => { t.deepEqual(protocol, ["await-3", "await-5", "await-7", "run-7"]); }); -test.serial.skip("High volume calls", async () => { +test.skip("High volume calls", async () => { // This is not a unit test, but rather a small speed test // to make a note that the classic _.debounce is way faster. // For 1000 invocations, _.debounce is roughly 10x faster. From 9ca1834ddfb1fe1380d28c09b66e61b99404ee8e Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Tue, 15 Aug 2023 10:14:15 +0200 Subject: [PATCH 05/21] remove AsyncTaskQueue implementation + specs --- frontend/javascripts/libs/async_task_queue.ts | 143 ------------------ .../javascripts/libs/latest_task_executor.ts | 4 + .../test/libs/async_task_queue.spec.ts | 125 --------------- .../test/libs/latest_task_executor.spec.ts | 4 +- 4 files changed, 6 insertions(+), 270 deletions(-) delete mode 100644 frontend/javascripts/libs/async_task_queue.ts delete mode 100644 frontend/javascripts/test/libs/async_task_queue.spec.ts diff --git a/frontend/javascripts/libs/async_task_queue.ts b/frontend/javascripts/libs/async_task_queue.ts deleted file mode 100644 index 204956d1d6f..00000000000 --- a/frontend/javascripts/libs/async_task_queue.ts +++ /dev/null @@ -1,143 +0,0 @@ -/* eslint-disable no-await-in-loop */ -import { createNanoEvents, Emitter } from "nanoevents"; -import _ from "lodash"; -import Deferred from "libs/deferred"; -import * as Utils from "libs/utils"; -type AsyncTask = () => Promise; - -class AsyncTaskQueue { - // Executes asynchronous tasks in order. - // - // Each action is executed after the previous action - // is finished. Any output of the previous action is - // passed to the current action. - maxRetry: number; - retryTimeMs: number; - failureEventThreshold: number; - tasks: Array = []; - deferreds: Map> = new Map(); - doneDeferred: Deferred = new Deferred(); - retryCount: number = 0; - running: boolean = false; - failed: boolean = false; - emitter: Emitter; - - constructor(maxRetry: number = 3, retryTimeMs: number = 1000, failureEventThreshold: number = 3) { - this.emitter = createNanoEvents(); - - this.maxRetry = maxRetry; - this.retryTimeMs = retryTimeMs; - this.failureEventThreshold = failureEventThreshold; - } - - isBusy(): boolean { - return this.running || this.tasks.length !== 0; - } - - scheduleTask(task: AsyncTask): Promise { - this.tasks.push(task); - const deferred = new Deferred(); - // @ts-expect-error ts-migrate(2345) FIXME: Argument of type 'Deferred' is n... Remove this comment to see the full error message - this.deferreds.set(task, deferred); - - if (this.failed) { - this.restart(); - } - - if (!this.running) { - this.executeNext(); - } - - // @ts-expect-error ts-migrate(2322) FIXME: Type 'Promise' is not assignable to type ... Remove this comment to see the full error message - return deferred.promise(); - } - - scheduleTasks(tasks: Array): Promise { - return Promise.all(tasks.map((task) => this.scheduleTask(task))); - } - - async restart(): Promise { - // To restart the pipeline after it failed. - // Returns a new Promise for the first item. - if (this.failed && this.tasks.length > 0) { - this.failed = false; - this.retryCount = 0; - this.running = false; - // Reinsert first action - await this.executeNext(); - } - } - - signalResolve(task: AsyncTask, obj: any): void { - const deferred = this.deferreds.get(task); - this.deferreds.delete(task); - - if (deferred != null) { - deferred.resolve(obj); - } - } - - signalReject(task: AsyncTask, error: any): void { - const deferred = this.deferreds.get(task); - this.deferreds.delete(task); - - if (deferred != null) { - deferred.reject(error); - } - } - - join(): Promise { - if (this.isBusy()) { - return this.doneDeferred.promise(); - } else { - return Promise.resolve(); - } - } - - async executeNext(): Promise { - this.running = true; - - while (this.tasks.length > 0) { - const currentTask = this.tasks.shift(); - - try { - // @ts-expect-error ts-migrate(2722) FIXME: Cannot invoke an object which is possibly 'undefin... Remove this comment to see the full error message - const response = await currentTask(); - // @ts-expect-error ts-migrate(2345) FIXME: Argument of type 'AsyncTask | undefined' is not as... Remove this comment to see the full error message - this.signalResolve(currentTask, response); - this.emitter.emit("success"); - } catch (error) { - this.retryCount++; - // @ts-expect-error ts-migrate(2345) FIXME: Argument of type 'AsyncTask | undefined' is not as... Remove this comment to see the full error message - this.tasks.unshift(currentTask); - - if (this.retryCount > this.failureEventThreshold) { - console.error("AsyncTaskQueue failed with error", error); - this.emitter.emit("failure", this.retryCount); - } - - if (this.retryCount >= this.maxRetry) { - this.failed = true; - // @ts-expect-error ts-migrate(2345) FIXME: Argument of type 'AsyncTask | undefined' is not as... Remove this comment to see the full error message - this.signalReject(currentTask, error); - this.running = false; - this.doneDeferred.reject(error); - this.doneDeferred = new Deferred(); - return; - } else { - await Utils.sleep(this.retryTimeMs); - } - } - } - - this.running = false; - this.doneDeferred.resolve(); - this.doneDeferred = new Deferred(); - } - - on(event: string | number, cb: (...args: any) => void) { - this.emitter.on(event, cb); - } -} - -export default AsyncTaskQueue; diff --git a/frontend/javascripts/libs/latest_task_executor.ts b/frontend/javascripts/libs/latest_task_executor.ts index dd660cf9672..4486c7ec27c 100644 --- a/frontend/javascripts/libs/latest_task_executor.ts +++ b/frontend/javascripts/libs/latest_task_executor.ts @@ -11,6 +11,10 @@ export const SKIPPED_TASK_REASON = "Skipped task"; * LatestTaskExecutor instance. * * See the corresponding spec for examples. + * + * If you need the same behavior plus cancellation of running + * tasks, take a look at the saga-based `createDebouncedAbortableCallable` + * utility. */ export default class LatestTaskExecutor { diff --git a/frontend/javascripts/test/libs/async_task_queue.spec.ts b/frontend/javascripts/test/libs/async_task_queue.spec.ts deleted file mode 100644 index 8a4c5fa52f8..00000000000 --- a/frontend/javascripts/test/libs/async_task_queue.spec.ts +++ /dev/null @@ -1,125 +0,0 @@ -// @ts-nocheck -import AsyncTaskQueue from "libs/async_task_queue"; -import Deferred from "libs/deferred"; -import * as Utils from "libs/utils"; -import sinon from "sinon"; -import test from "ava"; - -test("AsyncTaskQueue should run a task (1/2)", async (t) => { - t.plan(1); - const queue = new AsyncTaskQueue(); - const task = new Deferred(); - queue.scheduleTask(task.task()); - await Utils.sleep(1); - t.is(queue.isBusy(), true); - task.resolve(); - await queue.join(); -}); -test("AsyncTaskQueue should run a task (2/2)", async (t) => { - t.plan(2); - const queue = new AsyncTaskQueue(); - const task = new Deferred(); - const result = "foo"; - const handle = queue.scheduleTask(task.task()); - await Utils.sleep(1); - t.is(queue.isBusy(), true); - task.resolve(result); - t.deepEqual(await handle, result); -}); -test("AsyncTaskQueue should fail the queue on a failed task", async (t) => { - t.plan(4); - const result = new Error("foo"); - const queue = new AsyncTaskQueue(0); - const task = new Deferred(); - const catcherBox = { - do: () => {}, - }; - sinon.spy(catcherBox, "do"); - const handle = queue.scheduleTask(task.task()); - handle.catch(catcherBox.do); - await Utils.sleep(1); - t.is(queue.isBusy(), true); - task.reject(result); - - try { - await queue.join(); - } catch (err) { - t.true(catcherBox.do.calledWith(result)); - t.is(err, result); - t.is(queue.failed, true); - } -}); -test("AsyncTaskQueue should seralize task execution", async (t) => { - t.plan(1); - const queue = new AsyncTaskQueue(0); - const task1 = new Deferred(); - const task2 = new Deferred(); - const taskLog = []; - const handle1 = queue.scheduleTask(task1.task()); - handle1.then(() => taskLog.push(1)); - const handle2 = queue.scheduleTask(task2.task()); - handle2.then(() => taskLog.push(2)); - await Utils.sleep(1); - task2.resolve(); - task1.resolve(); - await queue.join(); - t.deepEqual(taskLog, [1, 2]); -}); -test.serial("AsyncTaskQueue should retry failed tasks (1/2)", async (t) => { - t.plan(3); - const result = new Error("foo"); - const queue = new AsyncTaskQueue(3, 1); - const deferredBox = { - value: new Deferred(), - }; - - const task = () => deferredBox.value.promise(); - - const catcherBox = { - do: () => {}, - }; - sinon.spy(catcherBox, "do"); - const handle = queue.scheduleTask(task); - handle.catch(catcherBox.do); - - for (let i = 0; i < 3; i++) { - // eslint-disable-next-line no-await-in-loop - await Utils.sleep(5); - deferredBox.value.reject(result); - deferredBox.value = new Deferred(); - } - - try { - await queue.join(); - } catch (err) { - t.true(catcherBox.do.calledWith(result)); - t.is(err, result); - t.is(queue.failed, true); - } -}); -test.serial("AsyncTaskQueue should retry failed tasks (2/2)", async (t) => { - t.plan(1); - const result = new Error("foo"); - const queue = new AsyncTaskQueue(3, 1); - const deferredBox = { - value: new Deferred(), - }; - - const task = () => deferredBox.value.promise(); - - const handle = queue.scheduleTask(task); - - for (let i = 0; i < 2; i++) { - // eslint-disable-next-line no-await-in-loop - await Utils.sleep(5); - deferredBox.value.reject(result); - deferredBox.value = new Deferred(); - } - - await Utils.sleep(5); - deferredBox.value.resolve(); - await handle; - await queue.join(); - // ensure that we get to this point - t.pass(); -}); diff --git a/frontend/javascripts/test/libs/latest_task_executor.spec.ts b/frontend/javascripts/test/libs/latest_task_executor.spec.ts index 1226062a05d..437efff38ac 100644 --- a/frontend/javascripts/test/libs/latest_task_executor.spec.ts +++ b/frontend/javascripts/test/libs/latest_task_executor.spec.ts @@ -1,13 +1,13 @@ import Deferred from "libs/deferred"; import test from "ava"; import LatestTaskExecutor, { SKIPPED_TASK_REASON } from "libs/latest_task_executor"; -// @ts-expect-error ts-migrate(2769) FIXME: No overload matches this call. + test("LatestTaskExecutor: One task", async (t) => { const executor = new LatestTaskExecutor(); const deferred1 = new Deferred(); const scheduledPromise = executor.schedule(() => deferred1.promise()); deferred1.resolve(true); - return scheduledPromise.then((result) => { + await scheduledPromise.then((result) => { t.true(result); }); }); From a5967526dfb190255e1113681a8dd060c9a94494 Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Tue, 15 Aug 2023 12:18:08 +0200 Subject: [PATCH 06/21] implement small AsyncFifoResolver to prevent theoretical race condition --- .../javascripts/libs/async_fifo_resolver.ts | 34 +++++++++ .../model/bucket_data_handling/pushqueue.ts | 41 ++++++++-- .../bucket_data_handling/wkstore_adapter.ts | 11 ++- .../test/libs/async_fifo_resolver.spec.ts | 76 +++++++++++++++++++ .../libs/debounced_abortable_saga.spec.ts | 6 +- 5 files changed, 151 insertions(+), 17 deletions(-) create mode 100644 frontend/javascripts/libs/async_fifo_resolver.ts create mode 100644 frontend/javascripts/test/libs/async_fifo_resolver.spec.ts diff --git a/frontend/javascripts/libs/async_fifo_resolver.ts b/frontend/javascripts/libs/async_fifo_resolver.ts new file mode 100644 index 00000000000..75fe7ab5a41 --- /dev/null +++ b/frontend/javascripts/libs/async_fifo_resolver.ts @@ -0,0 +1,34 @@ +/* + * This class can be used to await promises + * in the order they were passed to + * orderedWaitFor. + * + * This enables scheduling of asynchronous work + * concurrently while ensuring that the results + * are processed in the order they were requested + * (instead of the order in which they finished). + * + * Example: + * const resolver = new AsyncFifoResolver(); + * const promise1Done = resolver.orderedWaitFor(promise1); + * const promise2Done = resolver.orderedWaitFor(promise2); + * + * Even if promise2 resolves before promise1, promise2Done + * will resolve *after* promise1Done. + */ + +export class AsyncFifoResolver { + queue: Promise[]; + constructor() { + this.queue = []; + } + + async orderedWaitFor(promise: Promise): Promise { + this.queue.push(promise); + const promiseCount = this.queue.length; + const retVals = await Promise.all(this.queue); + // Trim awaited promises + this.queue = this.queue.slice(promiseCount); + return retVals[promiseCount - 1]; + } +} diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts index be5609d0293..6f5a1fc6d60 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts @@ -1,10 +1,15 @@ import _ from "lodash"; import type { DataBucket } from "oxalis/model/bucket_data_handling/bucket"; import { alert } from "libs/window"; -import { sendToStore } from "oxalis/model/bucket_data_handling/wkstore_adapter"; +import { createCompressedUpdateBucketActions } from "oxalis/model/bucket_data_handling/wkstore_adapter"; import type DataCube from "oxalis/model/bucket_data_handling/data_cube"; import { createDebouncedAbortableParameterlessCallable } from "libs/debounced_abortable_saga"; import { call } from "redux-saga/effects"; +import { Store } from "oxalis/singletons"; +import { pushSaveQueueTransaction } from "../actions/save_actions"; +import { UpdateAction } from "../sagas/update_actions"; +import { AsyncFifoResolver } from "libs/async_fifo_resolver"; + export const COMPRESSING_BATCH_SIZE = 32; // Only process the PushQueue after there was no user interaction (or bucket modification due to // downsampling) for PUSH_DEBOUNCE_TIME milliseconds... @@ -17,6 +22,8 @@ const _PUSH_DEBOUNCE_MAX_WAIT_TIME = 30000; class PushQueue { cube: DataCube; + private fifoResolver = new AsyncFifoResolver(); + // The pendingQueue contains all buckets that should be: // - snapshotted, // - put into one transaction and then @@ -40,7 +47,7 @@ class PushQueue { return ( this.pendingQueue.size === 0 && this.cube.temporalBucketManager.getCount() === 0 && - this.pendingTransactionCount == 0 + this.pendingTransactionCount === 0 ); } @@ -80,29 +87,47 @@ class PushQueue { }; private flushAndSnapshot() { - console.log("this.pendingQueue.size", this.pendingQueue.size); - // Flush pendingQueue. Note that it's important to do this synchronously. // If other actors could add to queue concurrently, the front-end could // send an inconsistent state for a transaction. + console.log("Flush pending queue with size:", this.pendingQueue.size); const batch: DataBucket[] = Array.from(this.pendingQueue); this.pendingQueue = new Set(); - // Fire and forget + // Fire and forget. The correct transaction ordering is ensured + // within pushTransaction. this.pushTransaction(batch); } + // todo: prevent user from brushing for eternity? // push = _.debounce(this.pushImpl, PUSH_DEBOUNCE_TIME, { // maxWait: PUSH_DEBOUNCE_MAX_WAIT_TIME, // }); - // todo: prevent user from brushing for eternity? push = createDebouncedAbortableParameterlessCallable(this.pushImpl, PUSH_DEBOUNCE_TIME, this); async pushTransaction(batch: Array): Promise { - // The batch will be put into one transaction. + /* + * Create a transaction from the batch and push it into the save queue. + */ this.pendingTransactionCount++; - await sendToStore(batch, this.cube.layerName); + + // Start the compression job. Note that an older invocation of + // createCompressedUpdateBucketActions might still be running. + // We can still *start* a new compression job, but we want to ensure + // that the jobs are processed in the order they were initiated. + // This is done using orderedWaitFor. + // Addendum: + // In practice, this won't matter much since compression jobs + // are processed by a pool of webworkers in fifo-order, anyway. + // However, there is a theoretical chance of a race condition, + // since the fifo-ordering is only ensured for starting the webworker + // and not for receiving the return values. + const items = await this.fifoResolver.orderedWaitFor( + createCompressedUpdateBucketActions(batch), + ); + Store.dispatch(pushSaveQueueTransaction(items, "volume", this.cube.layerName)); + this.pendingTransactionCount--; } } diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts index 6e25bf0638c..269e502b730 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts @@ -10,7 +10,6 @@ import { } from "oxalis/model/accessors/dataset_accessor"; import { getVolumeTracingById } from "oxalis/model/accessors/volumetracing_accessor"; import { parseAsMaybe } from "libs/utils"; -import { pushSaveQueueTransaction } from "oxalis/model/actions/save_actions"; import type { UpdateAction } from "oxalis/model/sagas/update_actions"; import { updateBucket } from "oxalis/model/sagas/update_actions"; import ByteArraysToLz4Base64Worker from "oxalis/workers/byte_arrays_to_lz4_base64.worker"; @@ -245,10 +244,12 @@ function sliceBufferIntoPieces( return bucketBuffers; } -export async function sendToStore(batch: Array, tracingId: string): Promise { - const items: Array = _.flatten( +export async function createCompressedUpdateBucketActions( + batch: Array, +): Promise { + return _.flatten( await Promise.all( - _.chunk(batch, COMPRESSION_BATCH_SIZE).map(async (batchSubset): Promise => { + _.chunk(batch, COMPRESSION_BATCH_SIZE).map(async (batchSubset) => { const byteArrays = []; for (const bucket of batchSubset) { const data = bucket.getCopyOfData(); @@ -265,6 +266,4 @@ export async function sendToStore(batch: Array, tracingId: string): }), ), ); - - Store.dispatch(pushSaveQueueTransaction(items, "volume", tracingId)); } diff --git a/frontend/javascripts/test/libs/async_fifo_resolver.spec.ts b/frontend/javascripts/test/libs/async_fifo_resolver.spec.ts new file mode 100644 index 00000000000..49eab827add --- /dev/null +++ b/frontend/javascripts/test/libs/async_fifo_resolver.spec.ts @@ -0,0 +1,76 @@ +import { sleep } from "libs/utils"; +import test from "ava"; +import _ from "lodash"; +import { AsyncFifoResolver } from "libs/async_fifo_resolver"; + +const createSubmitterFnWithProtocol = () => { + const resolver = new AsyncFifoResolver(); + const protocol: string[] = []; + + async function submitter(id: number, duration: number) { + protocol.push(`started-${id}`); + await resolver.orderedWaitFor( + sleep(duration).then(() => protocol.push(`sleep-finished-${id}`)), + ); + protocol.push(`finished-${id}`); + } + + return { submitter, resolver, protocol }; +}; + +test("AsyncFifoResolver: Test simplest case", async (t) => { + t.plan(2); + + const { submitter, resolver, protocol } = createSubmitterFnWithProtocol(); + submitter(1, 10); + submitter(2, 10); + + // Wait until everything is done + await resolver.orderedWaitFor(Promise.resolve()); + + t.deepEqual(protocol, [ + "started-1", + "started-2", + "sleep-finished-1", + "finished-1", + "sleep-finished-2", + "finished-2", + ]); + t.is(resolver.queue.length, 0); +}); + +test("AsyncFifoResolver: Test out-of-order sleeps should still finish in order", async (t) => { + t.plan(2); + + const { submitter, resolver, protocol } = createSubmitterFnWithProtocol(); + submitter(1, 50); + submitter(2, 10); + + // Wait until everything is done + await resolver.orderedWaitFor(Promise.resolve()); + + t.deepEqual(protocol, [ + "started-1", + "started-2", + "sleep-finished-2", + "sleep-finished-1", + "finished-1", + "finished-2", + ]); + t.is(resolver.queue.length, 0); +}); + +test("AsyncFifoResolver: New submits shouldn't block old ones.", async (t) => { + t.plan(2); + + const { submitter, resolver, protocol } = createSubmitterFnWithProtocol(); + // The first submitter should finish through and should not be blocked + // by the second one. + submitter(1, 50); + submitter(2, 1000); + + await sleep(50); + + t.deepEqual(protocol, ["started-1", "started-2", "sleep-finished-1", "finished-1"]); + t.is(resolver.queue.length, 1); +}); diff --git a/frontend/javascripts/test/libs/debounced_abortable_saga.spec.ts b/frontend/javascripts/test/libs/debounced_abortable_saga.spec.ts index 45b9894e303..941880b9ea8 100644 --- a/frontend/javascripts/test/libs/debounced_abortable_saga.spec.ts +++ b/frontend/javascripts/test/libs/debounced_abortable_saga.spec.ts @@ -16,7 +16,7 @@ const createAbortableFnWithProtocol = () => { }; const DEBOUNCE_THRESHOLD = 100; -test("Test simplest case", async (t) => { +test("DebouncedAbortableSaga: Test simplest case", async (t) => { t.plan(1); const { abortableFn, protocol } = createAbortableFnWithProtocol(); const fn = createDebouncedAbortableCallable(abortableFn, DEBOUNCE_THRESHOLD, this); @@ -27,7 +27,7 @@ test("Test simplest case", async (t) => { t.deepEqual(protocol, ["await-1", "run-1"]); }); -test("Rapid calls where the last one should win", async (t) => { +test("DebouncedAbortableSaga: Rapid calls where the last one should win", async (t) => { t.plan(1); const { abortableFn, protocol } = createAbortableFnWithProtocol(); const fn = createDebouncedAbortableCallable(abortableFn, DEBOUNCE_THRESHOLD, this); @@ -40,7 +40,7 @@ test("Rapid calls where the last one should win", async (t) => { t.deepEqual(protocol, ["await-3", "run-3"]); }); -test("Rapid calls with small breaks", async (t) => { +test("DebouncedAbortableSaga: Rapid calls with small breaks", async (t) => { t.plan(1); const { abortableFn, protocol } = createAbortableFnWithProtocol(); const fn = createDebouncedAbortableCallable(abortableFn, DEBOUNCE_THRESHOLD, this); From bd5d355831a7f80bf3a13ac5ba4ea840d2a713b9 Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Tue, 15 Aug 2023 15:49:49 +0200 Subject: [PATCH 07/21] ensure that the save saga consumes N items from the save queue where N is the size of the queue at the time the auto-save-interval kicked in --- frontend/javascripts/oxalis/model.ts | 8 +-- .../model/bucket_data_handling/pushqueue.ts | 14 ++++- .../oxalis/model/sagas/save_saga.ts | 59 ++++++++++++------- .../oxalis/view/action-bar/save_button.tsx | 7 ++- 4 files changed, 60 insertions(+), 28 deletions(-) diff --git a/frontend/javascripts/oxalis/model.ts b/frontend/javascripts/oxalis/model.ts index a6e4ba7302a..0de69687273 100644 --- a/frontend/javascripts/oxalis/model.ts +++ b/frontend/javascripts/oxalis/model.ts @@ -1,5 +1,4 @@ import _ from "lodash"; -import { COMPRESSING_BATCH_SIZE } from "oxalis/model/bucket_data_handling/pushqueue"; import type { Vector3 } from "oxalis/constants"; import type { Versions } from "oxalis/view/version_view"; import { getActiveSegmentationTracingLayer } from "oxalis/model/accessors/volumetracing_accessor"; @@ -288,14 +287,13 @@ export class OxalisModel { getPushQueueStats() { const compressingBucketCount = _.sum( - Utils.values(this.dataLayers).map( - (dataLayer) => - dataLayer.pushQueue.compressionTaskQueue.tasks.length * COMPRESSING_BATCH_SIZE, + Utils.values(this.dataLayers).map((dataLayer) => + dataLayer.pushQueue.getCompressingBucketCount(), ), ); const waitingForCompressionBucketCount = _.sum( - Utils.values(this.dataLayers).map((dataLayer) => dataLayer.pushQueue.pendingQueue.size), + Utils.values(this.dataLayers).map((dataLayer) => dataLayer.pushQueue.getPendingQueueSize()), ); return { diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts index 6f5a1fc6d60..3f824044140 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts @@ -10,7 +10,6 @@ import { pushSaveQueueTransaction } from "../actions/save_actions"; import { UpdateAction } from "../sagas/update_actions"; import { AsyncFifoResolver } from "libs/async_fifo_resolver"; -export const COMPRESSING_BATCH_SIZE = 32; // Only process the PushQueue after there was no user interaction (or bucket modification due to // downsampling) for PUSH_DEBOUNCE_TIME milliseconds... const PUSH_DEBOUNCE_TIME = 1000; @@ -38,6 +37,9 @@ class PushQueue { // During that compression, the transaction is counted as pending. private pendingTransactionCount: number = 0; + // Store the number of buckets that are currently being compressed. + private compressingBucketCount: number = 0; + constructor(cube: DataCube) { this.cube = cube; this.pendingQueue = new Set(); @@ -59,6 +61,14 @@ class PushQueue { this.push(); } + getPendingQueueSize(): number { + return this.pendingQueue.size; + } + + getCompressingBucketCount(): number { + return this.compressingBucketCount; + } + clear(): void { this.pendingQueue.clear(); } @@ -111,6 +121,7 @@ class PushQueue { * Create a transaction from the batch and push it into the save queue. */ this.pendingTransactionCount++; + this.compressingBucketCount += batch.length; // Start the compression job. Note that an older invocation of // createCompressedUpdateBucketActions might still be running. @@ -129,6 +140,7 @@ class PushQueue { Store.dispatch(pushSaveQueueTransaction(items, "volume", this.cube.layerName)); this.pendingTransactionCount--; + this.compressingBucketCount -= batch.length; } } diff --git a/frontend/javascripts/oxalis/model/sagas/save_saga.ts b/frontend/javascripts/oxalis/model/sagas/save_saga.ts index e65f2fe836f..c31444481e5 100644 --- a/frontend/javascripts/oxalis/model/sagas/save_saga.ts +++ b/frontend/javascripts/oxalis/model/sagas/save_saga.ts @@ -88,27 +88,37 @@ export function* pushSaveQueueAsync(saveQueueType: SaveQueueType, tracingId: str }); yield* put(setSaveBusyAction(true, saveQueueType)); - if (forcePush) { - while (true) { - // Send batches to the server until the save queue is empty. - saveQueue = yield* select((state) => selectQueue(state, saveQueueType, tracingId)); - - if (saveQueue.length > 0) { - yield* call(sendRequestToServer, saveQueueType, tracingId); - } else { - break; - } - } - } else { + // Send (parts) of the save queue to the server. + // There are two main cases: + // 1) forcePush is true + // The user explicitly requested to save an annotation. + // In this case, batches are sent to the server until the save + // queue is empty. Note that the save queue might be added to + // while saving is in progress. Still, the save queue will be + // drained until it is empty. If the user hits save and continuously + // annotates further, a high number of save-requests might be sent. + // 2) forcePush is false + // The auto-save interval was reached at time T. The following code + // will determine how many items are in the save queue at this time T. + // Exactly that many items will be sent to the server. + // New items that might be added to the save queue during saving, will + // ignored (they will be picked up in the next iteration of this loop). + const itemCountToSave = forcePush + ? Infinity + : yield* select((state) => selectQueue(state, saveQueueType, tracingId).length); + let savedItemCount = 0; + while (savedItemCount < itemCountToSave) { + // Saving the tracing automatically (via timeout) only saves the current state. + // It does not require to reach an empty saveQueue. This is especially + // important when the auto-saving happens during continuous movements. + // Always draining the save queue completely would mean that save + // requests are sent as long as the user moves. saveQueue = yield* select((state) => selectQueue(state, saveQueueType, tracingId)); if (saveQueue.length > 0) { - // Saving the tracing automatically (via timeout) only saves the current state. - // It does not require to reach an empty saveQueue. This is especially - // important when the auto-saving happens during continuous movements. - // Always draining the save queue completely would mean that save - // requests are sent as long as the user moves. - yield* call(sendRequestToServer, saveQueueType, tracingId); + savedItemCount += yield* call(sendRequestToServer, saveQueueType, tracingId); + } else { + break; } } @@ -151,7 +161,16 @@ function getRetryWaitTime(retryCount: number) { // at any time, because the browser page is reloaded after the message is shown, anyway. let didShowFailedSimultaneousTracingError = false; -export function* sendRequestToServer(saveQueueType: SaveQueueType, tracingId: string): Saga { +export function* sendRequestToServer( + saveQueueType: SaveQueueType, + tracingId: string, +): Saga { + /* + * Saves a reasonably-sized part of the save queue (that corresponds to the + * tracingId) to the server (plus retry-mechanism). + * The saga returns the number of save queue items that were saved. + */ + const fullSaveQueue = yield* select((state) => selectQueue(state, saveQueueType, tracingId)); const saveQueue = sliceAppropriateBatchCount(fullSaveQueue); console.log("saving", saveQueue.length, "items out of", fullSaveQueue.length); @@ -207,7 +226,7 @@ export function* sendRequestToServer(saveQueueType: SaveQueueType, tracingId: st } yield* call(toggleErrorHighlighting, false); - return; + return saveQueue.length; } catch (error) { if (exceptionDuringMarkBucketsAsNotDirty) { throw error; diff --git a/frontend/javascripts/oxalis/view/action-bar/save_button.tsx b/frontend/javascripts/oxalis/view/action-bar/save_button.tsx index 2f0e41b0340..274edc2538b 100644 --- a/frontend/javascripts/oxalis/view/action-bar/save_button.tsx +++ b/frontend/javascripts/oxalis/view/action-bar/save_button.tsx @@ -72,9 +72,12 @@ class SaveButton extends React.PureComponent { _forceUpdate = () => { const isStateSaved = Model.stateSaved(); const oldestUnsavedTimestamp = getOldestUnsavedTimestamp(Store.getState().save.queue); + + // todo: get oldest unsaved timestamp from push queue. + // + const unsavedDuration = - // @ts-expect-error ts-migrate(2362) FIXME: The left-hand side of an arithmetic operation must... Remove this comment to see the full error message - oldestUnsavedTimestamp != null ? new Date() - oldestUnsavedTimestamp : 0; + oldestUnsavedTimestamp != null ? Date.now() - oldestUnsavedTimestamp : 0; const showUnsavedWarning = unsavedDuration > UNSAVED_WARNING_THRESHOLD; if (showUnsavedWarning) { From b0254a49bc8a09a9163c2266b56aff1d0bda25f4 Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Tue, 15 Aug 2023 17:24:17 +0200 Subject: [PATCH 08/21] fix tests --- .circleci/not-on-master.sh | 8 ++++---- .../model/bucket_data_handling/pushqueue.ts | 2 +- .../oxalis/model/sagas/save_saga.ts | 10 ++++------ .../oxalis/model/sagas/save_saga_constants.ts | 4 ++-- .../binary/layers/wkstore_adapter.spec.ts | 10 ++++++++-- .../javascripts/test/sagas/save_saga.spec.ts | 19 ++++++++++++------- 6 files changed, 31 insertions(+), 22 deletions(-) diff --git a/.circleci/not-on-master.sh b/.circleci/not-on-master.sh index e3078cdb9ce..581393ebead 100755 --- a/.circleci/not-on-master.sh +++ b/.circleci/not-on-master.sh @@ -1,8 +1,8 @@ #!/usr/bin/env bash set -Eeuo pipefail -# if [ "${CIRCLE_BRANCH}" == "master" ]; then +if [ "${CIRCLE_BRANCH}" == "master" ]; then echo "Skipping this step on master..." -# else -# exec "$@" -# fi +else + exec "$@" +fi diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts index 3f824044140..a2f87ed1f5c 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts @@ -5,7 +5,7 @@ import { createCompressedUpdateBucketActions } from "oxalis/model/bucket_data_ha import type DataCube from "oxalis/model/bucket_data_handling/data_cube"; import { createDebouncedAbortableParameterlessCallable } from "libs/debounced_abortable_saga"; import { call } from "redux-saga/effects"; -import { Store } from "oxalis/singletons"; +import Store from "oxalis/store"; import { pushSaveQueueTransaction } from "../actions/save_actions"; import { UpdateAction } from "../sagas/update_actions"; import { AsyncFifoResolver } from "libs/async_fifo_resolver"; diff --git a/frontend/javascripts/oxalis/model/sagas/save_saga.ts b/frontend/javascripts/oxalis/model/sagas/save_saga.ts index c31444481e5..c48379509c9 100644 --- a/frontend/javascripts/oxalis/model/sagas/save_saga.ts +++ b/frontend/javascripts/oxalis/model/sagas/save_saga.ts @@ -73,7 +73,7 @@ export function* pushSaveQueueAsync(saveQueueType: SaveQueueType, tracingId: str if (saveQueue.length === 0) { if (loopCounter % 100 === 0) { - // See https://github.com/scalableminds/webknossos/pull/6076 for an explanation + // See https://github.com/scalableminds/webknossos/pull/6076 (or 82e16e1) for an explanation // of this delay call. yield* delay(0); } @@ -103,16 +103,14 @@ export function* pushSaveQueueAsync(saveQueueType: SaveQueueType, tracingId: str // Exactly that many items will be sent to the server. // New items that might be added to the save queue during saving, will // ignored (they will be picked up in the next iteration of this loop). + // Otherwise, the risk of a high number of save-requests (see case 1) + // would be present here, too (note the risk would be greater, because the + // user didn't use the save button which is usually accompanied a small pause). const itemCountToSave = forcePush ? Infinity : yield* select((state) => selectQueue(state, saveQueueType, tracingId).length); let savedItemCount = 0; while (savedItemCount < itemCountToSave) { - // Saving the tracing automatically (via timeout) only saves the current state. - // It does not require to reach an empty saveQueue. This is especially - // important when the auto-saving happens during continuous movements. - // Always draining the save queue completely would mean that save - // requests are sent as long as the user moves. saveQueue = yield* select((state) => selectQueue(state, saveQueueType, tracingId)); if (saveQueue.length > 0) { diff --git a/frontend/javascripts/oxalis/model/sagas/save_saga_constants.ts b/frontend/javascripts/oxalis/model/sagas/save_saga_constants.ts index 30e90a68ef1..53abc5b0b43 100644 --- a/frontend/javascripts/oxalis/model/sagas/save_saga_constants.ts +++ b/frontend/javascripts/oxalis/model/sagas/save_saga_constants.ts @@ -11,5 +11,5 @@ export const UNDO_HISTORY_SIZE = 20; export const SETTINGS_RETRY_DELAY = 15 * 1000; export const SETTINGS_MAX_RETRY_COUNT = 20; // 20 * 15s == 5m -export const maximumActionCountPerBatch = 20; -export const maximumActionCountPerSave = 60; +export const maximumActionCountPerBatch = 5000; //20; // 5000 +export const maximumActionCountPerSave = 15000; // 60; // 15000 diff --git a/frontend/javascripts/test/model/binary/layers/wkstore_adapter.spec.ts b/frontend/javascripts/test/model/binary/layers/wkstore_adapter.spec.ts index 6e188adaecc..badbdfbe361 100644 --- a/frontend/javascripts/test/model/binary/layers/wkstore_adapter.spec.ts +++ b/frontend/javascripts/test/model/binary/layers/wkstore_adapter.spec.ts @@ -57,7 +57,10 @@ const StoreMock = { mockRequire("libs/request", RequestMock); mockRequire("oxalis/store", StoreMock); const { DataBucket } = mockRequire.reRequire("oxalis/model/bucket_data_handling/bucket"); -const { requestWithFallback, sendToStore } = mockRequire.reRequire( + +const PushQueue = mockRequire.reRequire("oxalis/model/bucket_data_handling/pushqueue").default; + +const { requestWithFallback } = mockRequire.reRequire( "oxalis/model/bucket_data_handling/wkstore_adapter", ); const tokenResponse = { @@ -263,7 +266,10 @@ test.serial("sendToStore: Request Handling should send the correct request param saveQueueType: "volume", tracingId, }; - return sendToStore(batch, tracingId).then(() => { + + const pushQueue = new PushQueue({ ...mockedCube, layerName: tracingId }); + + return pushQueue.pushTransaction(batch).then(() => { t.is(StoreMock.dispatch.callCount, 1); const [saveQueueItems] = StoreMock.dispatch.getCall(0).args; t.deepEqual(saveQueueItems, expectedSaveQueueItems); diff --git a/frontend/javascripts/test/sagas/save_saga.spec.ts b/frontend/javascripts/test/sagas/save_saga.spec.ts index a655e266277..8999696c951 100644 --- a/frontend/javascripts/test/sagas/save_saga.spec.ts +++ b/frontend/javascripts/test/sagas/save_saga.spec.ts @@ -89,18 +89,23 @@ test("SaveSaga should send update actions", (t) => { expectValueDeepEqual(t, saga.next([]), take("PUSH_SAVE_QUEUE_TRANSACTION")); saga.next(); // race - saga.next({ - forcePush: SaveActions.saveNowAction(), - }); - saga.next(); // select state + expectValueDeepEqual( + t, + saga.next({ + forcePush: SaveActions.saveNowAction(), + }), + put(setSaveBusyAction(true, TRACING_TYPE)), + ); + + saga.next(); // advance to next select state expectValueDeepEqual(t, saga.next(saveQueue), call(sendRequestToServer, TRACING_TYPE, tracingId)); - saga.next(); // select state + saga.next(saveQueue.length); // select state expectValueDeepEqual(t, saga.next([]), put(setSaveBusyAction(false, TRACING_TYPE))); + // Test that loop repeats saga.next(); // select state - expectValueDeepEqual(t, saga.next([]), take("PUSH_SAVE_QUEUE_TRANSACTION")); }); test("SaveSaga should send request to server", (t) => { @@ -214,7 +219,7 @@ test("SaveSaga should send update actions right away and try to reach a state wh saga.next(saveQueue); // call sendRequestToServer - saga.next(); // select state + saga.next(1); // advance to select state expectValueDeepEqual(t, saga.next([]), put(setSaveBusyAction(false, TRACING_TYPE))); }); From 8a5d3877167f237704d46e05cd788be91b48e946 Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Tue, 15 Aug 2023 17:32:39 +0200 Subject: [PATCH 09/21] fix accidentally skipped tests; improve linting rule to avoid this; fix broken segment group test --- .../volumetracing/volumetracing_saga_integration.spec.ts | 5 +++-- tools/assert-no-test-only.sh | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/frontend/javascripts/test/sagas/volumetracing/volumetracing_saga_integration.spec.ts b/frontend/javascripts/test/sagas/volumetracing/volumetracing_saga_integration.spec.ts index 68c483cdfa1..3e2f9052d99 100644 --- a/frontend/javascripts/test/sagas/volumetracing/volumetracing_saga_integration.spec.ts +++ b/frontend/javascripts/test/sagas/volumetracing/volumetracing_saga_integration.spec.ts @@ -953,7 +953,8 @@ test.serial("Undo for deleting segment group (with recursion)", async (t) => { const stateRestored = Store.getState(); const tracingRestored = stateRestored.tracing.volumes[0]; - t.is(tracingRestored.segmentGroups.length, 2); + t.is(tracingRestored.segmentGroups.length, 1); + t.is(tracingRestored.segmentGroups[0]?.children.length || 0, 1); t.is(tracingRestored.segments.size(), 4); t.is(tracingRestored.segments.get(1).groupId, 1); @@ -962,7 +963,7 @@ test.serial("Undo for deleting segment group (with recursion)", async (t) => { t.is(tracingRestored.segments.get(4).groupId, 2); }); -test.serial.only("Undo for deleting segment group (bug repro)", async (t) => { +test.serial("Undo for deleting segment group (bug repro)", async (t) => { const volumeTracingLayerName = t.context.api.data.getVolumeTracingLayerIds()[0]; const position = [1, 2, 3] as Vector3; Store.dispatch(clickSegmentAction(1, position)); diff --git a/tools/assert-no-test-only.sh b/tools/assert-no-test-only.sh index 4d726fbea90..0ee1f606a9b 100755 --- a/tools/assert-no-test-only.sh +++ b/tools/assert-no-test-only.sh @@ -1,4 +1,5 @@ #!/bin/bash echo "Checking for test.only() in test files." ! grep -r "test\.only(" frontend/javascripts/test || echo "Found test files with test.only() which disables other tests. Please remove the only modifier." +! grep -r "test\.serial\.only(" frontend/javascripts/test || echo "Found test files with test.only() which disables other tests. Please remove the only modifier." echo "Done" From a2574b86d0a380797a5cfdfb253100a0edfc32de Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Tue, 15 Aug 2023 17:55:52 +0200 Subject: [PATCH 10/21] harden error handling in PushQueue --- .../oxalis/model/actions/actions.ts | 11 +++- .../model/bucket_data_handling/pushqueue.ts | 65 +++++++++++-------- .../oxalis/model/sagas/root_saga.ts | 13 +++- 3 files changed, 60 insertions(+), 29 deletions(-) diff --git a/frontend/javascripts/oxalis/model/actions/actions.ts b/frontend/javascripts/oxalis/model/actions/actions.ts index a6bef847adc..7fcea4d403c 100644 --- a/frontend/javascripts/oxalis/model/actions/actions.ts +++ b/frontend/javascripts/oxalis/model/actions/actions.ts @@ -14,6 +14,8 @@ import type { ConnectomeAction } from "oxalis/model/actions/connectome_actions"; import { ProofreadAction } from "oxalis/model/actions/proofread_actions"; import { OrganizationAction } from "oxalis/model/actions/organization_actions"; +export type EscalateErrorAction = ReturnType; + export type Action = | SkeletonTracingAction | VolumeTracingAction @@ -32,7 +34,8 @@ export type Action = | OrganizationAction | ReturnType | ReturnType - | ReturnType; + | ReturnType + | EscalateErrorAction; export const wkReadyAction = () => ({ @@ -48,3 +51,9 @@ export const restartSagaAction = () => ({ type: "RESTART_SAGA", } as const); + +export const escalateErrorAction = (error: unknown) => + ({ + type: "ESCALATE_ERROR", + error, + } as const); diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts index a2f87ed1f5c..70cd8b1b67c 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts @@ -9,6 +9,7 @@ import Store from "oxalis/store"; import { pushSaveQueueTransaction } from "../actions/save_actions"; import { UpdateAction } from "../sagas/update_actions"; import { AsyncFifoResolver } from "libs/async_fifo_resolver"; +import { escalateErrorAction } from "../actions/actions"; // Only process the PushQueue after there was no user interaction (or bucket modification due to // downsampling) for PUSH_DEBOUNCE_TIME milliseconds... @@ -21,8 +22,6 @@ const _PUSH_DEBOUNCE_MAX_WAIT_TIME = 30000; class PushQueue { cube: DataCube; - private fifoResolver = new AsyncFifoResolver(); - // The pendingQueue contains all buckets that should be: // - snapshotted, // - put into one transaction and then @@ -40,6 +39,10 @@ class PushQueue { // Store the number of buckets that are currently being compressed. private compressingBucketCount: number = 0; + // Helper to ensure the Store's save queue is filled in the correct + // order. + private fifoResolver = new AsyncFifoResolver(); + constructor(cube: DataCube) { this.cube = cube; this.pendingQueue = new Set(); @@ -88,10 +91,13 @@ class PushQueue { // mechanism, because it could get cancelled due to // createDebouncedAbortableParameterlessCallable otherwise. this.flushAndSnapshot(); - } catch (_error) { - // Error Recovery! - // todo: somewhere else? - alert("We've encountered a permanent issue while saving. Please try to reload the page."); + } catch (error) { + // The above code is critical for saving volume data. Because the + // code is invoked asynchronously, there won't be a default error + // handling in case it crashes due to a bug. + // Therefore, escalate the error manually so that the sagas will crash + // (notifying the user and stopping further potentially undefined behavior). + Store.dispatch(escalateErrorAction(error)); } console.log("pushImpl end"); }; @@ -120,27 +126,32 @@ class PushQueue { /* * Create a transaction from the batch and push it into the save queue. */ - this.pendingTransactionCount++; - this.compressingBucketCount += batch.length; - - // Start the compression job. Note that an older invocation of - // createCompressedUpdateBucketActions might still be running. - // We can still *start* a new compression job, but we want to ensure - // that the jobs are processed in the order they were initiated. - // This is done using orderedWaitFor. - // Addendum: - // In practice, this won't matter much since compression jobs - // are processed by a pool of webworkers in fifo-order, anyway. - // However, there is a theoretical chance of a race condition, - // since the fifo-ordering is only ensured for starting the webworker - // and not for receiving the return values. - const items = await this.fifoResolver.orderedWaitFor( - createCompressedUpdateBucketActions(batch), - ); - Store.dispatch(pushSaveQueueTransaction(items, "volume", this.cube.layerName)); - - this.pendingTransactionCount--; - this.compressingBucketCount -= batch.length; + try { + this.pendingTransactionCount++; + this.compressingBucketCount += batch.length; + + // Start the compression job. Note that an older invocation of + // createCompressedUpdateBucketActions might still be running. + // We can still *start* a new compression job, but we want to ensure + // that the jobs are processed in the order they were initiated. + // This is done using orderedWaitFor. + // Addendum: + // In practice, this won't matter much since compression jobs + // are processed by a pool of webworkers in fifo-order, anyway. + // However, there is a theoretical chance of a race condition, + // since the fifo-ordering is only ensured for starting the webworker + // and not for receiving the return values. + const items = await this.fifoResolver.orderedWaitFor( + createCompressedUpdateBucketActions(batch), + ); + Store.dispatch(pushSaveQueueTransaction(items, "volume", this.cube.layerName)); + + this.pendingTransactionCount--; + this.compressingBucketCount -= batch.length; + } catch (error) { + // See other usage of escalateErrorAction for a detailed explanation. + Store.dispatch(escalateErrorAction(error)); + } } } diff --git a/frontend/javascripts/oxalis/model/sagas/root_saga.ts b/frontend/javascripts/oxalis/model/sagas/root_saga.ts index 6b0cef8ed36..bf0b715305a 100644 --- a/frontend/javascripts/oxalis/model/sagas/root_saga.ts +++ b/frontend/javascripts/oxalis/model/sagas/root_saga.ts @@ -1,5 +1,5 @@ import type { Saga } from "oxalis/model/sagas/effect-generators"; -import { all, call, cancel, fork, take } from "typed-redux-saga"; +import { all, call, cancel, fork, take, takeEvery } from "typed-redux-saga"; import { alert } from "libs/window"; import VolumetracingSagas from "oxalis/model/sagas/volumetracing_saga"; import SaveSagas, { toggleErrorHighlighting } from "oxalis/model/sagas/save_saga"; @@ -19,6 +19,7 @@ import MappingSaga from "oxalis/model/sagas/mapping_saga"; import ProofreadSaga from "oxalis/model/sagas/proofread_saga"; import { listenForWkReady } from "oxalis/model/sagas/wk_ready_saga"; import { warnIfEmailIsUnverified } from "./user_saga"; +import { EscalateErrorAction } from "../actions/actions"; let rootSagaCrashed = false; export default function* rootSaga(): Saga { @@ -33,6 +34,15 @@ export function hasRootSagaCrashed() { return rootSagaCrashed; } +function* listenToErrorEscalation() { + // Make the saga deliberately crash when this action has been + // dispatched. This should be used if an error was thrown in a + // critical place, which should stop further saga saving. + yield* takeEvery("ESCALATE_ERROR", (action: EscalateErrorAction) => { + throw action.error; + }); +} + function* restartableSaga(): Saga { try { yield* all([ @@ -55,6 +65,7 @@ function* restartableSaga(): Saga { ...VolumetracingSagas.map((saga) => call(saga)), call(watchZ1Downsampling), call(warnIfEmailIsUnverified), + call(listenToErrorEscalation), ]); } catch (err) { rootSagaCrashed = true; From e954202651c3696e899d8639a8b499657647f757 Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Tue, 15 Aug 2023 18:02:34 +0200 Subject: [PATCH 11/21] move some lib modules into libs/async --- frontend/javascripts/libs/{ => async}/async_fifo_resolver.ts | 0 .../javascripts/libs/{ => async}/debounced_abortable_saga.ts | 0 frontend/javascripts/libs/{ => async}/deferred.ts | 0 .../javascripts/libs/{ => async}/latest_task_executor.ts | 2 +- frontend/javascripts/libs/{ => async}/task_pool.ts | 0 .../javascripts/libs/{worker_pool.ts => webworker_pool.ts} | 4 ++-- .../javascripts/oxalis/model/actions/annotation_actions.ts | 2 +- frontend/javascripts/oxalis/model/actions/save_actions.ts | 2 +- .../javascripts/oxalis/model/actions/settings_actions.ts | 2 +- .../oxalis/model/actions/volumetracing_actions.ts | 2 +- .../model/bucket_data_handling/layer_rendering_manager.ts | 2 +- .../oxalis/model/bucket_data_handling/pushqueue.ts | 5 +++-- .../oxalis/model/bucket_data_handling/wkstore_adapter.ts | 4 ++-- frontend/javascripts/oxalis/model/sagas/isosurface_saga.ts | 4 ++-- frontend/javascripts/oxalis/throttled_store.ts | 2 +- frontend/javascripts/test/libs/async_fifo_resolver.spec.ts | 2 +- .../javascripts/test/libs/debounced_abortable_saga.spec.ts | 2 +- frontend/javascripts/test/libs/deferred.spec.ts | 2 +- frontend/javascripts/test/libs/latest_task_executor.spec.ts | 4 ++-- frontend/javascripts/test/libs/task_pool.spec.ts | 2 +- 20 files changed, 22 insertions(+), 21 deletions(-) rename frontend/javascripts/libs/{ => async}/async_fifo_resolver.ts (100%) rename frontend/javascripts/libs/{ => async}/debounced_abortable_saga.ts (100%) rename frontend/javascripts/libs/{ => async}/deferred.ts (100%) rename frontend/javascripts/libs/{ => async}/latest_task_executor.ts (98%) rename frontend/javascripts/libs/{ => async}/task_pool.ts (100%) rename frontend/javascripts/libs/{worker_pool.ts => webworker_pool.ts} (89%) diff --git a/frontend/javascripts/libs/async_fifo_resolver.ts b/frontend/javascripts/libs/async/async_fifo_resolver.ts similarity index 100% rename from frontend/javascripts/libs/async_fifo_resolver.ts rename to frontend/javascripts/libs/async/async_fifo_resolver.ts diff --git a/frontend/javascripts/libs/debounced_abortable_saga.ts b/frontend/javascripts/libs/async/debounced_abortable_saga.ts similarity index 100% rename from frontend/javascripts/libs/debounced_abortable_saga.ts rename to frontend/javascripts/libs/async/debounced_abortable_saga.ts diff --git a/frontend/javascripts/libs/deferred.ts b/frontend/javascripts/libs/async/deferred.ts similarity index 100% rename from frontend/javascripts/libs/deferred.ts rename to frontend/javascripts/libs/async/deferred.ts diff --git a/frontend/javascripts/libs/latest_task_executor.ts b/frontend/javascripts/libs/async/latest_task_executor.ts similarity index 98% rename from frontend/javascripts/libs/latest_task_executor.ts rename to frontend/javascripts/libs/async/latest_task_executor.ts index 4486c7ec27c..d9c7c668b87 100644 --- a/frontend/javascripts/libs/latest_task_executor.ts +++ b/frontend/javascripts/libs/async/latest_task_executor.ts @@ -1,4 +1,4 @@ -import Deferred from "libs/deferred"; +import Deferred from "libs/async/deferred"; type Task = () => Promise; export const SKIPPED_TASK_REASON = "Skipped task"; /* diff --git a/frontend/javascripts/libs/task_pool.ts b/frontend/javascripts/libs/async/task_pool.ts similarity index 100% rename from frontend/javascripts/libs/task_pool.ts rename to frontend/javascripts/libs/async/task_pool.ts diff --git a/frontend/javascripts/libs/worker_pool.ts b/frontend/javascripts/libs/webworker_pool.ts similarity index 89% rename from frontend/javascripts/libs/worker_pool.ts rename to frontend/javascripts/libs/webworker_pool.ts index 65cb3b58769..5803e6c3354 100644 --- a/frontend/javascripts/libs/worker_pool.ts +++ b/frontend/javascripts/libs/webworker_pool.ts @@ -1,10 +1,10 @@ import _ from "lodash"; -export default class WorkerPool { +export default class WebWorkerPool { // This class can be used to instantiate multiple web workers // which are then used for computation in a simple round-robin manner. // // Example: - // const compressionPool = new WorkerPool( + // const compressionPool = new WebWorkerPool( // () => createWorker(ByteArraysToLz4Base64Worker), // COMPRESSION_WORKER_COUNT, // ); diff --git a/frontend/javascripts/oxalis/model/actions/annotation_actions.ts b/frontend/javascripts/oxalis/model/actions/annotation_actions.ts index 97d6970948a..3756d3fcdc0 100644 --- a/frontend/javascripts/oxalis/model/actions/annotation_actions.ts +++ b/frontend/javascripts/oxalis/model/actions/annotation_actions.ts @@ -16,7 +16,7 @@ import type { import type { Vector3 } from "oxalis/constants"; import _ from "lodash"; import { Dispatch } from "redux"; -import Deferred from "libs/deferred"; +import Deferred from "libs/async/deferred"; type InitializeAnnotationAction = ReturnType; type SetAnnotationNameAction = ReturnType; diff --git a/frontend/javascripts/oxalis/model/actions/save_actions.ts b/frontend/javascripts/oxalis/model/actions/save_actions.ts index 650a14eea18..d732d22b130 100644 --- a/frontend/javascripts/oxalis/model/actions/save_actions.ts +++ b/frontend/javascripts/oxalis/model/actions/save_actions.ts @@ -2,7 +2,7 @@ import type { Dispatch } from "redux"; import type { UpdateAction } from "oxalis/model/sagas/update_actions"; import { getUid } from "libs/uid_generator"; import Date from "libs/date"; -import Deferred from "libs/deferred"; +import Deferred from "libs/async/deferred"; export type SaveQueueType = "skeleton" | "volume" | "mapping"; export type PushSaveQueueTransaction = ReturnType; diff --git a/frontend/javascripts/oxalis/model/actions/settings_actions.ts b/frontend/javascripts/oxalis/model/actions/settings_actions.ts index 06723c1288f..24b85b1e342 100644 --- a/frontend/javascripts/oxalis/model/actions/settings_actions.ts +++ b/frontend/javascripts/oxalis/model/actions/settings_actions.ts @@ -7,7 +7,7 @@ import type { Mapping, MappingType, } from "oxalis/store"; -import Deferred from "libs/deferred"; +import Deferred from "libs/async/deferred"; import { APIHistogramData } from "types/api_flow_types"; export type UpdateUserSettingAction = ReturnType; diff --git a/frontend/javascripts/oxalis/model/actions/volumetracing_actions.ts b/frontend/javascripts/oxalis/model/actions/volumetracing_actions.ts index cc5b9e5055d..2acc1d7bab9 100644 --- a/frontend/javascripts/oxalis/model/actions/volumetracing_actions.ts +++ b/frontend/javascripts/oxalis/model/actions/volumetracing_actions.ts @@ -2,7 +2,7 @@ import type { ServerEditableMapping, ServerVolumeTracing } from "types/api_flow_ import type { Vector2, Vector3, Vector4, OrthoView, ContourMode } from "oxalis/constants"; import type { BucketDataArray } from "oxalis/model/bucket_data_handling/bucket"; import type { Segment, SegmentGroup, SegmentMap } from "oxalis/store"; -import Deferred from "libs/deferred"; +import Deferred from "libs/async/deferred"; import type { Dispatch } from "redux"; import { AllUserBoundingBoxActions } from "oxalis/model/actions/annotation_actions"; import { QuickSelectGeometry } from "oxalis/geometries/helper_geometries"; diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/layer_rendering_manager.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/layer_rendering_manager.ts index 5fa24d16836..89102666e21 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/layer_rendering_manager.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/layer_rendering_manager.ts @@ -16,7 +16,7 @@ import { } from "oxalis/model/accessors/dataset_accessor"; import AsyncBucketPickerWorker from "oxalis/workers/async_bucket_picker.worker"; import type DataCube from "oxalis/model/bucket_data_handling/data_cube"; -import LatestTaskExecutor, { SKIPPED_TASK_REASON } from "libs/latest_task_executor"; +import LatestTaskExecutor, { SKIPPED_TASK_REASON } from "libs/async/latest_task_executor"; import type PullQueue from "oxalis/model/bucket_data_handling/pullqueue"; import Store, { PlaneRects, SegmentMap } from "oxalis/store"; import TextureBucketManager from "oxalis/model/bucket_data_handling/texture_bucket_manager"; diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts index 70cd8b1b67c..d8b8dddc002 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts @@ -3,12 +3,12 @@ import type { DataBucket } from "oxalis/model/bucket_data_handling/bucket"; import { alert } from "libs/window"; import { createCompressedUpdateBucketActions } from "oxalis/model/bucket_data_handling/wkstore_adapter"; import type DataCube from "oxalis/model/bucket_data_handling/data_cube"; -import { createDebouncedAbortableParameterlessCallable } from "libs/debounced_abortable_saga"; +import { createDebouncedAbortableParameterlessCallable } from "libs/async/debounced_abortable_saga"; import { call } from "redux-saga/effects"; import Store from "oxalis/store"; import { pushSaveQueueTransaction } from "../actions/save_actions"; import { UpdateAction } from "../sagas/update_actions"; -import { AsyncFifoResolver } from "libs/async_fifo_resolver"; +import { AsyncFifoResolver } from "libs/async/async_fifo_resolver"; import { escalateErrorAction } from "../actions/actions"; // Only process the PushQueue after there was no user interaction (or bucket modification due to @@ -82,6 +82,7 @@ class PushQueue { pushImpl = function* (this: PushQueue) { try { + console.log("throw in pushimpl"); console.log("pushImpl start"); // Wait until there are no temporal buckets, anymore, so that // all buckets can be snapshotted and saved to the server. diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts index 269e502b730..0e2e806c720 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts @@ -18,7 +18,7 @@ import ErrorHandling from "libs/error_handling"; import Request from "libs/request"; import type { DataLayerType, VolumeTracing } from "oxalis/store"; import Store from "oxalis/store"; -import WorkerPool from "libs/worker_pool"; +import WebworkerPool from "libs/webworker_pool"; import type { Vector3, Vector4 } from "oxalis/constants"; import constants, { MappingStatusEnum } from "oxalis/constants"; import window from "libs/window"; @@ -33,7 +33,7 @@ const decodeFourBit = createWorker(DecodeFourBitWorker); // go. const COMPRESSION_BATCH_SIZE = 128; const COMPRESSION_WORKER_COUNT = 2; -const compressionPool = new WorkerPool( +const compressionPool = new WebworkerPool( () => createWorker(ByteArraysToLz4Base64Worker), COMPRESSION_WORKER_COUNT, ); diff --git a/frontend/javascripts/oxalis/model/sagas/isosurface_saga.ts b/frontend/javascripts/oxalis/model/sagas/isosurface_saga.ts index 25a461f3bbd..d0b153ebe73 100644 --- a/frontend/javascripts/oxalis/model/sagas/isosurface_saga.ts +++ b/frontend/javascripts/oxalis/model/sagas/isosurface_saga.ts @@ -5,7 +5,7 @@ import { chunkDynamically, sleep } from "libs/utils"; import ErrorHandling from "libs/error_handling"; import type { APIDataset, APIMeshFile, APISegmentationLayer } from "types/api_flow_types"; import { mergeBufferGeometries, mergeVertices } from "libs/BufferGeometryUtils"; -import Deferred from "libs/deferred"; +import Deferred from "libs/async/deferred"; import Store from "oxalis/store"; import { @@ -70,7 +70,7 @@ import { saveNowAction } from "oxalis/model/actions/save_actions"; import Toast from "libs/toast"; import { getDracoLoader } from "libs/draco"; import messages from "messages"; -import processTaskWithPool from "libs/task_pool"; +import processTaskWithPool from "libs/async/task_pool"; import { getBaseSegmentationName } from "oxalis/view/right-border-tabs/segments_tab/segments_view_helper"; import { RemoveSegmentAction, UpdateSegmentAction } from "../actions/volumetracing_actions"; import { ResolutionInfo } from "../helpers/resolution_info"; diff --git a/frontend/javascripts/oxalis/throttled_store.ts b/frontend/javascripts/oxalis/throttled_store.ts index 64bd969706d..5e6241f3f3c 100644 --- a/frontend/javascripts/oxalis/throttled_store.ts +++ b/frontend/javascripts/oxalis/throttled_store.ts @@ -1,6 +1,6 @@ /* eslint no-await-in-loop: 0 */ import type { Store as StoreType } from "redux"; -import Deferred from "libs/deferred"; +import Deferred from "libs/async/deferred"; import type { OxalisState } from "oxalis/store"; import Store from "oxalis/store"; import * as Utils from "libs/utils"; diff --git a/frontend/javascripts/test/libs/async_fifo_resolver.spec.ts b/frontend/javascripts/test/libs/async_fifo_resolver.spec.ts index 49eab827add..3f05f614353 100644 --- a/frontend/javascripts/test/libs/async_fifo_resolver.spec.ts +++ b/frontend/javascripts/test/libs/async_fifo_resolver.spec.ts @@ -1,7 +1,7 @@ import { sleep } from "libs/utils"; import test from "ava"; import _ from "lodash"; -import { AsyncFifoResolver } from "libs/async_fifo_resolver"; +import { AsyncFifoResolver } from "libs/async/async_fifo_resolver"; const createSubmitterFnWithProtocol = () => { const resolver = new AsyncFifoResolver(); diff --git a/frontend/javascripts/test/libs/debounced_abortable_saga.spec.ts b/frontend/javascripts/test/libs/debounced_abortable_saga.spec.ts index 941880b9ea8..2aa8330ce44 100644 --- a/frontend/javascripts/test/libs/debounced_abortable_saga.spec.ts +++ b/frontend/javascripts/test/libs/debounced_abortable_saga.spec.ts @@ -2,7 +2,7 @@ import { type Saga } from "oxalis/model/sagas/effect-generators"; import { sleep } from "libs/utils"; import test from "ava"; import _ from "lodash"; -import { createDebouncedAbortableCallable } from "libs/debounced_abortable_saga"; +import { createDebouncedAbortableCallable } from "libs/async/debounced_abortable_saga"; const createAbortableFnWithProtocol = () => { const protocol: string[] = []; diff --git a/frontend/javascripts/test/libs/deferred.spec.ts b/frontend/javascripts/test/libs/deferred.spec.ts index 3e32c1432aa..0089c676826 100644 --- a/frontend/javascripts/test/libs/deferred.spec.ts +++ b/frontend/javascripts/test/libs/deferred.spec.ts @@ -1,5 +1,5 @@ // @ts-nocheck -import Deferred from "libs/deferred"; +import Deferred from "libs/async/deferred"; import runAsync from "test/helpers/run-async"; import test from "ava"; diff --git a/frontend/javascripts/test/libs/latest_task_executor.spec.ts b/frontend/javascripts/test/libs/latest_task_executor.spec.ts index 437efff38ac..2e56c4c8d46 100644 --- a/frontend/javascripts/test/libs/latest_task_executor.spec.ts +++ b/frontend/javascripts/test/libs/latest_task_executor.spec.ts @@ -1,6 +1,6 @@ -import Deferred from "libs/deferred"; +import Deferred from "libs/async/deferred"; import test from "ava"; -import LatestTaskExecutor, { SKIPPED_TASK_REASON } from "libs/latest_task_executor"; +import LatestTaskExecutor, { SKIPPED_TASK_REASON } from "libs/async/latest_task_executor"; test("LatestTaskExecutor: One task", async (t) => { const executor = new LatestTaskExecutor(); diff --git a/frontend/javascripts/test/libs/task_pool.spec.ts b/frontend/javascripts/test/libs/task_pool.spec.ts index d4243dcb4b7..3c9aedbcf6a 100644 --- a/frontend/javascripts/test/libs/task_pool.spec.ts +++ b/frontend/javascripts/test/libs/task_pool.spec.ts @@ -1,6 +1,6 @@ import { call, type Saga } from "oxalis/model/sagas/effect-generators"; import { runSaga } from "redux-saga"; -import processTaskWithPool from "libs/task_pool"; +import processTaskWithPool from "libs/async/task_pool"; import * as Utils from "libs/utils"; import test from "ava"; From db611cc05621af22d4bace5be05fb28dbc6c2294 Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Tue, 15 Aug 2023 18:32:38 +0200 Subject: [PATCH 12/21] warn user when pushqueue is starving --- frontend/javascripts/oxalis/model.ts | 8 +++++ .../model/bucket_data_handling/pushqueue.ts | 35 +++++++++++++------ .../oxalis/view/action-bar/save_button.tsx | 9 +++-- 3 files changed, 36 insertions(+), 16 deletions(-) diff --git a/frontend/javascripts/oxalis/model.ts b/frontend/javascripts/oxalis/model.ts index 0de69687273..fd7465d41a8 100644 --- a/frontend/javascripts/oxalis/model.ts +++ b/frontend/javascripts/oxalis/model.ts @@ -285,6 +285,14 @@ export class OxalisModel { return storeStateSaved && pushQueuesSaved; } + getLongestPushQueueWaitTime() { + return ( + _.max( + Utils.values(this.dataLayers).map((layer) => layer.pushQueue.getTransactionWaitTime()), + ) || 0 + ); + } + getPushQueueStats() { const compressingBucketCount = _.sum( Utils.values(this.dataLayers).map((dataLayer) => diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts index d8b8dddc002..b5dda168d2a 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts @@ -1,6 +1,5 @@ import _ from "lodash"; import type { DataBucket } from "oxalis/model/bucket_data_handling/bucket"; -import { alert } from "libs/window"; import { createCompressedUpdateBucketActions } from "oxalis/model/bucket_data_handling/wkstore_adapter"; import type DataCube from "oxalis/model/bucket_data_handling/data_cube"; import { createDebouncedAbortableParameterlessCallable } from "libs/async/debounced_abortable_saga"; @@ -12,12 +11,10 @@ import { AsyncFifoResolver } from "libs/async/async_fifo_resolver"; import { escalateErrorAction } from "../actions/actions"; // Only process the PushQueue after there was no user interaction (or bucket modification due to -// downsampling) for PUSH_DEBOUNCE_TIME milliseconds... +// downsampling) for PUSH_DEBOUNCE_TIME milliseconds. +// PushQueue.getTransactionWaitTime is used to avoid debounce-related starvation that can happen +// when the user constantly annotates. const PUSH_DEBOUNCE_TIME = 1000; -// ...unless a timeout of PUSH_DEBOUNCE_MAX_WAIT_TIME milliseconds -// is exceeded. Then, initiate a push. -// todo: reactivate? -const _PUSH_DEBOUNCE_MAX_WAIT_TIME = 30000; class PushQueue { cube: DataCube; @@ -43,6 +40,11 @@ class PushQueue { // order. private fifoResolver = new AsyncFifoResolver(); + // If the timestamp is defined, it encodes when the first bucket + // was added to the PushQueue that will be part of the next (to be created) + // transaction. + private waitTimeStartTimeStamp: number | null = null; + constructor(cube: DataCube) { this.cube = cube; this.pendingQueue = new Set(); @@ -57,6 +59,9 @@ class PushQueue { } insert(bucket: DataBucket): void { + if (this.waitTimeStartTimeStamp == null) { + this.waitTimeStartTimeStamp = Date.now(); + } if (!this.pendingQueue.has(bucket)) { this.pendingQueue.add(bucket); bucket.dirtyCount++; @@ -72,6 +77,18 @@ class PushQueue { return this.compressingBucketCount; } + getTransactionWaitTime(): number { + // Return how long we are waiting for the transaction flush + // (waiting time depends on the user activity and on the + // time it takes to download buckets). + if (this.waitTimeStartTimeStamp == null) { + // No pending buckets exist. There's no wait time. + return 0; + } + + return Date.now() - this.waitTimeStartTimeStamp; + } + clear(): void { this.pendingQueue.clear(); } @@ -104,6 +121,7 @@ class PushQueue { }; private flushAndSnapshot() { + this.waitTimeStartTimeStamp = null; // Flush pendingQueue. Note that it's important to do this synchronously. // If other actors could add to queue concurrently, the front-end could // send an inconsistent state for a transaction. @@ -116,11 +134,6 @@ class PushQueue { this.pushTransaction(batch); } - // todo: prevent user from brushing for eternity? - // push = _.debounce(this.pushImpl, PUSH_DEBOUNCE_TIME, { - // maxWait: PUSH_DEBOUNCE_MAX_WAIT_TIME, - // }); - push = createDebouncedAbortableParameterlessCallable(this.pushImpl, PUSH_DEBOUNCE_TIME, this); async pushTransaction(batch: Array): Promise { diff --git a/frontend/javascripts/oxalis/view/action-bar/save_button.tsx b/frontend/javascripts/oxalis/view/action-bar/save_button.tsx index 274edc2538b..0490aafd1ae 100644 --- a/frontend/javascripts/oxalis/view/action-bar/save_button.tsx +++ b/frontend/javascripts/oxalis/view/action-bar/save_button.tsx @@ -73,11 +73,10 @@ class SaveButton extends React.PureComponent { const isStateSaved = Model.stateSaved(); const oldestUnsavedTimestamp = getOldestUnsavedTimestamp(Store.getState().save.queue); - // todo: get oldest unsaved timestamp from push queue. - // - - const unsavedDuration = - oldestUnsavedTimestamp != null ? Date.now() - oldestUnsavedTimestamp : 0; + const unsavedDuration = Math.max( + oldestUnsavedTimestamp != null ? Date.now() - oldestUnsavedTimestamp : 0, + Model.getLongestPushQueueWaitTime(), + ); const showUnsavedWarning = unsavedDuration > UNSAVED_WARNING_THRESHOLD; if (showUnsavedWarning) { From e3228df7715ee4ba66b58950caf113c775f8ed44 Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Tue, 15 Aug 2023 18:49:43 +0200 Subject: [PATCH 13/21] Apply suggestions from code review --- .../oxalis/model/bucket_data_handling/data_cube.ts | 1 - .../oxalis/model/bucket_data_handling/pushqueue.ts | 4 ---- frontend/javascripts/oxalis/model/sagas/save_saga.ts | 1 - .../javascripts/oxalis/model/sagas/save_saga_constants.ts | 4 ++-- 4 files changed, 2 insertions(+), 8 deletions(-) diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/data_cube.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/data_cube.ts index ad7c0071826..23f63a30d81 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/data_cube.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/data_cube.ts @@ -679,7 +679,6 @@ class DataCube { } triggerPushQueue() { - console.log("triggerPushQueue"); this.pushQueue.push(); } diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts index b5dda168d2a..ec4780480d5 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts @@ -99,8 +99,6 @@ class PushQueue { pushImpl = function* (this: PushQueue) { try { - console.log("throw in pushimpl"); - console.log("pushImpl start"); // Wait until there are no temporal buckets, anymore, so that // all buckets can be snapshotted and saved to the server. yield call(this.cube.temporalBucketManager.getAllLoadedPromise); @@ -117,7 +115,6 @@ class PushQueue { // (notifying the user and stopping further potentially undefined behavior). Store.dispatch(escalateErrorAction(error)); } - console.log("pushImpl end"); }; private flushAndSnapshot() { @@ -125,7 +122,6 @@ class PushQueue { // Flush pendingQueue. Note that it's important to do this synchronously. // If other actors could add to queue concurrently, the front-end could // send an inconsistent state for a transaction. - console.log("Flush pending queue with size:", this.pendingQueue.size); const batch: DataBucket[] = Array.from(this.pendingQueue); this.pendingQueue = new Set(); diff --git a/frontend/javascripts/oxalis/model/sagas/save_saga.ts b/frontend/javascripts/oxalis/model/sagas/save_saga.ts index c48379509c9..e89d075c02b 100644 --- a/frontend/javascripts/oxalis/model/sagas/save_saga.ts +++ b/frontend/javascripts/oxalis/model/sagas/save_saga.ts @@ -171,7 +171,6 @@ export function* sendRequestToServer( const fullSaveQueue = yield* select((state) => selectQueue(state, saveQueueType, tracingId)); const saveQueue = sliceAppropriateBatchCount(fullSaveQueue); - console.log("saving", saveQueue.length, "items out of", fullSaveQueue.length); let compactedSaveQueue = compactSaveQueue(saveQueue); const { version, type } = yield* select((state) => selectTracing(state, saveQueueType, tracingId), diff --git a/frontend/javascripts/oxalis/model/sagas/save_saga_constants.ts b/frontend/javascripts/oxalis/model/sagas/save_saga_constants.ts index 53abc5b0b43..233d31fed01 100644 --- a/frontend/javascripts/oxalis/model/sagas/save_saga_constants.ts +++ b/frontend/javascripts/oxalis/model/sagas/save_saga_constants.ts @@ -11,5 +11,5 @@ export const UNDO_HISTORY_SIZE = 20; export const SETTINGS_RETRY_DELAY = 15 * 1000; export const SETTINGS_MAX_RETRY_COUNT = 20; // 20 * 15s == 5m -export const maximumActionCountPerBatch = 5000; //20; // 5000 -export const maximumActionCountPerSave = 15000; // 60; // 15000 +export const maximumActionCountPerBatch = 5000; +export const maximumActionCountPerSave = 15000; From 31df268397d5f5d739107555077c291eb8147e72 Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Tue, 15 Aug 2023 18:50:07 +0200 Subject: [PATCH 14/21] clean up a bit --- .../oxalis/model/bucket_data_handling/pushqueue.ts | 12 +++++------- .../model/bucket_data_handling/wkstore_adapter.ts | 8 +++----- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts index ec4780480d5..cae666d0a4d 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts @@ -30,10 +30,7 @@ class PushQueue { // Everytime the pendingQueue is flushed, its content is put into a transaction. // That transaction is compressed asynchronously before it is sent to the store. - // During that compression, the transaction is counted as pending. - private pendingTransactionCount: number = 0; - - // Store the number of buckets that are currently being compressed. + // Buckets that are currently being compressed, are counted in this property. private compressingBucketCount: number = 0; // Helper to ensure the Store's save queue is filled in the correct @@ -54,7 +51,7 @@ class PushQueue { return ( this.pendingQueue.size === 0 && this.cube.temporalBucketManager.getCount() === 0 && - this.pendingTransactionCount === 0 + this.compressingBucketCount === 0 ); } @@ -101,6 +98,9 @@ class PushQueue { try { // Wait until there are no temporal buckets, anymore, so that // all buckets can be snapshotted and saved to the server. + // If PushQueue.push() is called while we are waiting here, + // this generator is aborted and the debounce-time begins + // again. yield call(this.cube.temporalBucketManager.getAllLoadedPromise); // It is important that flushAndSnapshot does not use a generator @@ -137,7 +137,6 @@ class PushQueue { * Create a transaction from the batch and push it into the save queue. */ try { - this.pendingTransactionCount++; this.compressingBucketCount += batch.length; // Start the compression job. Note that an older invocation of @@ -156,7 +155,6 @@ class PushQueue { ); Store.dispatch(pushSaveQueueTransaction(items, "volume", this.cube.layerName)); - this.pendingTransactionCount--; this.compressingBucketCount -= batch.length; } catch (error) { // See other usage of escalateErrorAction for a detailed explanation. diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts index 0e2e806c720..13d808c681f 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts @@ -250,12 +250,10 @@ export async function createCompressedUpdateBucketActions( return _.flatten( await Promise.all( _.chunk(batch, COMPRESSION_BATCH_SIZE).map(async (batchSubset) => { - const byteArrays = []; - for (const bucket of batchSubset) { + const byteArrays = batchSubset.map((bucket) => { const data = bucket.getCopyOfData(); - const byteArray = new Uint8Array(data.buffer, data.byteOffset, data.byteLength); - byteArrays.push(byteArray); - } + return new Uint8Array(data.buffer, data.byteOffset, data.byteLength); + }); const compressedBase64Strings = await compressionPool.submit(byteArrays); return compressedBase64Strings.map((compressedBase64, index) => { From 1e90b3d7d81e87042a39ed6a6e207d2e71a03ed2 Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Wed, 16 Aug 2023 12:01:41 +0200 Subject: [PATCH 15/21] tune batch count constants for volume tracings; also show downloading buckets in save button tooltip --- frontend/javascripts/oxalis/model.ts | 7 +++++++ .../oxalis/model/reducers/save_reducer.ts | 7 +++++-- .../oxalis/model/sagas/save_saga.ts | 13 +++++++----- .../oxalis/model/sagas/save_saga_constants.ts | 13 ++++++++++-- .../oxalis/view/action-bar/save_button.tsx | 20 +++++++++++++++++-- .../test/sagas/saga_integration.spec.ts | 11 ++++++---- 6 files changed, 56 insertions(+), 15 deletions(-) diff --git a/frontend/javascripts/oxalis/model.ts b/frontend/javascripts/oxalis/model.ts index fd7465d41a8..78d44b6cdb3 100644 --- a/frontend/javascripts/oxalis/model.ts +++ b/frontend/javascripts/oxalis/model.ts @@ -304,9 +304,16 @@ export class OxalisModel { Utils.values(this.dataLayers).map((dataLayer) => dataLayer.pushQueue.getPendingQueueSize()), ); + const outstandingBucketDownloadCount = _.sum( + Utils.values(this.dataLayers).map((dataLayer) => + dataLayer.cube.temporalBucketManager.getCount(), + ), + ); + return { compressingBucketCount, waitingForCompressionBucketCount, + outstandingBucketDownloadCount, }; } diff --git a/frontend/javascripts/oxalis/model/reducers/save_reducer.ts b/frontend/javascripts/oxalis/model/reducers/save_reducer.ts index 07dafbac2f9..92c2a9473a5 100644 --- a/frontend/javascripts/oxalis/model/reducers/save_reducer.ts +++ b/frontend/javascripts/oxalis/model/reducers/save_reducer.ts @@ -10,7 +10,7 @@ import type { } from "oxalis/model/actions/save_actions"; import { getActionLog } from "oxalis/model/helpers/action_logger_middleware"; import { getStats } from "oxalis/model/accessors/skeletontracing_accessor"; -import { maximumActionCountPerBatch } from "oxalis/model/sagas/save_saga_constants"; +import { MAXIMUM_ACTION_COUNT_PER_BATCH } from "oxalis/model/sagas/save_saga_constants"; import { selectQueue } from "oxalis/model/accessors/save_accessor"; import { updateKey2 } from "oxalis/model/helpers/deep_update"; import { @@ -123,7 +123,10 @@ function SaveReducer(state: OxalisState, action: Action): OxalisState { throw new Error("Tried to save something even though user is not logged in."); } - const updateActionChunks = _.chunk(items, maximumActionCountPerBatch); + const updateActionChunks = _.chunk( + items, + MAXIMUM_ACTION_COUNT_PER_BATCH[action.saveQueueType], + ); const transactionGroupCount = updateActionChunks.length; const actionLogInfo = JSON.stringify(getActionLog().slice(-10)); diff --git a/frontend/javascripts/oxalis/model/sagas/save_saga.ts b/frontend/javascripts/oxalis/model/sagas/save_saga.ts index e89d075c02b..c46a4e79289 100644 --- a/frontend/javascripts/oxalis/model/sagas/save_saga.ts +++ b/frontend/javascripts/oxalis/model/sagas/save_saga.ts @@ -36,7 +36,7 @@ import { globalPositionToBucketPosition } from "oxalis/model/helpers/position_co import type { Saga } from "oxalis/model/sagas/effect-generators"; import { select } from "oxalis/model/sagas/effect-generators"; import { - maximumActionCountPerSave, + MAXIMUM_ACTION_COUNT_PER_SAVE, MAX_SAVE_RETRY_WAITING_TIME, PUSH_THROTTLE_TIME, SAVE_RETRY_WAITING_TIME, @@ -131,15 +131,18 @@ export function sendRequestWithToken( } // This function returns the first n batches of the provided array, so that the count of -// all actions in these n batches does not exceed maximumActionCountPerSave -function sliceAppropriateBatchCount(batches: Array): Array { +// all actions in these n batches does not exceed MAXIMUM_ACTION_COUNT_PER_SAVE +function sliceAppropriateBatchCount( + batches: Array, + saveQueueType: SaveQueueType, +): Array { const slicedBatches = []; let actionCount = 0; for (const batch of batches) { const newActionCount = actionCount + batch.actions.length; - if (newActionCount <= maximumActionCountPerSave) { + if (newActionCount <= MAXIMUM_ACTION_COUNT_PER_SAVE[saveQueueType]) { actionCount = newActionCount; slicedBatches.push(batch); } else { @@ -170,7 +173,7 @@ export function* sendRequestToServer( */ const fullSaveQueue = yield* select((state) => selectQueue(state, saveQueueType, tracingId)); - const saveQueue = sliceAppropriateBatchCount(fullSaveQueue); + const saveQueue = sliceAppropriateBatchCount(fullSaveQueue, saveQueueType); let compactedSaveQueue = compactSaveQueue(saveQueue); const { version, type } = yield* select((state) => selectTracing(state, saveQueueType, tracingId), diff --git a/frontend/javascripts/oxalis/model/sagas/save_saga_constants.ts b/frontend/javascripts/oxalis/model/sagas/save_saga_constants.ts index 233d31fed01..28d90432790 100644 --- a/frontend/javascripts/oxalis/model/sagas/save_saga_constants.ts +++ b/frontend/javascripts/oxalis/model/sagas/save_saga_constants.ts @@ -11,5 +11,14 @@ export const UNDO_HISTORY_SIZE = 20; export const SETTINGS_RETRY_DELAY = 15 * 1000; export const SETTINGS_MAX_RETRY_COUNT = 20; // 20 * 15s == 5m -export const maximumActionCountPerBatch = 5000; -export const maximumActionCountPerSave = 15000; +export const MAXIMUM_ACTION_COUNT_PER_BATCH = { + skeleton: 5000, + volume: 1000, // Since volume saving is slower, use a lower value here. + mapping: 5000, +} as const; + +export const MAXIMUM_ACTION_COUNT_PER_SAVE = { + skeleton: 15000, + volume: 3000, + mapping: 15000, +} as const; diff --git a/frontend/javascripts/oxalis/view/action-bar/save_button.tsx b/frontend/javascripts/oxalis/view/action-bar/save_button.tsx index 0490aafd1ae..d59ba52de7b 100644 --- a/frontend/javascripts/oxalis/view/action-bar/save_button.tsx +++ b/frontend/javascripts/oxalis/view/action-bar/save_button.tsx @@ -29,6 +29,7 @@ type State = { isStateSaved: boolean; showUnsavedWarning: boolean; saveInfo: { + outstandingBucketDownloadCount: number; compressingBucketCount: number; waitingForCompressionBucketCount: number; }; @@ -55,6 +56,7 @@ class SaveButton extends React.PureComponent { isStateSaved: false, showUnsavedWarning: false, saveInfo: { + outstandingBucketDownloadCount: 0, compressingBucketCount: 0, waitingForCompressionBucketCount: 0, }, @@ -83,11 +85,16 @@ class SaveButton extends React.PureComponent { reportUnsavedDurationThresholdExceeded(); } - const { compressingBucketCount, waitingForCompressionBucketCount } = Model.getPushQueueStats(); + const { + compressingBucketCount, + waitingForCompressionBucketCount, + outstandingBucketDownloadCount, + } = Model.getPushQueueStats(); this.setState({ isStateSaved, showUnsavedWarning, saveInfo: { + outstandingBucketDownloadCount, compressingBucketCount, waitingForCompressionBucketCount, }, @@ -111,6 +118,8 @@ class SaveButton extends React.PureComponent { render() { const { progressFraction } = this.props; const { showUnsavedWarning } = this.state; + const { outstandingBucketDownloadCount } = this.state.saveInfo; + const totalBucketsToCompress = this.state.saveInfo.waitingForCompressionBucketCount + this.state.saveInfo.compressingBucketCount; @@ -127,7 +136,14 @@ class SaveButton extends React.PureComponent { > 0 + // Downloading the buckets often takes longer and the progress + // is visible (as the count will decrease continually). + // If lots of buckets need compression, this can also take a bit. + // Don't show both labels at the same time, because the compression + // usually can only start after the download is finished. + outstandingBucketDownloadCount > 0 + ? `${outstandingBucketDownloadCount} items remaining to download...` + : totalBucketsToCompress > 0 ? `${totalBucketsToCompress} items remaining to compress...` : null } diff --git a/frontend/javascripts/test/sagas/saga_integration.spec.ts b/frontend/javascripts/test/sagas/saga_integration.spec.ts index 006f8ee8aab..38ee04bc297 100644 --- a/frontend/javascripts/test/sagas/saga_integration.spec.ts +++ b/frontend/javascripts/test/sagas/saga_integration.spec.ts @@ -5,7 +5,7 @@ import "test/sagas/saga_integration.mock.js"; import { __setupOxalis, TIMESTAMP } from "test/helpers/apiHelpers"; import { createSaveQueueFromUpdateActions } from "test/helpers/saveHelpers"; import { enforceSkeletonTracing, getStats } from "oxalis/model/accessors/skeletontracing_accessor"; -import { maximumActionCountPerBatch } from "oxalis/model/sagas/save_saga_constants"; +import { MAXIMUM_ACTION_COUNT_PER_BATCH } from "oxalis/model/sagas/save_saga_constants"; import { restartSagaAction, wkReadyAction } from "oxalis/model/actions/actions"; import Store from "oxalis/store"; import * as Utils from "libs/utils"; @@ -82,7 +82,10 @@ test.serial("Save actions should not be chunked below the chunk limit (1/3)", (t const trees = generateDummyTrees(1000, 1); Store.dispatch(addTreesAndGroupsAction(createTreeMapFromTreeArray(trees), [])); t.is(Store.getState().save.queue.skeleton.length, 1); - t.true(Store.getState().save.queue.skeleton[0].actions.length < maximumActionCountPerBatch); + t.true( + Store.getState().save.queue.skeleton[0].actions.length < + MAXIMUM_ACTION_COUNT_PER_BATCH.skeleton, + ); }); test.serial("Save actions should be chunked above the chunk limit (2/3)", (t) => { Store.dispatch(discardSaveQueuesAction()); @@ -91,7 +94,7 @@ test.serial("Save actions should be chunked above the chunk limit (2/3)", (t) => Store.dispatch(addTreesAndGroupsAction(createTreeMapFromTreeArray(trees), [])); const state = Store.getState(); t.true(state.save.queue.skeleton.length > 1); - t.is(state.save.queue.skeleton[0].actions.length, maximumActionCountPerBatch); + t.is(state.save.queue.skeleton[0].actions.length, MAXIMUM_ACTION_COUNT_PER_BATCH.skeleton); }); test.serial("Save actions should be chunked after compacting (3/3)", (t) => { const nodeCount = 20000; @@ -107,6 +110,6 @@ test.serial("Save actions should be chunked after compacting (3/3)", (t) => { const { skeleton: skeletonSaveQueue } = Store.getState().save.queue; // There should only be one chunk t.is(skeletonSaveQueue.length, 1); - t.true(skeletonSaveQueue[0].actions.length < maximumActionCountPerBatch); + t.true(skeletonSaveQueue[0].actions.length < MAXIMUM_ACTION_COUNT_PER_BATCH.skeleton); t.is(skeletonSaveQueue[0].actions[1].name, "moveTreeComponent"); }); From 9111358b6da62ff4862aa78a91fb2b6d7876301b Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Wed, 16 Aug 2023 13:52:23 +0200 Subject: [PATCH 16/21] fix race condition in AsyncFifoResolver --- .../libs/async/async_fifo_resolver.ts | 9 ++--- .../test/libs/async_fifo_resolver.spec.ts | 33 +++++++++++++++++++ 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/frontend/javascripts/libs/async/async_fifo_resolver.ts b/frontend/javascripts/libs/async/async_fifo_resolver.ts index 75fe7ab5a41..f0e0e52d2cc 100644 --- a/frontend/javascripts/libs/async/async_fifo_resolver.ts +++ b/frontend/javascripts/libs/async/async_fifo_resolver.ts @@ -25,10 +25,11 @@ export class AsyncFifoResolver { async orderedWaitFor(promise: Promise): Promise { this.queue.push(promise); - const promiseCount = this.queue.length; + const promiseCountToAwait = this.queue.length; const retVals = await Promise.all(this.queue); - // Trim awaited promises - this.queue = this.queue.slice(promiseCount); - return retVals[promiseCount - 1]; + // Note that this.queue can have changed during the await. + // Find the index of the promise and trim the queue accordingly. + this.queue = this.queue.slice(this.queue.indexOf(promise) + 1); + return retVals[promiseCountToAwait - 1]; } } diff --git a/frontend/javascripts/test/libs/async_fifo_resolver.spec.ts b/frontend/javascripts/test/libs/async_fifo_resolver.spec.ts index 3f05f614353..53db8fd27e0 100644 --- a/frontend/javascripts/test/libs/async_fifo_resolver.spec.ts +++ b/frontend/javascripts/test/libs/async_fifo_resolver.spec.ts @@ -74,3 +74,36 @@ test("AsyncFifoResolver: New submits shouldn't block old ones.", async (t) => { t.deepEqual(protocol, ["started-1", "started-2", "sleep-finished-1", "finished-1"]); t.is(resolver.queue.length, 1); }); + +test("AsyncFifoResolver: Trimming of queue should work despite race condition potential.", async (t) => { + t.plan(3); + + const { submitter, resolver, protocol } = createSubmitterFnWithProtocol(); + + submitter(1, 100); + const promise = submitter(2, 100); + t.is(resolver.queue.length, 2); + submitter(3, 1000); + + await promise; + submitter(4, 1); + + // Wait until everything is done + await resolver.orderedWaitFor(Promise.resolve()); + + t.deepEqual(protocol, [ + "started-1", + "started-2", + "started-3", + "sleep-finished-1", + "finished-1", + "sleep-finished-2", + "finished-2", + "started-4", + "sleep-finished-4", + "sleep-finished-3", + "finished-3", + "finished-4", + ]); + t.is(resolver.queue.length, 0); +}); From 5dceb0c2438206bea14471deff0a986d49187d62 Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Wed, 16 Aug 2023 15:52:39 +0200 Subject: [PATCH 17/21] fix incorrect dtype in comment --- .../oxalis/model/bucket_data_handling/wkstore_adapter.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts index 13d808c681f..e31ebbe4c26 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/wkstore_adapter.ts @@ -28,7 +28,7 @@ import _ from "lodash"; const decodeFourBit = createWorker(DecodeFourBitWorker); -// For 64-bit buckets with 32^3 voxels, a COMPRESSION_BATCH_SIZE of +// For 32-bit buckets with 32^3 voxels, a COMPRESSION_BATCH_SIZE of // 128 corresponds to 16.8 MB that are sent to a webworker in one // go. const COMPRESSION_BATCH_SIZE = 128; From e1f4a534a95b14d1261e85fbea91007b2420bd57 Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Wed, 16 Aug 2023 17:00:18 +0200 Subject: [PATCH 18/21] update changelog --- CHANGELOG.unreleased.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.unreleased.md b/CHANGELOG.unreleased.md index 7af4f9ccc7a..b67603295a1 100644 --- a/CHANGELOG.unreleased.md +++ b/CHANGELOG.unreleased.md @@ -20,6 +20,9 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released ### Changed - Small messages during annotating (e.g. “finished undo”, “applying mapping…”) are now click-through so they do not block users from selecting tools. [7239](https://github.com/scalableminds/webknossos/pull/7239) +- Annotating volume data uses a transaction-based mechanism now. As a result, WK is more robust against partial saves (i.e., due to a crashing tab). [#7264](https://github.com/scalableminds/webknossos/pull/7264) +- Improved speed of saving volume data. [#7264](https://github.com/scalableminds/webknossos/pull/7264) +- Improved progress indicator when saving volume data. [#7264](https://github.com/scalableminds/webknossos/pull/7264) ### Fixed - Fixed that is was possible to have larger active segment ids that supported by the data type of the segmentation layer which caused the segmentation ids to overflow. [#7240](https://github.com/scalableminds/webknossos/pull/7240) From 3799b22c81af14f3c06eac095510ad87eaa99623 Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Wed, 16 Aug 2023 17:06:20 +0200 Subject: [PATCH 19/21] improve comment --- .../oxalis/model/bucket_data_handling/pushqueue.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts index cae666d0a4d..52ba3d3b16c 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts @@ -120,8 +120,8 @@ class PushQueue { private flushAndSnapshot() { this.waitTimeStartTimeStamp = null; // Flush pendingQueue. Note that it's important to do this synchronously. - // If other actors could add to queue concurrently, the front-end could - // send an inconsistent state for a transaction. + // Otherwise, other actors might add to the queue concurrently during the flush, + // causing an inconsistent state for a transaction. const batch: DataBucket[] = Array.from(this.pendingQueue); this.pendingQueue = new Set(); From 48b19c4ed0f8ed4a625eec32dc30a9ee63e5f0f1 Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Wed, 16 Aug 2023 17:09:52 +0200 Subject: [PATCH 20/21] rename pendingQueue to pendingBuckets --- .../model/bucket_data_handling/pushqueue.ts | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts index 52ba3d3b16c..ba4eeb9fd5d 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts @@ -19,16 +19,16 @@ const PUSH_DEBOUNCE_TIME = 1000; class PushQueue { cube: DataCube; - // The pendingQueue contains all buckets that should be: + // The pendingBuckets contains all buckets that should be: // - snapshotted, // - put into one transaction and then // - saved - // That queue is flushed in a debounced manner so that the time of the + // That set is flushed in a debounced manner so that the time of the // snapshot should be suitable for a transaction (since neither WK nor the // user edited the buckets in a certain time window). - private pendingQueue: Set; + private pendingBuckets: Set; - // Everytime the pendingQueue is flushed, its content is put into a transaction. + // Everytime the pendingBuckets is flushed, its content is put into a transaction. // That transaction is compressed asynchronously before it is sent to the store. // Buckets that are currently being compressed, are counted in this property. private compressingBucketCount: number = 0; @@ -44,12 +44,12 @@ class PushQueue { constructor(cube: DataCube) { this.cube = cube; - this.pendingQueue = new Set(); + this.pendingBuckets = new Set(); } stateSaved(): boolean { return ( - this.pendingQueue.size === 0 && + this.pendingBuckets.size === 0 && this.cube.temporalBucketManager.getCount() === 0 && this.compressingBucketCount === 0 ); @@ -59,15 +59,15 @@ class PushQueue { if (this.waitTimeStartTimeStamp == null) { this.waitTimeStartTimeStamp = Date.now(); } - if (!this.pendingQueue.has(bucket)) { - this.pendingQueue.add(bucket); + if (!this.pendingBuckets.has(bucket)) { + this.pendingBuckets.add(bucket); bucket.dirtyCount++; } this.push(); } - getPendingQueueSize(): number { - return this.pendingQueue.size; + getPendingBucketsSize(): number { + return this.pendingBuckets.size; } getCompressingBucketCount(): number { @@ -87,11 +87,11 @@ class PushQueue { } clear(): void { - this.pendingQueue.clear(); + this.pendingBuckets.clear(); } print(): void { - this.pendingQueue.forEach((e) => console.log(e)); + this.pendingBuckets.forEach((e) => console.log(e)); } pushImpl = function* (this: PushQueue) { @@ -119,11 +119,11 @@ class PushQueue { private flushAndSnapshot() { this.waitTimeStartTimeStamp = null; - // Flush pendingQueue. Note that it's important to do this synchronously. - // Otherwise, other actors might add to the queue concurrently during the flush, + // Flush pendingBuckets. Note that it's important to do this synchronously. + // Otherwise, other actors might add to pendingBuckets concurrently during the flush, // causing an inconsistent state for a transaction. - const batch: DataBucket[] = Array.from(this.pendingQueue); - this.pendingQueue = new Set(); + const batch: DataBucket[] = Array.from(this.pendingBuckets); + this.pendingBuckets = new Set(); // Fire and forget. The correct transaction ordering is ensured // within pushTransaction. From f4b6c89569cf9e4324dfd95b2b6203a72f479835 Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Wed, 16 Aug 2023 17:18:06 +0200 Subject: [PATCH 21/21] fix incorrect method name --- frontend/javascripts/oxalis/model.ts | 2 +- .../javascripts/oxalis/model/bucket_data_handling/pushqueue.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/frontend/javascripts/oxalis/model.ts b/frontend/javascripts/oxalis/model.ts index 78d44b6cdb3..eb80727e193 100644 --- a/frontend/javascripts/oxalis/model.ts +++ b/frontend/javascripts/oxalis/model.ts @@ -301,7 +301,7 @@ export class OxalisModel { ); const waitingForCompressionBucketCount = _.sum( - Utils.values(this.dataLayers).map((dataLayer) => dataLayer.pushQueue.getPendingQueueSize()), + Utils.values(this.dataLayers).map((dataLayer) => dataLayer.pushQueue.getPendingBucketCount()), ); const outstandingBucketDownloadCount = _.sum( diff --git a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts index ba4eeb9fd5d..549d4293858 100644 --- a/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts +++ b/frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts @@ -66,7 +66,7 @@ class PushQueue { this.push(); } - getPendingBucketsSize(): number { + getPendingBucketCount(): number { return this.pendingBuckets.size; }