Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-1566 Fix Morgan's call cache file hash CPU thrash Cromwell crash #7419

Merged
merged 6 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 3 additions & 53 deletions engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import akka.actor.ActorSystem
import akka.stream.scaladsl.Flow
import cats.effect._

import scala.util.Try
import cloud.nio.spi.{ChecksumFailure, ChecksumResult, ChecksumSkipped, ChecksumSuccess, FileHash, HashType}
import cloud.nio.spi.{ChecksumFailure, ChecksumResult, ChecksumSkipped, ChecksumSuccess, FileHash}
import com.typesafe.config.Config
import common.util.IORetry
import cromwell.core.io._
Expand All @@ -15,10 +14,7 @@ import cromwell.engine.io.RetryableRequestSupport.{isInfinitelyRetryable, isRetr
import cromwell.engine.io.{IoAttempts, IoCommandContext, IoCommandStalenessBackpressuring}
import cromwell.filesystems.blob.BlobPath
import cromwell.filesystems.drs.DrsPath
import cromwell.filesystems.gcs.GcsPath
import cromwell.filesystems.http.HttpPath
import cromwell.filesystems.s3.S3Path
import cromwell.util.TryWithResource._
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ValueReader

Expand Down Expand Up @@ -134,7 +130,7 @@ class NioFlow(parallelism: Int,

def readFileAndChecksum: IO[String] =
for {
fileHash <- getStoredHash(command.file)
fileHash <- NioHashing.getStoredHash(command.file)
uncheckedValue <- readFile
checksumResult <- fileHash match {
case Some(hash) => checkHash(uncheckedValue, hash)
Expand Down Expand Up @@ -177,31 +173,7 @@ class NioFlow(parallelism: Int,
}

private def hash(hash: IoHashCommand): IO[String] =
// If there is no hash accessible from the file storage system,
// we'll read the file and generate the hash ourselves.
getStoredHash(hash.file)
.flatMap {
case Some(storedHash) => IO.pure(storedHash)
case None => generateMd5FileHashForPath(hash.file)
}
.map(_.hash)

private def getStoredHash(file: Path): IO[Option[FileHash]] =
file match {
case gcsPath: GcsPath => getFileHashForGcsPath(gcsPath).map(Option(_))
case blobPath: BlobPath => getFileHashForBlobPath(blobPath)
case drsPath: DrsPath =>
IO {
// We assume all DRS files have a stored hash; this will throw
// if the file does not.
drsPath.getFileHash
}.map(Option(_))
case s3Path: S3Path =>
IO {
Option(FileHash(HashType.S3Etag, s3Path.eTag))
}
case _ => IO.pure(None)
}
NioHashing.hash(hash.file)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The NioFlow class is supposed to handle all 10 file methods but it was getting to be mostly hashing, hence the split-out.

It also wasn't really clear what hashing methods you were actually supposed to call, and which ones existed solely for use by other hashing-related methods. Now the interface is clear, you can either hash or getStoredHash.


private def touch(touch: IoTouchCommand) = IO {
touch.file.touch()
Expand All @@ -222,28 +194,6 @@ class NioFlow(parallelism: Int,
}

private def createDirectories(path: Path) = path.parent.createDirectories()

/**
* Lazy evaluation of a Try in a delayed IO. This avoids accidentally eagerly evaluating the Try.
*
* IMPORTANT: Use this instead of IO.fromTry to make sure the Try will be reevaluated if the
* IoCommand is retried.
*/
private def delayedIoFromTry[A](t: => Try[A]): IO[A] = IO[A](t.get)

private def getFileHashForGcsPath(gcsPath: GcsPath): IO[FileHash] = delayedIoFromTry {
gcsPath.objectBlobId.map(id => FileHash(HashType.GcsCrc32c, gcsPath.cloudStorage.get(id).getCrc32c))
}

private def getFileHashForBlobPath(blobPath: BlobPath): IO[Option[FileHash]] = delayedIoFromTry {
blobPath.md5HexString.map(md5 => md5.map(FileHash(HashType.Md5, _)))
}

private def generateMd5FileHashForPath(path: Path): IO[FileHash] = delayedIoFromTry {
tryWithResource(() => path.newInputStream) { inputStream =>
FileHash(HashType.Md5, org.apache.commons.codec.digest.DigestUtils.md5Hex(inputStream))
}
}
}

object NioFlow {
Expand Down
91 changes: 91 additions & 0 deletions engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package cromwell.engine.io.nio

import cats.effect.IO
import cloud.nio.spi.{FileHash, HashType}
import common.util.StringUtil.EnhancedString
import cromwell.core.path.Path
import cromwell.filesystems.blob.BlobPath
import cromwell.filesystems.drs.DrsPath
import cromwell.filesystems.gcs.GcsPath
import cromwell.filesystems.http.HttpPath
import cromwell.filesystems.s3.S3Path
import cromwell.util.TryWithResource.tryWithResource

import scala.util.Try

object NioHashing {

def hash(file: Path): IO[String] =
// If there is no hash accessible from the file storage system,
// we'll read the file and generate the hash ourselves if we can.
getStoredHash(file)
.flatMap {
case Some(storedHash) => IO.pure(storedHash)
case None =>
if (canHashLocally(file))
generateMd5FileHashForPath(file)
else
IO.raiseError(
new Exception(
s"File of type ${file.getClass.getSimpleName} requires hash in object metadata, not present for ${file.pathAsString.maskSensitiveUri}"
)
)
}
.map(_.hash)

def getStoredHash(file: Path): IO[Option[FileHash]] =
file match {
case gcsPath: GcsPath => getFileHashForGcsPath(gcsPath).map(Option(_))
case blobPath: BlobPath => getFileHashForBlobPath(blobPath)

Check warning on line 39 in engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala

View check run for this annotation

Codecov / codecov/patch

engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala#L38-L39

Added lines #L38 - L39 were not covered by tests
case drsPath: DrsPath =>
IO {
// We assume all DRS files have a stored hash; this will throw
// if the file does not.
drsPath.getFileHash
}.map(Option(_))
case s3Path: S3Path =>
IO {
Option(FileHash(HashType.S3Etag, s3Path.eTag))

Check warning on line 48 in engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala

View check run for this annotation

Codecov / codecov/patch

engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala#L47-L48

Added lines #L47 - L48 were not covered by tests
}
case _ => IO.pure(None)
}

/**
* In some scenarios like SFS it is appropriate for Cromwell to hash files using its own CPU power.
*
* In cloud scenarios, we don't want this because the files are huge, downloading them is slow & expensive,
* and the extreme CPU usage destabilizes the instance. [WX-1566]
*
* Cromwell is fundamentally supposed to be a job scheduler, and heavy computation should take place elsewhere.
*
* @param file The path to consider for local hashing
*/
private def canHashLocally(file: Path) =
file match {
case _: HttpPath => false
case _: BlobPath => false

Check warning on line 66 in engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala

View check run for this annotation

Codecov / codecov/patch

engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala#L66

Added line #L66 was not covered by tests
case _ => true
}

private def generateMd5FileHashForPath(path: Path): IO[FileHash] = delayedIoFromTry {
tryWithResource(() => path.newInputStream) { inputStream =>
FileHash(HashType.Md5, org.apache.commons.codec.digest.DigestUtils.md5Hex(inputStream))
}
}

private def getFileHashForGcsPath(gcsPath: GcsPath): IO[FileHash] = delayedIoFromTry {
gcsPath.objectBlobId.map(id => FileHash(HashType.GcsCrc32c, gcsPath.cloudStorage.get(id).getCrc32c))

Check warning on line 77 in engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala

View check run for this annotation

Codecov / codecov/patch

engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala#L76-L77

Added lines #L76 - L77 were not covered by tests
}

private def getFileHashForBlobPath(blobPath: BlobPath): IO[Option[FileHash]] = delayedIoFromTry {
blobPath.md5HexString.map(md5 => md5.map(FileHash(HashType.Md5, _)))

Check warning on line 81 in engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala

View check run for this annotation

Codecov / codecov/patch

engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala#L80-L81

Added lines #L80 - L81 were not covered by tests
}

/**
* Lazy evaluation of a Try in a delayed IO. This avoids accidentally eagerly evaluating the Try.
*
* IMPORTANT: Use this instead of IO.fromTry to make sure the Try will be reevaluated if the
* IoCommand is retried.
*/
private def delayedIoFromTry[A](t: => Try[A]): IO[A] = IO[A](t.get)
}
Loading