Skip to content

Commit

Permalink
Introduce data vault as storage backend abstraction (#6899)
Browse files Browse the repository at this point in the history
  • Loading branch information
frcroth authored Mar 20, 2023
1 parent a5dca07 commit 9338e80
Show file tree
Hide file tree
Showing 56 changed files with 689 additions and 3,740 deletions.
8 changes: 4 additions & 4 deletions app/models/binary/credential/CredentialService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package models.binary.credential
import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.storage.{
FileSystemCredential,
FileSystemsHolder,
DataVaultsHolder,
GoogleServiceAccountCredential,
HttpBasicAuthCredential,
S3AccessKeyCredential
Expand All @@ -24,21 +24,21 @@ class CredentialService @Inject()(credentialDAO: CredentialDAO) {
userId: ObjectId,
organizationId: ObjectId): Option[FileSystemCredential] =
uri.getScheme match {
case FileSystemsHolder.schemeHttps | FileSystemsHolder.schemeHttp =>
case DataVaultsHolder.schemeHttps | DataVaultsHolder.schemeHttp =>
credentialIdentifier.map(
username =>
HttpBasicAuthCredential(uri.toString,
username,
credentialSecret.getOrElse(""),
userId.toString,
organizationId.toString))
case FileSystemsHolder.schemeS3 =>
case DataVaultsHolder.schemeS3 =>
(credentialIdentifier, credentialSecret) match {
case (Some(keyId), Some(secretKey)) =>
Some(S3AccessKeyCredential(uri.toString, keyId, secretKey, userId.toString, organizationId.toString))
case _ => None
}
case FileSystemsHolder.schemeGS =>
case DataVaultsHolder.schemeGS =>
for {
secret <- credentialSecret
secretJson <- tryo(Json.parse(secret)).toOption
Expand Down
5 changes: 2 additions & 3 deletions app/models/binary/explore/ExploreRemoteLayerService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import com.scalableminds.webknossos.datastore.dataformats.zarr._
import com.scalableminds.webknossos.datastore.datareaders.n5.N5Header
import com.scalableminds.webknossos.datastore.datareaders.zarr._
import com.scalableminds.webknossos.datastore.models.datasource._
import com.scalableminds.webknossos.datastore.storage.{FileSystemsHolder, RemoteSourceDescriptor}
import com.scalableminds.webknossos.datastore.storage.{DataVaultsHolder, RemoteSourceDescriptor}
import com.typesafe.scalalogging.LazyLogging
import models.binary.credential.CredentialService
import models.user.User
Expand Down Expand Up @@ -163,8 +163,7 @@ class ExploreRemoteLayerService @Inject()(credentialService: CredentialService)
requestingUser._organization)
remoteSource = RemoteSourceDescriptor(uri, credentialOpt)
credentialId <- Fox.runOptional(credentialOpt)(c => credentialService.insertOne(c)) ?~> "remoteFileSystem.credential.insert.failed"
fileSystem <- FileSystemsHolder.getOrCreate(remoteSource) ?~> "remoteFileSystem.setup.failed"
remotePath <- tryo(fileSystem.getPath(FileSystemsHolder.pathFromUri(remoteSource.uri))) ?~> "remoteFileSystem.getPath.failed"
remotePath <- DataVaultsHolder.getVaultPath(remoteSource) ?~> "remoteFileSystem.setup.failed"
layersWithVoxelSizes <- exploreRemoteLayersForRemotePath(
remotePath,
credentialId.map(_.toString),
Expand Down
6 changes: 5 additions & 1 deletion app/models/binary/explore/RemoteLayerExplorer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.scalableminds.util.io.ZipIO
import com.scalableminds.util.tools.{Fox, FoxImplicits, JsonHelper}
import com.scalableminds.webknossos.datastore.dataformats.MagLocator
import com.scalableminds.webknossos.datastore.models.datasource.{DataLayer, ElementClass}
import com.scalableminds.webknossos.datastore.datavault.VaultPath
import net.liftweb.util.Helpers.tryo
import play.api.libs.json.Reads

Expand All @@ -25,7 +26,10 @@ trait RemoteLayerExplorer extends FoxImplicits {

protected def parseJsonFromPath[T: Reads](path: Path): Fox[T] =
for {
fileBytes <- tryo(ZipIO.tryGunzip(Files.readAllBytes(path))) ?~> "dataSet.explore.failed.readFile"
fileBytes <- path match {
case path: VaultPath => path.readBytes() ?~> "dataSet.explore.failed.readFile"
case _ => tryo(ZipIO.tryGunzip(Files.readAllBytes(path))) ?~> "dataSet.explore.failed.readFile"
}
fileAsString <- tryo(new String(fileBytes, StandardCharsets.UTF_8)).toFox ?~> "dataSet.explore.failed.readFile"
parsed <- JsonHelper.parseAndValidateJson[T](fileAsString)
} yield parsed
Expand Down
122 changes: 122 additions & 0 deletions test/backend/DataVaultTestSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package backend

import org.scalatestplus.play.PlaySpec

import java.net.URI
import com.scalableminds.webknossos.datastore.datavault.{DataVault, GoogleCloudDataVault, HttpsDataVault, VaultPath}
import com.scalableminds.webknossos.datastore.storage.RemoteSourceDescriptor

import scala.collection.immutable.NumericRange

class DataVaultTestSuite extends PlaySpec {

"Data vault" when {
"using Range requests" when {
val range: NumericRange[Long] = Range.Long(0, 1024, 1)
val dataKey = "32_32_40/15360-15424_8384-8448_3520-3584" // when accessed via range request, the response body is 1024 bytes long, otherwise 124.8 KB

"with HTTP Vault" should {
"return correct response" in {
val uri = new URI("http://storage.googleapis.com/")
val vaultPath = new VaultPath(uri, HttpsDataVault.create(RemoteSourceDescriptor(uri, None)))
val bytes =
(vaultPath / s"neuroglancer-fafb-data/fafb_v14/fafb_v14_orig/$dataKey").readBytes(Some(range)).get

assert(bytes.length == range.length)
assert(bytes.take(10).sameElements(Array(-1, -40, -1, -32, 0, 16, 74, 70, 73, 70)))
}
}

"with Google Cloud Storage Vault" should {
"return correct response" in {
val uri = new URI("gs://neuroglancer-fafb-data/fafb_v14/fafb_v14_orig")
val vaultPath = new VaultPath(uri, GoogleCloudDataVault.create(RemoteSourceDescriptor(uri, None)))
val bytes = (vaultPath / dataKey).readBytes(Some(range)).get

assert(bytes.length == range.length)
assert(bytes.take(10).sameElements(Array(-1, -40, -1, -32, 0, 16, 74, 70, 73, 70)))
}
}
}
"using regular requests" when {
val dataKey = "32_32_40/15360-15424_8384-8448_3520-3584"
val dataLength = 127808

"with HTTP Vault" should {
"return correct response" in {
val uri = new URI("http://storage.googleapis.com/")
val vaultPath = new VaultPath(uri, HttpsDataVault.create(RemoteSourceDescriptor(uri, None)))
val bytes = (vaultPath / s"neuroglancer-fafb-data/fafb_v14/fafb_v14_orig/$dataKey").readBytes().get

assert(bytes.length == dataLength)
assert(bytes.take(10).sameElements(Array(-1, -40, -1, -32, 0, 16, 74, 70, 73, 70)))
}
}

"with Google Cloud Storage Vault" should {
"return correct response" in {
val uri = new URI("gs://neuroglancer-fafb-data/fafb_v14/fafb_v14_orig")
val vaultPath = new VaultPath(uri, GoogleCloudDataVault.create(RemoteSourceDescriptor(uri, None)))
val bytes = (vaultPath / dataKey).readBytes().get

assert(bytes.length == dataLength)
assert(bytes.take(10).sameElements(Array(-1, -40, -1, -32, 0, 16, 74, 70, 73, 70)))
}
}
}

"using vault path" when {
class MockDataVault extends DataVault {
override def readBytes(path: VaultPath, range: Option[NumericRange[Long]]): Array[Byte] = ???
}

"Uri has no trailing slash" should {
val someUri = new URI("protocol://host/a/b")
val somePath = new VaultPath(someUri, new MockDataVault)

"resolve child" in {
val childPath = somePath / "c"
assert(childPath.toUri.toString == s"${someUri.toString}/c")
}

"get parent" in {
assert((somePath / "..").toString == "protocol://host/a/")
}

"get directory" in {
assert((somePath / ".").toString == s"${someUri.toString}/")
}

"handle sequential parameters" in {
assert((somePath / "c" / "d" / "e").toString == "protocol://host/a/b/c/d/e")
}

"resolve relative to host with starting slash in parameter" in {
assert((somePath / "/x").toString == "protocol://host/x")
}

"resolving path respects trailing slash" in {
assert((somePath / "x/").toString == "protocol://host/a/b/x/")
assert((somePath / "x").toString == "protocol://host/a/b/x")
}
}
"Uri has trailing slash" should {
val trailingSlashUri = new URI("protocol://host/a/b/")
val trailingSlashPath = new VaultPath(trailingSlashUri, new MockDataVault)
"resolve child" in {
val childPath = trailingSlashPath / "c"
assert(childPath.toUri.toString == s"${trailingSlashUri.toString}c")
}

"get parent" in {
assert((trailingSlashPath / "..").toString == "protocol://host/a/")
}

"get directory" in {
assert((trailingSlashPath / ".").toString == s"${trailingSlashUri.toString}")
}
}

}
}
}
47 changes: 0 additions & 47 deletions test/backend/RangeRequestTestSuite.scala

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.scalableminds.webknossos.datastore.dataformats

import com.scalableminds.util.tools.{Fox, FoxImplicits}
import com.scalableminds.webknossos.datastore.datavault.{FileSystemVaultPath, VaultPath}
import com.scalableminds.webknossos.datastore.models.BucketPosition
import com.scalableminds.webknossos.datastore.models.requests.DataReadInstruction
import com.scalableminds.webknossos.datastore.storage.{DataCubeCache, FileSystemService}
import com.typesafe.scalalogging.LazyLogging
import net.liftweb.common.Empty

import java.nio.file.Path
import scala.concurrent.ExecutionContext

trait BucketProvider extends FoxImplicits with LazyLogging {
Expand Down Expand Up @@ -45,12 +45,13 @@ trait BucketProvider extends FoxImplicits with LazyLogging {
Iterator.empty

protected def localPathFrom(readInstruction: DataReadInstruction, relativeMagPath: String)(
implicit ec: ExecutionContext): Fox[Path] = {
val magPath = readInstruction.baseDir
.resolve(readInstruction.dataSource.id.team)
.resolve(readInstruction.dataSource.id.name)
.resolve(readInstruction.dataLayer.name)
.resolve(relativeMagPath)
implicit ec: ExecutionContext): Fox[VaultPath] = {
val magPath = FileSystemVaultPath.fromPath(
readInstruction.baseDir
.resolve(readInstruction.dataSource.id.team)
.resolve(readInstruction.dataSource.id.name)
.resolve(readInstruction.dataLayer.name)
.resolve(relativeMagPath))
if (magPath.toFile.exists()) {
Fox.successful(magPath)
} else Fox.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.scalableminds.webknossos.datastore.dataformats
import com.scalableminds.util.geometry.Vec3Int
import com.scalableminds.webknossos.datastore.datareaders.AxisOrder
import com.scalableminds.webknossos.datastore.models.datasource.ResolutionFormatHelper
import com.scalableminds.webknossos.datastore.storage.{FileSystemsHolder, LegacyFileSystemCredential}
import com.scalableminds.webknossos.datastore.storage.{DataVaultsHolder, LegacyFileSystemCredential}
import play.api.libs.json.{Json, OFormat}

import java.net.URI
Expand All @@ -17,7 +17,7 @@ case class MagLocator(mag: Vec3Int,

lazy val pathWithFallback: String = path.getOrElse(mag.toMagLiteral(allowScalar = true))
lazy val uri: URI = new URI(pathWithFallback)
lazy val isRemote: Boolean = FileSystemsHolder.isSupportedRemoteScheme(uri.getScheme)
lazy val isRemote: Boolean = DataVaultsHolder.isSupportedRemoteScheme(uri.getScheme)
}

object MagLocator extends ResolutionFormatHelper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import com.scalableminds.util.requestlogging.RateLimitedErrorLogging
import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.dataformats.{BucketProvider, DataCubeHandle, MagLocator}
import com.scalableminds.webknossos.datastore.datareaders.n5.N5Array
import com.scalableminds.webknossos.datastore.datavault.VaultPath
import com.scalableminds.webknossos.datastore.models.BucketPosition
import com.scalableminds.webknossos.datastore.models.requests.DataReadInstruction
import com.scalableminds.webknossos.datastore.storage.FileSystemService
import com.typesafe.scalalogging.LazyLogging
import net.liftweb.common.{Empty, Failure, Full}
import net.liftweb.util.Helpers.tryo

import java.nio.file.Path
import scala.concurrent.ExecutionContext

class N5CubeHandle(n5Array: N5Array) extends DataCubeHandle with LazyLogging with RateLimitedErrorLogging {
Expand Down Expand Up @@ -45,7 +45,7 @@ class N5BucketProvider(layer: N5Layer, val fileSystemServiceOpt: Option[FileSyst
fileSystemServiceOpt match {
case Some(fileSystemService: FileSystemService) =>
for {
magPath: Path <- if (n5Mag.isRemote) {
magPath: VaultPath <- if (n5Mag.isRemote) {
fileSystemService.remotePathFor(n5Mag)
} else localPathFrom(readInstruction, n5Mag.pathWithFallback)
cubeHandle <- tryo(onError = e => logError(e))(N5Array.open(magPath, n5Mag.axisOrder, n5Mag.channelIndex))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import com.scalableminds.util.requestlogging.RateLimitedErrorLogging
import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.dataformats.{BucketProvider, DataCubeHandle, MagLocator}
import com.scalableminds.webknossos.datastore.datareaders.precomputed.PrecomputedArray
import com.scalableminds.webknossos.datastore.datavault.VaultPath
import com.scalableminds.webknossos.datastore.models.BucketPosition
import com.scalableminds.webknossos.datastore.models.requests.DataReadInstruction
import com.scalableminds.webknossos.datastore.storage.FileSystemService
import com.typesafe.scalalogging.LazyLogging
import net.liftweb.common.{Empty, Failure, Full}
import net.liftweb.util.Helpers.tryo

import java.nio.file.Path
import scala.concurrent.ExecutionContext

class PrecomputedCubeHandle(precomputedArray: PrecomputedArray)
Expand Down Expand Up @@ -48,7 +48,7 @@ class PrecomputedBucketProvider(layer: PrecomputedLayer, val fileSystemServiceOp
fileSystemServiceOpt match {
case Some(fileSystemService: FileSystemService) =>
for {
magPath: Path <- if (precomputedMag.isRemote) {
magPath: VaultPath <- if (precomputedMag.isRemote) {
fileSystemService.remotePathFor(precomputedMag)
} else localPathFrom(readInstruction, precomputedMag.pathWithFallback)
cubeHandle <- tryo(onError = e => logError(e))(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import com.scalableminds.util.requestlogging.RateLimitedErrorLogging
import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.dataformats.{BucketProvider, DataCubeHandle, MagLocator}
import com.scalableminds.webknossos.datastore.datareaders.zarr.ZarrArray
import com.scalableminds.webknossos.datastore.datavault.VaultPath
import com.scalableminds.webknossos.datastore.models.BucketPosition
import com.scalableminds.webknossos.datastore.models.requests.DataReadInstruction
import com.scalableminds.webknossos.datastore.storage.FileSystemService
import com.typesafe.scalalogging.LazyLogging
import net.liftweb.common.{Empty, Failure, Full}
import net.liftweb.util.Helpers.tryo

import java.nio.file.Path
import scala.concurrent.ExecutionContext

class ZarrCubeHandle(zarrArray: ZarrArray) extends DataCubeHandle with LazyLogging with RateLimitedErrorLogging {
Expand Down Expand Up @@ -45,7 +45,7 @@ class ZarrBucketProvider(layer: ZarrLayer, val fileSystemServiceOpt: Option[File
fileSystemServiceOpt match {
case Some(fileSystemService: FileSystemService) =>
for {
magPath: Path <- if (zarrMag.isRemote) {
magPath: VaultPath <- if (zarrMag.isRemote) {
fileSystemService.remotePathFor(zarrMag)
} else localPathFrom(readInstruction, zarrMag.pathWithFallback)
cubeHandle <- tryo(onError = e => logError(e))(
Expand Down
Loading

0 comments on commit 9338e80

Please sign in to comment.