Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Centaur blob filesystem #7104

Merged
merged 27 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,11 @@ metadata {
status: Succeeded
"outputs.azure_blob_storage_read.s1": "This is my test file! Did it work??"
}

# az:// is the root of the container specified in reference.conf.
# Here, we verify that exactly one log was written.

fileSystemCheck: "blob"
outputExpectations: {
"az://test-cromwell-workflow-logs/workflow.<<UUID>>.log" : 1
}
6 changes: 6 additions & 0 deletions centaur/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,11 @@ centaur {
include "centaur_aws_credentials.conf"
}

azure {
container: "test-blob"
endpoint: "https://centaurtesting.blob.core.windows.net"
subscription: "62b22893-6bc1-46d9-8a90-806bb3cce3c9"
}

log-request-failures = false
}
11 changes: 11 additions & 0 deletions centaur/src/main/scala/centaur/test/FilesChecker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,14 @@ object AWSFilesChecker extends FilesChecker {

override def countObjectsAtPath: String => Int = s3Client.countObjects(s3PrefixRegex)
}

object BlobFilesChecker extends FilesChecker {
import ObjectCounterInstances.blobObjectCounter
import ObjectCounterSyntax._

private lazy val containerClient = Operations.blobContainerClient

// The root of the endpoint + container specified in reference.conf will be substituted for az://
private val azurePrefixRange = "^az:\\/\\/.*"
override def countObjectsAtPath: String => Int = ObjectCounterSyntax(containerClient).countObjects(azurePrefixRange)
}
17 changes: 17 additions & 0 deletions centaur/src/main/scala/centaur/test/ObjectCounter.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package centaur.test

import com.azure.storage.blob.BlobContainerClient
import com.google.cloud.storage.Storage.BlobListOption
import com.google.cloud.storage.{Blob, Storage}
import software.amazon.awssdk.services.s3.S3Client
Expand Down Expand Up @@ -38,6 +39,22 @@ object ObjectCounterInstances {
storage.list(g.bucket, BlobListOption.prefix(g.directory)).iterateAll.asScala
listObjectsAtPath(_).size
}

implicit val blobObjectCounter: ObjectCounter[BlobContainerClient] = (containerClient : BlobContainerClient) => {
val pathToInt: Path => Int = providedPath => {
//Our path parsing is somewhat GCP centric. Convert to a blob path starting from the container root.
def pathToBlobPath(parsedPath : Path) : String = {
if(parsedPath.bucket.isEmpty) return ""
if(parsedPath.directory.isEmpty) return parsedPath.bucket
return parsedPath.bucket + "/" + parsedPath.directory
THWiseman marked this conversation as resolved.
Show resolved Hide resolved
}
val fullPath = pathToBlobPath(providedPath)
val blobsInFolder = containerClient.listBlobsByHierarchy(fullPath)
//if something "isPrefix", it's a directory. Otherwise, its a file. We just want to count files.
blobsInFolder.asScala.filter(item => !item.isPrefix).size
THWiseman marked this conversation as resolved.
Show resolved Hide resolved
}
pathToInt(_)
}
}

