Skip to content

Commit

Permalink
WX-1566 Fix Morgan's call cache file hash CPU thrash Cromwell crash (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
aednichols authored May 6, 2024
1 parent c8e13a8 commit 8bdcbb0
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 55 deletions.
56 changes: 3 additions & 53 deletions engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import akka.actor.ActorSystem
import akka.stream.scaladsl.Flow
import cats.effect._

import scala.util.Try
import cloud.nio.spi.{ChecksumFailure, ChecksumResult, ChecksumSkipped, ChecksumSuccess, FileHash, HashType}
import cloud.nio.spi.{ChecksumFailure, ChecksumResult, ChecksumSkipped, ChecksumSuccess, FileHash}
import com.typesafe.config.Config
import common.util.IORetry
import cromwell.core.io._
Expand All @@ -15,10 +14,7 @@ import cromwell.engine.io.RetryableRequestSupport.{isInfinitelyRetryable, isRetr
import cromwell.engine.io.{IoAttempts, IoCommandContext, IoCommandStalenessBackpressuring}
import cromwell.filesystems.blob.BlobPath
import cromwell.filesystems.drs.DrsPath
import cromwell.filesystems.gcs.GcsPath
import cromwell.filesystems.http.HttpPath
import cromwell.filesystems.s3.S3Path
import cromwell.util.TryWithResource._
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ValueReader

Expand Down Expand Up @@ -134,7 +130,7 @@ class NioFlow(parallelism: Int,

def readFileAndChecksum: IO[String] =
for {
fileHash <- getStoredHash(command.file)
fileHash <- NioHashing.getStoredHash(command.file)
uncheckedValue <- readFile
checksumResult <- fileHash match {
case Some(hash) => checkHash(uncheckedValue, hash)
Expand Down Expand Up @@ -177,31 +173,7 @@ class NioFlow(parallelism: Int,
}

private def hash(hash: IoHashCommand): IO[String] =
// If there is no hash accessible from the file storage system,
// we'll read the file and generate the hash ourselves.
getStoredHash(hash.file)
.flatMap {
case Some(storedHash) => IO.pure(storedHash)
case None => generateMd5FileHashForPath(hash.file)
}
.map(_.hash)

private def getStoredHash(file: Path): IO[Option[FileHash]] =
file match {
case gcsPath: GcsPath => getFileHashForGcsPath(gcsPath).map(Option(_))
case blobPath: BlobPath => getFileHashForBlobPath(blobPath)
case drsPath: DrsPath =>
IO {
// We assume all DRS files have a stored hash; this will throw
// if the file does not.
drsPath.getFileHash
}.map(Option(_))
case s3Path: S3Path =>
IO {
Option(FileHash(HashType.S3Etag, s3Path.eTag))
}
case _ => IO.pure(None)
}
NioHashing.hash(hash.file)

private def touch(touch: IoTouchCommand) = IO {
touch.file.touch()
Expand All @@ -222,28 +194,6 @@ class NioFlow(parallelism: Int,
}

private def createDirectories(path: Path) = path.parent.createDirectories()

/**
* Lazy evaluation of a Try in a delayed IO. This avoids accidentally eagerly evaluating the Try.
*
* IMPORTANT: Use this instead of IO.fromTry to make sure the Try will be reevaluated if the
* IoCommand is retried.
*/
private def delayedIoFromTry[A](t: => Try[A]): IO[A] = IO[A](t.get)

private def getFileHashForGcsPath(gcsPath: GcsPath): IO[FileHash] = delayedIoFromTry {
gcsPath.objectBlobId.map(id => FileHash(HashType.GcsCrc32c, gcsPath.cloudStorage.get(id).getCrc32c))
}

private def getFileHashForBlobPath(blobPath: BlobPath): IO[Option[FileHash]] = delayedIoFromTry {
blobPath.md5HexString.map(md5 => md5.map(FileHash(HashType.Md5, _)))
}

private def generateMd5FileHashForPath(path: Path): IO[FileHash] = delayedIoFromTry {
tryWithResource(() => path.newInputStream) { inputStream =>
FileHash(HashType.Md5, org.apache.commons.codec.digest.DigestUtils.md5Hex(inputStream))
}
}
}

object NioFlow {
Expand Down
92 changes: 92 additions & 0 deletions engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package cromwell.engine.io.nio

import cats.effect.IO
import cloud.nio.spi.{FileHash, HashType}
import common.util.StringUtil.EnhancedString
import cromwell.core.path.Path
import cromwell.filesystems.blob.BlobPath
import cromwell.filesystems.drs.DrsPath
import cromwell.filesystems.gcs.GcsPath
import cromwell.filesystems.http.HttpPath
import cromwell.filesystems.s3.S3Path
import cromwell.util.TryWithResource.tryWithResource

import scala.util.Try

object NioHashing {

def hash(file: Path): IO[String] =
// If there is no hash accessible from the file storage system,
// we'll read the file and generate the hash ourselves if we can.
getStoredHash(file)
.flatMap {
case Some(storedHash) => IO.pure(storedHash)
case None =>
if (canHashLocally(file))
generateMd5FileHashForPath(file)
else
IO.raiseError(
new Exception(
s"File of type ${file.getClass.getSimpleName} requires an associated hash, not present for ${file.pathAsString.maskSensitiveUri}"
)
)
}
.map(_.hash)

def getStoredHash(file: Path): IO[Option[FileHash]] =
file match {
case gcsPath: GcsPath => getFileHashForGcsPath(gcsPath).map(Option(_))
case blobPath: BlobPath => getFileHashForBlobPath(blobPath)
case drsPath: DrsPath =>
IO {
// We assume all DRS files have a stored hash; this will throw
// if the file does not.
drsPath.getFileHash
}.map(Option(_))
case s3Path: S3Path =>
IO {
Option(FileHash(HashType.S3Etag, s3Path.eTag))
}
case _ => IO.pure(None)
}

/**
* In some scenarios like SFS it is appropriate for Cromwell to hash files using its own CPU power.
*
* In cloud scenarios, we don't want this because the files are huge, downloading them is slow & expensive,
* and the extreme CPU usage destabilizes the instance (WX-1566). For more context, see also comments
* on `cromwell.filesystems.blob.BlobPath#largeBlobFileMetadataKey()`.
*
* Cromwell is fundamentally supposed to be a job scheduler, and heavy computation should take place elsewhere.
*
* @param file The path to consider for local hashing
*/
private def canHashLocally(file: Path) =
file match {
case _: HttpPath => false
case _: BlobPath => false
case _ => true
}

private def generateMd5FileHashForPath(path: Path): IO[FileHash] = delayedIoFromTry {
tryWithResource(() => path.newInputStream) { inputStream =>
FileHash(HashType.Md5, org.apache.commons.codec.digest.DigestUtils.md5Hex(inputStream))
}
}

private def getFileHashForGcsPath(gcsPath: GcsPath): IO[FileHash] = delayedIoFromTry {
gcsPath.objectBlobId.map(id => FileHash(HashType.GcsCrc32c, gcsPath.cloudStorage.get(id).getCrc32c))
}

private def getFileHashForBlobPath(blobPath: BlobPath): IO[Option[FileHash]] = delayedIoFromTry {
blobPath.md5HexString.map(md5 => md5.map(FileHash(HashType.Md5, _)))
}

/**
* Lazy evaluation of a Try in a delayed IO. This avoids accidentally eagerly evaluating the Try.
*
* IMPORTANT: Use this instead of IO.fromTry to make sure the Try will be reevaluated if the
* IoCommand is retried.
*/
private def delayedIoFromTry[A](t: => Try[A]): IO[A] = IO[A](t.get)
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ object BlobPath {
// They do this for all files they touch, regardless of size, and the root/metadata property is authoritative over native.
//
// N.B. most if not virtually all large files in the wild will NOT have this key populated because they were not created
// by TES or its associated upload utility [4].
// by TES or its associated upload utility [3].
//
// [0] https://learn.microsoft.com/en-us/azure/storage/blobs/scalability-targets
// [1] https://learn.microsoft.com/en-us/rest/api/storageservices/version-2019-12-12
// [2] https://github.com/microsoft/ga4gh-tes/blob/03feb746bb961b72fa91266a56db845e3b31be27/src/Tes.Runner/Transfer/BlobBlockApiHttpUtils.cs#L25
// [4] https://github.com/microsoft/ga4gh-tes/blob/main/src/Tes.RunnerCLI/scripts/roothash.sh
// [3] https://github.com/microsoft/ga4gh-tes/blob/main/src/Tes.RunnerCLI/scripts/roothash.sh
private val largeBlobFileMetadataKey = "md5_4mib_hashlist_root_hash"

def cleanedNioPathString(nioString: String): String = {
Expand Down

0 comments on commit 8bdcbb0

Please sign in to comment.