Skip to content

Commit

Permalink
Annotation Locking Mechanism (#6819)
Browse files Browse the repository at this point in the history
* add route to test-and-acquire annotation mutex

* compact writes

* move mutex storage to postgres

* naming

* POST

* WIP: log time for annotation mutex request

* move to request logging trait

* lint

* Adding frontent requesting mutex for annotations

* WIP: add tests for frontend mutex acquire saga

* finish writing tests for frontend mutex acquire saga

* clean up

* add feedback

* add version assertions to sql evolutions

* apply feedback and fix annotation_saga tests

* Apply suggestions from code review

Co-authored-by: Philipp Otto <[email protected]>

* apply pr feedback
 - fix logic bug
 - always disable updating when the mutex acquiring results in an exception
 - add test covering case where the mutex can be acquired after some requests

* add additional visual indications when annotation is locked

* don't show annotation version mismatch while not having the annotations mutex

---------

Co-authored-by: Michael Büßemeyer <[email protected]>
Co-authored-by: MichaelBuessemeyer <[email protected]>
Co-authored-by: Philipp Otto <[email protected]>
  • Loading branch information
4 people authored Feb 23, 2023
1 parent fa47f73 commit a6075d3
Show file tree
Hide file tree
Showing 30 changed files with 826 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released
### Added
- Remote datasets can now also be streamed from Google Cloud Storage URIs (`gs://`). [#6775](https://github.com/scalableminds/webknossos/pull/6775)
- Remote volume datasets in the neuroglancer precomputed format can now be viewed in WEBKNOSSOS. [#6716](https://github.com/scalableminds/webknossos/pull/6716)
- If an annotation that others are allowed to edit is opened, it will now be automatically locked. This prevents conflicts when multiple users try to edit it at the same time. [#6819](https://github.com/scalableminds/webknossos/pull/6819)
- Added new mesh-related menu items to the context menu when a mesh is hovered in the 3d viewport. [#](https://github.com/scalableminds/webknossos/pull/6813)
- Highlight 'organization owner' in Admin>User page. [#6832](https://github.com/scalableminds/webknossos/pull/6832)
- Added functions to get and set segment colors to the frontend API (`api.data.{getSegmentColor,setSegmentColor}`). [#6853](https://github.com/scalableminds/webknossos/pull/6853)
Expand Down
3 changes: 2 additions & 1 deletion app/WebKnossosModule.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import com.google.inject.AbstractModule
import controllers.InitialDataService
import models.analytics.AnalyticsSessionService
import models.annotation.AnnotationStore
import models.annotation.{AnnotationMutexService, AnnotationStore}
import models.binary.DataSetService
import models.job.{JobService, WorkerLivenessService}
import models.storage.UsedStorageService
Expand All @@ -26,6 +26,7 @@ class WebKnossosModule extends AbstractModule {
bind(classOf[UserDataSetConfigurationDAO]).asEagerSingleton()
bind(classOf[UserCache]).asEagerSingleton()
bind(classOf[AnnotationStore]).asEagerSingleton()
bind(classOf[AnnotationMutexService]).asEagerSingleton()
bind(classOf[DataSetService]).asEagerSingleton()
bind(classOf[TimeSpanService]).asEagerSingleton()
bind(classOf[TempFileService]).asEagerSingleton()
Expand Down
23 changes: 22 additions & 1 deletion app/controllers/AnnotationController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import models.team.{TeamDAO, TeamService}
import models.user.time._
import models.user.{User, UserDAO, UserService}
import oxalis.mail.{MailchimpClient, MailchimpTag}
import oxalis.security.{URLSharing, WkEnv}
import oxalis.security.{URLSharing, UserAwareRequestLogging, WkEnv}
import oxalis.telemetry.SlackNotificationService
import play.api.i18n.{Messages, MessagesProvider}
import play.api.libs.json.Json.WithDefaultValues
import play.api.libs.json._
Expand Down Expand Up @@ -54,6 +55,7 @@ class AnnotationController @Inject()(
dataSetDAO: DataSetDAO,
dataSetService: DataSetService,
annotationService: AnnotationService,
annotationMutexService: AnnotationMutexService,
userService: UserService,
teamService: TeamService,
projectDAO: ProjectDAO,
Expand All @@ -64,10 +66,12 @@ class AnnotationController @Inject()(
provider: AnnotationInformationProvider,
annotationRestrictionDefaults: AnnotationRestrictionDefaults,
analyticsService: AnalyticsService,
slackNotificationService: SlackNotificationService,
mailchimpClient: MailchimpClient,
conf: WkConf,
sil: Silhouette[WkEnv])(implicit ec: ExecutionContext, bodyParsers: PlayBodyParsers)
extends Controller
with UserAwareRequestLogging
with FoxImplicits {

implicit val timeout: Timeout = Timeout(5 seconds)
Expand Down Expand Up @@ -559,6 +563,7 @@ class AnnotationController @Inject()(
sil.SecuredAction.async { implicit request =>
for {
annotation <- provider.provideAnnotation(typ, id, request.identity)
_ <- bool2Fox(annotation.typ == AnnotationType.Explorational || annotation.typ == AnnotationType.Task) ?~> "annotation.othersMayEdit.onlyExplorationalOrTask"
_ <- bool2Fox(annotation._user == request.identity._id) ?~> "notAllowed" ~> FORBIDDEN
_ <- annotationDAO.updateOthersMayEdit(annotation._id, othersMayEdit)
} yield Ok(Json.toJson(othersMayEdit))
Expand Down Expand Up @@ -601,4 +606,20 @@ class AnnotationController @Inject()(
}
} yield annotationLayer.copy(tracingId = newTracingId)

@ApiOperation(hidden = true, value = "")
def tryAcquiringAnnotationMutex(id: String): Action[AnyContent] =
sil.SecuredAction.async { implicit request =>
logTime(slackNotificationService.noticeSlowRequest, durationThreshold = 1 second) {
for {
idValidated <- ObjectId.fromString(id)
annotation <- provider.provideAnnotation(id, request.identity) ~> NOT_FOUND
_ <- bool2Fox(annotation.othersMayEdit) ?~> "notAllowed" ~> FORBIDDEN
restrictions <- provider.restrictionsFor(AnnotationIdentifier(annotation.typ, idValidated)) ?~> "restrictions.notFound" ~> NOT_FOUND
_ <- restrictions.allowUpdate(request.identity) ?~> "notAllowed" ~> FORBIDDEN
mutexResult <- annotationMutexService.tryAcquiringAnnotationMutex(annotation._id, request.identity._id) ?~> "annotation.mutex.failed"
resultJson <- annotationMutexService.publicWrites(mutexResult)
} yield Ok(resultJson)
}
}

}
119 changes: 119 additions & 0 deletions app/models/annotation/AnnotationMutexService.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package models.annotation

import akka.actor.ActorSystem
import com.scalableminds.util.accesscontext.GlobalAccessContext
import com.scalableminds.util.time.Instant
import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.helpers.IntervalScheduler
import com.scalableminds.webknossos.schema.Tables.AnnotationMutexesRow
import com.typesafe.scalalogging.LazyLogging
import models.user.{UserDAO, UserService}
import net.liftweb.common.Full
import play.api.inject.ApplicationLifecycle
import play.api.libs.json.{JsObject, Json}
import utils.{ObjectId, WkConf}
import utils.sql.{SimpleSQLDAO, SqlClient}

import javax.inject.Inject
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.{DurationInt, FiniteDuration}

case class AnnotationMutex(annotationId: ObjectId, userId: ObjectId, expiry: Instant)

case class MutexResult(canEdit: Boolean, blockedByUser: Option[ObjectId])

class AnnotationMutexService @Inject()(val lifecycle: ApplicationLifecycle,
val system: ActorSystem,
wkConf: WkConf,
userDAO: UserDAO,
userService: UserService,
annotationMutexDAO: AnnotationMutexDAO)
extends IntervalScheduler
with LazyLogging {

override protected def tickerInterval: FiniteDuration = 1 hour

override protected def tick(): Unit = {
logger.info("Cleaning up expired annotation mutexes...")
annotationMutexDAO.deleteExpired()
()
}

private val defaultExpiryTime = wkConf.WebKnossos.Annotation.Mutex.expiryTime

def tryAcquiringAnnotationMutex(annotationId: ObjectId, userId: ObjectId)(
implicit ec: ExecutionContext): Fox[MutexResult] =
this.synchronized {
for {
mutexBox <- annotationMutexDAO.findOne(annotationId).futureBox
result <- mutexBox match {
case Full(mutex) =>
if (mutex.userId == userId)
refresh(mutex)
else
Fox.successful(MutexResult(canEdit = false, blockedByUser = Some(mutex.userId)))
case _ =>
acquire(annotationId, userId)
}
} yield result
}

private def acquire(annotationId: ObjectId, userId: ObjectId): Fox[MutexResult] =
for {
_ <- annotationMutexDAO.upsertOne(AnnotationMutex(annotationId, userId, Instant.in(defaultExpiryTime)))
} yield MutexResult(canEdit = true, None)

private def refresh(mutex: AnnotationMutex): Fox[MutexResult] =
for {
_ <- annotationMutexDAO.upsertOne(mutex.copy(expiry = Instant.in(defaultExpiryTime)))
} yield MutexResult(canEdit = true, None)

def publicWrites(mutexResult: MutexResult)(implicit ec: ExecutionContext): Fox[JsObject] =
for {
userOpt <- Fox.runOptional(mutexResult.blockedByUser)(user => userDAO.findOne(user)(GlobalAccessContext))
userJsonOpt <- Fox.runOptional(userOpt)(user => userService.compactWrites(user))
} yield
Json.obj(
"canEdit" -> mutexResult.canEdit,
"blockedByUser" -> userJsonOpt
)

}

class AnnotationMutexDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext)
extends SimpleSQLDAO(sqlClient) {

private def parse(r: AnnotationMutexesRow): AnnotationMutex =
AnnotationMutex(
ObjectId(r._Annotation),
ObjectId(r._User),
Instant.fromSql(r.expiry)
)

def findOne(annotationId: ObjectId): Fox[AnnotationMutex] =
for {
rows <- run(q"""SELECT _annotation, _user, expiry
FROM webknossos.annotation_mutexes
WHERE _annotation = $annotationId
AND expiry > NOW()""".as[AnnotationMutexesRow])
first <- rows.headOption
parsed = parse(first)
} yield parsed

def upsertOne(annotationMutex: AnnotationMutex): Fox[Unit] =
for {
_ <- run(q"""INSERT INTO webknossos.annotation_mutexes(_annotation, _user, expiry)
VALUES(${annotationMutex.annotationId}, ${annotationMutex.userId}, ${annotationMutex.expiry})
ON CONFLICT (_annotation)
DO UPDATE SET
_user = ${annotationMutex.userId},
expiry = ${annotationMutex.expiry}
""".asUpdate)
} yield ()

def deleteExpired(): Fox[Unit] =
for {
_ <- run(q"DELETE FROM webknossos.annotation_mutexes WHERE expiry < NOW()".asUpdate)
} yield ()

}
5 changes: 2 additions & 3 deletions app/models/annotation/AnnotationService.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package models.annotation

import java.io.{BufferedOutputStream, File, FileOutputStream}
import akka.actor.ActorSystem
import akka.stream.Materializer
import com.scalableminds.util.accesscontext.{AuthorizedAccessContext, DBAccessContext, GlobalAccessContext}
Expand Down Expand Up @@ -37,8 +36,6 @@ import com.scalableminds.webknossos.tracingstore.tracings.volume.{
}
import com.typesafe.scalalogging.LazyLogging
import controllers.AnnotationLayerParameters

import javax.inject.Inject
import models.annotation.AnnotationState._
import models.annotation.AnnotationType.AnnotationType
import models.annotation.handler.SavedTracingInformationHandler
Expand All @@ -57,6 +54,8 @@ import play.api.libs.iteratee.Enumerator
import play.api.libs.json.{JsNull, JsObject, JsValue, Json}
import utils.ObjectId

import java.io.{BufferedOutputStream, File, FileOutputStream}
import javax.inject.Inject
import scala.concurrent.{ExecutionContext, Future}

case class DownloadAnnotation(skeletonTracingIdOpt: Option[String],
Expand Down
6 changes: 6 additions & 0 deletions app/oxalis/telemetry/SlackNotificationService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,10 @@ class SlackNotificationService @Inject()(rpc: RPC, config: WkConf) extends LazyL
title = "Task creation with base",
msg = s"$numberOfTasks tasks with BaseAnnotation for TaskTypes ${taskType.mkString(", ")} have been created"
)

def noticeSlowRequest(msg: String): Unit =
slackClient.info(
title = "Slow request",
msg = msg
)
}
6 changes: 6 additions & 0 deletions app/utils/WkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ class WkConf @Inject()(configuration: Configuration) extends ConfigReader with L
val maxOpenPerUser: Int = get[Int]("webKnossos.tasks.maxOpenPerUser")
}

object Annotation {
object Mutex {
val expiryTime: FiniteDuration = get[FiniteDuration]("webKnossos.annotation.mutex.expiryTime")
}
}

object Cache {
object User {
val timeout: FiniteDuration = get[FiniteDuration]("webKnossos.cache.user.timeout")
Expand Down
1 change: 1 addition & 0 deletions conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ webKnossos {
cache {
user.timeout = 3 minutes
}
annotation.mutex.expiryTime = 2 minutes
fetchUsedStorage {
rescanInterval = 24 hours # do not scan organizations whose last scan is more recent than this
tickerInterval = 10 minutes # scan some organizations at each tick
Expand Down
18 changes: 18 additions & 0 deletions conf/evolutions/100-annotation-mutexes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
START TRANSACTION;

do $$ begin ASSERT (select schemaVersion from webknossos.releaseInformation) = 99, 'Previous schema version mismatch'; end; $$ LANGUAGE plpgsql;

CREATE TABLE webknossos.annotation_mutexes(
_annotation CHAR(24) PRIMARY KEY,
_user CHAR(24) NOT NULL,
expiry TIMESTAMP NOT NULL
);

ALTER TABLE webknossos.annotation_mutexes
ADD CONSTRAINT annotation_ref FOREIGN KEY(_annotation) REFERENCES webknossos.annotations(_id) ON DELETE CASCADE DEFERRABLE,
ADD CONSTRAINT user_ref FOREIGN KEY(_user) REFERENCES webknossos.users(_id) ON DELETE CASCADE DEFERRABLE;

UPDATE webknossos.releaseInformation
SET schemaVersion = 100;

COMMIT TRANSACTION;
10 changes: 10 additions & 0 deletions conf/evolutions/reversions/100-annotation-mutexes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
START TRANSACTION;

do $$ begin ASSERT (select schemaVersion from webknossos.releaseInformation) = 100, 'Previous schema version mismatch'; end; $$ LANGUAGE plpgsql;

DROP TABLE webknossos.annotation_mutexes;

UPDATE webknossos.releaseInformation
SET schemaVersion = 99;

COMMIT TRANSACTION;
1 change: 1 addition & 0 deletions conf/webknossos.latest.routes
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ PATCH /annotations/:id/deleteAnnotationLayer
DELETE /annotations/:id controllers.AnnotationController.cancelWithoutType(id: String)
POST /annotations/:id/merge/:mergedTyp/:mergedId controllers.AnnotationController.mergeWithoutType(id: String, mergedTyp: String, mergedId: String)
GET /annotations/:id/download controllers.AnnotationIOController.downloadWithoutType(id: String, skeletonVersion: Option[Long], volumeVersion: Option[Long], skipVolumeData: Option[Boolean])
POST /annotations/:id/acquireMutex controllers.AnnotationController.tryAcquiringAnnotationMutex(id: String)

GET /annotations/:typ/:id/info controllers.AnnotationController.info(typ: String, id: String, timestamp: Long)
PATCH /annotations/:typ/:id/makeHybrid controllers.AnnotationController.makeHybrid(typ: String, id: String, fallbackLayerName: Option[String])
Expand Down
1 change: 1 addition & 0 deletions docs/sharing.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ To change the visibility of an annotation, follow these steps:
Additionally, you can control whether other users, who can see your annotation, may also edit your annotation.
Use this setting to enable collaborative work within your annotation.
However, note that you should coordinate the collaboration because parallel changes to an annotation are not supported.
To avoid possible conflicts in such cases the annotation will be locked to a single user at a time. In case the annotation is locked by someone else WEBKNOSSOS will tell you the name of the person currently editing so you can coordinate with this person.

### Link Sharing
Annotations can be shared via a link. People, who obtain the link, must have access to the annotation according to the permissions above to view the annotation.
Expand Down
13 changes: 13 additions & 0 deletions frontend/javascripts/admin/admin_rest_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import type {
VoxelyticsWorkflowListing,
APIPricingPlanStatus,
VoxelyticsLogLine,
APIUserCompact,
APIDatasetCompact,
} from "types/api_flow_types";
import { APIAnnotationTypeEnum } from "types/api_flow_types";
Expand Down Expand Up @@ -860,6 +861,18 @@ export async function getTracingsForAnnotation(
return fullAnnotationLayers;
}

export async function acquireAnnotationMutex(
annotationId: string,
): Promise<{ canEdit: boolean; blockedByUser: APIUserCompact | undefined | null }> {
const { canEdit, blockedByUser } = await Request.receiveJSON(
`/api/annotations/${annotationId}/acquireMutex`,
{
method: "POST",
},
);
return { canEdit, blockedByUser };
}

function extractVersion(
versions: Versions,
tracingId: string,
Expand Down
7 changes: 7 additions & 0 deletions frontend/javascripts/messages.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,13 @@ instead. Only enable this option if you understand its effect. All layers will n
"annotation.undoFinish.confirm": "Are you sure you want to reopen your old task?",
"annotation.undoFinish.content":
"If you reopen your old annotation, the current annotation will not be finished or cancelled. Instead, it will remain open and you can find it in the dashboard to continue annotating.",
"annotation.acquiringMutexFailed": _.template(
"This annotation is currently being edited by <%- userName %>. To avoid conflicts, you can only view it. If you want to edit it, please ask <%- userName %> to finish their work first.",
),
"annotation.acquiringMutexFailed.noUser":
"This annotation is currently being edited by someone else. To avoid conflicts, you can only view it at the moment.",
"annotation.acquiringMutexSucceeded":
"This annotation is not being edited anymore and available for editing. Reload the page to see its newest version and to edit it.",
"task.bulk_create_invalid":
"Can not parse task specification. It includes at least one invalid task.",
"task.recommended_configuration": "The author of this task suggests to use these settings:",
Expand Down
Loading

0 comments on commit a6075d3

Please sign in to comment.