Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop AWS #42

Merged
merged 12 commits into from
Jan 8, 2024
4 changes: 4 additions & 0 deletions backend/src/main/scala/cromwell/backend/backend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,17 @@ object CommonBackendConfigurationAttributes {
"default-runtime-attributes.zones",
"default-runtime-attributes.continueOnReturnCode",
"default-runtime-attributes.cpu",
"default-runtime-attributes.gpuCount",
"default-runtime-attributes.noAddress",
"default-runtime-attributes.docker",
"default-runtime-attributes.queueArn",
"default-runtime-attributes.awsBatchRetryAttempts",
"default-runtime-attributes.maxRetries",
"default-runtime-attributes.awsBatchEvaluateOnExit",
"default-runtime-attributes.ulimits",
"default-runtime-attributes.efsDelocalize",
"default-runtime-attributes.efsMakeMD5",
"default-runtime-attributes.tagResources",
"default-runtime-attributes.failOnStderr",
"slow-job-warning-time",
"dockerhub",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ import wom.callable.{AdHocValue, CommandTaskDefinition, ContainerizedInputExpres
import wom.expression.WomExpression
import wom.graph.LocalName
import wom.values._
import wom.types._
import wom.{CommandSetupSideEffectFile, InstantiatedCommand, WomFileMapper}

import java.net.URLDecoder
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -165,7 +166,12 @@ trait StandardAsyncExecutionActor
if (inputsToNotLocalize.contains(womFile)) {
womFile
} else {
mapper(womFile)
// resolve
val mapped_wom = mapper(womFile)
// decode url encoded values generated by the mapper
val decoded_womValue = URLDecoder.decode(mapped_wom.valueString,"UTF-8")
// convert to womfile again
WomFile(WomSingleFileType, decoded_womValue)
}
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
package cromwell.docker.registryv2.flows.aws

import cats.effect.{IO, Resource}
import cromwell.core.TestKitSuite
import cromwell.docker.registryv2.DockerRegistryV2Abstract
Expand All @@ -12,20 +11,19 @@ import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.mockito.MockitoSugar
import software.amazon.awssdk.services.ecrpublic.model.{AuthorizationData, GetAuthorizationTokenRequest, GetAuthorizationTokenResponse}
import software.amazon.awssdk.services.ecrpublic.EcrPublicClient

class AmazonEcrPublicSpec extends TestKitSuite with AnyFlatSpecLike with Matchers with MockitoSugar with BeforeAndAfter with PrivateMethodTester {
class AmazonEcrPublicSpec extends TestKitSuite with AnyFlatSpecLike with Matchers with BeforeAndAfter with PrivateMethodTester {
behavior of "AmazonEcrPublic"

val goodUri = "public.ecr.aws/amazonlinux/amazonlinux:latest"
val otherUri = "ubuntu:latest"


val mediaType: MediaType = MediaType.parse(DockerRegistryV2Abstract.ManifestV2MediaType).right.get
val mediaType: MediaType = MediaType.parse(DockerRegistryV2Abstract.DockerManifestV2MediaType).getOrElse(fail("Cant parse mediatype"))
val contentType: Header = `Content-Type`(mediaType)
val mockEcrClient: EcrPublicClient = mock[EcrPublicClient]
val mockEcrClient: EcrPublicClient = mock(classOf[EcrPublicClient])
implicit val mockIOClient: Client[IO] = Client({ _: Request[IO] =>
// This response will have an empty body, so we need to be explicit about the typing:
Resource.pure[IO, Response[IO]](Response(headers = Headers.of(contentType))) : Resource[IO, Response[IO]]
Expand All @@ -44,7 +42,7 @@ class AmazonEcrPublicSpec extends TestKitSuite with AnyFlatSpecLike with Matcher
}

it should "have public.ecr.aws as registryHostName" in {
val registryHostNameMethod = PrivateMethod[String]('registryHostName)
val registryHostNameMethod = PrivateMethod[String](Symbol("registryHostName"))
registry invokePrivate registryHostNameMethod(DockerImageIdentifier.fromString(goodUri).get) shouldEqual "public.ecr.aws"
}

Expand All @@ -63,7 +61,7 @@ class AmazonEcrPublicSpec extends TestKitSuite with AnyFlatSpecLike with Matcher
.build())
.build)

val getTokenMethod = PrivateMethod[IO[Option[String]]]('getToken)
val getTokenMethod = PrivateMethod[IO[Option[String]]](Symbol("getToken"))
registry invokePrivate getTokenMethod(context, mockIOClient) ensuring(io => io.unsafeRunSync().get == token)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,18 @@ import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.mockito.MockitoSugar
import software.amazon.awssdk.services.ecr.EcrClient
import software.amazon.awssdk.services.ecr.model.{AuthorizationData, GetAuthorizationTokenResponse}

class AmazonEcrSpec extends TestKitSuite with AnyFlatSpecLike with Matchers with MockitoSugar with BeforeAndAfter with PrivateMethodTester{
class AmazonEcrSpec extends TestKitSuite with AnyFlatSpecLike with Matchers with BeforeAndAfter with PrivateMethodTester{
behavior of "AmazonEcr"

val goodUri = "123456789012.dkr.ecr.us-east-1.amazonaws.com/amazonlinux/amazonlinux:latest"
val otherUri = "ubuntu:latest"

val mediaType: MediaType = MediaType.parse(DockerRegistryV2Abstract.ManifestV2MediaType).right.get
val mediaType: MediaType = MediaType.parse(DockerRegistryV2Abstract.DockerManifestV2MediaType).getOrElse(fail("Can't parse media type"))
val contentType: Header = `Content-Type`(mediaType)
val mockEcrClient: EcrClient = mock[EcrClient]
val mockEcrClient: EcrClient = mock(classOf[EcrClient])
implicit val mockIOClient: Client[IO] = Client({ _: Request[IO] =>
// This response will have an empty body, so we need to be explicit about the typing:
Resource.pure[IO, Response[IO]](Response(headers = Headers.of(contentType))) : Resource[IO, Response[IO]]
Expand All @@ -42,12 +41,12 @@ class AmazonEcrSpec extends TestKitSuite with AnyFlatSpecLike with Matchers with
}

it should "use Basic Auth Scheme" in {
val authSchemeMethod = PrivateMethod[AuthScheme]('authorizationScheme)
val authSchemeMethod = PrivateMethod[AuthScheme](Symbol("authorizationScheme"))
registry invokePrivate authSchemeMethod() shouldEqual AuthScheme.Basic
}

it should "return 123456789012.dkr.ecr.us-east-1.amazonaws.com as registryHostName" in {
val registryHostNameMethod = PrivateMethod[String]('registryHostName)
val registryHostNameMethod = PrivateMethod[String](Symbol("registryHostName"))
registry invokePrivate registryHostNameMethod(DockerImageIdentifier.fromString(goodUri).get) shouldEqual "123456789012.dkr.ecr.us-east-1.amazonaws.com"
}

Expand All @@ -66,7 +65,7 @@ class AmazonEcrSpec extends TestKitSuite with AnyFlatSpecLike with Matchers with
.build())
.build)

val getTokenMethod = PrivateMethod[IO[Option[String]]]('getToken)
val getTokenMethod = PrivateMethod[IO[Option[String]]](Symbol("getToken"))
registry invokePrivate getTokenMethod(context, mockIOClient) ensuring(io => io.unsafeRunSync().get == token)
}
}
5 changes: 1 addition & 4 deletions filesystems/s3/src/main/java/org/lerch/s3fs/S3FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,7 @@ public Bucket getBucket() {
}

private Bucket getBucket(String bucketName) {
for (Bucket buck : getClient().listBuckets().buckets())
if (buck.name().equals(bucketName))
return buck;
return null;
return Bucket.builder().name(bucketName).build();
}

private boolean hasBucket(String bucketName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import software.amazon.awssdk.services.sns.model.PublishRequest
import spray.json.enrichAny

import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}
import scala.util.{Failure, Success, Try}


/**
Expand All @@ -61,6 +61,10 @@ class AwsSnsMetadataServiceActor(serviceConfig: Config, globalConfig: Config, se

//setup sns client
val topicArn: String = serviceConfig.getString("aws.topicArn")
val publishStatusOnly: Boolean = Try(serviceConfig.getBoolean("aws.publishStatusOnly")) match {
case Failure(_) => false
case Success(value) => value
}

val awsConfig: AwsConfiguration = AwsConfiguration(globalConfig)
val credentialsProviderChain: AwsCredentialsProviderChain =
Expand All @@ -74,7 +78,11 @@ class AwsSnsMetadataServiceActor(serviceConfig: Config, globalConfig: Config, se
def publishMessages(events: Iterable[MetadataEvent]): Future[Unit] = {
import AwsSnsMetadataServiceActor.EnhancedMetadataEvents

val eventsJson = events.toJson
val eventsJson = if (publishStatusOnly) {
events.filter(_.key.key == "status").toJson
} else {
events.toJson
}
//if there are no events then don't publish anything
if( eventsJson.length < 1) { return Future(())}
log.debug(f"Publishing to $topicArn : $eventsJson")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

package cromwell.backend.impl.aws

import java.net.SocketTimeoutException
import java.net.{SocketTimeoutException, URLDecoder}
import java.io.FileNotFoundException
import java.nio.file.Paths

Expand Down Expand Up @@ -202,7 +202,8 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
configuration.fsxMntPoint,
configuration.efsMntPoint,
Option(runtimeAttributes.efsMakeMD5),
Option(runtimeAttributes.efsDelocalize))
Option(runtimeAttributes.efsDelocalize),
Option(runtimeAttributes.tagResources))
}

// setup batch client to query job container info
Expand Down Expand Up @@ -262,13 +263,13 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
case Success(path: S3Path) =>
configuration.fileSystem match {
case AWSBatchStorageSystems.s3 =>
path.pathWithoutScheme
URLDecoder.decode(path.pathWithoutScheme,"UTF-8")
case _ =>
path.toString
URLDecoder.decode(path.toString,"UTF-8")
}
// non-s3 paths
case _ =>
value
URLDecoder.decode(value,"UTF-8")
}
)
}
Expand Down Expand Up @@ -421,7 +422,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
AwsBatchFileOutput(makeSafeAwsBatchReferenceName(womFile.value), womFile.value, relpath, disk)
} else {
// if efs is not enabled, OR efs delocalization IS enabled, keep the s3 path as destination.
AwsBatchFileOutput(makeSafeAwsBatchReferenceName(womFile.value), destination, relpath, disk)
AwsBatchFileOutput(makeSafeAwsBatchReferenceName(womFile.value), URLDecoder.decode(destination,"UTF-8"), relpath, disk)
}
List(output)
}
Expand Down Expand Up @@ -689,7 +690,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
// STATUS LOGIC:
// - success : container exit code is zero
// - command failure: container exit code > 0, no statusReason in container
// - OOM kill : container exit code > 0, statusReason contains "OutOfMemory"
// - OOM kill : container exit code > 0, statusReason contains "OutOfMemory" OR exit code == 137
// - spot kill : no container exit code set. statusReason of ATTEMPT (not container) says "host EC2 (...) terminated"
Log.debug(s"Looking for memoryRetry in job '${job.jobId}'")
val describeJobsResponse = batchClient.describeJobs(DescribeJobsRequest.builder.jobs(job.jobId).build)
Expand Down Expand Up @@ -722,6 +723,9 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
case "0" =>
Log.debug("container exit code was zero. job succeeded")
false
case "137" =>
Log.info("Job failed with Container status reason : 'OutOfMemory' (code:137)")
true
case _ =>
// failed job due to command errors (~ user errors) don't have a container exit reason.
val containerStatusReason:String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ case class AwsBatchAttributes(fileSystem: String,
fsxMntPoint: Option[List[String]],
efsMntPoint: Option[String],
efsMakeMD5: Option[Boolean],
tagResources: Option[Boolean],
efsDelocalize: Option[Boolean],
globLinkCommand: Option[String],
checkSiblingMd5: Option[Boolean]
Expand All @@ -87,9 +88,13 @@ object AwsBatchAttributes {
"numSubmitAttempts",
"default-runtime-attributes.scriptBucketName",
"awsBatchRetryAttempts",
"awsBatchEvaluateOnExit",
"ulimits",
"gpuCount",
"efsDelocalize",
"efsMakeMD5",
"tagResources",
"maxRetries",
"glob-link-command"
)

Expand Down Expand Up @@ -180,6 +185,13 @@ object AwsBatchAttributes {
case false => None
}
}
// from config if set:
val tagResources:ErrorOr[Option[Boolean]] = validate {
backendConfig.hasPath("default-runtime-attributes.tagResources") match {
case true => Some(backendConfig.getBoolean("default-runtime-attributes.tagResources"))
case false => None
}
}
// from config if set.
val globLinkCommand:ErrorOr[Option[String]] = validate {
backendConfig.hasPath("glob-link-command") match {
Expand All @@ -206,6 +218,7 @@ object AwsBatchAttributes {
efsMntPoint,
efsMakeMD5,
efsDelocalize,
tagResources,
globLinkCommand,
checkSiblingMd5
).tupled.map((AwsBatchAttributes.apply _).tupled) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class AwsBatchConfiguration(val configurationDescriptor: BackendConfigurationDes
val efsMntPoint = batchAttributes.efsMntPoint
val efsMakeMD5 = batchAttributes.efsMakeMD5
val efsDelocalize = batchAttributes.efsDelocalize
val tagResources = batchAttributes.tagResources
val globLinkCommand = batchAttributes.globLinkCommand
val checkSiblingMd5 = batchAttributes.checkSiblingMd5
}
Expand Down
Loading
Loading