Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-1105 Fix interpretation of full http blob paths #7138

Merged
merged 19 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Cromwell Change Log

## 86 Release Notes

### HTTPFilesystem Improvements

* WDL `size` engine function now works for HTTP files.

## 85 Release Notes

### Migration of PKs to BIGINT
Expand Down
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ lazy val engine = project
.dependsOn(backend)
.dependsOn(gcsFileSystem)
.dependsOn(drsFileSystem)
.dependsOn(httpFileSystem)
.dependsOn(sraFileSystem)
.dependsOn(awsS3FileSystem)
.dependsOn(azureBlobFileSystem)
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ webservice {
}

akka {

http {
client {
parsing {
illegal-header-warnings = off
}
}
}

actor.default-dispatcher.fork-join-executor {
# Number of threads = min(parallelism-factor * cpus, parallelism-max)
# Below are the default values set by Akka, uncomment to tune these
Expand Down
3 changes: 2 additions & 1 deletion engine/src/main/scala/cromwell/engine/io/IoActor.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package cromwell.engine.io

import akka.NotUsed
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Timers}
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props, Timers}
import akka.dispatch.ControlMessage
import akka.stream._
import akka.stream.scaladsl.{Flow, GraphDSL, Merge, Partition, Sink, Source, SourceQueueWithComplete}
Expand Down Expand Up @@ -40,6 +40,7 @@ final class IoActor(ioConfig: IoConfig,
applicationName: String)(implicit val materializer: ActorMaterializer)
extends Actor with ActorLogging with StreamActorHelper[IoCommandContext[_]] with IoInstrumentation with Timers {
implicit val ec: ExecutionContext = context.dispatcher
implicit val system: ActorSystem = context.system

// IntelliJ disapproves of mutable state in Actors, but this should be safe as long as access occurs only in
// the `receive` method. Alternatively IntelliJ does suggest a `become` workaround we might try in the future.
Expand Down
16 changes: 11 additions & 5 deletions engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package cromwell.engine.io.nio

import akka.actor.ActorSystem
import akka.stream.scaladsl.Flow
import cats.effect.{IO, Timer}
import cats.effect._

import scala.util.Try
import cloud.nio.spi.{ChecksumFailure, ChecksumResult, ChecksumSkipped, ChecksumSuccess, FileHash, HashType}
Expand All @@ -15,6 +16,7 @@ import cromwell.engine.io.{IoAttempts, IoCommandContext, IoCommandStalenessBackp
import cromwell.filesystems.blob.BlobPath
import cromwell.filesystems.drs.DrsPath
import cromwell.filesystems.gcs.GcsPath
import cromwell.filesystems.http.HttpPath
import cromwell.filesystems.s3.S3Path
import cromwell.util.TryWithResource._
import net.ceedubs.ficus.Ficus._
Expand All @@ -34,9 +36,11 @@ class NioFlow(parallelism: Int,
onBackpressure: Option[Double] => Unit,
numberOfAttempts: Int,
commandBackpressureStaleness: FiniteDuration
)(implicit ec: ExecutionContext) extends IoCommandStalenessBackpressuring {
)(implicit system: ActorSystem) extends IoCommandStalenessBackpressuring {

implicit private val ec: ExecutionContext = system.dispatcher
implicit private val timer: Timer[IO] = IO.timer(ec)
implicit private val contextShift: ContextShift[IO] = IO.contextShift(ec)

override def maxStaleness: FiniteDuration = commandBackpressureStaleness

Expand Down Expand Up @@ -161,9 +165,11 @@ class NioFlow(parallelism: Int,
fileContentIo.map(_.replaceAll("\\r\\n", "\\\n"))
}

private def size(size: IoSizeCommand) = IO {
size.file.size
}
private def size(size: IoSizeCommand) =
size.file match {
case httpPath: HttpPath => IO.fromFuture(IO(httpPath.fetchSize))
case nioPath => IO(nioPath.size)
}

private def hash(hash: IoHashCommand): IO[String] = {
// If there is no hash accessible from the file storage system,
Expand Down
33 changes: 31 additions & 2 deletions engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.scalatest.flatspec.AsyncFlatSpecLike
import org.scalatest.matchers.should.Matchers
import common.mock.MockSugar
import cromwell.filesystems.blob.BlobPath
import cromwell.filesystems.http.HttpPathBuilder

import java.nio.file.NoSuchFileException
import java.util.UUID
Expand All @@ -42,7 +43,7 @@ class NioFlowSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers with
onRetryCallback = NoopOnRetry,
onBackpressure = NoopOnBackpressure,
numberOfAttempts = 3,
commandBackpressureStaleness = 5 seconds)(system.dispatcher).flow
commandBackpressureStaleness = 5 seconds)(system).flow

implicit val materializer: ActorMaterializer = ActorMaterializer()
private val replyTo = mock[ActorRef]
Expand Down Expand Up @@ -96,6 +97,34 @@ class NioFlowSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers with
}
}

