Skip to content

Commit

Permalink
Analytics: mark annotation updates that are view-only (#5330)
Browse files Browse the repository at this point in the history
  • Loading branch information
fm3 authored Mar 24, 2021
1 parent e8752ac commit 982c55d
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 95 deletions.
68 changes: 36 additions & 32 deletions app/controllers/WKTracingStoreController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,57 +2,61 @@ package controllers

import com.scalableminds.util.accesscontext.{DBAccessContext, GlobalAccessContext}
import com.scalableminds.util.tools.{Fox, FoxImplicits}
import com.scalableminds.webknossos.tracingstore.TracingUpdatesReport
import javax.inject.Inject
import models.analytics.{AnalyticsService, UpdateAnnotationEvent}
import models.analytics.{AnalyticsService, UpdateAnnotationEvent, UpdateAnnotationViewOnlyEvent}
import models.annotation.AnnotationState._
import models.annotation.{Annotation, AnnotationDAO, TracingStoreService}
import models.binary.{DataSetDAO, DataSetService}
import models.organization.OrganizationDAO
import models.user.time.TimeSpanService
import oxalis.security.{WebknossosBearerTokenAuthenticatorService, WkSilhouetteEnvironment}
import play.api.i18n.Messages
import play.api.libs.json.{JsObject, JsValue, Json}
import play.api.mvc.{Action, AnyContent}
import play.api.libs.json.Json
import play.api.mvc.{Action, AnyContent, PlayBodyParsers}

import scala.concurrent.ExecutionContext

class WKTracingStoreController @Inject()(tracingStoreService: TracingStoreService,
wkSilhouetteEnvironment: WkSilhouetteEnvironment,
timeSpanService: TimeSpanService,
dataSetService: DataSetService,
organizationDAO: OrganizationDAO,
analyticsService: AnalyticsService,
dataSetDAO: DataSetDAO,
annotationDAO: AnnotationDAO)(implicit ec: ExecutionContext)
class WKTracingStoreController @Inject()(
tracingStoreService: TracingStoreService,
wkSilhouetteEnvironment: WkSilhouetteEnvironment,
timeSpanService: TimeSpanService,
dataSetService: DataSetService,
organizationDAO: OrganizationDAO,
analyticsService: AnalyticsService,
dataSetDAO: DataSetDAO,
annotationDAO: AnnotationDAO)(implicit ec: ExecutionContext, playBodyParsers: PlayBodyParsers)
extends Controller
with FoxImplicits {

val bearerTokenService: WebknossosBearerTokenAuthenticatorService =
wkSilhouetteEnvironment.combinedAuthenticatorService.tokenAuthenticatorService

def handleTracingUpdateReport(name: String): Action[JsValue] = Action.async(parse.json) { implicit request =>
tracingStoreService.validateAccess(name) { _ =>
for {
tracingId <- (request.body \ "tracingId").asOpt[String].toFox
annotation <- annotationDAO.findOneByTracingId(tracingId)(GlobalAccessContext)
_ <- ensureAnnotationNotFinished(annotation)
timestamps <- (request.body \ "timestamps").asOpt[List[Long]].toFox
statisticsOpt = (request.body \ "statistics").asOpt[JsObject]
userTokenOpt = (request.body \ "userToken").asOpt[String]
_ <- statisticsOpt match {
case Some(statistics) => annotationDAO.updateStatistics(annotation._id, statistics)(GlobalAccessContext)
case None => Fox.successful(())
}
_ <- annotationDAO.updateModified(annotation._id, System.currentTimeMillis)(GlobalAccessContext)
userBox <- bearerTokenService.userForTokenOpt(userTokenOpt)(GlobalAccessContext).futureBox
_ <- Fox.runOptional(userBox)(user =>
timeSpanService.logUserInteraction(timestamps, user, annotation)(GlobalAccessContext))
_ = userBox.map(user => analyticsService.track(UpdateAnnotationEvent(user, annotation)))
} yield {
Ok
def handleTracingUpdateReport(name: String): Action[TracingUpdatesReport] =
Action.async(validateJson[TracingUpdatesReport]) { implicit request =>
tracingStoreService.validateAccess(name) { _ =>
val report = request.body
for {
annotation <- annotationDAO.findOneByTracingId(report.tracingId)(GlobalAccessContext)
_ <- ensureAnnotationNotFinished(annotation)
_ <- Fox.runOptional(report.statistics) { statistics =>
annotationDAO.updateStatistics(annotation._id, statistics)(GlobalAccessContext)
}
_ <- annotationDAO.updateModified(annotation._id, System.currentTimeMillis)(GlobalAccessContext)
userBox <- bearerTokenService.userForTokenOpt(report.userToken)(GlobalAccessContext).futureBox
_ <- Fox.runOptional(userBox)(user =>
timeSpanService.logUserInteraction(report.timestamps, user, annotation)(GlobalAccessContext))
_ = userBox.map { user =>
if (report.significantChangesCount > 0) {
analyticsService.track(UpdateAnnotationEvent(user, annotation, report.significantChangesCount))
}
if (report.viewChangesCount > 0) {
analyticsService.track(UpdateAnnotationViewOnlyEvent(user, annotation, report.viewChangesCount))
}
}
} yield Ok
}
}
}

private def ensureAnnotationNotFinished(annotation: Annotation) =
if (annotation.state == Finished) Fox.failure("annotation already finshed")
Expand Down
12 changes: 10 additions & 2 deletions app/models/analytics/AnalyticsService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,19 @@ case class DownloadAnnotationEvent(user: User, annotationId: String, annotationT
Fox.successful(Json.obj("annotation_id" -> annotationId, "annotation_type" -> annotationType))
}

case class UpdateAnnotationEvent(user: User, annotation: Annotation)(implicit ec: ExecutionContext)
case class UpdateAnnotationEvent(user: User, annotation: Annotation, changesCount: Int)(implicit ec: ExecutionContext)
extends AnalyticsEvent {
def eventType: String = "update_annotation"
def eventProperties(analyticsLookUpService: AnalyticsLookUpService): Fox[JsObject] =
Fox.successful(Json.obj("annotation_id" -> annotation._id.id))
Fox.successful(Json.obj("annotation_id" -> annotation._id.id, "changes_count" -> changesCount))
}

case class UpdateAnnotationViewOnlyEvent(user: User, annotation: Annotation, changesCount: Int)(
implicit ec: ExecutionContext)
extends AnalyticsEvent {
def eventType: String = "update_annotation_view_only"
def eventProperties(analyticsLookUpService: AnalyticsLookUpService): Fox[JsObject] =
Fox.successful(Json.obj("annotation_id" -> annotation._id.id, "changes_count" -> changesCount))
}

case class OpenDatasetEvent(user: User, dataSet: DataSet)(implicit ec: ExecutionContext) extends AnalyticsEvent {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,19 @@ import com.scalableminds.webknossos.datastore.services.{
import com.typesafe.scalalogging.LazyLogging
import play.api.cache.SyncCacheApi
import play.api.inject.ApplicationLifecycle
import play.api.libs.json.{JsObject, Json}
import play.api.libs.json.{JsObject, Json, OFormat}
import play.api.libs.ws.WSResponse

case class TracingUpdatesReport(tracingId: String,
timestamps: List[Long],
statistics: Option[JsObject],
significantChangesCount: Int,
viewChangesCount: Int,
userToken: Option[String])
object TracingUpdatesReport {
implicit val jsonFormat: OFormat[TracingUpdatesReport] = Json.format[TracingUpdatesReport]
}

class TracingStoreWkRpcClient @Inject()(
rpc: RPC,
config: TracingStoreConfig,
Expand All @@ -28,17 +38,10 @@ class TracingStoreWkRpcClient @Inject()(

private val webKnossosUrl: String = config.Tracingstore.WebKnossos.uri

def reportTracingUpdates(tracingId: String,
timestamps: List[Long],
statistics: Option[JsObject],
userToken: Option[String]): Fox[WSResponse] =
def reportTracingUpdates(tracingUpdatesReport: TracingUpdatesReport): Fox[WSResponse] =
rpc(s"$webKnossosUrl/api/tracingstores/$tracingStoreName/handleTracingUpdateReport")
.addQueryString("key" -> tracingStoreKey)
.post(
Json.obj("timestamps" -> timestamps,
"statistics" -> statistics,
"tracingId" -> tracingId,
"userToken" -> userToken))
.post(Json.toJson(tracingUpdatesReport))

def reportIsosurfaceRequest(userToken: Option[String]): Fox[WSResponse] =
rpc(s"$webKnossosUrl/api/tracingstores/$tracingStoreName/reportIsosurfaceRequest")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ import com.scalableminds.webknossos.tracingstore.tracings.{
UpdateAction,
UpdateActionGroup
}
import com.scalableminds.webknossos.tracingstore.{TracingStoreAccessTokenService, TracingStoreWkRpcClient}
import com.scalableminds.webknossos.tracingstore.{
TracingStoreAccessTokenService,
TracingStoreWkRpcClient,
TracingUpdatesReport
}
import play.api.i18n.Messages
import play.api.libs.json.{Format, Json}
import play.api.mvc.PlayBodyParsers
import play.api.mvc.{Action, AnyContent, PlayBodyParsers}
import scalapb.{GeneratedMessage, GeneratedMessageCompanion, Message}

import scala.concurrent.ExecutionContext
Expand Down Expand Up @@ -47,7 +51,7 @@ trait TracingController[T <: GeneratedMessage with Message[T], Ts <: GeneratedMe

implicit val bodyParsers: PlayBodyParsers

def save = Action.async(validateProto[T]) { implicit request =>
def save: Action[T] = Action.async(validateProto[T]) { implicit request =>
log {
logTime(slackNotificationService.noticeSlowRequest) {
accessTokenService.validateAccess(UserAccessRequest.webknossos) {
Expand All @@ -62,7 +66,7 @@ trait TracingController[T <: GeneratedMessage with Message[T], Ts <: GeneratedMe
}
}

def saveMultiple = Action.async(validateProto[Ts]) { implicit request =>
def saveMultiple: Action[Ts] = Action.async(validateProto[Ts]) { implicit request =>
log {
logTime(slackNotificationService.noticeSlowRequest) {
accessTokenService.validateAccess(UserAccessRequest.webknossos) {
Expand All @@ -80,7 +84,7 @@ trait TracingController[T <: GeneratedMessage with Message[T], Ts <: GeneratedMe
}
}

def get(tracingId: String, version: Option[Long]) = Action.async { implicit request =>
def get(tracingId: String, version: Option[Long]): Action[AnyContent] = Action.async { implicit request =>
log {
accessTokenService.validateAccess(UserAccessRequest.readTracing(tracingId)) {
AllowRemoteOrigin {
Expand All @@ -94,41 +98,43 @@ trait TracingController[T <: GeneratedMessage with Message[T], Ts <: GeneratedMe
}
}

def getMultiple = Action.async(validateJson[List[Option[TracingSelector]]]) { implicit request =>
log {
accessTokenService.validateAccess(UserAccessRequest.webknossos) {
AllowRemoteOrigin {
for {
tracings <- tracingService.findMultiple(request.body, applyUpdates = true)
} yield {
Ok(tracings.toByteArray).as("application/x-protobuf")
def getMultiple: Action[List[Option[TracingSelector]]] = Action.async(validateJson[List[Option[TracingSelector]]]) {
implicit request =>
log {
accessTokenService.validateAccess(UserAccessRequest.webknossos) {
AllowRemoteOrigin {
for {
tracings <- tracingService.findMultiple(request.body, applyUpdates = true)
} yield {
Ok(tracings.toByteArray).as("application/x-protobuf")
}
}
}
}
}
}

def update(tracingId: String) = Action.async(validateJson[List[UpdateActionGroup[T]]]) { implicit request =>
log {
logTime(slackNotificationService.noticeSlowRequest) {
accessTokenService.validateAccess(UserAccessRequest.writeTracing(tracingId)) {
AllowRemoteOrigin {
val updateGroups = request.body
val userToken = request.getQueryString("token")
if (updateGroups.forall(_.transactionGroupCount.getOrElse(1) == 1)) {
commitUpdates(tracingId, updateGroups, userToken).map(_ => Ok)
} else {
updateGroups
.foldLeft(tracingService.currentVersion(tracingId)) { (currentCommittedVersionFox, updateGroup) =>
handleUpdateGroupForTransaction(tracingId, currentCommittedVersionFox, updateGroup, userToken)
}
.map(_ => Ok)
def update(tracingId: String): Action[List[UpdateActionGroup[T]]] =
Action.async(validateJson[List[UpdateActionGroup[T]]]) { implicit request =>
log {
logTime(slackNotificationService.noticeSlowRequest) {
accessTokenService.validateAccess(UserAccessRequest.writeTracing(tracingId)) {
AllowRemoteOrigin {
val updateGroups = request.body
val userToken = request.getQueryString("token")
if (updateGroups.forall(_.transactionGroupCount.getOrElse(1) == 1)) {
commitUpdates(tracingId, updateGroups, userToken).map(_ => Ok)
} else {
updateGroups
.foldLeft(tracingService.currentVersion(tracingId)) { (currentCommittedVersionFox, updateGroup) =>
handleUpdateGroupForTransaction(tracingId, currentCommittedVersionFox, updateGroup, userToken)
}
.map(_ => Ok)
}
}
}
}
}
}
}

val transactionBatchExpiry: FiniteDuration = 20 minutes

Expand Down Expand Up @@ -179,10 +185,16 @@ trait TracingController[T <: GeneratedMessage with Message[T], Ts <: GeneratedMe
private def commitUpdates(tracingId: String,
updateGroups: List[UpdateActionGroup[T]],
userToken: Option[String]): Fox[Long] = {
val timestamps = updateGroups.map(_.timestamp)
val latestStatistics = updateGroups.flatMap(_.stats).lastOption
val currentVersion = tracingService.currentVersion(tracingId)
webKnossosServer.reportTracingUpdates(tracingId, timestamps, latestStatistics, userToken).flatMap { _ =>
val report = TracingUpdatesReport(
tracingId,
timestamps = updateGroups.map(_.timestamp),
statistics = updateGroups.flatMap(_.stats).lastOption,
significantChangesCount = updateGroups.map(_.significantChangesCount).sum,
viewChangesCount = updateGroups.map(_.viewChangesCount).sum,
userToken
)
webKnossosServer.reportTracingUpdates(report).flatMap { _ =>
updateGroups.foldLeft(currentVersion) { (previousVersion, updateGroup) =>
previousVersion.flatMap { prevVersion: Long =>
if (prevVersion + 1 == updateGroup.version) {
Expand Down Expand Up @@ -212,25 +224,27 @@ trait TracingController[T <: GeneratedMessage with Message[T], Ts <: GeneratedMe
}
}

def mergedFromIds(persist: Boolean) = Action.async(validateJson[List[Option[TracingSelector]]]) { implicit request =>
log {
accessTokenService.validateAccess(UserAccessRequest.webknossos) {
AllowRemoteOrigin {
for {
tracings <- tracingService.findMultiple(request.body, applyUpdates = true) ?~> Messages("tracing.notFound")
newId = tracingService.generateTracingId
mergedTracing = tracingService.merge(tracings.flatten)
_ <- tracingService.save(mergedTracing, Some(newId), version = 0, toCache = !persist)
_ <- tracingService.mergeVolumeData(request.body.flatten,
tracings.flatten,
newId,
mergedTracing,
toCache = !persist)
} yield {
Ok(Json.toJson(newId))
def mergedFromIds(persist: Boolean): Action[List[Option[TracingSelector]]] =
Action.async(validateJson[List[Option[TracingSelector]]]) { implicit request =>
log {
accessTokenService.validateAccess(UserAccessRequest.webknossos) {
AllowRemoteOrigin {
for {
tracings <- tracingService.findMultiple(request.body, applyUpdates = true) ?~> Messages(
"tracing.notFound")
newId = tracingService.generateTracingId
mergedTracing = tracingService.merge(tracings.flatten)
_ <- tracingService.save(mergedTracing, Some(newId), version = 0, toCache = !persist)
_ <- tracingService.mergeVolumeData(request.body.flatten,
tracings.flatten,
newId,
mergedTracing,
toCache = !persist)
} yield {
Ok(Json.toJson(newId))
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ trait UpdateAction[T <: GeneratedMessage with Message[T]] {
def addInfo(info: Option[String]): UpdateAction[T] = this

def transformToCompact: UpdateAction[T] = this

// For analytics we wan to know how many changes are view only (e.g. move camera, toggle tree visibility)
// Overridden in subclasses
def isViewOnlyChange: Boolean = false
}

object UpdateAction {
Expand All @@ -32,7 +36,10 @@ case class UpdateActionGroup[T <: GeneratedMessage with Message[T]](
transactionId: Option[String],
transactionGroupCount: Option[Int],
transactionGroupIndex: Option[Int]
)
) {
def significantChangesCount: Int = actions.count(!_.isViewOnlyChange)
def viewChangesCount: Int = actions.count(_.isViewOnlyChange)
}

object UpdateActionGroup {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ case class UpdateTracingSkeletonAction(activeNode: Option[Int],
override def addTimestamp(timestamp: Long): UpdateAction[SkeletonTracing] =
this.copy(actionTimestamp = Some(timestamp))
override def addInfo(info: Option[String]): UpdateAction[SkeletonTracing] = this.copy(info = info)
override def isViewOnlyChange: Boolean = true
}

case class RevertToVersionAction(sourceVersion: Long, actionTimestamp: Option[Long] = None, info: Option[String] = None)
Expand Down Expand Up @@ -329,6 +330,7 @@ case class UpdateTreeVisibility(treeId: Int,
override def addTimestamp(timestamp: Long): UpdateAction[SkeletonTracing] =
this.copy(actionTimestamp = Some(timestamp))
override def addInfo(info: Option[String]): UpdateAction[SkeletonTracing] = this.copy(info = info)
override def isViewOnlyChange: Boolean = true
}

case class UpdateTreeGroupVisibility(treeGroupId: Option[Int],
Expand Down Expand Up @@ -362,6 +364,7 @@ case class UpdateTreeGroupVisibility(treeGroupId: Option[Int],
override def addTimestamp(timestamp: Long): UpdateAction[SkeletonTracing] =
this.copy(actionTimestamp = Some(timestamp))
override def addInfo(info: Option[String]): UpdateAction[SkeletonTracing] = this.copy(info = info)
override def isViewOnlyChange: Boolean = true
}

case class UpdateUserBoundingBoxes(boundingBoxes: List[NamedBoundingBox],
Expand Down Expand Up @@ -396,6 +399,7 @@ case class UpdateUserBoundingBoxVisibility(boundingBoxId: Option[Int],
override def addTimestamp(timestamp: Long): UpdateAction[SkeletonTracing] =
this.copy(actionTimestamp = Some(timestamp))
override def addInfo(info: Option[String]): UpdateAction[SkeletonTracing] = this.copy(info = info)
override def isViewOnlyChange: Boolean = true
}

case class UpdateTdCamera(actionTimestamp: Option[Long] = None, info: Option[String] = None)
Expand All @@ -406,6 +410,7 @@ case class UpdateTdCamera(actionTimestamp: Option[Long] = None, info: Option[Str
override def addTimestamp(timestamp: Long): UpdateAction[SkeletonTracing] =
this.copy(actionTimestamp = Some(timestamp))
override def addInfo(info: Option[String]): UpdateAction[SkeletonTracing] = this.copy(info = info)
override def isViewOnlyChange: Boolean = true
}

object CreateTreeSkeletonAction {
Expand Down
Loading

0 comments on commit 982c55d

Please sign in to comment.