Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-1153 Azure blob read md5 from metadata for large files #7204

Merged
merged 22 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
359ef4d
Initial commit
aednichols Aug 11, 2023
0b9aca0
Consistently use `Try` for IO
aednichols Aug 11, 2023
cad86dc
Whitespace disabled from working on WDL parser...
aednichols Aug 11, 2023
ade9145
Fix sneaky `None.get` disguised as `Map.get()`
aednichols Aug 12, 2023
39024c2
Add test, fix thing test found
aednichols Aug 14, 2023
108f60e
Merge remote-tracking branch 'origin/develop' into aen_wx_1153
aednichols Aug 14, 2023
c4fbfb1
Try populating Azure creds for unit tests
aednichols Aug 15, 2023
ac4bd02
See what happens with other tests
aednichols Aug 15, 2023
cf96a7c
You work locally, will you work in CI?
aednichols Aug 15, 2023
d348428
Don't hide the exception when tests fail
aednichols Aug 15, 2023
c5d991e
Can we, uh, see the error maybe?
aednichols Aug 16, 2023
b62c9de
Do you pass in CI now?
aednichols Aug 17, 2023
c4e3ece
Do you pass in CI now?
aednichols Aug 17, 2023
4c9a642
Merge remote-tracking branch 'origin/develop' into aen_wx_1153
aednichols Aug 17, 2023
b4872d9
Cleanup
aednichols Aug 17, 2023
c36e3e5
Add PR suggested test
aednichols Aug 17, 2023
2fe419c
Unignore other tests
jgainerdewar Aug 17, 2023
23217a2
Enhance `FileSystemAPI` to allow injection
aednichols Aug 17, 2023
35ec356
Merge branch 'aen_wx_1153' of github.com:broadinstitute/cromwell into…
jgainerdewar Aug 17, 2023
f845d8b
Merge branch 'aen_wx_1153' of github.com:broadinstitute/cromwell into…
jgainerdewar Aug 17, 2023
d17274f
Re-add test I accidentally deleted
jgainerdewar Aug 17, 2023
ba60ef8
Merge branch 'develop' into aen_wx_1153
aednichols Aug 17, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/cromwell_unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ jobs:

#Invoke SBT to run all unit tests for Cromwell.
- name: Run tests
env:
AZURE_CLIENT_ID: ${{ secrets.VAULT_AZURE_CENTAUR_CLIENT_ID }}
AZURE_CLIENT_SECRET: ${{ secrets.VAULT_AZURE_CENTAUR_CLIENT_SECRET }}
AZURE_TENANT_ID: ${{ secrets.VAULT_AZURE_CENTAUR_TENANT_ID }}
run: |
set -e
sbt "test"
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
package cromwell.filesystems.blob

import com.azure.core.credential.AzureSasCredential
import com.azure.storage.blob.nio.AzureFileSystem
import com.azure.storage.blob.nio.{AzureFileSystem, AzureFileSystemProvider}
import com.azure.storage.blob.sas.{BlobContainerSasPermission, BlobServiceSasSignatureValues}
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import common.validation.Validation._
import cromwell.cloudsupport.azure.{AzureCredentials, AzureUtils}

import java.net.URI
import java.nio.file.{FileSystem, FileSystemNotFoundException, FileSystems}
import java.nio.file.spi.FileSystemProvider
import java.nio.file.{FileSystem, FileSystemNotFoundException}
import java.time.temporal.ChronoUnit
import java.time.{Duration, Instant, OffsetDateTime}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

