Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Centaur blob filesystem #7104

Merged
merged 27 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ lazy val azureBlobFileSystem = (project in file("filesystems/blob"))
.dependsOn(core)
.dependsOn(core % "test->test")
.dependsOn(common % "test->test")
.dependsOn(cloudSupport)

lazy val awsS3FileSystem = (project in file("filesystems/s3"))
.withLibrarySettings("cromwell-aws-s3filesystem", s3FileSystemDependencies)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,11 @@ metadata {
status: Succeeded
"outputs.azure_blob_storage_read.s1": "This is my test file! Did it work??"
}

# az:// is the root of the container specified in reference.conf.
# Here, we verify that exactly one log was written.

fileSystemCheck: "blob"
outputExpectations: {
"az://test-cromwell-workflow-logs/workflow.<<UUID>>.log" : 1
}
6 changes: 6 additions & 0 deletions centaur/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,11 @@ centaur {
include "centaur_aws_credentials.conf"
}

azure {
container: "test-blob"
endpoint: "https://centaurtesting.blob.core.windows.net"
subscription: "62b22893-6bc1-46d9-8a90-806bb3cce3c9"
}

log-request-failures = false
}
11 changes: 11 additions & 0 deletions centaur/src/main/scala/centaur/test/FilesChecker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,14 @@ object AWSFilesChecker extends FilesChecker {

override def countObjectsAtPath: String => Int = s3Client.countObjects(s3PrefixRegex)
}

object BlobFilesChecker extends FilesChecker {
import ObjectCounterInstances.blobObjectCounter
import ObjectCounterSyntax._

private lazy val containerClient = Operations.blobContainerClient

// The root of the endpoint + container specified in reference.conf will be substituted for az://
private val azurePrefixRange = "^az:\\/\\/.*"
override def countObjectsAtPath: String => Int = ObjectCounterSyntax(containerClient).countObjects(azurePrefixRange)
}
20 changes: 20 additions & 0 deletions centaur/src/main/scala/centaur/test/ObjectCounter.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package centaur.test

import com.azure.storage.blob.BlobContainerClient
import com.google.cloud.storage.Storage.BlobListOption
import com.google.cloud.storage.{Blob, Storage}
import software.amazon.awssdk.services.s3.S3Client
Expand Down Expand Up @@ -38,6 +39,25 @@ object ObjectCounterInstances {
storage.list(g.bucket, BlobListOption.prefix(g.directory)).iterateAll.asScala
listObjectsAtPath(_).size
}

implicit val blobObjectCounter: ObjectCounter[BlobContainerClient] = (containerClient : BlobContainerClient) => {
val pathToInt: Path => Int = providedPath => {
//Our path parsing is somewhat GCP centric. Convert to a blob path starting from the container root.
def pathToBlobPath(parsedPath : Path) : String = {
(Option(parsedPath.bucket), Option(parsedPath.directory)) match {
case (None, _) => ""
case (Some(_), None) => parsedPath.bucket
case (Some(_), Some(_)) => parsedPath.bucket + "/" + parsedPath.directory
}
}

val fullPath = pathToBlobPath(providedPath)
val blobsInFolder = containerClient.listBlobsByHierarchy(fullPath)
//if something "isPrefix", it's a directory. Otherwise, its a file. We just want to count files.
blobsInFolder.asScala.filter(item => !item.isPrefix).size
THWiseman marked this conversation as resolved.
Show resolved Hide resolved
}
pathToInt(_)
}
}

