Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BT-711 Refresh SAS token for filesystem on expiry #6831

Merged
merged 18 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package cromwell.filesystems.blob

import com.azure.core.credential.AzureSasCredential
import com.azure.core.management.AzureEnvironment
import com.azure.core.management.profile.AzureProfile
import com.azure.identity.DefaultAzureCredentialBuilder
import com.azure.resourcemanager.AzureResourceManager
import com.azure.resourcemanager.storage.models.{StorageAccount, StorageAccountKey}
import com.azure.storage.blob.nio.AzureFileSystem
import com.azure.storage.blob.sas.{BlobContainerSasPermission, BlobServiceSasSignatureValues}
import com.azure.storage.blob.{BlobContainerClient, BlobContainerClientBuilder}
import com.azure.storage.common.StorageSharedKeyCredential

import java.net.URI
import java.nio.file.{FileSystem, FileSystemNotFoundException, FileSystems}
import java.time.temporal.ChronoUnit
import java.time.{Duration, Instant, OffsetDateTime}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

case class FileSystemAPI() {
def getFileSystem(uri: URI): Try[FileSystem] = Try(FileSystems.getFileSystem(uri))
def newFileSystem(uri: URI, config: Map[String, Object]): FileSystem = FileSystems.newFileSystem(uri, config.asJava)
def closeFileSystem(uri: URI): Option[Unit] = getFileSystem(uri).toOption.map(_.close)
}

object BlobFileSystemManager {
def parseTokenExpiry(token: AzureSasCredential): Option[Instant] = for {
expiryString <- token.getSignature.split("&").find(_.startsWith("se")).map(_.replaceFirst("se=","")).map(_.replace("%3A", ":"))
instant = Instant.parse(expiryString)
} yield instant

def buildConfigMap(credential: AzureSasCredential, container: BlobContainerName): Map[String, Object] = {
Map((AzureFileSystem.AZURE_STORAGE_SAS_TOKEN_CREDENTIAL, credential),
(AzureFileSystem.AZURE_STORAGE_FILE_STORES, container.value),
(AzureFileSystem.AZURE_STORAGE_SKIP_INITIAL_CONTAINER_CHECK, java.lang.Boolean.TRUE))
}
def hasTokenExpired(tokenExpiry: Instant, buffer: Duration): Boolean = Instant.now.plus(buffer).isAfter(tokenExpiry)
def uri(endpoint: EndpointURL) = new URI("azb://?endpoint=" + endpoint)
}
case class BlobFileSystemManager(
container: BlobContainerName,
endpoint: EndpointURL,
expiryBufferMinutes: Long,
blobTokenGenerator: BlobTokenGenerator,
fileSystemAPI: FileSystemAPI = FileSystemAPI(),
private val initialExpiration: Option[Instant] = None) {
private var expiry: Option[Instant] = initialExpiration
val buffer: Duration = Duration.of(expiryBufferMinutes, ChronoUnit.MINUTES)

def getExpiry: Option[Instant] = expiry
def uri: URI = BlobFileSystemManager.uri(endpoint)
def isTokenExpired: Boolean = expiry.exists(BlobFileSystemManager.hasTokenExpired(_, buffer))
def shouldReopenFilesystem: Boolean = isTokenExpired || expiry.isEmpty
def retrieveFilesystem(): Try[FileSystem] = {
synchronized {
shouldReopenFilesystem match {
case false => fileSystemAPI.getFileSystem(uri).recoverWith {
// If no filesystem already exists, this will create a new connection, with the provided configs
case _: FileSystemNotFoundException => blobTokenGenerator.generateAccessToken.flatMap(generateFilesystem(uri, container, _))
}
// If the token has expired, OR there is no token record, try to close the FS and regenerate
case true =>
fileSystemAPI.closeFileSystem(uri)
blobTokenGenerator.generateAccessToken.flatMap(generateFilesystem(uri, container, _))
}
}
}

private def generateFilesystem(uri: URI, container: BlobContainerName, token: AzureSasCredential): Try[FileSystem] = {
expiry = BlobFileSystemManager.parseTokenExpiry(token)
if (expiry.isEmpty) return Failure(new Exception("Could not reopen filesystem, no expiration found"))
Try(fileSystemAPI.newFileSystem(uri, BlobFileSystemManager.buildConfigMap(token, container)))
}

}

