Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-1566 Fix Morgan's call cache file hash CPU thrash Cromwell crash #7419

Merged
merged 6 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 3 additions & 53 deletions engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import akka.stream.scaladsl.Flow
import cats.effect._

import scala.util.Try
import cloud.nio.spi.{ChecksumFailure, ChecksumResult, ChecksumSkipped, ChecksumSuccess, FileHash, HashType}
import cloud.nio.spi.{ChecksumFailure, ChecksumResult, ChecksumSkipped, ChecksumSuccess, FileHash}
import com.typesafe.config.Config
import common.util.IORetry
import cromwell.core.io._
Expand All @@ -15,10 +14,7 @@
import cromwell.engine.io.{IoAttempts, IoCommandContext, IoCommandStalenessBackpressuring}
import cromwell.filesystems.blob.BlobPath
import cromwell.filesystems.drs.DrsPath
import cromwell.filesystems.gcs.GcsPath
import cromwell.filesystems.http.HttpPath
import cromwell.filesystems.s3.S3Path
import cromwell.util.TryWithResource._
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ValueReader

Expand Down Expand Up @@ -134,7 +130,7 @@

def readFileAndChecksum: IO[String] =
for {
fileHash <- getStoredHash(command.file)
fileHash <- NioHashing.getStoredHash(command.file)

Check warning on line 133 in engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala

View check run for this annotation

Codecov / codecov/patch

engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala#L133

Added line #L133 was not covered by tests
uncheckedValue <- readFile
checksumResult <- fileHash match {
case Some(hash) => checkHash(uncheckedValue, hash)
Expand Down Expand Up @@ -177,31 +173,7 @@
}

private def hash(hash: IoHashCommand): IO[String] =
// If there is no hash accessible from the file storage system,
// we'll read the file and generate the hash ourselves.
getStoredHash(hash.file)
.flatMap {
case Some(storedHash) => IO.pure(storedHash)
case None => generateMd5FileHashForPath(hash.file)
}
.map(_.hash)

private def getStoredHash(file: Path): IO[Option[FileHash]] =
file match {
case gcsPath: GcsPath => getFileHashForGcsPath(gcsPath).map(Option(_))
case blobPath: BlobPath => getFileHashForBlobPath(blobPath)
case drsPath: DrsPath =>
IO {
// We assume all DRS files have a stored hash; this will throw
// if the file does not.
drsPath.getFileHash
}.map(Option(_))
case s3Path: S3Path =>
IO {
Option(FileHash(HashType.S3Etag, s3Path.eTag))
}
case _ => IO.pure(None)
}
NioHashing.hash(hash.file)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The NioFlow class is supposed to handle all 10 file methods but it was getting to be mostly hashing, hence the split-out.

It also wasn't really clear what hashing methods you were actually supposed to call, and which ones existed solely for use by other hashing-related methods. Now the interface is clear, you can either hash or getStoredHash.


private def touch(touch: IoTouchCommand) = IO {
touch.file.touch()
Expand All @@ -222,28 +194,6 @@
}

private def createDirectories(path: Path) = path.parent.createDirectories()

/**
* Lazy evaluation of a Try in a delayed IO. This avoids accidentally eagerly evaluating the Try.
*
* IMPORTANT: Use this instead of IO.fromTry to make sure the Try will be reevaluated if the
* IoCommand is retried.
*/
private def delayedIoFromTry[A](t: => Try[A]): IO[A] = IO[A](t.get)

private def getFileHashForGcsPath(gcsPath: GcsPath): IO[FileHash] = delayedIoFromTry {
gcsPath.objectBlobId.map(id => FileHash(HashType.GcsCrc32c, gcsPath.cloudStorage.get(id).getCrc32c))
}

private def getFileHashForBlobPath(blobPath: BlobPath): IO[Option[FileHash]] = delayedIoFromTry {
blobPath.md5HexString.map(md5 => md5.map(FileHash(HashType.Md5, _)))
}

private def generateMd5FileHashForPath(path: Path): IO[FileHash] = delayedIoFromTry {
tryWithResource(() => path.newInputStream) { inputStream =>
FileHash(HashType.Md5, org.apache.commons.codec.digest.DigestUtils.md5Hex(inputStream))
}
}
}

