From d8644d2a907e65cce66e504597cb7841f9e927da Mon Sep 17 00:00:00 2001 From: Florian M Date: Tue, 12 Sep 2023 11:00:15 +0200 Subject: [PATCH 1/6] fix transaction management bug --- .../controllers/TracingController.scala | 43 +++++++++++++++++-- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/controllers/TracingController.scala b/webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/controllers/TracingController.scala index 77bc1dcdae4..d025060fda1 100644 --- a/webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/controllers/TracingController.scala +++ b/webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/controllers/TracingController.scala @@ -1,7 +1,7 @@ package com.scalableminds.webknossos.tracingstore.controllers import com.scalableminds.util.time.Instant -import com.scalableminds.util.tools.Fox +import com.scalableminds.util.tools.{Fox, JsonHelper} import com.scalableminds.util.tools.JsonHelper.{boxFormat, optionFormat} import com.scalableminds.webknossos.datastore.controllers.Controller import com.scalableminds.webknossos.datastore.services.UserAccessRequest @@ -19,7 +19,7 @@ import com.scalableminds.webknossos.tracingstore.{ } import net.liftweb.common.{Empty, Failure, Full} import play.api.i18n.Messages -import play.api.libs.json.{Format, Json} +import play.api.libs.json.{Format, JsArray, Json} import play.api.mvc.{Action, AnyContent, PlayBodyParsers} import scalapb.{GeneratedMessage, GeneratedMessageCompanion} @@ -128,6 +128,8 @@ trait TracingController[T <: GeneratedMessage, Ts <: GeneratedMessage] extends C accessTokenService.validateAccess(UserAccessRequest.writeTracing(tracingId), urlOrHeaderToken(token, request)) { val updateGroups = request.body if (updateGroups.forall(_.transactionGroupCount == 1)) { + logger.info( + s"Received ${updateGroups.length} groups (all single-group-transactions). Committing directly.") commitUpdates(tracingId, updateGroups, urlOrHeaderToken(token, request)).map(_ => Ok) } else { updateGroups @@ -152,6 +154,8 @@ trait TracingController[T <: GeneratedMessage, Ts <: GeneratedMessage] extends C userToken: Option[String]): Fox[Long] = for { previousCommittedVersion: Long <- previousVersionFox + _ = logger.info( + s"Received a group for v${updateGroup.version}, tid ${updateGroup.transactionId} tidx ${updateGroup.transactionGroupIndex} tcnt ${updateGroup.transactionGroupCount}") result <- if (previousCommittedVersion + 1 == updateGroup.version) { if (updateGroup.transactionGroupCount == updateGroup.transactionGroupIndex + 1) { // Received the last group of this transaction @@ -187,10 +191,43 @@ trait TracingController[T <: GeneratedMessage, Ts <: GeneratedMessage] extends C _ <- bool2Fox( previousActionGroupsToCommit .exists(_.transactionGroupIndex == 0) || updateGroup.transactionGroupCount == 1) ?~> s"Trying to commit a transaction without a group that has transactionGroupIndex 0." - commitResult <- commitUpdates(tracingId, previousActionGroupsToCommit :+ updateGroup, userToken) + _ = logger.info( + s"Comitting ${previousActionGroupsToCommit.length} prev actions, and one final with version ${updateGroup.version}, merged into one") + concatenatedGroup <- concatenateUpdateGroupsOfTransaction(previousActionGroupsToCommit, updateGroup) + commitResult <- commitUpdates(tracingId, List(concatenatedGroup), userToken) _ <- tracingService.removeAllUncommittedFor(tracingId, updateGroup.transactionId) } yield commitResult + private def concatenateUpdateGroupsOfTransaction(previousActionGroups: List[UpdateActionGroup[T]], + lastActionGroup: UpdateActionGroup[T]): Fox[UpdateActionGroup[T]] = { + val actionsToCommit = previousActionGroups :+ lastActionGroup + val concatenatedInfoFox: Fox[Option[String]] = if (actionsToCommit.length == 1) { + Fox.successful(lastActionGroup.info) + } else { + val infoStrings: List[String] = actionsToCommit.flatMap(_.info) + for { + infoArrays <- Fox.serialCombined(infoStrings)(infoString => + JsonHelper.parseAndValidateJson[JsArray](infoString)) + concatInfoArray = if (infoArrays.isEmpty) None else Some(infoArrays.reduce(_ ++ _)) + } yield concatInfoArray.map(_.toString) + } + + for { + infoOpt <- concatenatedInfoFox + } yield + UpdateActionGroup[T]( + version = lastActionGroup.version, + timestamp = lastActionGroup.timestamp, + authorId = lastActionGroup.authorId, + actions = actionsToCommit.flatMap(_.actions), + stats = lastActionGroup.stats, + info = infoOpt, + transactionId = f"${lastActionGroup.transactionId}-concatenated", + transactionGroupCount = 1, + transactionGroupIndex = 0, + ) + } + // Perform version check and commit the passed updates private def commitUpdates(tracingId: String, updateGroups: List[UpdateActionGroup[T]], From ce972d1dceb6735bdf9feacd3d26c1e382466cc8 Mon Sep 17 00:00:00 2001 From: Florian M Date: Tue, 12 Sep 2023 11:16:14 +0200 Subject: [PATCH 2/6] cleanup --- CHANGELOG.unreleased.md | 1 + .../controllers/TracingController.scala | 43 ++++++++----------- 2 files changed, 20 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.unreleased.md b/CHANGELOG.unreleased.md index eb53418f197..55950e1dc0d 100644 --- a/CHANGELOG.unreleased.md +++ b/CHANGELOG.unreleased.md @@ -28,6 +28,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released - Fixed reading sharded Zarr 3 data from the local file system. [#7321](https://github.com/scalableminds/webknossos/pull/7321) - Fixed no-bucket data zipfile when downloading volume annotations. [#7323](https://github.com/scalableminds/webknossos/pull/7323) - Fixed too tight assertions when saving annotations, leading to failed save requests. [#7326](https://github.com/scalableminds/webknossos/pull/7326) +- Fixed a bug when saving large amounts of skeleton annotation data at once. [#7329](https://github.com/scalableminds/webknossos/pull/7329) ### Removed diff --git a/webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/controllers/TracingController.scala b/webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/controllers/TracingController.scala index d025060fda1..64370fb0ecc 100644 --- a/webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/controllers/TracingController.scala +++ b/webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/controllers/TracingController.scala @@ -199,35 +199,30 @@ trait TracingController[T <: GeneratedMessage, Ts <: GeneratedMessage] extends C } yield commitResult private def concatenateUpdateGroupsOfTransaction(previousActionGroups: List[UpdateActionGroup[T]], - lastActionGroup: UpdateActionGroup[T]): Fox[UpdateActionGroup[T]] = { - val actionsToCommit = previousActionGroups :+ lastActionGroup - val concatenatedInfoFox: Fox[Option[String]] = if (actionsToCommit.length == 1) { - Fox.successful(lastActionGroup.info) - } else { - val infoStrings: List[String] = actionsToCommit.flatMap(_.info) + lastActionGroup: UpdateActionGroup[T]): Fox[UpdateActionGroup[T]] = + if (previousActionGroups.isEmpty) Fox.successful(lastActionGroup) + else { for { + _ <- Fox.successful(()) + allActionGroups = previousActionGroups :+ lastActionGroup + infoStrings = allActionGroups.flatMap(_.info) infoArrays <- Fox.serialCombined(infoStrings)(infoString => JsonHelper.parseAndValidateJson[JsArray](infoString)) - concatInfoArray = if (infoArrays.isEmpty) None else Some(infoArrays.reduce(_ ++ _)) - } yield concatInfoArray.map(_.toString) + concatInfoArray: Option[JsArray] = if (infoArrays.isEmpty) None else Some(infoArrays.reduce(_ ++ _)) + } yield + UpdateActionGroup[T]( + version = lastActionGroup.version, + timestamp = lastActionGroup.timestamp, + authorId = lastActionGroup.authorId, + actions = allActionGroups.flatMap(_.actions), + stats = lastActionGroup.stats, + info = concatInfoArray.map(_.toString), + transactionId = f"${lastActionGroup.transactionId}-concatenated", + transactionGroupCount = 1, + transactionGroupIndex = 0, + ) } - for { - infoOpt <- concatenatedInfoFox - } yield - UpdateActionGroup[T]( - version = lastActionGroup.version, - timestamp = lastActionGroup.timestamp, - authorId = lastActionGroup.authorId, - actions = actionsToCommit.flatMap(_.actions), - stats = lastActionGroup.stats, - info = infoOpt, - transactionId = f"${lastActionGroup.transactionId}-concatenated", - transactionGroupCount = 1, - transactionGroupIndex = 0, - ) - } - // Perform version check and commit the passed updates private def commitUpdates(tracingId: String, updateGroups: List[UpdateActionGroup[T]], From 06eda2fe5faf6d6d12f28121f5b30d8ba662e21b Mon Sep 17 00:00:00 2001 From: Florian M Date: Tue, 12 Sep 2023 11:28:26 +0200 Subject: [PATCH 3/6] info does not need concatenating --- .../controllers/TracingController.scala | 41 +++++++------------ 1 file changed, 15 insertions(+), 26 deletions(-) diff --git a/webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/controllers/TracingController.scala b/webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/controllers/TracingController.scala index 64370fb0ecc..fecf050ba30 100644 --- a/webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/controllers/TracingController.scala +++ b/webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/controllers/TracingController.scala @@ -128,8 +128,6 @@ trait TracingController[T <: GeneratedMessage, Ts <: GeneratedMessage] extends C accessTokenService.validateAccess(UserAccessRequest.writeTracing(tracingId), urlOrHeaderToken(token, request)) { val updateGroups = request.body if (updateGroups.forall(_.transactionGroupCount == 1)) { - logger.info( - s"Received ${updateGroups.length} groups (all single-group-transactions). Committing directly.") commitUpdates(tracingId, updateGroups, urlOrHeaderToken(token, request)).map(_ => Ok) } else { updateGroups @@ -191,36 +189,27 @@ trait TracingController[T <: GeneratedMessage, Ts <: GeneratedMessage] extends C _ <- bool2Fox( previousActionGroupsToCommit .exists(_.transactionGroupIndex == 0) || updateGroup.transactionGroupCount == 1) ?~> s"Trying to commit a transaction without a group that has transactionGroupIndex 0." - _ = logger.info( - s"Comitting ${previousActionGroupsToCommit.length} prev actions, and one final with version ${updateGroup.version}, merged into one") - concatenatedGroup <- concatenateUpdateGroupsOfTransaction(previousActionGroupsToCommit, updateGroup) + concatenatedGroup = concatenateUpdateGroupsOfTransaction(previousActionGroupsToCommit, updateGroup) commitResult <- commitUpdates(tracingId, List(concatenatedGroup), userToken) _ <- tracingService.removeAllUncommittedFor(tracingId, updateGroup.transactionId) } yield commitResult private def concatenateUpdateGroupsOfTransaction(previousActionGroups: List[UpdateActionGroup[T]], - lastActionGroup: UpdateActionGroup[T]): Fox[UpdateActionGroup[T]] = - if (previousActionGroups.isEmpty) Fox.successful(lastActionGroup) + lastActionGroup: UpdateActionGroup[T]): UpdateActionGroup[T] = + if (previousActionGroups.isEmpty) lastActionGroup else { - for { - _ <- Fox.successful(()) - allActionGroups = previousActionGroups :+ lastActionGroup - infoStrings = allActionGroups.flatMap(_.info) - infoArrays <- Fox.serialCombined(infoStrings)(infoString => - JsonHelper.parseAndValidateJson[JsArray](infoString)) - concatInfoArray: Option[JsArray] = if (infoArrays.isEmpty) None else Some(infoArrays.reduce(_ ++ _)) - } yield - UpdateActionGroup[T]( - version = lastActionGroup.version, - timestamp = lastActionGroup.timestamp, - authorId = lastActionGroup.authorId, - actions = allActionGroups.flatMap(_.actions), - stats = lastActionGroup.stats, - info = concatInfoArray.map(_.toString), - transactionId = f"${lastActionGroup.transactionId}-concatenated", - transactionGroupCount = 1, - transactionGroupIndex = 0, - ) + val allActionGroups = previousActionGroups :+ lastActionGroup + UpdateActionGroup[T]( + version = lastActionGroup.version, + timestamp = lastActionGroup.timestamp, + authorId = lastActionGroup.authorId, + actions = allActionGroups.flatMap(_.actions), + stats = lastActionGroup.stats, // the latest stats do count + info = lastActionGroup.info, // frontend sets this identically for all groups of transaction + transactionId = f"${lastActionGroup.transactionId}-concatenated", + transactionGroupCount = 1, + transactionGroupIndex = 0, + ) } // Perform version check and commit the passed updates From 7bad3ca0597c066797f928a7cab6051bf42d560a Mon Sep 17 00:00:00 2001 From: Florian M Date: Tue, 12 Sep 2023 11:30:18 +0200 Subject: [PATCH 4/6] cleanup --- .../tracingstore/controllers/TracingController.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/controllers/TracingController.scala b/webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/controllers/TracingController.scala index fecf050ba30..70243ea357e 100644 --- a/webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/controllers/TracingController.scala +++ b/webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/controllers/TracingController.scala @@ -1,7 +1,7 @@ package com.scalableminds.webknossos.tracingstore.controllers import com.scalableminds.util.time.Instant -import com.scalableminds.util.tools.{Fox, JsonHelper} +import com.scalableminds.util.tools.Fox import com.scalableminds.util.tools.JsonHelper.{boxFormat, optionFormat} import com.scalableminds.webknossos.datastore.controllers.Controller import com.scalableminds.webknossos.datastore.services.UserAccessRequest @@ -19,7 +19,7 @@ import com.scalableminds.webknossos.tracingstore.{ } import net.liftweb.common.{Empty, Failure, Full} import play.api.i18n.Messages -import play.api.libs.json.{Format, JsArray, Json} +import play.api.libs.json.{Format, Json} import play.api.mvc.{Action, AnyContent, PlayBodyParsers} import scalapb.{GeneratedMessage, GeneratedMessageCompanion} @@ -152,8 +152,6 @@ trait TracingController[T <: GeneratedMessage, Ts <: GeneratedMessage] extends C userToken: Option[String]): Fox[Long] = for { previousCommittedVersion: Long <- previousVersionFox - _ = logger.info( - s"Received a group for v${updateGroup.version}, tid ${updateGroup.transactionId} tidx ${updateGroup.transactionGroupIndex} tcnt ${updateGroup.transactionGroupCount}") result <- if (previousCommittedVersion + 1 == updateGroup.version) { if (updateGroup.transactionGroupCount == updateGroup.transactionGroupIndex + 1) { // Received the last group of this transaction From 1d0bf5ec86ea6053a264ae3564c7029631efe3f5 Mon Sep 17 00:00:00 2001 From: Florian M Date: Tue, 12 Sep 2023 11:43:19 +0200 Subject: [PATCH 5/6] cleaner version check --- .../tracingstore/controllers/TracingController.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/controllers/TracingController.scala b/webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/controllers/TracingController.scala index 70243ea357e..cf28ab9e0f5 100644 --- a/webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/controllers/TracingController.scala +++ b/webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/controllers/TracingController.scala @@ -226,8 +226,7 @@ trait TracingController[T <: GeneratedMessage, Ts <: GeneratedMessage] extends C remoteWebKnossosClient.reportTracingUpdates(report).flatMap { _ => updateGroups.foldLeft(currentCommittedVersion) { (previousVersion, updateGroup) => previousVersion.flatMap { prevVersion: Long => - val versionIncrement = if (updateGroup.transactionGroupIndex == 0) 1 else 0 // version increment happens at the start of each transaction - if (prevVersion + versionIncrement == updateGroup.version) { + if (prevVersion + 1 == updateGroup.version) { tracingService .handleUpdateGroup(tracingId, updateGroup, prevVersion, userToken) .flatMap( From a0abef19d1f8d307c4894692617a7d990f1c0fdd Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Tue, 12 Sep 2023 11:58:32 +0200 Subject: [PATCH 6/6] disable transactions for editable mappings in front-end --- .../javascripts/oxalis/model/sagas/save_saga_constants.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/frontend/javascripts/oxalis/model/sagas/save_saga_constants.ts b/frontend/javascripts/oxalis/model/sagas/save_saga_constants.ts index 28d90432790..3633a96e587 100644 --- a/frontend/javascripts/oxalis/model/sagas/save_saga_constants.ts +++ b/frontend/javascripts/oxalis/model/sagas/save_saga_constants.ts @@ -14,11 +14,11 @@ export const SETTINGS_MAX_RETRY_COUNT = 20; // 20 * 15s == 5m export const MAXIMUM_ACTION_COUNT_PER_BATCH = { skeleton: 5000, volume: 1000, // Since volume saving is slower, use a lower value here. - mapping: 5000, + mapping: Infinity, // The back-end does not accept transactions for mappings. } as const; export const MAXIMUM_ACTION_COUNT_PER_SAVE = { skeleton: 15000, volume: 3000, - mapping: 15000, + mapping: Infinity, // The back-end does not accept transactions for mappings. } as const;