sealed trait BlobTokenGenerator {def generateAccessToken: Try[AzureSasCredential]}
object BlobTokenGenerator {
def createBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, subscription: Option[String]): BlobTokenGenerator = {
createBlobTokenGenerator(container, endpoint, None, None, subscription)
}
def createBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, workspaceId: Option[WorkspaceId], workspaceManagerURL: Option[WorkspaceManagerURL], subscription: Option[String]): BlobTokenGenerator = {
(container: BlobContainerName, endpoint: EndpointURL, workspaceId, workspaceManagerURL) match {
case (container, endpoint, None, None) =>
NativeBlobTokenGenerator(container, endpoint, subscription)
case (container, endpoint, Some(workspaceId), Some(workspaceManagerURL)) =>
WSMBlobTokenGenerator(container, endpoint, workspaceId, workspaceManagerURL)
case _ =>
throw new Exception("Arguments provided do not match any available BlobTokenGenerator implementation.")
}
}
def createBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL): BlobTokenGenerator = createBlobTokenGenerator(container, endpoint, None)
def createBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, workspaceId: Option[WorkspaceId], workspaceManagerURL: Option[WorkspaceManagerURL]): BlobTokenGenerator =
createBlobTokenGenerator(container, endpoint, workspaceId, workspaceManagerURL, None)

}

case class WSMBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, workspaceId: WorkspaceId, workspaceManagerURL: WorkspaceManagerURL) extends BlobTokenGenerator {
def generateAccessToken: Try[AzureSasCredential] = Failure(new NotImplementedError)
}

case class NativeBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, subscription: Option[String] = None) extends BlobTokenGenerator {

private val azureProfile = new AzureProfile(AzureEnvironment.AZURE)
private def azureCredentialBuilder = new DefaultAzureCredentialBuilder()
.authorityHost(azureProfile.getEnvironment.getActiveDirectoryEndpoint)
.build
private def authenticateWithSubscription(sub: String) = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withSubscription(sub)
private def authenticateWithDefaultSubscription = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withDefaultSubscription()
private def azure = subscription.map(authenticateWithSubscription(_)).getOrElse(authenticateWithDefaultSubscription)

private def findAzureStorageAccount(name: StorageAccountName) = azure.storageAccounts.list.asScala.find(_.name.equals(name.value))
.fold[Try[StorageAccount]](Failure(new Exception("Azure Storage Account not found")))(Success(_))
private def buildBlobContainerClient(credential: StorageSharedKeyCredential, endpoint: EndpointURL, container: BlobContainerName): BlobContainerClient = {
new BlobContainerClientBuilder()
.credential(credential)
.endpoint(endpoint.value)
.containerName(container.value)
.buildClient()
}
private val bcsp = new BlobContainerSasPermission()
.setReadPermission(true)
.setCreatePermission(true)
.setListPermission(true)


def generateAccessToken: Try[AzureSasCredential] = for {
configuredAccount <- BlobPathBuilder.parseStorageAccount(BlobPathBuilder.parseURI(endpoint.value))
azureAccount <- findAzureStorageAccount(configuredAccount)
keys = azureAccount.getKeys.asScala
key <- keys.headOption.fold[Try[StorageAccountKey]](Failure(new Exception("Storage account has no keys")))(Success(_))
first = key.value
sskc = new StorageSharedKeyCredential(configuredAccount.value, first)
bcc = buildBlobContainerClient(sskc, endpoint, container)
bsssv = new BlobServiceSasSignatureValues(OffsetDateTime.now.plusDays(1), bcsp)
asc = new AzureSasCredential(bcc.generateSas(bsssv))
} yield asc
}
Original file line number Diff line number Diff line change
@@ -1,26 +1,22 @@
package cromwell.filesystems.blob

import com.azure.core.credential.AzureSasCredential
import com.azure.storage.blob.nio.AzureFileSystem
import com.google.common.net.UrlEscapers
import cromwell.core.path.{NioPath, Path, PathBuilder}
import cromwell.filesystems.blob.BlobPathBuilder._

import java.net.{MalformedURLException, URI}
import java.nio.file.{FileSystem, FileSystemNotFoundException, FileSystems}
import scala.jdk.CollectionConverters._
import scala.language.postfixOps
import scala.util.{Failure, Try}
import scala.util.{Failure, Success, Try}