// We encapsulate this functionality here so that we can easily mock it out, to allow for testing without
// actually connecting to Blob storage.
case class FileSystemAPI() {
def getFileSystem(uri: URI): Try[FileSystem] = Try(FileSystems.getFileSystem(uri))
def newFileSystem(uri: URI, config: Map[String, Object]): FileSystem = FileSystems.newFileSystem(uri, config.asJava)
case class FileSystemAPI(private val provider: FileSystemProvider = new AzureFileSystemProvider()) {
def getFileSystem(uri: URI): Try[FileSystem] = Try(provider.getFileSystem(uri))
def newFileSystem(uri: URI, config: Map[String, Object]): FileSystem = provider.newFileSystem(uri, config.asJava)
def closeFileSystem(uri: URI): Option[Unit] = getFileSystem(uri).toOption.map(_.close)
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import cromwell.filesystems.blob.BlobPathBuilder._

import java.net.{MalformedURLException, URI}
import java.nio.file.{Files, LinkOption}
import scala.jdk.CollectionConverters._
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -78,6 +79,20 @@ object BlobPath {
// format the library expects
// 2) If the path looks like <container>:<path>, strip off the <container>: to leave the absolute path inside the container.
private val brokenPathRegex = "https:/([a-z0-9]+).blob.core.windows.net/([-a-zA-Z0-9]+)/(.*)".r

// Blob files larger than 5 GB upload in parallel parts [0][1] and do not get a native `CONTENT-MD5` property.
// Instead, some uploaders such as TES [2] calculate the md5 themselves and store it under this key in metadata.
// They do this for all files they touch, regardless of size, and the root/metadata property is authoritative over native.
//
// N.B. most if not virtually all large files in the wild will NOT have this key populated because they were not created
// by TES or its associated upload utility [4].
//
// [0] https://learn.microsoft.com/en-us/azure/storage/blobs/scalability-targets
// [1] https://learn.microsoft.com/en-us/rest/api/storageservices/version-2019-12-12
// [2] https://github.com/microsoft/ga4gh-tes/blob/03feb746bb961b72fa91266a56db845e3b31be27/src/Tes.Runner/Transfer/BlobBlockApiHttpUtils.cs#L25
// [4] https://github.com/microsoft/ga4gh-tes/blob/main/src/Tes.RunnerCLI/scripts/roothash.sh
private val largeBlobFileMetadataKey = "md5_4mib_hashlist_root_hash"

def cleanedNioPathString(nioString: String): String = {
val pathStr = nioString match {
case brokenPathRegex(_, containerName, pathInContainer) =>
Expand Down Expand Up @@ -116,16 +131,33 @@ case class BlobPath private[blob](pathString: String, endpoint: EndpointURL, con
def blobFileAttributes: Try[AzureBlobFileAttributes] =
Try(Files.readAttributes(nioPath, classOf[AzureBlobFileAttributes]))

def blobFileMetadata: Try[Option[Map[String, String]]] = blobFileAttributes.map { attrs =>
// `metadata()` has a documented `null` case
Option(attrs.metadata()).map(_.asScala.toMap)
}

def md5HexString: Try[Option[String]] = {
blobFileAttributes.map(h =>
Option(h.blobHttpHeaders().getContentMd5) match {
case None => None
case Some(arr) if arr.isEmpty => None
// Convert the bytes to a hex-encoded string. Note that this value
// is rendered in base64 in the Azure web portal.
case Some(bytes) => Option(bytes.map("%02x".format(_)).mkString)
def md5FromMetadata: Option[String] = (blobFileMetadata map { maybeMetadataMap: Option[Map[String, String]] =>
maybeMetadataMap flatMap { metadataMap: Map[String, String] =>
metadataMap.get(BlobPath.largeBlobFileMetadataKey)
}
)
}).toOption.flatten

// Convert the bytes to a hex-encoded string. Note that the value
// is rendered in base64 in the Azure web portal.
def hexString(bytes: Array[Byte]): String = bytes.map("%02x".format(_)).mkString

blobFileAttributes.map { attr: AzureBlobFileAttributes =>
(Option(attr.blobHttpHeaders().getContentMd5), md5FromMetadata) match {
case (None, None) => None
// (Some, Some) will happen for all <5 GB files uploaded by TES. Per Microsoft 2023-08-15 the
// root/metadata algorithm emits different values than the native algorithm and we should
// always choose metadata for consistency with larger files that only have that one.
case (_, Some(metadataMd5)) => Option(metadataMd5)
case (Some(headerMd5Bytes), None) if headerMd5Bytes.isEmpty => None
case (Some(headerMd5Bytes), None) => Option(hexString(headerMd5Bytes))
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,46 +89,80 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers with MockSugar {
)
}

//// The below tests are IGNORED because they depend on Azure auth information being present in the environment ////
// The following tests use the `centaurtesting` account injected into CI. They depend on access to the
// container specified below. You may need to log in to az cli locally to get them to pass.
private val subscriptionId: SubscriptionId = SubscriptionId(UUID.fromString("62b22893-6bc1-46d9-8a90-806bb3cce3c9"))
private val endpoint: EndpointURL = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage")
private val store: BlobContainerName = BlobContainerName("inputs")
private val endpoint: EndpointURL = BlobPathBuilderSpec.buildEndpoint("centaurtesting")
private val container: BlobContainerName = BlobContainerName("test-blob")

def makeBlobPathBuilder(blobEndpoint: EndpointURL, container: BlobContainerName): BlobPathBuilder = {
def makeBlobPathBuilder(blobEndpoint: EndpointURL,
container: BlobContainerName): BlobPathBuilder = {
val blobTokenGenerator = NativeBlobSasTokenGenerator(container, blobEndpoint, Some(subscriptionId))
val fsm = new BlobFileSystemManager(container, blobEndpoint, 10, blobTokenGenerator)
new BlobPathBuilder(store, endpoint)(fsm)
new BlobPathBuilder(container, blobEndpoint)(fsm)
}

ignore should "resolve an absolute path string correctly to a path" in {
val builder = makeBlobPathBuilder(endpoint, store)
val rootString = s"${endpoint.value}/${store.value}/cromwell-execution"
it should "read md5 from small files <5g" in {
val builder = makeBlobPathBuilder(endpoint, container)
val evalPath = "/testRead.txt"
val testString = endpoint.value + "/" + container + evalPath
val blobPath1: BlobPath = (builder build testString).get
blobPath1.md5HexString.get should equal(Option("31ae06882d06a20e01ba1ac961ce576c"))
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add test cases for the two types of missing hashes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that the tests are working... with pleasure!

it should "read md5 from large files >5g" in {
val builder = makeBlobPathBuilder(endpoint, container)
val evalPath = "/Rocky-9.2-aarch64-dvd.iso"
val testString = endpoint.value + "/" + container + evalPath
val blobPath1: BlobPath = (builder build testString).get
blobPath1.md5HexString.toOption.get should equal(Some("13cb09331d2d12c0f476f81c672a4319"))
}

it should "choose the root/metadata md5 over the native md5 for files that have both" in {
val builder = makeBlobPathBuilder(endpoint, container)
val evalPath = "/redundant_md5_test.txt"
val testString = endpoint.value + "/" + container + evalPath
val blobPath1: BlobPath = (builder build testString).get
blobPath1.md5HexString.toOption.get should equal(Some("021c7cc715ec82292bb9b925f9ca44d3"))
}

it should "gracefully return `None` when neither hash is found" in {
val builder = makeBlobPathBuilder(endpoint, container)
val evalPath = "/no_md5_test.txt"
val testString = endpoint.value + "/" + container + evalPath
val blobPath1: BlobPath = (builder build testString).get
blobPath1.md5HexString.get should equal(None)
}

it should "resolve an absolute path string correctly to a path" in {
val builder = makeBlobPathBuilder(endpoint, container)
val rootString = s"${endpoint.value}/${container.value}/cromwell-execution"
val blobRoot: BlobPath = builder build rootString getOrElse fail()
blobRoot.toAbsolutePath.pathAsString should equal ("https://coaexternalstorage.blob.core.windows.net/inputs/cromwell-execution")
val otherFile = blobRoot.resolve("https://coaexternalstorage.blob.core.windows.net/inputs/cromwell-execution/test/inputFile.txt")
otherFile.toAbsolutePath.pathAsString should equal ("https://coaexternalstorage.blob.core.windows.net/inputs/cromwell-execution/test/inputFile.txt")
blobRoot.toAbsolutePath.pathAsString should equal ("https://centaurtesting.blob.core.windows.net/test-blob/cromwell-execution")
val otherFile = blobRoot.resolve("https://centaurtesting.blob.core.windows.net/test-blob/cromwell-execution/test/inputFile.txt")
otherFile.toAbsolutePath.pathAsString should equal ("https://centaurtesting.blob.core.windows.net/test-blob/cromwell-execution/test/inputFile.txt")
}

ignore should "build a blob path from a test string and read a file" in {
val builder = makeBlobPathBuilder(endpoint, store)
it should "build a blob path from a test string and read a file" in {
val builder = makeBlobPathBuilder(endpoint, container)
val endpointHost = BlobPathBuilder.parseURI(endpoint.value).map(_.getHost).getOrElse(fail("Could not parse URI"))
val evalPath = "/test/inputFile.txt"
val testString = endpoint.value + "/" + store + evalPath
val testString = endpoint.value + "/" + container + evalPath
val blobPath: BlobPath = builder build testString getOrElse fail()

blobPath.container should equal(store)
blobPath.container should equal(container)
blobPath.endpoint should equal(endpoint)
blobPath.pathAsString should equal(testString)
blobPath.pathWithoutScheme should equal(endpointHost + "/" + store + evalPath)
blobPath.pathWithoutScheme should equal(endpointHost + "/" + container + evalPath)
val is = blobPath.newInputStream()
val fileText = (is.readAllBytes.map(_.toChar)).mkString
fileText should include ("This is my test file!!!! Did it work?")
}

ignore should "build duplicate blob paths in the same filesystem" in {
val builder = makeBlobPathBuilder(endpoint, store)
it should "build duplicate blob paths in the same filesystem" in {
val builder = makeBlobPathBuilder(endpoint, container)
val evalPath = "/test/inputFile.txt"
val testString = endpoint.value + "/" + store + evalPath
val testString = endpoint.value + "/" + container + evalPath
val blobPath1: BlobPath = builder build testString getOrElse fail()
blobPath1.nioPath.getFileSystem.close()
val blobPath2: BlobPath = builder build testString getOrElse fail()
Expand All @@ -138,20 +172,20 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers with MockSugar {
fileText should include ("This is my test file!!!! Did it work?")
}

ignore should "resolve a path without duplicating container name" in {
val builder = makeBlobPathBuilder(endpoint, store)
val rootString = s"${endpoint.value}/${store.value}/cromwell-execution"
it should "resolve a path without duplicating container name" in {
val builder = makeBlobPathBuilder(endpoint, container)
val rootString = s"${endpoint.value}/${container.value}/cromwell-execution"
val blobRoot: BlobPath = builder build rootString getOrElse fail()
blobRoot.toAbsolutePath.pathAsString should equal ("https://coaexternalstorage.blob.core.windows.net/inputs/cromwell-execution")
blobRoot.toAbsolutePath.pathAsString should equal ("https://centaurtesting.blob.core.windows.net/test-blob/cromwell-execution")
val otherFile = blobRoot.resolve("test/inputFile.txt")
otherFile.toAbsolutePath.pathAsString should equal ("https://coaexternalstorage.blob.core.windows.net/inputs/cromwell-execution/test/inputFile.txt")
otherFile.toAbsolutePath.pathAsString should equal ("https://centaurtesting.blob.core.windows.net/test-blob/cromwell-execution/test/inputFile.txt")
}

ignore should "correctly remove a prefix from the blob path" in {
val builder = makeBlobPathBuilder(endpoint, store)
val rootString = s"${endpoint.value}/${store.value}/cromwell-execution/"
val execDirString = s"${endpoint.value}/${store.value}/cromwell-execution/abc123/myworkflow/task1/def4356/execution/"
val fileString = s"${endpoint.value}/${store.value}/cromwell-execution/abc123/myworkflow/task1/def4356/execution/stdout"
it should "correctly remove a prefix from the blob path" in {
val builder = makeBlobPathBuilder(endpoint, container)
val rootString = s"${endpoint.value}/${container.value}/cromwell-execution/"
val execDirString = s"${endpoint.value}/${container.value}/cromwell-execution/abc123/myworkflow/task1/def4356/execution/"
val fileString = s"${endpoint.value}/${container.value}/cromwell-execution/abc123/myworkflow/task1/def4356/execution/stdout"
val blobRoot: BlobPath = builder build rootString getOrElse fail()
val execDir: BlobPath = builder build execDirString getOrElse fail()
val blobFile: BlobPath = builder build fileString getOrElse fail()
Expand All @@ -160,10 +194,10 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers with MockSugar {
blobFile.pathStringWithoutPrefix(blobFile) should equal ("")
}

ignore should "not change a path if it doesn't start with a prefix" in {
val builder = makeBlobPathBuilder(endpoint, store)
val otherRootString = s"${endpoint.value}/${store.value}/foobar/"
val fileString = s"${endpoint.value}/${store.value}/cromwell-execution/abc123/myworkflow/task1/def4356/execution/stdout"
it should "not change a path if it doesn't start with a prefix" in {
val builder = makeBlobPathBuilder(endpoint, container)
val otherRootString = s"${endpoint.value}/${container.value}/foobar/"
val fileString = s"${endpoint.value}/${container.value}/cromwell-execution/abc123/myworkflow/task1/def4356/execution/stdout"
val otherBlobRoot: BlobPath = builder build otherRootString getOrElse fail()
val blobFile: BlobPath = builder build fileString getOrElse fail()
blobFile.pathStringWithoutPrefix(otherBlobRoot) should equal ("/cromwell-execution/abc123/myworkflow/task1/def4356/execution/stdout")
Expand Down