From f89854af3d2c49686aed7a1a5a883dcb29ad0df3 Mon Sep 17 00:00:00 2001 From: Janet Gainer-Dewar Date: Sat, 27 Aug 2022 13:23:03 -0400 Subject: [PATCH 01/11] Draft support for optional FileHash --- .../cromwell/engine/io/nio/NioFlow.scala | 32 +++++++++++++------ 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala b/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala index 0598e154f2d..92fa292e4e7 100644 --- a/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala +++ b/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala @@ -2,6 +2,7 @@ package cromwell.engine.io.nio import akka.stream.scaladsl.Flow import cats.effect.{IO, Timer} +import cats.implicits._ import scala.util.Try import cloud.nio.spi.{ChecksumFailure, ChecksumResult, ChecksumSkipped, ChecksumSuccess, FileHash, HashType} @@ -128,15 +129,22 @@ class NioFlow(parallelism: Int, def readFileAndChecksum: IO[String] = { for { - fileHash <- getHash(command.file) + fileHash <- getStoredHash(command.file) uncheckedValue <- readFile - checksumResult <- checkHash(uncheckedValue, fileHash) + checksumResult = fileHash match { + case Some(hash) => checkHash(uncheckedValue, hash) + // If there is no stored checksum, don't attempt to validate. + // If the missing checksum is itself an error condition, that + // should be detected by the code that gets the FileHash. + case None => ChecksumSkipped + } verifiedValue <- checksumResult match { case _: ChecksumSkipped => IO.pure(uncheckedValue) case _: ChecksumSuccess => IO.pure(uncheckedValue) case failure: ChecksumFailure => IO.raiseError( ChecksumFailedException( - s"Failed checksum for '${command.file}'. Expected '${fileHash.hashType}' hash of '${fileHash.hash}'. Calculated hash '${failure.calculatedHash}'")) + s"Failed checksum for '${command.file}'. Expected '${fileHash.map(_.hashType).getOrElse("")}' hash of '${fileHash.map(_.hash).getOrElse("")}'. Calculated hash '${failure.calculatedHash}'") + ) } } yield verifiedValue } @@ -153,19 +161,23 @@ class NioFlow(parallelism: Int, } private def hash(hash: IoHashCommand): IO[String] = { - getHash(hash.file).map(_.hash) + getStoredHash(hash.file).flatMap ( h => h match { + case Some(storedHash) => IO.pure(storedHash) + case None => generateMd5FileHashForPath(hash.file) + }).map(_.hash) } - private def getHash(file: Path): IO[FileHash] = { + private def getStoredHash(file: Path): IO[Option[FileHash]] = { file match { - case gcsPath: GcsPath => getFileHashForGcsPath(gcsPath) + case gcsPath: GcsPath => getFileHashForGcsPath(gcsPath).map(Option(_)) case drsPath: DrsPath => IO { + // drsPath.getFileHash throws if it can't find a stored hash. drsPath.getFileHash - } + }.map(Option(_)) case s3Path: S3Path => IO { - FileHash(HashType.S3Etag, s3Path.eTag) + Option(FileHash(HashType.S3Etag, s3Path.eTag)) } - case path => getMd5FileHashForPath(path) + case _ => IO.pure(None) } } @@ -201,7 +213,7 @@ class NioFlow(parallelism: Int, gcsPath.objectBlobId.map(id => FileHash(HashType.GcsCrc32c, gcsPath.cloudStorage.get(id).getCrc32c)) } - private def getMd5FileHashForPath(path: Path): IO[FileHash] = delayedIoFromTry { + private def generateMd5FileHashForPath(path: Path): IO[FileHash] = delayedIoFromTry { tryWithResource(() => path.newInputStream) { inputStream => FileHash(HashType.Md5, org.apache.commons.codec.digest.DigestUtils.md5Hex(inputStream)) } From 8c30dc715e9a002adb7d39466ee6d01ec39e4ab0 Mon Sep 17 00:00:00 2001 From: Janet Gainer-Dewar Date: Sat, 27 Aug 2022 13:23:47 -0400 Subject: [PATCH 02/11] Draft getMd5 for BlobPath --- .../cromwell/filesystems/blob/BlobPathBuilder.scala | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala index 69a21c90eda..1e6be842429 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala @@ -1,13 +1,13 @@ package cromwell.filesystems.blob import com.azure.core.credential.AzureSasCredential -import com.azure.storage.blob.nio.AzureFileSystem +import com.azure.storage.blob.nio.{AzureBlobFileAttributes, AzureFileSystem} import com.google.common.net.UrlEscapers import cromwell.core.path.{NioPath, Path, PathBuilder} import cromwell.filesystems.blob.BlobPathBuilder._ import java.net.{MalformedURLException, URI} -import java.nio.file.{FileSystem, FileSystemNotFoundException, FileSystems} +import java.nio.file.{FileSystem, FileSystemNotFoundException, FileSystems, Files} import scala.jdk.CollectionConverters._ import scala.language.postfixOps import scala.util.{Failure, Try} @@ -90,4 +90,13 @@ case class BlobPath private[blob](nioPath: NioPath, endpoint: String, container: override def pathAsString: String = List(endpoint, container, nioPath.toString()).mkString("/") override def pathWithoutScheme: String = parseURI(endpoint).getHost + "/" + container + "/" + nioPath.toString() + + def getMd5: Option[String] = { + val headers = Files.readAttributes(nioPath, classOf[AzureBlobFileAttributes]).blobHttpHeaders() + Option(headers.getContentMd5) match { + case None => None + case Some(arr) if arr.isEmpty => None + case Some(bytes) => Some(bytes.map("%02x".format(_)).mkString) + } + } } From 2533e190cd95f85a82f123ba50218e621fc2aca4 Mon Sep 17 00:00:00 2001 From: Janet Gainer-Dewar Date: Mon, 29 Aug 2022 14:28:26 -0400 Subject: [PATCH 03/11] Resolve non-parallel IO to fix tests --- engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala b/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala index 92fa292e4e7..6ca12f870d4 100644 --- a/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala +++ b/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala @@ -131,12 +131,12 @@ class NioFlow(parallelism: Int, for { fileHash <- getStoredHash(command.file) uncheckedValue <- readFile - checksumResult = fileHash match { + checksumResult <- fileHash match { case Some(hash) => checkHash(uncheckedValue, hash) // If there is no stored checksum, don't attempt to validate. // If the missing checksum is itself an error condition, that // should be detected by the code that gets the FileHash. - case None => ChecksumSkipped + case None => IO.pure(ChecksumSkipped) } verifiedValue <- checksumResult match { case _: ChecksumSkipped => IO.pure(uncheckedValue) From 7e981d286e5e60a0a5cc4dc1b9b1cd1860bb47f7 Mon Sep 17 00:00:00 2001 From: Janet Gainer-Dewar Date: Mon, 29 Aug 2022 15:46:41 -0400 Subject: [PATCH 04/11] Checksum validation for BlobPath --- .../scala/cromwell/engine/io/nio/NioFlow.scala | 12 +++++++++--- .../filesystems/blob/BlobPathBuilder.scala | 14 +++++++------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala b/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala index 6ca12f870d4..b764f8c34a2 100644 --- a/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala +++ b/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala @@ -2,7 +2,6 @@ package cromwell.engine.io.nio import akka.stream.scaladsl.Flow import cats.effect.{IO, Timer} -import cats.implicits._ import scala.util.Try import cloud.nio.spi.{ChecksumFailure, ChecksumResult, ChecksumSkipped, ChecksumSuccess, FileHash, HashType} @@ -13,6 +12,7 @@ import cromwell.core.path.Path import cromwell.engine.io.IoActor._ import cromwell.engine.io.RetryableRequestSupport.{isInfinitelyRetryable, isRetryable} import cromwell.engine.io.{IoAttempts, IoCommandContext, IoCommandStalenessBackpressuring} +import cromwell.filesystems.blob.BlobPath import cromwell.filesystems.drs.DrsPath import cromwell.filesystems.gcs.GcsPath import cromwell.filesystems.s3.S3Path @@ -136,7 +136,7 @@ class NioFlow(parallelism: Int, // If there is no stored checksum, don't attempt to validate. // If the missing checksum is itself an error condition, that // should be detected by the code that gets the FileHash. - case None => IO.pure(ChecksumSkipped) + case None => IO.pure(ChecksumSkipped()) } verifiedValue <- checksumResult match { case _: ChecksumSkipped => IO.pure(uncheckedValue) @@ -150,7 +150,8 @@ class NioFlow(parallelism: Int, } val fileContentIo = command.file match { - case _: DrsPath => readFileAndChecksum + case _: DrsPath => readFileAndChecksum + case _: BlobPath => readFileAndChecksum case _ => readFile } fileContentIo.map(_.replaceAll("\\r\\n", "\\\n")) @@ -170,6 +171,7 @@ class NioFlow(parallelism: Int, private def getStoredHash(file: Path): IO[Option[FileHash]] = { file match { case gcsPath: GcsPath => getFileHashForGcsPath(gcsPath).map(Option(_)) + case blobPath: BlobPath => getFileHashForBlobPath(blobPath) case drsPath: DrsPath => IO { // drsPath.getFileHash throws if it can't find a stored hash. drsPath.getFileHash @@ -213,6 +215,10 @@ class NioFlow(parallelism: Int, gcsPath.objectBlobId.map(id => FileHash(HashType.GcsCrc32c, gcsPath.cloudStorage.get(id).getCrc32c)) } + private def getFileHashForBlobPath(blobPath: BlobPath): IO[Option[FileHash]] = delayedIoFromTry { + blobPath.getMd5.map(md5 => md5.map(FileHash(HashType.Md5, _))) + } + private def generateMd5FileHashForPath(path: Path): IO[FileHash] = delayedIoFromTry { tryWithResource(() => path.newInputStream) { inputStream => FileHash(HashType.Md5, org.apache.commons.codec.digest.DigestUtils.md5Hex(inputStream)) diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala index 1e6be842429..8847c67e8c2 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala @@ -91,12 +91,12 @@ case class BlobPath private[blob](nioPath: NioPath, endpoint: String, container: override def pathWithoutScheme: String = parseURI(endpoint).getHost + "/" + container + "/" + nioPath.toString() - def getMd5: Option[String] = { - val headers = Files.readAttributes(nioPath, classOf[AzureBlobFileAttributes]).blobHttpHeaders() - Option(headers.getContentMd5) match { - case None => None - case Some(arr) if arr.isEmpty => None - case Some(bytes) => Some(bytes.map("%02x".format(_)).mkString) - } + def getMd5: Try[Option[String]] = { + Try(Files.readAttributes(nioPath, classOf[AzureBlobFileAttributes]).blobHttpHeaders()) + .map(h => Option(h.getContentMd5) match { + case None => None + case Some(arr) if arr.isEmpty => None + case Some(bytes) => Option(bytes.map("%02x".format(_)).mkString) + }) } } From e31be358efe78f286fade42ada7bd30a2e602f3a Mon Sep 17 00:00:00 2001 From: Janet Gainer-Dewar Date: Mon, 29 Aug 2022 15:50:07 -0400 Subject: [PATCH 05/11] Nicer error message --- engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala b/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala index b764f8c34a2..ba035ff9600 100644 --- a/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala +++ b/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala @@ -143,7 +143,11 @@ class NioFlow(parallelism: Int, case _: ChecksumSuccess => IO.pure(uncheckedValue) case failure: ChecksumFailure => IO.raiseError( ChecksumFailedException( - s"Failed checksum for '${command.file}'. Expected '${fileHash.map(_.hashType).getOrElse("")}' hash of '${fileHash.map(_.hash).getOrElse("")}'. Calculated hash '${failure.calculatedHash}'") + fileHash match { + case Some(hash) => s"Failed checksum for '${command.file}'. Expected '${hash.hashType}' hash of '${hash.hash}'. Calculated hash '${failure.calculatedHash}'" + case None => s"Failed checksum for '${command.file}'. Couldn't find stored file hash." // This should never happen + } + ) ) } } yield verifiedValue From c4f4c3415ad0c6221e23884f8ff90bb05b2efed7 Mon Sep 17 00:00:00 2001 From: Janet Gainer-Dewar Date: Mon, 29 Aug 2022 15:54:47 -0400 Subject: [PATCH 06/11] Test for missing Blob hash --- .../cromwell/engine/io/nio/NioFlowSpec.scala | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala b/engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala index 4b3461ae7d3..46ae8312831 100644 --- a/engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala +++ b/engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala @@ -20,13 +20,14 @@ import org.mockito.Mockito.{times, verify, when} import org.scalatest.flatspec.AsyncFlatSpecLike import org.scalatest.matchers.should.Matchers import common.mock.MockSugar +import cromwell.filesystems.blob.BlobPath import java.nio.file.NoSuchFileException import java.util.UUID import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.language.postfixOps -import scala.util.Failure +import scala.util.{Failure, Success} import scala.util.control.NoStackTrace class NioFlowSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers with MockSugar { @@ -171,6 +172,25 @@ class NioFlowSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers with } } + it should "succeed if a BlobPath is missing a stored hash" in { + val testPath = mock[BlobPath] + when(testPath.limitFileContent(any[Option[Int]], any[Boolean])(any[ExecutionContext])) + .thenReturn("hello there".getBytes) + when(testPath.getMd5) + .thenReturn(Success(None)) + + val context = DefaultCommandContext(contentAsStringCommand(testPath, Option(100), failOnOverflow = true).get, replyTo) + val testSource = Source.single(context) + + val stream = testSource.via(flow).toMat(readSink)(Keep.right) + + stream.run() map { + case (success: IoSuccess[_], _) => assert(success.result.asInstanceOf[String] == "hello there") + case (ack, _) => + fail(s"read returned an unexpected message:\n$ack\n\n") + } + } + it should "copy Nio paths" in { val testPath = DefaultPathBuilder.createTempFile() val testCopyPath = testPath.sibling(UUID.randomUUID().toString) From f67aae07fa4f94c735e3ed95f6250f32fac256cc Mon Sep 17 00:00:00 2001 From: Janet Gainer-Dewar Date: Mon, 29 Aug 2022 16:03:20 -0400 Subject: [PATCH 07/11] Break attr acquisition into separate method --- .../cromwell/filesystems/blob/BlobPathBuilder.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala index 8847c67e8c2..705ac0d815c 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala @@ -91,12 +91,16 @@ case class BlobPath private[blob](nioPath: NioPath, endpoint: String, container: override def pathWithoutScheme: String = parseURI(endpoint).getHost + "/" + container + "/" + nioPath.toString() + def blobFileAttributes: Try[AzureBlobFileAttributes] = + Try(Files.readAttributes(nioPath, classOf[AzureBlobFileAttributes])) + def getMd5: Try[Option[String]] = { - Try(Files.readAttributes(nioPath, classOf[AzureBlobFileAttributes]).blobHttpHeaders()) - .map(h => Option(h.getContentMd5) match { + blobFileAttributes.map(h => + Option(h.blobHttpHeaders().getContentMd5) match { case None => None case Some(arr) if arr.isEmpty => None case Some(bytes) => Option(bytes.map("%02x".format(_)).mkString) - }) + } + ) } } From a87c15d12d770c330cd9caf2c3139112a4cd4e0b Mon Sep 17 00:00:00 2001 From: Janet Gainer-Dewar Date: Mon, 29 Aug 2022 17:03:03 -0400 Subject: [PATCH 08/11] Cleanup, comments --- .../main/scala/cromwell/engine/io/nio/NioFlow.scala | 11 +++++++---- .../cromwell/filesystems/blob/BlobPathBuilder.scala | 4 +++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala b/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala index ba035ff9600..012e771b5a0 100644 --- a/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala +++ b/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala @@ -166,10 +166,12 @@ class NioFlow(parallelism: Int, } private def hash(hash: IoHashCommand): IO[String] = { - getStoredHash(hash.file).flatMap ( h => h match { + // If there is no hash accessible from the file storage system, + // we'll read the file and generate the hash ourselves. + getStoredHash(hash.file).flatMap { case Some(storedHash) => IO.pure(storedHash) case None => generateMd5FileHashForPath(hash.file) - }).map(_.hash) + }.map(_.hash) } private def getStoredHash(file: Path): IO[Option[FileHash]] = { @@ -177,7 +179,8 @@ class NioFlow(parallelism: Int, case gcsPath: GcsPath => getFileHashForGcsPath(gcsPath).map(Option(_)) case blobPath: BlobPath => getFileHashForBlobPath(blobPath) case drsPath: DrsPath => IO { - // drsPath.getFileHash throws if it can't find a stored hash. + // We assume all DRS files have a stored hash; this will throw + // if the file does not. drsPath.getFileHash }.map(Option(_)) case s3Path: S3Path => IO { @@ -220,7 +223,7 @@ class NioFlow(parallelism: Int, } private def getFileHashForBlobPath(blobPath: BlobPath): IO[Option[FileHash]] = delayedIoFromTry { - blobPath.getMd5.map(md5 => md5.map(FileHash(HashType.Md5, _))) + blobPath.md5HexString.map(md5 => md5.map(FileHash(HashType.Md5, _))) } private def generateMd5FileHashForPath(path: Path): IO[FileHash] = delayedIoFromTry { diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala index 705ac0d815c..b89a335b9de 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala @@ -94,11 +94,13 @@ case class BlobPath private[blob](nioPath: NioPath, endpoint: String, container: def blobFileAttributes: Try[AzureBlobFileAttributes] = Try(Files.readAttributes(nioPath, classOf[AzureBlobFileAttributes])) - def getMd5: Try[Option[String]] = { + def md5HexString: Try[Option[String]] = { blobFileAttributes.map(h => Option(h.blobHttpHeaders().getContentMd5) match { case None => None case Some(arr) if arr.isEmpty => None + // Convert the bytes to a hex-encoded string. Note that this value + // is rendered in base64 in the Azure web portal. case Some(bytes) => Option(bytes.map("%02x".format(_)).mkString) } ) From e5704396fe43d2501d82d61337735d54c04b2184 Mon Sep 17 00:00:00 2001 From: Janet Gainer-Dewar Date: Mon, 29 Aug 2022 17:03:21 -0400 Subject: [PATCH 09/11] In-progress tests of blob hash command --- .../cromwell/engine/io/nio/NioFlowSpec.scala | 41 ++++++++++++++++++- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala b/engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala index 46ae8312831..623ccecb3e0 100644 --- a/engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala +++ b/engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala @@ -22,12 +22,13 @@ import org.scalatest.matchers.should.Matchers import common.mock.MockSugar import cromwell.filesystems.blob.BlobPath +import java.io.ByteArrayInputStream import java.nio.file.NoSuchFileException import java.util.UUID import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.language.postfixOps -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} import scala.util.control.NoStackTrace class NioFlowSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers with MockSugar { @@ -128,6 +129,42 @@ class NioFlowSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers with } } + it should "get hash from a BlobPath when stored hash exists" in { + val testPath = mock[BlobPath] + val hashString = "2d01d5d9c24034d54fe4fba0ede5182d" // echo "hello there" | md5sum + testPath.md5HexString returns Try(Option(hashString)) + + val context = DefaultCommandContext(hashCommand(testPath).get, replyTo) + val testSource = Source.single(context) + + val stream = testSource.via(flow).toMat(readSink)(Keep.right) + + stream.run() map { + case (success: IoSuccess[_], _) => assert(success.result.asInstanceOf[String] == hashString) + case (ack, _) => + fail(s"read returned an unexpected message:\n$ack\n\n") + } + } + + // TODO working on this + ignore should "get hash from a BlobPath when stored hash does not exist" in { + val testPath = mock[BlobPath] + val hashString = "2d01d5d9c24034d54fe4fba0ede5182d" // echo "hello there" | md5sum + testPath.md5HexString returns Try(None) + testPath.newInputStream returns new ByteArrayInputStream("hello there".getBytes) + + val context = DefaultCommandContext(hashCommand(testPath).get, replyTo) + val testSource = Source.single(context) + + val stream = testSource.via(flow).toMat(readSink)(Keep.right) + + stream.run() map { + case (success: IoSuccess[_], _) => assert(success.result.asInstanceOf[String] == hashString) + case (ack, _) => + fail(s"read returned an unexpected message:\n$ack\n\n") + } + } + it should "fail if DrsPath hash doesn't match checksum" in { val testPath = mock[DrsPath] when(testPath.limitFileContent(any[Option[Int]], any[Boolean])(any[ExecutionContext])).thenReturn("hello".getBytes) @@ -176,7 +213,7 @@ class NioFlowSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers with val testPath = mock[BlobPath] when(testPath.limitFileContent(any[Option[Int]], any[Boolean])(any[ExecutionContext])) .thenReturn("hello there".getBytes) - when(testPath.getMd5) + when(testPath.md5HexString) .thenReturn(Success(None)) val context = DefaultCommandContext(contentAsStringCommand(testPath, Option(100), failOnOverflow = true).get, replyTo) From 7c6cd13d83ef04c13a8aed099bedc4d319782f6c Mon Sep 17 00:00:00 2001 From: Janet Gainer-Dewar Date: Tue, 30 Aug 2022 12:02:36 -0400 Subject: [PATCH 10/11] Remove test --- .../cromwell/engine/io/nio/NioFlowSpec.scala | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala b/engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala index 623ccecb3e0..efeb7a50ea9 100644 --- a/engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala +++ b/engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala @@ -146,25 +146,6 @@ class NioFlowSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers with } } - // TODO working on this - ignore should "get hash from a BlobPath when stored hash does not exist" in { - val testPath = mock[BlobPath] - val hashString = "2d01d5d9c24034d54fe4fba0ede5182d" // echo "hello there" | md5sum - testPath.md5HexString returns Try(None) - testPath.newInputStream returns new ByteArrayInputStream("hello there".getBytes) - - val context = DefaultCommandContext(hashCommand(testPath).get, replyTo) - val testSource = Source.single(context) - - val stream = testSource.via(flow).toMat(readSink)(Keep.right) - - stream.run() map { - case (success: IoSuccess[_], _) => assert(success.result.asInstanceOf[String] == hashString) - case (ack, _) => - fail(s"read returned an unexpected message:\n$ack\n\n") - } - } - it should "fail if DrsPath hash doesn't match checksum" in { val testPath = mock[DrsPath] when(testPath.limitFileContent(any[Option[Int]], any[Boolean])(any[ExecutionContext])).thenReturn("hello".getBytes) From bdbd0fb5380a0c249e586250b694223dfb7fed84 Mon Sep 17 00:00:00 2001 From: Janet Gainer-Dewar Date: Tue, 30 Aug 2022 12:18:39 -0400 Subject: [PATCH 11/11] Remove unused import --- engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala b/engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala index efeb7a50ea9..b01d52eece0 100644 --- a/engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala +++ b/engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala @@ -22,7 +22,6 @@ import org.scalatest.matchers.should.Matchers import common.mock.MockSugar import cromwell.filesystems.blob.BlobPath -import java.io.ByteArrayInputStream import java.nio.file.NoSuchFileException import java.util.UUID import scala.concurrent.ExecutionContext