it should "fail with an UnknownHost error when trying to get size for a bogus HTTP path" in {
val httpPath = new HttpPathBuilder().build("http://ex000mple.c0m/bogus/url/fake.html").get

val context = DefaultCommandContext(sizeCommand(httpPath).get, replyTo)
val testSource = Source.single(context)

val stream = testSource.via(flow).toMat(readSink)(Keep.right)
stream.run() map {
case (IoFailure(_, EnhancedCromwellIoException(_, receivedException)), _) =>
receivedException.getMessage should include ("UnknownHost")
case (ack, _) => fail(s"size should have failed with UnknownHost but didn't:\n$ack\n\n")
}
}

it should "fail when trying to get size for a bogus HTTP path" in {
val httpPath = new HttpPathBuilder().build("http://google.com/bogus/je8934hufe832489uihewuihf").get

val context = DefaultCommandContext(sizeCommand(httpPath).get, replyTo)
val testSource = Source.single(context)

val stream = testSource.via(flow).toMat(readSink)(Keep.right)
stream.run() map {
case (IoFailure(_, EnhancedCromwellIoException(_, receivedException)), _) =>
receivedException.getMessage should include ("Couldn't fetch size")
case (ack, _) => fail(s"size should have failed but didn't:\n$ack\n\n")
}
}