object BlobPathBuilder {

sealed trait BlobPathValidation
case class ValidBlobPath(path: String) extends BlobPathValidation
case class UnparsableBlobPath(errorMessage: Throwable) extends BlobPathValidation

def invalidBlobPathMessage(container: String, endpoint: String) = s"Malformed Blob URL for this builder. Expecting a URL for a container $container and endpoint $endpoint"
def parseURI(string: String) = URI.create(UrlEscapers.urlFragmentEscaper().escape(string))
def parseStorageAccount(uri: URI) = uri.getHost().split("\\.").filter(!_.isEmpty()).headOption
def invalidBlobPathMessage(container: BlobContainerName, endpoint: EndpointURL) = s"Malformed Blob URL for this builder. Expecting a URL for a container $container and endpoint $endpoint"
def parseURI(string: String): URI = URI.create(UrlEscapers.urlFragmentEscaper().escape(string))
def parseStorageAccount(uri: URI): Try[StorageAccountName] = uri.getHost.split("\\.").find(_.nonEmpty).map(StorageAccountName(_)).fold[Try[StorageAccountName]](Failure(new Exception("Could not parse storage account")))(Success(_))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IntelliJ is telling me that .map(StorageAccountName(_)) can be just .map(StorageAccountName). But I'm also seeing it say that doesn't work in other cases and I don't understand why. 🤷‍♂️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was wondering about this shorthand. I may leave this to be consistent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fold doesn't feel natural to me when dealing with an Option (though I get that it works because it's essentially a container). Is this a common way to handle this in other places in Cromwell?

If not, what do you think of:

    maybeName.toRight(new Exception("Could not parse storage account")).toTry

or

    maybeName.map(Success(_)).getOrElse(Failure(new Exception("Could not parse storage account")))

(.map(Success(_)) is where IntelliJ complains if I try just .map(Success). 🤷‍♂️)
or even just pattern matching on Some/None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the map approach! I'll go with that


/**
* Validates a that a path from a string is a valid BlobPath of the format:
Expand All @@ -40,13 +36,13 @@ object BlobPathBuilder {
*
* If the configured container and storage account do not match, the string is considered unparsable
*/
def validateBlobPath(string: String, container: String, endpoint: String): BlobPathValidation = {
def validateBlobPath(string: String, container: BlobContainerName, endpoint: EndpointURL): BlobPathValidation = {
Try {
val uri = parseURI(string)
val storageAccount = parseStorageAccount(parseURI(endpoint))
val hasContainer = uri.getPath().split("/").filter(!_.isEmpty()).headOption.contains(container)
def hasEndpoint = parseStorageAccount(uri).contains(storageAccount.get)
if (hasContainer && !storageAccount.isEmpty && hasEndpoint) {
val storageAccount = parseStorageAccount(parseURI(endpoint.value))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be helpful to continue pushing this style through the whole function. Managing the Try returned here inside of a Try { } didn't make sense to me at first. I wonder if parseURI is the only other thing in here that could throw an exception... at which point a for-comprehension might be really easy to read.

val hasContainer = uri.getPath.split("/").find(_.nonEmpty).contains(container.value)
val hasEndpoint = storageAccount.toOption.exists(parseStorageAccount(uri).toOption.contains(_))
if (hasContainer && storageAccount.isSuccess && hasEndpoint) {
ValidBlobPath(uri.getPath.replaceFirst("/" + container, ""))
} else {
UnparsableBlobPath(new MalformedURLException(invalidBlobPathMessage(container, endpoint)))
Expand All @@ -55,39 +51,30 @@ object BlobPathBuilder {
}
}

class BlobPathBuilder(blobTokenGenerator: BlobTokenGenerator, container: String, endpoint: String) extends PathBuilder {

val credential: AzureSasCredential = new AzureSasCredential(blobTokenGenerator.getAccessToken)
val fileSystemConfig: Map[String, Object] = Map((AzureFileSystem.AZURE_STORAGE_SAS_TOKEN_CREDENTIAL, credential),
(AzureFileSystem.AZURE_STORAGE_FILE_STORES, container),
(AzureFileSystem.AZURE_STORAGE_SKIP_INITIAL_CONTAINER_CHECK, java.lang.Boolean.TRUE))

def retrieveFilesystem(uri: URI): Try[FileSystem] = {
Try(FileSystems.getFileSystem(uri)) recover {
// If no filesystem already exists, this will create a new connection, with the provided configs
case _: FileSystemNotFoundException => FileSystems.newFileSystem(uri, fileSystemConfig.asJava)
}
}
class BlobPathBuilder(container: BlobContainerName, endpoint: EndpointURL)(private val fsm: BlobFileSystemManager) extends PathBuilder {

def build(string: String): Try[BlobPath] = {
validateBlobPath(string, container, endpoint) match {
case ValidBlobPath(path) => for {
fileSystem <- retrieveFilesystem(new URI("azb://?endpoint=" + endpoint))
nioPath <- Try(fileSystem.getPath(path))
blobPath = BlobPath(nioPath, endpoint, container)
} yield blobPath
case ValidBlobPath(path) => Try(BlobPath(path, endpoint, container)(fsm))
case UnparsableBlobPath(errorMessage: Throwable) => Failure(errorMessage)
}
}

override def name: String = "Azure Blob Storage"
}

// Add args for container, storage account name
case class BlobPath private[blob](nioPath: NioPath, endpoint: String, container: String) extends Path {
override protected def newPath(nioPath: NioPath): Path = BlobPath(nioPath, endpoint, container)
case class BlobPath private[blob](pathString: String, endpoint: EndpointURL, container: BlobContainerName)(private val fsm: BlobFileSystemManager) extends Path {
override def nioPath: NioPath = findNioPath(pathString)

override protected def newPath(nioPath: NioPath): Path = BlobPath(nioPath.toString, endpoint, container)(fsm)

override def pathAsString: String = List(endpoint, container, nioPath.toString).mkString("/")

override def pathAsString: String = List(endpoint, container, nioPath.toString()).mkString("/")
override def pathWithoutScheme: String = parseURI(endpoint.value).getHost + "/" + container + "/" + nioPath.toString

override def pathWithoutScheme: String = parseURI(endpoint).getHost + "/" + container + "/" + nioPath.toString()
private def findNioPath(path: String): NioPath = (for {
fileSystem <- fsm.retrieveFilesystem()
nioPath = fileSystem.getPath(path)
// This is purposefully an unprotected get because the NIO API needing an unwrapped path object.
// If an error occurs the api expects a thrown exception
} yield nioPath).get
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've discussed begrudgingly accepting that this method will throw. We should ensure it throws something useful, though. I think we should throw different informative error messages depending on whether we failed to get the filesystem or failed to create the NIO path.

}
Original file line number Diff line number Diff line change
@@ -1,114 +1,35 @@
package cromwell.filesystems.blob
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now is a good time to take another look at this file and see if there's anything we want to move out into other files in this package. There are a lot of top-level classes in here now, the whole group would probably be easier to understand with a little division.


import akka.actor.ActorSystem
import com.azure.core.management.AzureEnvironment
import com.azure.core.management.profile.AzureProfile
import com.azure.identity.DefaultAzureCredentialBuilder
import com.azure.resourcemanager.AzureResourceManager
import com.azure.storage.blob.BlobContainerClientBuilder
import com.azure.storage.blob.sas.{BlobContainerSasPermission, BlobServiceSasSignatureValues}
import com.azure.storage.common.StorageSharedKeyCredential
import com.typesafe.config.Config
import cromwell.core.WorkflowOptions
import cromwell.core.path.PathBuilderFactory
import net.ceedubs.ficus.Ficus._

import java.time.OffsetDateTime
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.jdk.CollectionConverters._
import scala.concurrent.{ExecutionContext, Future}

final case class BlobFileSystemConfig(config: Config)

final case class BlobContainerName(value: String) {override def toString: String = value}
final case class StorageAccountName(value: String) {override def toString: String = value}
final case class EndpointURL(value: String) {override def toString: String = value}
final case class WorkspaceId(value: String) {override def toString: String = value}
final case class WorkspaceManagerURL(value: String) {override def toString: String = value}
final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Config, singletonConfig: BlobFileSystemConfig) extends PathBuilderFactory {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a LOT of important string identifiers floating around. Not necessarily in scope for this PR, but we should consider replacing them with simple case classes so that the compiler can check that we're passing them around correctly. Otherwise it's too easy to swap their ordering in a method call and introduce a very hard-to-find bug.

case class BlobContainerName(value: String)
case class StorageAccountName(value: String)
case class WorkspaceId(value: String)

...and so on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I will give this a try, It will likely help me get the tests right without accidentally swapping strings

val container: String = instanceConfig.as[String]("store")
val endpoint: String = instanceConfig.as[String]("endpoint")
val workspaceId: Option[String] = instanceConfig.as[Option[String]]("workspace-id")
val workspaceManagerURL: Option[String] = singletonConfig.config.as[Option[String]]("workspace-manager-url")
val subscription: Option[String] = instanceConfig.as[Option[String]]("subscription")
val container: BlobContainerName = BlobContainerName(instanceConfig.as[String]("container"))
val endpoint: EndpointURL = EndpointURL(instanceConfig.as[String]("endpoint"))
val workspaceId: Option[WorkspaceId] = instanceConfig.as[Option[String]]("workspace-id").map(WorkspaceId(_))
val expiryBufferMinutes: Long = instanceConfig.as[Option[Long]]("expiry-buffer-minutes").getOrElse(10)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud, once we have this base config checked into reference.conf it would be preferable to set the default there... but we can't do that yet, so here is good.

val workspaceManagerURL: Option[WorkspaceManagerURL] = singletonConfig.config.as[Option[String]]("workspace-manager-url").map(WorkspaceManagerURL(_))

val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator(
container, endpoint, workspaceId, workspaceManagerURL)
container, endpoint, workspaceId, workspaceManagerURL, subscription)
val fsm: BlobFileSystemManager = BlobFileSystemManager(container, endpoint, expiryBufferMinutes, blobTokenGenerator)

override def withOptions(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[BlobPathBuilder] = {
Future {
new BlobPathBuilder(blobTokenGenerator, container, endpoint)
new BlobPathBuilder(container, endpoint)(fsm)
}
}
}

sealed trait BlobTokenGenerator {
def getAccessToken: String
}

object BlobTokenGenerator {
def createBlobTokenGenerator(container: String, endpoint: String): BlobTokenGenerator = {
createBlobTokenGenerator(container, endpoint, None, None)
}
def createBlobTokenGenerator(container: String, endpoint: String, workspaceId: Option[String], workspaceManagerURL: Option[String]): BlobTokenGenerator = {
(container: String, endpoint: String, workspaceId, workspaceManagerURL) match {
case (container, endpoint, None, None) =>
NativeBlobTokenGenerator(container, endpoint)
case (container, endpoint, Some(workspaceId), Some(workspaceManagerURL)) =>
WSMBlobTokenGenerator(container, endpoint, workspaceId, workspaceManagerURL)
case _ =>
throw new Exception("Arguments provided do not match any available BlobTokenGenerator implementation.")
}
}
}

case class WSMBlobTokenGenerator(container: String, endpoint: String, workspaceId: String, workspaceManagerURL: String) extends BlobTokenGenerator {
def getAccessToken: String = {
throw new NotImplementedError
}
}

case class NativeBlobTokenGenerator(container: String, endpoint: String) extends BlobTokenGenerator {
def getAccessToken: String = {
val storageAccountName = BlobPathBuilder.parseStorageAccount(BlobPathBuilder.parseURI(endpoint)) match {
case Some(storageAccountName) => storageAccountName
case _ => throw new Exception("Storage account could not be parsed from endpoint")
}

val profile = new AzureProfile(AzureEnvironment.AZURE)
val azureCredential = new DefaultAzureCredentialBuilder()
.authorityHost(profile.getEnvironment.getActiveDirectoryEndpoint)
.build
val azure = AzureResourceManager.authenticate(azureCredential, profile).withDefaultSubscription

val storageAccounts = azure.storageAccounts()
val storageAccount = storageAccounts
.list()
.asScala
.find(_.name == storageAccountName)

val storageAccountKeys = storageAccount match {
case Some(value) => value.getKeys.asScala.map(_.value())
case _ => throw new Exception("Storage Account not found")
}

val storageAccountKey = storageAccountKeys.headOption match {
case Some(value) => value
case _ => throw new Exception("Storage Account has no keys")
}

val keyCredential = new StorageSharedKeyCredential(
storageAccountName,
storageAccountKey
)
val blobContainerClient = new BlobContainerClientBuilder()
.credential(keyCredential)
.endpoint(endpoint)
.containerName(container)
.buildClient()

val blobContainerSasPermission = new BlobContainerSasPermission()
.setReadPermission(true)
.setCreatePermission(true)
.setListPermission(true)
val blobServiceSasSignatureValues = new BlobServiceSasSignatureValues(
OffsetDateTime.now.plusDays(1),
blobContainerSasPermission
)

blobContainerClient.generateSas(blobServiceSasSignatureValues)
}
}
Loading