object NioFlow {
Expand Down
92 changes: 92 additions & 0 deletions engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package cromwell.engine.io.nio

import cats.effect.IO
import cloud.nio.spi.{FileHash, HashType}
import common.util.StringUtil.EnhancedString
import cromwell.core.path.Path
import cromwell.filesystems.blob.BlobPath
import cromwell.filesystems.drs.DrsPath
import cromwell.filesystems.gcs.GcsPath
import cromwell.filesystems.http.HttpPath
import cromwell.filesystems.s3.S3Path
import cromwell.util.TryWithResource.tryWithResource

import scala.util.Try

object NioHashing {

def hash(file: Path): IO[String] =
// If there is no hash accessible from the file storage system,
// we'll read the file and generate the hash ourselves if we can.
getStoredHash(file)
.flatMap {
case Some(storedHash) => IO.pure(storedHash)

Check warning on line 23 in engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala

View check run for this annotation

Codecov / codecov/patch

engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala#L23

Added line #L23 was not covered by tests
case None =>
if (canHashLocally(file))
generateMd5FileHashForPath(file)
else
IO.raiseError(
new Exception(

Check warning on line 29 in engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala

View check run for this annotation

Codecov / codecov/patch

engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala#L28-L29

Added lines #L28 - L29 were not covered by tests
s"File of type ${file.getClass.getSimpleName} requires an associated hash, not present for ${file.pathAsString.maskSensitiveUri}"
)
)
}
.map(_.hash)

def getStoredHash(file: Path): IO[Option[FileHash]] =
file match {
case gcsPath: GcsPath => getFileHashForGcsPath(gcsPath).map(Option(_))
case blobPath: BlobPath => getFileHashForBlobPath(blobPath)

Check warning on line 39 in engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala

View check run for this annotation

Codecov / codecov/patch

engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala#L38-L39

Added lines #L38 - L39 were not covered by tests
case drsPath: DrsPath =>
IO {
// We assume all DRS files have a stored hash; this will throw
// if the file does not.
drsPath.getFileHash
}.map(Option(_))

Check warning on line 45 in engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala

View check run for this annotation

Codecov / codecov/patch

engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala#L44-L45

Added lines #L44 - L45 were not covered by tests
case s3Path: S3Path =>
IO {
Option(FileHash(HashType.S3Etag, s3Path.eTag))

Check warning on line 48 in engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala

View check run for this annotation

Codecov / codecov/patch

engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala#L47-L48

Added lines #L47 - L48 were not covered by tests
}
case _ => IO.pure(None)
}

/**
* In some scenarios like SFS it is appropriate for Cromwell to hash files using its own CPU power.
*
* In cloud scenarios, we don't want this because the files are huge, downloading them is slow & expensive,
* and the extreme CPU usage destabilizes the instance (WX-1566). For more context, see also comments
* on `cromwell.filesystems.blob.BlobPath#largeBlobFileMetadataKey()`.
*
* Cromwell is fundamentally supposed to be a job scheduler, and heavy computation should take place elsewhere.
*
* @param file The path to consider for local hashing
*/
private def canHashLocally(file: Path) =
file match {
case _: HttpPath => false
case _: BlobPath => false

Check warning on line 67 in engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala

View check run for this annotation

Codecov / codecov/patch

engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala#L66-L67

Added lines #L66 - L67 were not covered by tests
case _ => true
}

private def generateMd5FileHashForPath(path: Path): IO[FileHash] = delayedIoFromTry {
tryWithResource(() => path.newInputStream) { inputStream =>
FileHash(HashType.Md5, org.apache.commons.codec.digest.DigestUtils.md5Hex(inputStream))
}
}

private def getFileHashForGcsPath(gcsPath: GcsPath): IO[FileHash] = delayedIoFromTry {
gcsPath.objectBlobId.map(id => FileHash(HashType.GcsCrc32c, gcsPath.cloudStorage.get(id).getCrc32c))

Check warning on line 78 in engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala

View check run for this annotation

Codecov / codecov/patch

engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala#L77-L78

Added lines #L77 - L78 were not covered by tests
}

private def getFileHashForBlobPath(blobPath: BlobPath): IO[Option[FileHash]] = delayedIoFromTry {
blobPath.md5HexString.map(md5 => md5.map(FileHash(HashType.Md5, _)))

Check warning on line 82 in engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala

View check run for this annotation

Codecov / codecov/patch

engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala#L81-L82

Added lines #L81 - L82 were not covered by tests
}

/**
* Lazy evaluation of a Try in a delayed IO. This avoids accidentally eagerly evaluating the Try.
*
* IMPORTANT: Use this instead of IO.fromTry to make sure the Try will be reevaluated if the
* IoCommand is retried.
*/
private def delayedIoFromTry[A](t: => Try[A]): IO[A] = IO[A](t.get)
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ object BlobPath {
// They do this for all files they touch, regardless of size, and the root/metadata property is authoritative over native.
//
// N.B. most if not virtually all large files in the wild will NOT have this key populated because they were not created
// by TES or its associated upload utility [4].
// by TES or its associated upload utility [3].
//
// [0] https://learn.microsoft.com/en-us/azure/storage/blobs/scalability-targets
// [1] https://learn.microsoft.com/en-us/rest/api/storageservices/version-2019-12-12
// [2] https://github.com/microsoft/ga4gh-tes/blob/03feb746bb961b72fa91266a56db845e3b31be27/src/Tes.Runner/Transfer/BlobBlockApiHttpUtils.cs#L25
// [4] https://github.com/microsoft/ga4gh-tes/blob/main/src/Tes.RunnerCLI/scripts/roothash.sh
// [3] https://github.com/microsoft/ga4gh-tes/blob/main/src/Tes.RunnerCLI/scripts/roothash.sh
private val largeBlobFileMetadataKey = "md5_4mib_hashlist_root_hash"

def cleanedNioPathString(nioString: String): String = {
Expand Down
Loading