Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Silently ignore duplicate tracing save requests #3767

Merged
merged 18 commits into from
Feb 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.md).
- The modals for a new task description and recommended task settings are no longer shown in read-only tracings. [#3724](https://github.com/scalableminds/webknossos/pull/3724)
- Fixed a rendering bug when opening a task that only allowed flight/oblique mode tracing. [#3783](https://github.com/scalableminds/webknossos/pull/3783)
- Fixed a bug where some NMLs caused the webKnossos tab to freeze during NML upload. [#3758](https://github.com/scalableminds/webknossos/pull/3758)
- Fixed a bug where some skeleton save requests were wrongly rejected if they were sent more than once. [#3767](https://github.com/scalableminds/webknossos/pull/3767)


## [19.02.0](https://github.com/scalableminds/webknossos/releases/tag/19.02.0) - 2019-02-04
Expand Down
9 changes: 9 additions & 0 deletions frontend/javascripts/libs/uid_generator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// @flow

export function getUid(): string {
return Math.random()
.toString(36)
.substr(2, 10);
}

export default {};
3 changes: 2 additions & 1 deletion frontend/javascripts/oxalis/model/reducers/save_reducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ function SaveReducer(state: OxalisState, action: Action): OxalisState {
[action.tracingType]: {
$push: [
{
// Placeholder, the version number will be updated before sending to the server
// Placeholder, the version number and requestId will be updated before sending to the server
version: -1,
requestId: "",
timestamp: Date.now(),
actions: items,
stats,
Expand Down
73 changes: 42 additions & 31 deletions frontend/javascripts/oxalis/model/sagas/save_saga.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import Request, { type RequestOptionsWithData } from "libs/request";
import Toast from "libs/toast";
import messages from "messages";
import window, { alert, document, location } from "libs/window";
import { getUid } from "libs/uid_generator";

import { enforceSkeletonTracing } from "../accessors/skeletontracing_accessor";

Expand Down Expand Up @@ -162,10 +163,7 @@ function getRetryWaitTime(retryCount: number) {
return Math.min(2 ** retryCount * SAVE_RETRY_WAITING_TIME, MAX_SAVE_RETRY_WAITING_TIME);
}

export function* sendRequestToServer(
tracingType: "skeleton" | "volume",
retryCount: number = 0,
): Saga<void> {
export function* sendRequestToServer(tracingType: "skeleton" | "volume"): Saga<void> {
const fullSaveQueue = yield* select(state => state.save.queue[tracingType]);
const saveQueue = sliceAppropriateBatchCount(fullSaveQueue);

Expand All @@ -176,35 +174,41 @@ export function* sendRequestToServer(
const tracingStoreUrl = yield* select(state => state.tracing.tracingStore.url);
compactedSaveQueue = addVersionNumbers(compactedSaveQueue, version);

try {
yield* call(
sendRequestWithToken,
`${tracingStoreUrl}/tracings/${type}/${tracingId}/update?token=`,
{
method: "POST",
headers: { "X-Date": `${Date.now()}` },
data: compactedSaveQueue,
compress: true,
},
);
yield* put(setVersionNumberAction(version + compactedSaveQueue.length, tracingType));
yield* put(setLastSaveTimestampAction(tracingType));
yield* put(shiftSaveQueueAction(saveQueue.length, tracingType));
yield* call(toggleErrorHighlighting, false);
} catch (error) {
yield* call(toggleErrorHighlighting, true);
if (error.status === 409) {
// HTTP Code 409 'conflict' for dirty state
window.onbeforeunload = null;
yield* call(alert, messages["save.failed_simultaneous_tracing"]);
location.reload();
compactedSaveQueue = addRequestIds(compactedSaveQueue, getUid());

let retryCount = 0;
while (true) {
try {
yield* call(
sendRequestWithToken,
`${tracingStoreUrl}/tracings/${type}/${tracingId}/update?token=`,
{
method: "POST",
headers: { "X-Date": `${Date.now()}` },
data: compactedSaveQueue,
compress: true,
},
);
yield* put(setVersionNumberAction(version + compactedSaveQueue.length, tracingType));
yield* put(setLastSaveTimestampAction(tracingType));
yield* put(shiftSaveQueueAction(saveQueue.length, tracingType));
yield* call(toggleErrorHighlighting, false);
return;
} catch (error) {
yield* call(toggleErrorHighlighting, true);
if (error.status === 409) {
// HTTP Code 409 'conflict' for dirty state
window.onbeforeunload = null;
yield* call(alert, messages["save.failed_simultaneous_tracing"]);
location.reload();
return;
}
yield* race({
timeout: _call(delay, getRetryWaitTime(retryCount)),
forcePush: _take("SAVE_NOW"),
});
retryCount++;
}
yield* race({
timeout: _call(delay, getRetryWaitTime(retryCount)),
forcePush: _take("SAVE_NOW"),
});
yield* call(sendRequestToServer, tracingType, retryCount + 1);
}
}

Expand All @@ -226,6 +230,13 @@ export function addVersionNumbers(
return updateActionsBatches.map(batch => Object.assign({}, batch, { version: ++lastVersion }));
}

export function addRequestIds(
updateActionsBatches: Array<SaveQueueEntry>,
requestId: string,
): Array<SaveQueueEntry> {
return updateActionsBatches.map(batch => Object.assign({}, batch, { requestId }));
}

function removeUnrelevantUpdateActions(updateActions: Array<UpdateAction>) {
// This functions removes update actions that should not be sent to the server.
return updateActions.filter(ua => ua.name !== "toggleTree");
Expand Down
1 change: 1 addition & 0 deletions frontend/javascripts/oxalis/store.js
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ export type SaveQueueEntry = {
actions: Array<UpdateAction>,
stats: ?SkeletonTracingStats,
info: string,
requestId: string,
};

export type ProgressInfo = {
Expand Down
1 change: 1 addition & 0 deletions frontend/javascripts/test/helpers/saveHelpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { UpdateAction } from "oxalis/model/sagas/update_actions";
export function createSaveQueueFromUpdateActions(updateActions, timestamp, stats = null) {
return updateActions.map(ua => ({
version: -1,
requestId: "",
timestamp,
stats,
actions: [].concat(ua),
Expand Down
45 changes: 31 additions & 14 deletions frontend/javascripts/test/sagas/save_saga.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ const DateMock = {
};
mockRequire("libs/date", DateMock);

const REQUEST_ID = "sc5na1wy1r";
const UidMock = {
getUid: () => REQUEST_ID,
};
mockRequire("libs/uid_generator", UidMock);

mockRequire("oxalis/model/sagas/root_saga", function*() {
yield;
});
Expand All @@ -32,6 +38,7 @@ const {
sendRequestToServer,
toggleErrorHighlighting,
addVersionNumbers,
addRequestIds,
sendRequestWithToken,
} = mockRequire.reRequire("oxalis/model/sagas/save_saga");

Expand Down Expand Up @@ -120,7 +127,10 @@ test("SaveSaga should send request to server", t => {
saga.next();
saga.next(saveQueue);
saga.next({ version: LAST_VERSION, type: TRACING_TYPE, tracingId: "1234567890" });
const saveQueueWithVersions = addVersionNumbers(saveQueue, LAST_VERSION);
const saveQueueWithVersions = addRequestIds(
addVersionNumbers(saveQueue, LAST_VERSION),
REQUEST_ID,
);
expectValueDeepEqual(
t,
saga.next(TRACINGSTORE_URL),
Expand All @@ -138,28 +148,32 @@ test("SaveSaga should retry update actions", t => {
[UpdateActions.createEdge(1, 0, 1), UpdateActions.createEdge(1, 1, 2)],
TIMESTAMP,
);

const saga = sendRequestToServer(TRACING_TYPE);
saga.next();
saga.next(saveQueue);
saga.next({ version: LAST_VERSION, type: TRACING_TYPE, tracingId: "1234567890" });
const saveQueueWithVersions = addVersionNumbers(saveQueue, LAST_VERSION);
expectValueDeepEqual(
t,
saga.next(TRACINGSTORE_URL),
call(sendRequestWithToken, `${TRACINGSTORE_URL}/tracings/skeleton/1234567890/update?token=`, {
const saveQueueWithVersions = addRequestIds(
addVersionNumbers(saveQueue, LAST_VERSION),
REQUEST_ID,
);
const requestWithTokenCall = call(
sendRequestWithToken,
`${TRACINGSTORE_URL}/tracings/skeleton/1234567890/update?token=`,
{
method: "POST",
headers: { "X-Date": `${TIMESTAMP}` },
data: saveQueueWithVersions,
compress: true,
}),
},
);

const saga = sendRequestToServer(TRACING_TYPE);
saga.next();
saga.next(saveQueue);
saga.next({ version: LAST_VERSION, type: TRACING_TYPE, tracingId: "1234567890" });
expectValueDeepEqual(t, saga.next(TRACINGSTORE_URL), requestWithTokenCall);

expectValueDeepEqual(t, saga.throw("Timeout"), call(toggleErrorHighlighting, true));
// wait for retry
saga.next();
// should retry
expectValueDeepEqual(t, saga.next(), call(sendRequestToServer, TRACING_TYPE, 1));
expectValueDeepEqual(t, saga.next(), requestWithTokenCall);
});

test("SaveSaga should escalate on permanent client error update actions", t => {
Expand All @@ -172,7 +186,10 @@ test("SaveSaga should escalate on permanent client error update actions", t => {
saga.next();
saga.next(saveQueue);
saga.next({ version: LAST_VERSION, type: TRACING_TYPE, tracingId: "1234567890" });
const saveQueueWithVersions = addVersionNumbers(saveQueue, LAST_VERSION);
const saveQueueWithVersions = addRequestIds(
addVersionNumbers(saveQueue, LAST_VERSION),
REQUEST_ID,
);
expectValueDeepEqual(
t,
saga.next(TRACINGSTORE_URL),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ class TemporaryStore[K, V] @Inject()(system: ActorSystem) {
map.get(id)
}

def contains(id: K) =
map.synchronized(
map.contains(id)
)

def findAll =
map.synchronized {
map.values.toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.scalableminds.webknossos.tracingstore.tracings.TracingSelector
import com.scalableminds.webknossos.tracingstore.tracings.skeleton._
import com.scalableminds.util.tools.JsonHelper.boxFormat
import com.scalableminds.util.tools.JsonHelper.optionFormat
import com.scalableminds.webknossos.datastore.storage.TemporaryStore
import play.api.i18n.Messages
import play.api.libs.json.Json
import play.api.mvc.PlayBodyParsers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import com.scalableminds.webknossos.tracingstore.{TracingStoreAccessTokenService
import com.scalableminds.webknossos.tracingstore.tracings.{TracingSelector, TracingService, UpdateAction, UpdateActionGroup}
import com.scalableminds.util.tools.JsonHelper.boxFormat
import com.scalableminds.util.tools.JsonHelper.optionFormat
import com.scalableminds.webknossos.datastore.storage.TemporaryStore
import net.liftweb.common.Failure
import play.api.i18n.Messages
import scala.concurrent.duration._
import play.api.libs.json.{Json, Reads}
import play.api.mvc.PlayBodyParsers
import scalapb.{GeneratedMessage, GeneratedMessageCompanion, Message}
Expand Down Expand Up @@ -99,6 +101,7 @@ trait TracingController[T <: GeneratedMessage with Message[T],
}
}


def update(tracingId: String) = Action.async(validateJson[List[UpdateActionGroup[T]]]) { implicit request =>
log {
accessTokenService.validateAccess(UserAccessRequest.writeTracing(tracingId)) {
Expand All @@ -110,11 +113,18 @@ trait TracingController[T <: GeneratedMessage with Message[T],
val userToken = request.getQueryString("token")
webKnossosServer.reportTracingUpdates(tracingId, timestamps, latestStatistics, userToken).flatMap { _ =>
updateGroups.foldLeft(currentVersion) { (previousVersion, updateGroup) =>
previousVersion.flatMap { version =>
if (version + 1 == updateGroup.version || freezeVersions) {
tracingService.handleUpdateGroup(tracingId, updateGroup, version).map(_ => if (freezeVersions) version else updateGroup.version)
previousVersion.flatMap { prevVersion =>
if (prevVersion + 1 == updateGroup.version || freezeVersions) {
tracingService.handleUpdateGroup(tracingId, updateGroup, prevVersion)
.map(_ => Fox.successful(tracingService.saveToHandledGroupCache(tracingId, updateGroup.version, updateGroup.requestId)))
.map(_ => if (freezeVersions) prevVersion else updateGroup.version)
} else {
Failure(s"Incorrect version. Expected: ${version + 1}; Got: ${updateGroup.version}") ~> CONFLICT
if ( updateGroup.requestId.exists(requestId => tracingService.handledGroupCacheContains(requestId, tracingId, updateGroup.version))) {
//this update group was received and successfully saved in a previous request. silently ignore this duplicate request
Fox.successful(if (freezeVersions) prevVersion else updateGroup.version)
} else {
Failure(s"Incorrect version. Expected: ${prevVersion + 1}; Got: ${updateGroup.version}") ~> CONFLICT
}
}
}
}
Expand All @@ -123,4 +133,5 @@ trait TracingController[T <: GeneratedMessage with Message[T],
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import com.scalableminds.webknossos.tracingstore.tracings._
import com.scalableminds.webknossos.tracingstore.tracings.volume.VolumeTracingService
import com.scalableminds.util.tools.JsonHelper.boxFormat
import com.scalableminds.util.tools.JsonHelper.optionFormat
import com.scalableminds.webknossos.datastore.storage.TemporaryStore
import play.api.i18n.Messages
import play.api.libs.iteratee.Enumerator
import play.api.libs.iteratee.streams.IterateeStreams
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.scalableminds.webknossos.tracingstore.tracings
import java.util.UUID

import com.scalableminds.util.tools.{Fox, FoxImplicits}
import com.scalableminds.webknossos.datastore.storage.TemporaryStore
import scalapb.{GeneratedMessage, GeneratedMessageCompanion, Message}
import com.typesafe.scalalogging.LazyLogging
import play.api.libs.json.Reads
Expand All @@ -12,12 +13,16 @@ import scala.concurrent.duration._

trait TracingService[T <: GeneratedMessage with Message[T]] extends KeyValueStoreImplicits with FoxImplicits with LazyLogging {

val handledGroupCacheExpiry: FiniteDuration = 5 minutes

def tracingType: TracingType.Value

def tracingStore: FossilDBClient

def temporaryTracingStore: TemporaryTracingStore[T]

val handledGroupCache: TemporaryStore[(String, String, Long), Unit]

implicit def tracingCompanion: GeneratedMessageCompanion[T]

implicit val updateActionReads: Reads[UpdateAction[T]]
Expand Down Expand Up @@ -66,4 +71,13 @@ trait TracingService[T <: GeneratedMessage with Message[T]] extends KeyValueStor
tracingStore.put(id, version, tracing).map(_ => id)
}
}

def saveToHandledGroupCache(tracingId: String, version: Long, requestIdOpt: Option[String]): Unit = {
requestIdOpt.foreach { requestId =>
handledGroupCache.insert((requestId, tracingId, version), (), Some(handledGroupCacheExpiry))
}
}

def handledGroupCacheContains(requestId: String, tracingId: String, version: Long) =
handledGroupCache.contains(requestId, tracingId, version)
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ case class UpdateActionGroup[T <: GeneratedMessage with Message[T]](
timestamp: Long,
actions: List[UpdateAction[T]],
stats: Option[JsObject],
info: Option[String])
info: Option[String],
requestId: Option[String]
)

object UpdateActionGroup {

Expand All @@ -41,8 +43,9 @@ object UpdateActionGroup {
actions <- json.validate((JsPath \ "actions").read[List[UpdateAction[T]]])
stats <- json.validate((JsPath \ "stats").readNullable[JsObject])
info <- json.validate((JsPath \ "info").readNullable[String])
id <- json.validate((JsPath \ "requestId").readNullable[String])
} yield {
UpdateActionGroup[T](version, timestamp, actions, stats, info)
UpdateActionGroup[T](version, timestamp, actions, stats, info, id)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.scalableminds.webknossos.tracingstore.tracings.skeleton
import com.google.inject.Inject
import com.scalableminds.util.geometry.BoundingBox
import com.scalableminds.util.tools.{Fox, FoxImplicits, TextUtils}
import com.scalableminds.webknossos.datastore.storage.TemporaryStore
import com.scalableminds.webknossos.tracingstore.SkeletonTracing.SkeletonTracing
import com.scalableminds.webknossos.tracingstore.tracings.UpdateAction.SkeletonUpdateAction
import com.scalableminds.webknossos.tracingstore.tracings._
Expand All @@ -13,7 +14,8 @@ import play.api.libs.json.{JsObject, Json, Writes}
import scala.concurrent.ExecutionContext

class SkeletonTracingService @Inject()(tracingDataStore: TracingDataStore,
val temporaryTracingStore: TemporaryTracingStore[SkeletonTracing])
val temporaryTracingStore: TemporaryTracingStore[SkeletonTracing],
val handledGroupCache: TemporaryStore[(String, String, Long), Unit])
(implicit ec: ExecutionContext)
extends TracingService[SkeletonTracing]
with KeyValueStoreImplicits
Expand Down
Loading