it should "get hash from a Nio Path" in {
val testPath = DefaultPathBuilder.createTempFile()
testPath.write("hello")
Expand Down Expand Up @@ -304,7 +333,7 @@ class NioFlowSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers with
onRetryCallback = NoopOnRetry,
onBackpressure = NoopOnBackpressure,
numberOfAttempts = 3,
commandBackpressureStaleness = 5 seconds)(system.dispatcher) {
commandBackpressureStaleness = 5 seconds)(system) {

private var tries = 0
override def handleSingleCommand(ioSingleCommand: IoCommand[_]): IO[IoSuccess[_]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,30 @@ class BlobPathBuilder(container: BlobContainerName, endpoint: EndpointURL)(priva
}

object BlobPath {
// The Azure NIO library uses `{containerName}:` as the root of the path.
// This doesn't work well for our need to easily transfer back and forth
// to and from the blob URL format. This method removes anything up to and including
// the first colon, to create a path string useful for working with BlobPath.
// This is safe because the NIO library enforces no colons except to mark
// the root container name.
private def nioPathString(nioPath: NioPath): String = {
val pathStr = nioPath.toString
// The Azure NIO library uses `{containerName}:` as the root of the path (treating the blob container within
// the storage account similarly to a drive within a computer). This doesn't work well for our need to easily
// transfer back and forth to and from the blob URL format. It also causes the library to garble full http://
// paths that it receives (it interprets `http` as the container name); it transforms them to http:/<remainder of path>
//
// We transform these library-generated paths in two steps:
// 1) If the path starts with http:/ (single slash!) transform it to the containerName:<path inside container>
// format the library expects
// 2) If the path looks like <container>:<path>, strip off the <container>: to leave the absolute path inside the container.
private val brokenPathRegex = "https:/([a-z0-9]+).blob.core.windows.net/([-a-zA-Z0-9]+)/(.*)".r
def cleanedNioPathString(nioString: String): String = {
val pathStr = nioString match {
case brokenPathRegex(_, containerName, pathInContainer) =>
s"${containerName}:/${pathInContainer}"
case _ => nioString
}
pathStr.substring(pathStr.indexOf(":")+1)
}

def apply(nioPath: NioPath,
endpoint: EndpointURL,
container: BlobContainerName,
fsm: BlobFileSystemManager): BlobPath = {
BlobPath(nioPathString(nioPath), endpoint, container)(fsm)
BlobPath(cleanedNioPathString(nioPath.toString), endpoint, container)(fsm)
}
}

Expand All @@ -95,7 +103,7 @@ case class BlobPath private[blob](pathString: String, endpoint: EndpointURL, con
override def pathAsString: String = List(endpoint, container, pathString.stripPrefix("/")).mkString("/")

//This is purposefully an unprotected get because if the endpoint cannot be parsed this should fail loudly rather than quietly
override def pathWithoutScheme: String = parseURI(endpoint.value).map(u => List(u.getHost, container, pathString).mkString("/")).get
override def pathWithoutScheme: String = parseURI(endpoint.value).map(u => List(u.getHost, container, pathString.stripPrefix("/")).mkString("/")).get

private def findNioPath(path: String): NioPath = (for {
fileSystem <- fsm.retrieveFilesystem()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import org.mockito.Mockito.when
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.util.UUID
import scala.util.{Failure, Try}

object BlobPathBuilderSpec {
Expand Down Expand Up @@ -57,12 +58,59 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers with MockSugar {
testException should contain(exception)
}

private def testBlobNioStringCleaning(input: String, expected: String) =
BlobPath.cleanedNioPathString(input) shouldBe expected

it should "clean the NIO path string when it has a garbled http protocol" in {
testBlobNioStringCleaning(
"https:/lz43.blob.core.windows.net/sc-ebda3e/workspace-services/cbas/terra-app-4628d0e1/test_all_engine_functions/4bb6a0a2-3b07/call-run_read_string/execution/stdout",
"/workspace-services/cbas/terra-app-4628d0e1/test_all_engine_functions/4bb6a0a2-3b07/call-run_read_string/execution/stdout"
)
}

it should "clean the NIO path string when it has a container name with colon prefix" in {
testBlobNioStringCleaning(
"sc-ebda3e:/workspace-services/cbas/terra-app-4628d0e1/test_all_engine_functions/4bb6a0a2-3b07/call-run_read_string/execution/stdout",
"/workspace-services/cbas/terra-app-4628d0e1/test_all_engine_functions/4bb6a0a2-3b07/call-run_read_string/execution/stdout"
)
}

it should "clean the NIO path string when it's an in-container absolute path" in {
testBlobNioStringCleaning(
"/workspace-services/cbas/terra-app-4628d0e1/test_all_engine_functions/4bb6a0a2-3b07/call-run_read_string/execution/stdout",
"/workspace-services/cbas/terra-app-4628d0e1/test_all_engine_functions/4bb6a0a2-3b07/call-run_read_string/execution/stdout"
)
}

it should "clean the NIO path string when it's the root directory only" in {
testBlobNioStringCleaning(
"sc-ebda3e:",
""
)
}

//// The below tests are IGNORED because they depend on Azure auth information being present in the environment ////
val subscriptionId = SubscriptionId(UUID.fromString("62b22893-6bc1-46d9-8a90-806bb3cce3c9"))

ignore should "resolve an absolute path string correctly to a path" in {
val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage")
val store = BlobContainerName("inputs")
val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint, Some(subscriptionId))
val fsm: BlobFileSystemManager = new BlobFileSystemManager(store, endpoint, 10, blobTokenGenerator)

val rootString = s"${endpoint.value}/${store.value}/cromwell-execution"
val blobRoot: BlobPath = new BlobPathBuilder(store, endpoint)(fsm) build rootString getOrElse fail()
blobRoot.toAbsolutePath.pathAsString should equal ("https://coaexternalstorage.blob.core.windows.net/inputs/cromwell-execution")
val otherFile = blobRoot.resolve("https://coaexternalstorage.blob.core.windows.net/inputs/cromwell-execution/test/inputFile.txt")
otherFile.toAbsolutePath.pathAsString should equal ("https://coaexternalstorage.blob.core.windows.net/inputs/cromwell-execution/test/inputFile.txt")
}

ignore should "build a blob path from a test string and read a file" in {
val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage")
val endpointHost = BlobPathBuilder.parseURI(endpoint.value).map(_.getHost).getOrElse(fail("Could not parse URI"))
val store = BlobContainerName("inputs")
val evalPath = "/test/inputFile.txt"
val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint)
val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint, Some(subscriptionId))
val fsm: BlobFileSystemManager = new BlobFileSystemManager(store, endpoint, 10L, blobTokenGenerator)
val testString = endpoint.value + "/" + store + evalPath
val blobPath: BlobPath = new BlobPathBuilder(store, endpoint)(fsm) build testString getOrElse fail()
Expand All @@ -80,7 +128,7 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers with MockSugar {
val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage")
val store = BlobContainerName("inputs")
val evalPath = "/test/inputFile.txt"
val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint)
val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint, Some(subscriptionId))
val fsm: BlobFileSystemManager = new BlobFileSystemManager(store, endpoint, 10, blobTokenGenerator)
val testString = endpoint.value + "/" + store + evalPath
val blobPath1: BlobPath = new BlobPathBuilder(store, endpoint)(fsm) build testString getOrElse fail()
Expand All @@ -95,7 +143,7 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers with MockSugar {
ignore should "resolve a path without duplicating container name" in {
val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage")
val store = BlobContainerName("inputs")
val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint)
val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint, Some(subscriptionId))
val fsm: BlobFileSystemManager = new BlobFileSystemManager(store, endpoint, 10, blobTokenGenerator)

val rootString = s"${endpoint.value}/${store.value}/cromwell-execution"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import java.nio.file.Paths

import akka.actor.{ActorContext, ActorSystem}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.{HttpMethods, HttpRequest}
import akka.stream.scaladsl.{FileIO, Keep}
import akka.stream.{ActorAttributes, ActorMaterializer}
import cromwell.core.Dispatcher
Expand Down Expand Up @@ -53,4 +53,20 @@ case class HttpPath(nioPath: NioPath) extends Path {
override def pathAsString: String = nioPath.toString.replaceFirst("/", "//")

override def pathWithoutScheme: String = pathAsString.replaceFirst("http[s]?://", "")

def fetchSize(implicit executionContext: ExecutionContext, actorSystem: ActorSystem): Future[Long] = {
Http().singleRequest(HttpRequest(uri = pathAsString, method = HttpMethods.HEAD)).map { response =>
response.discardEntityBytes()
val length = if (response.status.isSuccess())
response.entity.contentLengthOption
else
None
length.getOrElse(
throw new RuntimeException(
s"Couldn't fetch size for $pathAsString, missing Content-Length header or path doesn't exist (HTTP ${response.status.toString()})."
)
)
}
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package cromwell.backend.impl.tes

import common.exception.AggregatedMessageException

import java.io.FileNotFoundException
import java.nio.file.FileAlreadyExistsException
import cats.syntax.apply._
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model.HttpHeader.ParsingResult.Ok
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller}
import akka.stream.ActorMaterializer
Expand Down Expand Up @@ -295,9 +297,18 @@ class TesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn
}
}

// Headers that should be included with all requests to the TES server
private def requestHeaders: List[HttpHeader] =
tesConfiguration.token.flatMap { t =>
HttpHeader.parse("Authorization", t) match {
case Ok(header, _) => Some(header)
case _ => None
}
}.toList

private def makeRequest[A](request: HttpRequest)(implicit um: Unmarshaller[ResponseEntity, A]): Future[A] = {
for {
response <- withRetry(() => Http().singleRequest(request))
response <- withRetry(() => Http().singleRequest(request.withHeaders(requestHeaders)))
data <- if (response.status.isFailure()) {
response.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map(_.utf8String) flatMap { errorBody =>
Future.failed(new RuntimeException(s"Failed TES request: Code ${response.status.intValue()}, Body = $errorBody"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ class TesConfiguration(val configurationDescriptor: BackendConfigurationDescript
.map(SimpleExponentialBackoff(_))
.getOrElse(TesConfiguration.defaultExecOrRecoverBackoff)

// Used for testing only. Include a bearer token for authenticating with the TES server
final val bearerPrefix: String = "Bearer "
val token: Option[String] = {
configurationDescriptor.backendConfig.as[Option[String]]("bearer-token").map { t =>
if (!t.startsWith(bearerPrefix))
s"${bearerPrefix}${t}"
else t
}
}
}

object TesConfiguration {
Expand Down