Skip to content

Commit

Permalink
Enable S3 compliant data vaults using https and http (#8167)
Browse files Browse the repository at this point in the history
  • Loading branch information
frcroth authored Nov 4, 2024
1 parent 159f868 commit 96c9d7f
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 53 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released
- Users without edit permissions to a dataset can no longer delete sharing tokens via the API. [#8083](https://github.com/scalableminds/webknossos/issues/8083)
- Fixed downloading task annotations of teams you are not in, when accessing directly via URI. [#8155](https://github.com/scalableminds/webknossos/pull/8155)
- Deleting a bounding box is now possible independently of a visible segmentation layer. [#8164](https://github.com/scalableminds/webknossos/pull/8164)
- S3-compliant object storages can now be accessed via HTTPS. [#8167](https://github.com/scalableminds/webknossos/pull/8167)

### Removed

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
"ts-coverage": "typescript-coverage-report",
"find-cyclic-dependencies": "yarn run dpdm -T --tree false --warning false --extensions .ts,.tsx frontend/javascripts/main.tsx",
"check-cyclic-dependencies": "node ./tools/check-cyclic-dependencies.js",
"startf": "yarn rm-fossil-lock; yarn kill-listeners; yarn start",
"startf": "yarn rm-fossil-lock; yarn kill-listeners; rm -r webknossos-jni/target; yarn start",
"beautify-front": "yarn fix-frontend && yarn typecheck",
"beautify": "yarn format-backend && yarn beautify-front"
},
Expand Down
87 changes: 50 additions & 37 deletions test/backend/DataVaultTestSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,14 @@ class DataVaultTestSuite extends PlaySpec {
"using S3 data vault" should {
"return correct response" in {
val uri = new URI("s3://janelia-cosem-datasets/jrc_hela-3/jrc_hela-3.n5/em/fibsem-uint16/")
val vaultPath = new VaultPath(uri, S3DataVault.create(RemoteSourceDescriptor(uri, None)))
val bytes =
(vaultPath / "s0/5/5/5").readBytes(Some(range))(globalExecutionContext).get(handleFoxJustification)
assert(bytes.length == range.length)
assert(bytes.take(10).sameElements(Array(0, 0, 0, 3, 0, 0, 0, 64, 0, 0)))
WsTestClient.withClient { ws =>
val vaultPath =
new VaultPath(uri, S3DataVault.create(RemoteSourceDescriptor(uri, None), ws)(globalExecutionContext))
val bytes =
(vaultPath / "s0/5/5/5").readBytes(Some(range))(globalExecutionContext).get(handleFoxJustification)
assert(bytes.length == range.length)
assert(bytes.take(10).sameElements(Array(0, 0, 0, 3, 0, 0, 0, 64, 0, 0)))
}
}
}
}
Expand Down Expand Up @@ -135,59 +138,69 @@ class DataVaultTestSuite extends PlaySpec {
"using s3 data vault" should {
"return correctly decoded brotli-compressed data" in {
val uri = new URI("s3://open-neurodata/bock11/image/4_4_40")
val vaultPath = new VaultPath(uri, S3DataVault.create(RemoteSourceDescriptor(uri, None)))
val bytes =
(vaultPath / "33792-34304_29696-30208_3216-3232")
.readBytes()(globalExecutionContext)
.get(handleFoxJustification)
assert(bytes.take(10).sameElements(Array(-87, -95, -85, -94, -101, 124, 115, 100, 113, 111)))
WsTestClient.withClient { ws =>
val vaultPath =
new VaultPath(uri, S3DataVault.create(RemoteSourceDescriptor(uri, None), ws)(globalExecutionContext))
val bytes =
(vaultPath / "33792-34304_29696-30208_3216-3232")
.readBytes()(globalExecutionContext)
.get(handleFoxJustification)
assert(bytes.take(10).sameElements(Array(-87, -95, -85, -94, -101, 124, 115, 100, 113, 111)))
}
}

"return empty box" when {
"requesting a non-existent bucket" in {
val uri = new URI(s"s3://non-existent-bucket${UUID.randomUUID}/non-existent-object")
val s3DataVault = S3DataVault.create(RemoteSourceDescriptor(uri, None))
val vaultPath = new VaultPath(uri, s3DataVault)
val result = vaultPath.readBytes()(globalExecutionContext).await(handleFoxJustification)
assertBoxEmpty(result)
WsTestClient.withClient { ws =>
val s3DataVault = S3DataVault.create(RemoteSourceDescriptor(uri, None), ws)(globalExecutionContext)
val vaultPath = new VaultPath(uri, s3DataVault)
val result = vaultPath.readBytes()(globalExecutionContext).await(handleFoxJustification)
assertBoxEmpty(result)
}
}
}

"return empty box" when {
"requesting a non-existent object in existent bucket" in {
val uri = new URI(s"s3://open-neurodata/non-existent-object${UUID.randomUUID}")
val s3DataVault = S3DataVault.create(RemoteSourceDescriptor(uri, None))
val vaultPath = new VaultPath(uri, s3DataVault)
val result = vaultPath.readBytes()(globalExecutionContext).await(handleFoxJustification)
assertBoxEmpty(result)
WsTestClient.withClient { ws =>
val s3DataVault = S3DataVault.create(RemoteSourceDescriptor(uri, None), ws)(globalExecutionContext)
val vaultPath = new VaultPath(uri, s3DataVault)
val result = vaultPath.readBytes()(globalExecutionContext).await(handleFoxJustification)
assertBoxEmpty(result)
}
}
}
}
}

"using directory list requests" when {
val uri = new URI("s3://janelia-cosem-datasets/jrc_hela-3/jrc_hela-3.n5/em/fibsem-uint16/")
val vaultPath = new VaultPath(uri, S3DataVault.create(RemoteSourceDescriptor(uri, None)))

"using s3 data vault" should {
"list available directories" in {
val result = vaultPath.listDirectory(maxItems = 3)(globalExecutionContext).get(handleFoxJustification)
assert(result.length == 3)
assert(
result.exists(
_.toUri == new URI("s3://janelia-cosem-datasets/jrc_hela-3/jrc_hela-3.n5/em/fibsem-uint16/s0/")))
}
WsTestClient.withClient { ws =>
val vaultPath =
new VaultPath(uri, S3DataVault.create(RemoteSourceDescriptor(uri, None), ws)(globalExecutionContext))

"using s3 data vault" should {
"list available directories" in {
val result = vaultPath.listDirectory(maxItems = 3)(globalExecutionContext).get(handleFoxJustification)
assert(result.length == 3)
assert(
result.exists(
_.toUri == new URI("s3://janelia-cosem-datasets/jrc_hela-3/jrc_hela-3.n5/em/fibsem-uint16/s0/")))
}

"return failure" when {
"requesting directory listing on non-existent bucket" in {
val uri = new URI(f"s3://non-existent-bucket${UUID.randomUUID}/non-existent-object/")
val s3DataVault = S3DataVault.create(RemoteSourceDescriptor(uri, None))
val vaultPath = new VaultPath(uri, s3DataVault)
val result = vaultPath.listDirectory(maxItems = 5)(globalExecutionContext).await(handleFoxJustification)
assertBoxFailure(result)
"return failure" when {
"requesting directory listing on non-existent bucket" in {
val uri = new URI(f"s3://non-existent-bucket${UUID.randomUUID}/non-existent-object/")
val s3DataVault = S3DataVault.create(RemoteSourceDescriptor(uri, None), ws)(globalExecutionContext)
val vaultPath = new VaultPath(uri, s3DataVault)
val result = vaultPath.listDirectory(maxItems = 5)(globalExecutionContext).await(handleFoxJustification)
assertBoxFailure(result)
}
}
}

}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.scalableminds.webknossos.datastore.datavault

import com.scalableminds.util.tools.Fox
import com.scalableminds.util.tools.Fox.box2Fox
import com.scalableminds.util.tools.Fox.{box2Fox, future2Fox}
import com.scalableminds.webknossos.datastore.storage.{
LegacyDataVaultCredential,
RemoteSourceDescriptor,
Expand All @@ -10,6 +10,7 @@ import com.scalableminds.webknossos.datastore.storage.{
import net.liftweb.common.Box.tryo
import net.liftweb.common.{Box, Empty, Full, Failure => BoxFailure}
import org.apache.commons.lang3.builder.HashCodeBuilder
import play.api.libs.ws.WSClient
import software.amazon.awssdk.auth.credentials.{
AnonymousCredentialsProvider,
AwsBasicCredentials,
Expand Down Expand Up @@ -41,14 +42,18 @@ import scala.jdk.FutureConverters._
import scala.jdk.OptionConverters.RichOptional
import scala.util.{Failure => TryFailure, Success => TrySuccess}

class S3DataVault(s3AccessKeyCredential: Option[S3AccessKeyCredential], uri: URI) extends DataVault {
class S3DataVault(s3AccessKeyCredential: Option[S3AccessKeyCredential],
uri: URI,
ws: WSClient,
implicit val ec: ExecutionContext)
extends DataVault {
private lazy val bucketName = S3DataVault.hostBucketFromUri(uri) match {
case Some(value) => value
case None => throw new Exception(s"Could not parse S3 bucket for ${uri.toString}")
}

private lazy val client: S3AsyncClient =
S3DataVault.getAmazonS3Client(s3AccessKeyCredential, uri)
private lazy val clientFox: Fox[S3AsyncClient] =
S3DataVault.getAmazonS3Client(s3AccessKeyCredential, uri, ws)

private def getRangeRequest(bucketName: String, key: String, range: NumericRange[Long]): GetObjectRequest =
GetObjectRequest.builder().bucket(bucketName).key(key).range(s"bytes=${range.start}-${range.end - 1}").build()
Expand All @@ -64,6 +69,7 @@ class S3DataVault(s3AccessKeyCredential: Option[S3AccessKeyCredential], uri: URI
val responseTransformer: AsyncResponseTransformer[GetObjectResponse, ResponseBytes[GetObjectResponse]] =
AsyncResponseTransformer.toBytes
for {
client <- clientFox
responseBytesObject: ResponseBytes[GetObjectResponse] <- notFoundToEmpty(
client.getObject(request, responseTransformer).asScala)
encoding = responseBytesObject.response().contentEncoding()
Expand Down Expand Up @@ -122,6 +128,7 @@ class S3DataVault(s3AccessKeyCredential: Option[S3AccessKeyCredential], uri: URI
val listObjectsRequest =
ListObjectsV2Request.builder().bucket(bucketName).prefix(keyPrefix).delimiter("/").maxKeys(maxKeys).build()
for {
client <- clientFox
objectListing: ListObjectsV2Response <- notFoundToFailure(client.listObjectsV2(listObjectsRequest).asScala)
s3SubPrefixes: List[CommonPrefix] = objectListing.commonPrefixes().asScala.take(maxItems).toList
} yield s3SubPrefixes.map(_.prefix())
Expand All @@ -140,13 +147,14 @@ class S3DataVault(s3AccessKeyCredential: Option[S3AccessKeyCredential], uri: URI
}

object S3DataVault {
def create(remoteSourceDescriptor: RemoteSourceDescriptor): S3DataVault = {
def create(remoteSourceDescriptor: RemoteSourceDescriptor, ws: WSClient)(
implicit ec: ExecutionContext): S3DataVault = {
val credential = remoteSourceDescriptor.credential.flatMap {
case f: S3AccessKeyCredential => Some(f)
case f: LegacyDataVaultCredential => Some(f.toS3AccessKey)
case _ => None
}
new S3DataVault(credential, remoteSourceDescriptor.uri)
new S3DataVault(credential, remoteSourceDescriptor.uri, ws, ec)
}

private def hostBucketFromUri(uri: URI): Option[String] = {
Expand Down Expand Up @@ -201,16 +209,34 @@ object S3DataVault {
private def isNonAmazonHost(uri: URI): Boolean =
(isPathStyle(uri) && !uri.getHost.endsWith(".amazonaws.com")) || uri.getHost == "localhost"

private def getAmazonS3Client(credentialOpt: Option[S3AccessKeyCredential], uri: URI): S3AsyncClient = {
private def determineProtocol(uri: URI, ws: WSClient)(implicit ec: ExecutionContext): Fox[String] = {
// If the endpoint supports HTTPS, use it. Otherwise, use HTTP.
val httpsUri = new URI("https", uri.getAuthority, "", "", "")
val httpsFuture = ws.url(httpsUri.toString).get()

val protocolFuture = httpsFuture.transformWith({
case TrySuccess(_) => Future.successful("https")
case TryFailure(_) => Future.successful("http")
})
for {
protocol <- protocolFuture.toFox
} yield protocol
}

private def getAmazonS3Client(credentialOpt: Option[S3AccessKeyCredential], uri: URI, ws: WSClient)(
implicit ec: ExecutionContext): Fox[S3AsyncClient] = {
val basic =
S3AsyncClient.builder().credentialsProvider(getCredentialsProvider(credentialOpt)).crossRegionAccessEnabled(true)
if (isNonAmazonHost(uri))
basic
.forcePathStyle(true)
.endpointOverride(new URI(s"http://${uri.getAuthority}"))
.region(AwsHostNameUtils.parseSigningRegion(uri.getAuthority, "s3").toScala.getOrElse(Region.US_EAST_1))
.build()
else basic.region(Region.US_EAST_1).build()
if (isNonAmazonHost(uri)) {
for {
protocol <- determineProtocol(uri, ws)
} yield
basic
.forcePathStyle(true)
.endpointOverride(new URI(s"${protocol}://${uri.getAuthority}"))
.region(AwsHostNameUtils.parseSigningRegion(uri.getAuthority, "s3").toScala.getOrElse(Region.US_EAST_1))
.build()
} else Fox.successful(basic.region(Region.US_EAST_1).build())
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class DataVaultService @Inject()(ws: WSClient) extends LazyLogging {
val fs: DataVault = if (scheme == DataVaultService.schemeGS) {
GoogleCloudDataVault.create(remoteSource)
} else if (scheme == DataVaultService.schemeS3) {
S3DataVault.create(remoteSource)
S3DataVault.create(remoteSource, ws)
} else if (scheme == DataVaultService.schemeHttps || scheme == DataVaultService.schemeHttp) {
HttpsDataVault.create(remoteSource, ws)
} else if (scheme == DataVaultService.schemeFile) {
Expand Down

0 comments on commit 96c9d7f

Please sign in to comment.