object ObjectCounterSyntax {
Expand Down
9 changes: 9 additions & 0 deletions centaur/src/main/scala/centaur/test/Test.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import centaur.test.metadata.WorkflowFlatMetadata
import centaur.test.metadata.WorkflowFlatMetadata._
import centaur.test.submit.SubmitHttpResponse
import centaur.test.workflow.Workflow
import com.azure.storage.blob.BlobContainerClient
import com.google.api.services.genomics.v2alpha1.{Genomics, GenomicsScopes}
import com.google.api.services.storage.StorageScopes
import com.google.auth.Credentials
Expand All @@ -23,6 +24,7 @@ import configs.syntax._
import cromwell.api.CromwellClient.UnsuccessfulRequestException
import cromwell.api.model.{CallCacheDiff, Failed, HashDifference, SubmittedWorkflow, Succeeded, TerminalStatus, WaasDescription, WorkflowId, WorkflowMetadata, WorkflowStatus}
import cromwell.cloudsupport.aws.AwsConfiguration
import cromwell.cloudsupport.azure.{AzureUtils}
import cromwell.cloudsupport.gcp.GoogleConfiguration
import cromwell.cloudsupport.gcp.auth.GoogleAuthMode
import io.circe.parser._
Expand Down Expand Up @@ -150,6 +152,13 @@ object Operations extends StrictLogging {
.build()
}

lazy val azureConfig: Config = CentaurConfig.conf.getConfig("azure")
val azureSubscription = azureConfig.getString("subscription")
val blobContainer = azureConfig.getString("container")
val azureEndpoint = azureConfig.getString("endpoint")
//NB: Centaur will throw an exception if it isn't able to authenticate with Azure blob storage via the local environment.
lazy val blobContainerClient: BlobContainerClient = AzureUtils.buildContainerClientFromLocalEnvironment(blobContainer, azureEndpoint, Option(azureSubscription)).get

def submitWorkflow(workflow: Workflow): Test[SubmittedWorkflow] = {
new Test[SubmittedWorkflow] {
override def run: IO[SubmittedWorkflow] = for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package centaur.test.workflow

import cats.data.Validated._
import cats.syntax.all._
import centaur.test.{AWSFilesChecker, FilesChecker, LocalFilesChecker, PipelinesFilesChecker}
import centaur.test.{AWSFilesChecker, FilesChecker, LocalFilesChecker, PipelinesFilesChecker, BlobFilesChecker}
import com.typesafe.config.Config
import common.validation.ErrorOr.ErrorOr
import configs.Result
Expand All @@ -25,8 +25,9 @@ object DirectoryContentCountCheck {
case Result.Success("gcs") => valid(PipelinesFilesChecker)
case Result.Success("local") => valid(LocalFilesChecker)
case Result.Success("aws") => valid(AWSFilesChecker)
case Result.Success(_) => invalidNel(s"Test '$name': Invalid 'fileSystemCheck' value (must be either 'local', 'gcs' or 'aws'")
case Result.Failure(_) => invalidNel(s"Test '$name': Must specify a 'fileSystemCheck' value (must be either 'local', 'gcs' or 'aws'")
case Result.Success("blob") => valid(BlobFilesChecker)
case Result.Success(_) => invalidNel(s"Test '$name': Invalid 'fileSystemCheck' value (must be either 'local', 'gcs', 'blob', or 'aws'")
case Result.Failure(_) => invalidNel(s"Test '$name': Must specify a 'fileSystemCheck' value (must be either 'local', 'gcs', 'blob', or 'aws'")
}

(directoryContentCountsValidation, fileSystemChecker) mapN { (d, f) => Option(DirectoryContentCountCheck(d, f)) }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package cromwell.cloudsupport.azure

import com.azure.core.management.AzureEnvironment
import com.azure.core.management.profile.AzureProfile
import com.azure.identity.DefaultAzureCredentialBuilder
import com.azure.resourcemanager.AzureResourceManager
import com.azure.resourcemanager.storage.models.StorageAccountKey
import com.azure.storage.blob.{BlobContainerClient, BlobContainerClientBuilder}
import com.azure.storage.common.StorageSharedKeyCredential
import com.google.common.net.UrlEscapers

import java.net.URI
import scala.jdk.CollectionConverters.IterableHasAsScala
import scala.util.{Failure, Success, Try}

object AzureUtils {
/**
* Generates a BlobContainerClient that can interact with the specified container. Authenticates using the local azure client running on the same machine.
* @param blobContainer Name of the blob container. Looks something like "my-blob-container".
* @param azureEndpoint Azure endpoint of the container. Looks something like https://somedomain.blob.core.windows.net.
* @param subscription Azure subscription. A globally unique identifier. If not provided, a default subscription will be used.
* @return A blob container client capable of interacting with the specified container.
*/
def buildContainerClientFromLocalEnvironment(blobContainer: String, azureEndpoint: String, subscription : Option[String]): Try[BlobContainerClient] = {
def parseURI(string: String): Try[URI] = Try(URI.create(UrlEscapers.urlFragmentEscaper().escape(string)))
def parseStorageAccount(uri: URI): Try[String] = uri.getHost.split("\\.").find(_.nonEmpty)
.map(Success(_)).getOrElse(Failure(new Exception("Could not parse storage account")))

val azureProfile = new AzureProfile(AzureEnvironment.AZURE)

def azureCredentialBuilder = new DefaultAzureCredentialBuilder()
.authorityHost(azureProfile.getEnvironment.getActiveDirectoryEndpoint)
.build

def authenticateWithSubscription(sub: String) = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withSubscription(sub)

def authenticateWithDefaultSubscription = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withDefaultSubscription()

def azure = subscription.map(authenticateWithSubscription(_)).getOrElse(authenticateWithDefaultSubscription)

def findAzureStorageAccount(storageAccountName: String) = azure.storageAccounts.list.asScala.find(_.name.equals(storageAccountName))
.map(Success(_)).getOrElse(Failure(new Exception("Azure Storage Account not found.")))

def buildBlobContainerClient(credential: StorageSharedKeyCredential, endpointURL: String, blobContainerName: String): BlobContainerClient = {
new BlobContainerClientBuilder()
.credential(credential)
.endpoint(endpointURL)
.containerName(blobContainerName)
.buildClient()
}

def generateBlobContainerClient: Try[BlobContainerClient] = for {
uri <- parseURI(azureEndpoint)
configuredAccount <- parseStorageAccount(uri)
azureAccount <- findAzureStorageAccount(configuredAccount)
keys = azureAccount.getKeys.asScala
key <- keys.headOption.fold[Try[StorageAccountKey]](Failure(new Exception("Storage account has no keys")))(Success(_))
first = key.value
sskc = new StorageSharedKeyCredential(configuredAccount, first)
bcc = buildBlobContainerClient(sskc, azureEndpoint, blobContainer)
} yield bcc

generateBlobContainerClient
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
package cromwell.filesystems.blob

import com.azure.core.credential.AzureSasCredential
import com.azure.core.management.AzureEnvironment
import com.azure.core.management.profile.AzureProfile
import com.azure.identity.DefaultAzureCredentialBuilder
import com.azure.resourcemanager.AzureResourceManager
import com.azure.resourcemanager.storage.models.StorageAccountKey
import com.azure.storage.blob.nio.AzureFileSystem
import com.azure.storage.blob.sas.{BlobContainerSasPermission, BlobServiceSasSignatureValues}
import com.azure.storage.blob.{BlobContainerClient, BlobContainerClientBuilder}
import com.azure.storage.common.StorageSharedKeyCredential
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import common.validation.Validation._
import cromwell.cloudsupport.azure.AzureUtils

import java.net.URI
import java.nio.file.{FileSystem, FileSystemNotFoundException, FileSystems}
Expand Down Expand Up @@ -223,21 +217,9 @@ case class WSMBlobSasTokenGenerator(container: BlobContainerName,
}

case class NativeBlobSasTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, subscription: Option[SubscriptionId] = None) extends BlobSasTokenGenerator {
private val azureProfile = new AzureProfile(AzureEnvironment.AZURE)
private def azureCredentialBuilder = new DefaultAzureCredentialBuilder()
.authorityHost(azureProfile.getEnvironment.getActiveDirectoryEndpoint)
.build
private def authenticateWithSubscription(sub: SubscriptionId) = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withSubscription(sub.toString)
private def authenticateWithDefaultSubscription = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withDefaultSubscription()
private def azure = subscription.map(authenticateWithSubscription(_)).getOrElse(authenticateWithDefaultSubscription)
private def findAzureStorageAccount(name: StorageAccountName) = azure.storageAccounts.list.asScala.find(_.name.equals(name.value))
.map(Success(_)).getOrElse(Failure(new Exception("Azure Storage Account not found")))
private def buildBlobContainerClient(credential: StorageSharedKeyCredential, endpoint: EndpointURL, container: BlobContainerName): BlobContainerClient = {
new BlobContainerClientBuilder()
.credential(credential)
.endpoint(endpoint.value)
.containerName(container.value)
.buildClient()
private val azureSubscription : Option[String] = subscription match {
case Some(x) => Option(x.toString)
case None => None
}
private val bcsp = new BlobContainerSasPermission()
.setReadPermission(true)
Expand All @@ -252,14 +234,7 @@ case class NativeBlobSasTokenGenerator(container: BlobContainerName, endpoint: E
* @return an AzureSasCredential for accessing a blob container
*/
def generateBlobSasToken: Try[AzureSasCredential] = for {
uri <- BlobPathBuilder.parseURI(endpoint.value)
configuredAccount <- BlobPathBuilder.parseStorageAccount(uri)
azureAccount <- findAzureStorageAccount(configuredAccount)
keys = azureAccount.getKeys.asScala
key <- keys.headOption.fold[Try[StorageAccountKey]](Failure(new Exception("Storage account has no keys")))(Success(_))
first = key.value
sskc = new StorageSharedKeyCredential(configuredAccount.value, first)
bcc = buildBlobContainerClient(sskc, endpoint, container)
bcc <- AzureUtils.buildContainerClientFromLocalEnvironment(container.toString, endpoint.toString, azureSubscription)
THWiseman marked this conversation as resolved.
Show resolved Hide resolved
bsssv = new BlobServiceSasSignatureValues(OffsetDateTime.now.plusDays(1), bcsp)
asc = new AzureSasCredential(bcc.generateSas(bsssv))
} yield asc
Expand Down
3 changes: 2 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ object Dependencies {
exclude("jakarta.xml.bind", "jakarta.xml.bind-api")
exclude("jakarta.activation", "jakarta.activation-api"),
"com.azure" % "azure-core-management" % "1.7.1",
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-xml" % jacksonV,
"com.azure.resourcemanager" % "azure-resourcemanager" % "2.18.0"
)

Expand Down Expand Up @@ -409,7 +410,7 @@ object Dependencies {
"com.lihaoyi" %% "pprint" % pprintV,
) ++ catsDependencies ++ configDependencies ++ slf4jFacadeDependencies ++ refinedTypeDependenciesList

val cloudSupportDependencies: List[ModuleID] = googleApiClientDependencies ++ googleCloudDependencies ++ betterFilesDependencies ++ awsCloudDependencies
val cloudSupportDependencies: List[ModuleID] = googleApiClientDependencies ++ googleCloudDependencies ++ betterFilesDependencies ++ awsCloudDependencies ++ azureDependencies

val databaseSqlDependencies: List[ModuleID] = List(
"commons-io" % "commons-io" % commonsIoV,
Expand Down