object ObjectCounterSyntax {
Expand Down
5 changes: 5 additions & 0 deletions centaur/src/main/scala/centaur/test/Test.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import centaur.test.metadata.WorkflowFlatMetadata
import centaur.test.metadata.WorkflowFlatMetadata._
import centaur.test.submit.SubmitHttpResponse
import centaur.test.workflow.Workflow
import com.azure.storage.blob.BlobContainerClient
import com.google.api.services.genomics.v2alpha1.{Genomics, GenomicsScopes}
import com.google.api.services.storage.StorageScopes
import com.google.auth.Credentials
Expand All @@ -23,6 +24,7 @@ import configs.syntax._
import cromwell.api.CromwellClient.UnsuccessfulRequestException
import cromwell.api.model.{CallCacheDiff, Failed, HashDifference, SubmittedWorkflow, Succeeded, TerminalStatus, WaasDescription, WorkflowId, WorkflowMetadata, WorkflowStatus}
import cromwell.cloudsupport.aws.AwsConfiguration
import cromwell.cloudsupport.azure.AzureConfiguration
import cromwell.cloudsupport.gcp.GoogleConfiguration
import cromwell.cloudsupport.gcp.auth.GoogleAuthMode
import io.circe.parser._
Expand Down Expand Up @@ -150,6 +152,9 @@ object Operations extends StrictLogging {
.build()
}

lazy val azureConfig: Config = CentaurConfig.conf.getConfig("azure")
lazy val blobContainerClient: BlobContainerClient = AzureConfiguration.apply(azureConfig)

def submitWorkflow(workflow: Workflow): Test[SubmittedWorkflow] = {
new Test[SubmittedWorkflow] {
override def run: IO[SubmittedWorkflow] = for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package centaur.test.workflow

import cats.data.Validated._
import cats.syntax.all._
import centaur.test.{AWSFilesChecker, FilesChecker, LocalFilesChecker, PipelinesFilesChecker}
import centaur.test.{AWSFilesChecker, FilesChecker, LocalFilesChecker, PipelinesFilesChecker, BlobFilesChecker}
import com.typesafe.config.Config
import common.validation.ErrorOr.ErrorOr
import configs.Result
Expand All @@ -25,8 +25,9 @@ object DirectoryContentCountCheck {
case Result.Success("gcs") => valid(PipelinesFilesChecker)
case Result.Success("local") => valid(LocalFilesChecker)
case Result.Success("aws") => valid(AWSFilesChecker)
case Result.Success(_) => invalidNel(s"Test '$name': Invalid 'fileSystemCheck' value (must be either 'local', 'gcs' or 'aws'")
case Result.Failure(_) => invalidNel(s"Test '$name': Must specify a 'fileSystemCheck' value (must be either 'local', 'gcs' or 'aws'")
case Result.Success("blob") => valid(BlobFilesChecker)
case Result.Success(_) => invalidNel(s"Test '$name': Invalid 'fileSystemCheck' value (must be either 'local', 'gcs', 'blob', or 'aws'")
case Result.Failure(_) => invalidNel(s"Test '$name': Must specify a 'fileSystemCheck' value (must be either 'local', 'gcs', 'blob', or 'aws'")
}

(directoryContentCountsValidation, fileSystemChecker) mapN { (d, f) => Option(DirectoryContentCountCheck(d, f)) }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package cromwell.cloudsupport.azure

import com.typesafe.config.{Config}
import com.azure.core.credential.AzureSasCredential
import com.azure.core.management.AzureEnvironment
import com.azure.core.management.profile.AzureProfile
import com.azure.identity.DefaultAzureCredentialBuilder
import com.azure.resourcemanager.AzureResourceManager
import com.azure.resourcemanager.storage.models.StorageAccountKey
import com.azure.storage.blob.sas.{BlobContainerSasPermission, BlobServiceSasSignatureValues}
import com.azure.storage.blob.{BlobContainerClient, BlobContainerClientBuilder}
import com.azure.storage.common.StorageSharedKeyCredential
import com.google.common.net.UrlEscapers

import java.net.URI
import java.time.OffsetDateTime
import scala.jdk.CollectionConverters.IterableHasAsScala
import scala.util.{Failure, Success, Try}

final case class AzureConfiguration private (subscription: String, endpoint: String, container: String) {

}

object AzureConfiguration {
THWiseman marked this conversation as resolved.
Show resolved Hide resolved

def apply(config: Config): BlobContainerClient = {
val azureSubscription = config.getString("subscription")
val blobContainer = config.getString("container")
val azureEndpoint = config.getString("endpoint")

def parseURI(string: String): Try[URI] = Try(URI.create(UrlEscapers.urlFragmentEscaper().escape(string)))

def parseStorageAccount(uri: URI): Try[String] = uri.getHost.split("\\.").find(_.nonEmpty)
.map(Success(_)).getOrElse(Failure(new Exception("Could not parse storage account")))

val azureProfile = new AzureProfile(AzureEnvironment.AZURE)

def azureCredentialBuilder = new DefaultAzureCredentialBuilder()
.authorityHost(azureProfile.getEnvironment.getActiveDirectoryEndpoint)
.build

def authenticateWithSubscription(sub: String) = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withSubscription(sub)

def azure = authenticateWithSubscription(azureSubscription)

def findAzureStorageAccount(storageAccountName: String) = azure.storageAccounts.list.asScala.find(_.name.equals(storageAccountName))
.map(Success(_)).getOrElse(Failure(new Exception("Azure Storage Account not found.")))

def buildBlobContainerClient(credential: StorageSharedKeyCredential, endpointURL: String, blobContainerName: String): BlobContainerClient = {
new BlobContainerClientBuilder()
.credential(credential)
.endpoint(endpointURL)
.containerName(blobContainerName)
.buildClient()
}

val bcsp = new BlobContainerSasPermission()
.setReadPermission(true)
.setCreatePermission(true)
.setListPermission(true)
.setWritePermission(true)

def generateBlobContainerClient: Try[BlobContainerClient] = for {
uri <- parseURI(azureEndpoint)
configuredAccount <- parseStorageAccount(uri)
azureAccount <- findAzureStorageAccount(configuredAccount)
keys = azureAccount.getKeys.asScala
key <- keys.headOption.fold[Try[StorageAccountKey]](Failure(new Exception("Storage account has no keys")))(Success(_))
first = key.value
sskc = new StorageSharedKeyCredential(configuredAccount, first)
bcc = buildBlobContainerClient(sskc, azureEndpoint, blobContainer)
bsssv = new BlobServiceSasSignatureValues(OffsetDateTime.now.plusDays(1), bcsp)
asc = new AzureSasCredential(bcc.generateSas(bsssv))
} yield bcc

if (generateBlobContainerClient.isFailure) {
THWiseman marked this conversation as resolved.
Show resolved Hide resolved
throw new Exception("Failed to generate Blob Container Client.")
}
generateBlobContainerClient.get
}
}
3 changes: 2 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ object Dependencies {
exclude("jakarta.xml.bind", "jakarta.xml.bind-api")
exclude("jakarta.activation", "jakarta.activation-api"),
"com.azure" % "azure-core-management" % "1.7.1",
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-xml" % jacksonV,
"com.azure.resourcemanager" % "azure-resourcemanager" % "2.18.0"
)

Expand Down Expand Up @@ -409,7 +410,7 @@ object Dependencies {
"com.lihaoyi" %% "pprint" % pprintV,
) ++ catsDependencies ++ configDependencies ++ slf4jFacadeDependencies ++ refinedTypeDependenciesList

val cloudSupportDependencies: List[ModuleID] = googleApiClientDependencies ++ googleCloudDependencies ++ betterFilesDependencies ++ awsCloudDependencies
val cloudSupportDependencies: List[ModuleID] = googleApiClientDependencies ++ googleCloudDependencies ++ betterFilesDependencies ++ awsCloudDependencies ++ azureDependencies

val databaseSqlDependencies: List[ModuleID] = List(
"commons-io" % "commons-io" % commonsIoV,
Expand Down