Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce data vault as storage backend abstraction #6899

Merged
merged 32 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6a42651
Implement https remote file system
frcroth Mar 2, 2023
d660449
Implement gcs remote file system
frcroth Mar 6, 2023
88aed0c
Implement s3 remote file system
frcroth Mar 6, 2023
f091c89
Merge branch 'master' into new-filesystem
frcroth Mar 6, 2023
9e66e9b
Rename to DataVault
frcroth Mar 6, 2023
1b01bd9
Add file system data vault for local datasets
frcroth Mar 6, 2023
9941e02
Remove S3FileSystem, clean up FileSystemsHolder
frcroth Mar 6, 2023
ccec35a
Fix and expand test for data vault
frcroth Mar 7, 2023
8f47066
Remove FileSystemStore
frcroth Mar 7, 2023
327ab4a
Remove HttpsFileSystem
frcroth Mar 7, 2023
a948287
Fix some issues with GoogleCloudDataVault
frcroth Mar 7, 2023
2206a8f
Support legacy credentials for https, support range with tryget
frcroth Mar 7, 2023
699441e
Rename FileSystemsHolder, use Long Range
frcroth Mar 8, 2023
6118033
Remove S3Utilities
frcroth Mar 8, 2023
e0fc3d0
Flatmap instead of map
frcroth Mar 14, 2023
88d92b5
Simplify s3 credentials passing
frcroth Mar 14, 2023
201448c
Change signature to readBytes
frcroth Mar 14, 2023
2fb78c8
Remove credential in vault path
frcroth Mar 14, 2023
9ba4b89
Remove key argument in readBytes, use underlying path instead
frcroth Mar 14, 2023
32d3b8c
Adapt interface for FileSystemVaultPath
frcroth Mar 14, 2023
2aaf5db
FIx s3 data vault with new keyless interface
frcroth Mar 14, 2023
ecd9a16
Cache vaults and not paths
frcroth Mar 14, 2023
5352bb6
Merge branch 'master' into new-filesystem
frcroth Mar 14, 2023
ce31bd4
Fix DataVaultTestSuite
frcroth Mar 14, 2023
c236008
Fix GCS range reading when not starting at 0
frcroth Mar 20, 2023
943ce67
Use slash for vault paths in a better way
frcroth Mar 20, 2023
912ab90
Add test to clarify behavior with starting slash
frcroth Mar 20, 2023
51d2d6d
Merge branch 'master' into new-filesystem
frcroth Mar 20, 2023
3e71046
Add another test to describe behavior
frcroth Mar 20, 2023
512e664
Fix handling of s3 short style uri
frcroth Mar 20, 2023
5dd3172
Remove comment relating to old usage
frcroth Mar 20, 2023
c73dcf2
Merge branch 'master' into new-filesystem
frcroth Mar 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions app/models/binary/credential/CredentialService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package models.binary.credential
import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.storage.{
FileSystemCredential,
FileSystemsHolder,
DataVaultsHolder,
GoogleServiceAccountCredential,
HttpBasicAuthCredential,
S3AccessKeyCredential
Expand All @@ -24,21 +24,21 @@ class CredentialService @Inject()(credentialDAO: CredentialDAO) {
userId: ObjectId,
organizationId: ObjectId): Option[FileSystemCredential] =
uri.getScheme match {
case FileSystemsHolder.schemeHttps | FileSystemsHolder.schemeHttp =>
case DataVaultsHolder.schemeHttps | DataVaultsHolder.schemeHttp =>
credentialIdentifier.map(
username =>
HttpBasicAuthCredential(uri.toString,
username,
credentialSecret.getOrElse(""),
userId.toString,
organizationId.toString))
case FileSystemsHolder.schemeS3 =>
case DataVaultsHolder.schemeS3 =>
(credentialIdentifier, credentialSecret) match {
case (Some(keyId), Some(secretKey)) =>
Some(S3AccessKeyCredential(uri.toString, keyId, secretKey, userId.toString, organizationId.toString))
case _ => None
}
case FileSystemsHolder.schemeGS =>
case DataVaultsHolder.schemeGS =>
for {
secret <- credentialSecret
secretJson <- tryo(Json.parse(secret)).toOption
Expand Down
5 changes: 2 additions & 3 deletions app/models/binary/explore/ExploreRemoteLayerService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import com.scalableminds.webknossos.datastore.dataformats.zarr._
import com.scalableminds.webknossos.datastore.datareaders.n5.N5Header
import com.scalableminds.webknossos.datastore.datareaders.zarr._
import com.scalableminds.webknossos.datastore.models.datasource._
import com.scalableminds.webknossos.datastore.storage.{FileSystemsHolder, RemoteSourceDescriptor}
import com.scalableminds.webknossos.datastore.storage.{DataVaultsHolder, RemoteSourceDescriptor}
import com.typesafe.scalalogging.LazyLogging
import models.binary.credential.CredentialService
import models.user.User
Expand Down Expand Up @@ -163,8 +163,7 @@ class ExploreRemoteLayerService @Inject()(credentialService: CredentialService)
requestingUser._organization)
remoteSource = RemoteSourceDescriptor(uri, credentialOpt)
credentialId <- Fox.runOptional(credentialOpt)(c => credentialService.insertOne(c)) ?~> "remoteFileSystem.credential.insert.failed"
fileSystem <- FileSystemsHolder.getOrCreate(remoteSource) ?~> "remoteFileSystem.setup.failed"
remotePath <- tryo(fileSystem.getPath(FileSystemsHolder.pathFromUri(remoteSource.uri))) ?~> "remoteFileSystem.getPath.failed"
remotePath <- DataVaultsHolder.getVaultPath(remoteSource) ?~> "remoteFileSystem.setup.failed"
layersWithVoxelSizes <- exploreRemoteLayersForRemotePath(
remotePath,
credentialId.map(_.toString),
Expand Down
1 change: 1 addition & 0 deletions app/models/binary/explore/PrecomputedExplorer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class PrecomputedExplorer extends RemoteLayerExplorer {

override def explore(remotePath: Path, credentialId: Option[String]): Fox[List[(PrecomputedLayer, Vec3Double)]] =
for {
_ <- Fox.bool2Fox(remotePath.toUri.toString.last == '/') ?~> "Remote dataset explore path must be a directory (indicated by trailing slash)"
fm3 marked this conversation as resolved.
Show resolved Hide resolved
infoPath <- Fox.successful(remotePath.resolve(PrecomputedHeader.FILENAME_INFO))
precomputedHeader <- parseJsonFromPath[PrecomputedHeader](infoPath) ?~> s"Failed to read neuroglancer precomputed metadata at $infoPath"
layerAndVoxelSize <- layerFromPrecomputedHeader(precomputedHeader, remotePath, credentialId)
Expand Down
6 changes: 5 additions & 1 deletion app/models/binary/explore/RemoteLayerExplorer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.scalableminds.util.io.ZipIO
import com.scalableminds.util.tools.{Fox, FoxImplicits, JsonHelper}
import com.scalableminds.webknossos.datastore.dataformats.MagLocator
import com.scalableminds.webknossos.datastore.models.datasource.{DataLayer, ElementClass}
import com.scalableminds.webknossos.datastore.datavault.VaultPath
import net.liftweb.util.Helpers.tryo
import play.api.libs.json.Reads

Expand All @@ -25,7 +26,10 @@ trait RemoteLayerExplorer extends FoxImplicits {

protected def parseJsonFromPath[T: Reads](path: Path): Fox[T] =
for {
fileBytes <- tryo(ZipIO.tryGunzip(Files.readAllBytes(path))) ?~> "dataSet.explore.failed.readFile"
fileBytes <- path match {
case path: VaultPath => path.readBytes() ?~> "dataSet.explore.failed.readFile"
case _ => tryo(ZipIO.tryGunzip(Files.readAllBytes(path))) ?~> "dataSet.explore.failed.readFile"
}
fileAsString <- tryo(new String(fileBytes, StandardCharsets.UTF_8)).toFox ?~> "dataSet.explore.failed.readFile"
parsed <- JsonHelper.parseAndValidateJson[T](fileAsString)
} yield parsed
Expand Down
68 changes: 68 additions & 0 deletions test/backend/DataVaultTestSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package backend

import org.scalatestplus.play.PlaySpec

import java.net.URI
import com.scalableminds.webknossos.datastore.datavault.{GoogleCloudDataVault, HttpsDataVault, VaultPath}
import com.scalableminds.webknossos.datastore.storage.RemoteSourceDescriptor

import scala.collection.immutable.NumericRange

class DataVaultTestSuite extends PlaySpec {

"Data vault" when {
"using Range requests" when {
val range: NumericRange[Long] = Range.Long(0, 1024, 1)
val dataKey = "32_32_40/15360-15424_8384-8448_3520-3584" // when accessed via range request, the response body is 1024 bytes long, otherwise 124.8 KB

"with HTTP Vault" should {
"return correct response" in {
val uri = new URI("http://storage.googleapis.com/")
val vaultPath = new VaultPath(uri, HttpsDataVault.create(RemoteSourceDescriptor(uri, None)))
val bytes =
(vaultPath / s"/neuroglancer-fafb-data/fafb_v14/fafb_v14_orig/$dataKey").readBytes(Some(range)).get

assert(bytes.length == range.length)
assert(bytes.take(10).sameElements(Array(-1, -40, -1, -32, 0, 16, 74, 70, 73, 70)))
}
}

"with Google Cloud Storage Vault" should {
"return correct response" in {
val uri = new URI("gs://neuroglancer-fafb-data/fafb_v14/fafb_v14_orig")
val vaultPath = new VaultPath(uri, GoogleCloudDataVault.create(RemoteSourceDescriptor(uri, None)))
val bytes = (vaultPath / s"fafb_v14_orig/$dataKey").readBytes(Some(range)).get

assert(bytes.length == range.length)
assert(bytes.take(10).sameElements(Array(-1, -40, -1, -32, 0, 16, 74, 70, 73, 70)))
}
}
}
"using regular requests" when {
val dataKey = "32_32_40/15360-15424_8384-8448_3520-3584"
val dataLength = 127808

"with HTTP Vault" should {
"return correct response" in {
val uri = new URI("http://storage.googleapis.com/")
val vaultPath = new VaultPath(uri, HttpsDataVault.create(RemoteSourceDescriptor(uri, None)))
val bytes = (vaultPath / s"/neuroglancer-fafb-data/fafb_v14/fafb_v14_orig/$dataKey").readBytes().get

assert(bytes.length == dataLength)
assert(bytes.take(10).sameElements(Array(-1, -40, -1, -32, 0, 16, 74, 70, 73, 70)))
}
}

"with Google Cloud Storage Vault" should {
"return correct response" in {
val uri = new URI("gs://neuroglancer-fafb-data/fafb_v14/fafb_v14_orig")
val vaultPath = new VaultPath(uri, GoogleCloudDataVault.create(RemoteSourceDescriptor(uri, None)))
val bytes = (vaultPath / s"fafb_v14_orig/$dataKey").readBytes().get
frcroth marked this conversation as resolved.
Show resolved Hide resolved

assert(bytes.length == dataLength)
assert(bytes.take(10).sameElements(Array(-1, -40, -1, -32, 0, 16, 74, 70, 73, 70)))
}
}
}
}
frcroth marked this conversation as resolved.
Show resolved Hide resolved
}
47 changes: 0 additions & 47 deletions test/backend/RangeRequestTestSuite.scala

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.scalableminds.webknossos.datastore.dataformats

import com.scalableminds.util.tools.{Fox, FoxImplicits}
import com.scalableminds.webknossos.datastore.datavault.{FileSystemVaultPath, VaultPath}
import com.scalableminds.webknossos.datastore.models.BucketPosition
import com.scalableminds.webknossos.datastore.models.requests.DataReadInstruction
import com.scalableminds.webknossos.datastore.storage.{DataCubeCache, FileSystemService}
import com.typesafe.scalalogging.LazyLogging
import net.liftweb.common.Empty

import java.nio.file.Path
import scala.concurrent.ExecutionContext

trait BucketProvider extends FoxImplicits with LazyLogging {
Expand Down Expand Up @@ -45,12 +45,13 @@ trait BucketProvider extends FoxImplicits with LazyLogging {
Iterator.empty

protected def localPathFrom(readInstruction: DataReadInstruction, relativeMagPath: String)(
implicit ec: ExecutionContext): Fox[Path] = {
val magPath = readInstruction.baseDir
.resolve(readInstruction.dataSource.id.team)
.resolve(readInstruction.dataSource.id.name)
.resolve(readInstruction.dataLayer.name)
.resolve(relativeMagPath)
implicit ec: ExecutionContext): Fox[VaultPath] = {
val magPath = FileSystemVaultPath.fromPath(
readInstruction.baseDir
.resolve(readInstruction.dataSource.id.team)
.resolve(readInstruction.dataSource.id.name)
.resolve(readInstruction.dataLayer.name)
.resolve(relativeMagPath))
if (magPath.toFile.exists()) {
Fox.successful(magPath)
} else Fox.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.scalableminds.webknossos.datastore.dataformats
import com.scalableminds.util.geometry.Vec3Int
import com.scalableminds.webknossos.datastore.datareaders.AxisOrder
import com.scalableminds.webknossos.datastore.models.datasource.ResolutionFormatHelper
import com.scalableminds.webknossos.datastore.storage.{FileSystemsHolder, LegacyFileSystemCredential}
import com.scalableminds.webknossos.datastore.storage.{DataVaultsHolder, LegacyFileSystemCredential}
import play.api.libs.json.{Json, OFormat}

import java.net.URI
Expand All @@ -17,7 +17,7 @@ case class MagLocator(mag: Vec3Int,

lazy val pathWithFallback: String = path.getOrElse(mag.toMagLiteral(allowScalar = true))
lazy val uri: URI = new URI(pathWithFallback)
lazy val isRemote: Boolean = FileSystemsHolder.isSupportedRemoteScheme(uri.getScheme)
lazy val isRemote: Boolean = DataVaultsHolder.isSupportedRemoteScheme(uri.getScheme)
}

object MagLocator extends ResolutionFormatHelper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import com.scalableminds.util.requestlogging.RateLimitedErrorLogging
import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.dataformats.{BucketProvider, DataCubeHandle, MagLocator}
import com.scalableminds.webknossos.datastore.datareaders.n5.N5Array
import com.scalableminds.webknossos.datastore.datavault.VaultPath
import com.scalableminds.webknossos.datastore.models.BucketPosition
import com.scalableminds.webknossos.datastore.models.requests.DataReadInstruction
import com.scalableminds.webknossos.datastore.storage.FileSystemService
import com.typesafe.scalalogging.LazyLogging
import net.liftweb.common.{Empty, Failure, Full}
import net.liftweb.util.Helpers.tryo

import java.nio.file.Path
import scala.concurrent.ExecutionContext

class N5CubeHandle(n5Array: N5Array) extends DataCubeHandle with LazyLogging with RateLimitedErrorLogging {
Expand Down Expand Up @@ -45,7 +45,7 @@ class N5BucketProvider(layer: N5Layer, val fileSystemServiceOpt: Option[FileSyst
fileSystemServiceOpt match {
case Some(fileSystemService: FileSystemService) =>
for {
magPath: Path <- if (n5Mag.isRemote) {
magPath: VaultPath <- if (n5Mag.isRemote) {
fileSystemService.remotePathFor(n5Mag)
} else localPathFrom(readInstruction, n5Mag.pathWithFallback)
cubeHandle <- tryo(onError = e => logError(e))(N5Array.open(magPath, n5Mag.axisOrder, n5Mag.channelIndex))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import com.scalableminds.util.requestlogging.RateLimitedErrorLogging
import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.dataformats.{BucketProvider, DataCubeHandle, MagLocator}
import com.scalableminds.webknossos.datastore.datareaders.precomputed.PrecomputedArray
import com.scalableminds.webknossos.datastore.datavault.VaultPath
import com.scalableminds.webknossos.datastore.models.BucketPosition
import com.scalableminds.webknossos.datastore.models.requests.DataReadInstruction
import com.scalableminds.webknossos.datastore.storage.FileSystemService
import com.typesafe.scalalogging.LazyLogging
import net.liftweb.common.{Empty, Failure, Full}
import net.liftweb.util.Helpers.tryo

import java.nio.file.Path
import scala.concurrent.ExecutionContext

class PrecomputedCubeHandle(precomputedArray: PrecomputedArray)
Expand Down Expand Up @@ -48,7 +48,7 @@ class PrecomputedBucketProvider(layer: PrecomputedLayer, val fileSystemServiceOp
fileSystemServiceOpt match {
case Some(fileSystemService: FileSystemService) =>
for {
magPath: Path <- if (precomputedMag.isRemote) {
magPath: VaultPath <- if (precomputedMag.isRemote) {
fileSystemService.remotePathFor(precomputedMag)
} else localPathFrom(readInstruction, precomputedMag.pathWithFallback)
cubeHandle <- tryo(onError = e => logError(e))(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import com.scalableminds.util.requestlogging.RateLimitedErrorLogging
import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.dataformats.{BucketProvider, DataCubeHandle, MagLocator}
import com.scalableminds.webknossos.datastore.datareaders.zarr.ZarrArray
import com.scalableminds.webknossos.datastore.datavault.VaultPath
import com.scalableminds.webknossos.datastore.models.BucketPosition
import com.scalableminds.webknossos.datastore.models.requests.DataReadInstruction
import com.scalableminds.webknossos.datastore.storage.FileSystemService
import com.typesafe.scalalogging.LazyLogging
import net.liftweb.common.{Empty, Failure, Full}
import net.liftweb.util.Helpers.tryo

import java.nio.file.Path
import scala.concurrent.ExecutionContext

class ZarrCubeHandle(zarrArray: ZarrArray) extends DataCubeHandle with LazyLogging with RateLimitedErrorLogging {
Expand Down Expand Up @@ -45,7 +45,7 @@ class ZarrBucketProvider(layer: ZarrLayer, val fileSystemServiceOpt: Option[File
fileSystemServiceOpt match {
case Some(fileSystemService: FileSystemService) =>
for {
magPath: Path <- if (zarrMag.isRemote) {
magPath: VaultPath <- if (zarrMag.isRemote) {
fileSystemService.remotePathFor(zarrMag)
} else localPathFrom(readInstruction, zarrMag.pathWithFallback)
cubeHandle <- tryo(onError = e => logError(e))(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.scalableminds.webknossos.datastore.datareaders

import com.scalableminds.webknossos.datastore.datavault.VaultPath
import com.typesafe.scalalogging.LazyLogging
import ucar.ma2.{Array => MultiArray, DataType => MADataType}

Expand All @@ -9,8 +10,8 @@ import scala.concurrent.Future
import scala.util.Using

object ChunkReader {
def create(store: FileSystemStore, header: DatasetHeader): ChunkReader =
new ChunkReader(header, store, createChunkTyper(header))
def create(vaultPath: VaultPath, header: DatasetHeader): ChunkReader =
new ChunkReader(header, vaultPath, createChunkTyper(header))

def createChunkTyper(header: DatasetHeader): ChunkTyper =
header.resolvedDataType match {
Expand All @@ -23,7 +24,7 @@ object ChunkReader {
}
}

class ChunkReader(val header: DatasetHeader, val store: FileSystemStore, val chunkTyper: ChunkTyper) {
class ChunkReader(val header: DatasetHeader, val vaultPath: VaultPath, val chunkTyper: ChunkTyper) {
lazy val chunkSize: Int = header.chunkSize.toList.product

@throws[IOException]
Expand All @@ -36,7 +37,7 @@ class ChunkReader(val header: DatasetHeader, val store: FileSystemStore, val chu
// and chunk shape (optional, only for data formats where each chunk reports its own shape, e.g. N5)
protected def readChunkBytesAndShape(path: String): Option[(Array[Byte], Option[Array[Int]])] =
Using.Manager { use =>
store.readBytes(path).map { bytes =>
(vaultPath / path).readBytes().map { bytes =>
val is = use(new ByteArrayInputStream(bytes))
val os = use(new ByteArrayOutputStream())
header.compressorImpl.uncompress(is, os)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.scalableminds.util.geometry.Vec3Int
import com.scalableminds.util.tools.Fox
import com.scalableminds.util.tools.Fox.option2Fox
import com.scalableminds.webknossos.datastore.datareaders.zarr.BytesConverter
import com.scalableminds.webknossos.datastore.datavault.VaultPath
import com.typesafe.scalalogging.LazyLogging
import ucar.ma2.{InvalidRangeException, Array => MultiArray}

Expand All @@ -15,14 +16,14 @@ import java.util
import scala.concurrent.{ExecutionContext, Future}

class DatasetArray(relativePath: DatasetPath,
store: FileSystemStore,
vaultPath: VaultPath,
header: DatasetHeader,
axisOrder: AxisOrder,
channelIndex: Option[Int])
extends LazyLogging {

protected val chunkReader: ChunkReader =
ChunkReader.create(store, header)
ChunkReader.create(vaultPath, header)

// cache currently limited to 1 GB per array
private lazy val chunkContentsCache: Cache[String, MultiArray] = {
Expand Down Expand Up @@ -126,7 +127,7 @@ class DatasetArray(relativePath: DatasetPath,

override def toString: String =
s"${getClass.getCanonicalName} {'/${relativePath.storeKey}' axisOrder=$axisOrder shape=${header.datasetShape.mkString(
",")} chunks=${header.chunkSize.mkString(",")} dtype=${header.dataType} fillValue=${header.fillValueNumber}, ${header.compressorImpl}, byteOrder=${header.byteOrder}, store=${store.getClass.getSimpleName}}"
",")} chunks=${header.chunkSize.mkString(",")} dtype=${header.dataType} fillValue=${header.fillValueNumber}, ${header.compressorImpl}, byteOrder=${header.byteOrder}, vault=${vaultPath.getName}}"

}

Expand Down
Loading