Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s3: support for encryption, storage class, custom headers #254

Merged
merged 6 commits into from
Apr 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion s3/src/main/scala/akka/stream/alpakka/s3/acl/CannedAcl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@
*/
package akka.stream.alpakka.s3.acl

import akka.http.scaladsl.model.HttpHeader
import akka.http.scaladsl.model.headers.RawHeader

/**
* Documentation: http://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#canned-acl
*/
sealed abstract class CannedAcl(val value: String)
sealed abstract class CannedAcl(val value: String) {
def header: HttpHeader = RawHeader("x-amz-acl", value)
}

object CannedAcl {
case object AuthenticatedRead extends CannedAcl("authenticated-read")
Expand Down
22 changes: 4 additions & 18 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,27 @@ package akka.stream.alpakka.s3.impl
import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model.Uri.Query
import akka.http.scaladsl.model.headers.{Host, RawHeader}
import akka.http.scaladsl.model.{RequestEntity, _}
import akka.http.scaladsl.model.headers.Host
import akka.http.scaladsl.model.{ContentTypes, RequestEntity, _}
import akka.stream.alpakka.s3.S3Settings
import akka.stream.alpakka.s3.acl.CannedAcl
import akka.stream.scaladsl.Source
import akka.util.ByteString

import scala.collection.immutable
import scala.collection.immutable.Seq
import scala.concurrent.{ExecutionContext, Future}

case class MetaHeaders(headers: Map[String, String])

private[alpakka] object HttpRequests {

def getDownloadRequest(s3Location: S3Location, region: String)(implicit conf: S3Settings): HttpRequest =
s3Request(s3Location, region: String)

def initiateMultipartUploadRequest(s3Location: S3Location,
contentType: ContentType,
cannedAcl: CannedAcl,
region: String,
metaHeaders: MetaHeaders)(implicit conf: S3Settings): HttpRequest = {

def buildHeaders(metaHeaders: MetaHeaders, cannedAcl: CannedAcl): immutable.Seq[HttpHeader] = {
val metaHttpHeaders = metaHeaders.headers.map { header =>
RawHeader(s"x-amz-meta-${header._1}", header._2)
}(collection.breakOut): Seq[HttpHeader]
metaHttpHeaders :+ RawHeader("x-amz-acl", cannedAcl.value)
}

s3Headers: S3Headers)(implicit conf: S3Settings): HttpRequest =
s3Request(s3Location, region, HttpMethods.POST, _.withQuery(Query("uploads")))
.withDefaultHeaders(buildHeaders(metaHeaders, cannedAcl))
.withDefaultHeaders(s3Headers.headers: _*)
.withEntity(HttpEntity.empty(contentType))
}

def uploadPartRequest(upload: MultipartUpload,
partNumber: Int,
Expand Down
91 changes: 91 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Headers.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.s3.impl

import akka.http.scaladsl.model.HttpHeader
import akka.http.scaladsl.model.headers.RawHeader
import akka.stream.alpakka.s3.acl.CannedAcl

import scala.collection.immutable

/**
* Container for headers used in s3 uploads like acl, server side encryption, storage class,
* metadata or custom headers for more advanced use cases.
*/
case class S3Headers(headers: Seq[HttpHeader])

case class MetaHeaders(metaHeaders: Map[String, String]) {
def headers =
metaHeaders.map { header =>
RawHeader(s"x-amz-meta-${header._1}", header._2)
}(collection.breakOut): immutable.Seq[HttpHeader]
}

/**
* Convenience apply methods for creation of canned acl, meta, encryption, storage class or custom headers.
*/
object S3Headers {
val empty = S3Headers(Seq())
def apply(cannedAcl: CannedAcl): S3Headers = S3Headers(Seq(cannedAcl.header))
def apply(metaHeaders: MetaHeaders): S3Headers = S3Headers(metaHeaders.headers)
def apply(cannedAcl: CannedAcl, metaHeaders: MetaHeaders): S3Headers =
S3Headers(metaHeaders.headers :+ cannedAcl.header)
def apply(storageClass: StorageClass): S3Headers = S3Headers(Seq(storageClass.header))
def apply(encryption: ServerSideEncryption): S3Headers = S3Headers(encryption.headers)
def apply(customHeaders: Map[String, String]): S3Headers =
S3Headers(customHeaders.map { header =>
RawHeader(header._1, header._2)
}(collection.breakOut))
def apply(cannedAcl: CannedAcl = CannedAcl.Private,
metaHeaders: MetaHeaders = MetaHeaders(Map()),
storageClass: StorageClass = StorageClass.Standard,
encryption: ServerSideEncryption = ServerSideEncryption.AES256,
customHeaders: Seq[HttpHeader] = Seq()): S3Headers = {
val headers = metaHeaders.headers ++ encryption.headers ++ customHeaders :+ cannedAcl.header :+ storageClass.header
S3Headers(headers)
}
}

/**
* Documentation: http://docs.aws.amazon.com/AmazonS3/latest/dev/storage-class-intro.html
*/
sealed abstract class StorageClass(storageClass: String) {
def header: HttpHeader = RawHeader("x-amz-storage-class", storageClass)
}

object StorageClass {
case object Standard extends StorageClass("STANDARD")
case object InfrequentAccess extends StorageClass("STANDARD_IA")
case object ReducedRedundancy extends StorageClass("REDUCED_REDUNDANCY")
}

/**
* Documentation: http://docs.aws.amazon.com/AmazonS3/latest/dev/serv-side-encryption.html
* @param algorithm AES-256 or aws:kms
* @param kmsKeyId optional amazon resource name in the "arn:aws:kms:my-region:my-account-id:key/my-key-id" format.
* @param context optional base64-encoded UTF-8 string holding JSON with the encryption context key-value pairs
*/
sealed abstract class ServerSideEncryption(algorithm: String,
kmsKeyId: Option[String] = None,
context: Option[String] = None) {
def headers: immutable.Seq[HttpHeader] = algorithm match {
case "AES256" => RawHeader("x-amz-server-side-encryption", "AES256") :: Nil
case "aws:kms" if kmsKeyId.isDefined && context.isEmpty =>
RawHeader("x-amz-server-side-encryption", "aws:kms") ::
RawHeader("x-amz-server-side-encryption-aws-kms-key-id", kmsKeyId.get) ::
Nil
case "aws:kms" if kmsKeyId.isDefined && context.isDefined =>
RawHeader("x-amz-server-side-encryption", "aws:kms") ::
RawHeader("x-amz-server-side-encryption-aws-kms-key-id", kmsKeyId.get) ::
RawHeader("x-amz-server-side-encryption-context", context.get) ::
Nil
case _ => throw new IllegalArgumentException("Unsupported encryption algorithm.")
}
}

object ServerSideEncryption {
case object AES256 extends ServerSideEncryption("AES256")
case class KMS(keyId: String, context: Option[String] = None)
extends ServerSideEncryption("aws:kms", Some(keyId), context)
}
25 changes: 10 additions & 15 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,18 @@ private[alpakka] final class S3Stream(credentials: AWSCredentials,
*/
def multipartUpload(s3Location: S3Location,
contentType: ContentType = ContentTypes.`application/octet-stream`,
metaHeaders: MetaHeaders,
cannedAcl: CannedAcl = CannedAcl.Private,
s3Headers: S3Headers,
chunkSize: Int = MinChunkSize,
chunkingParallelism: Int = 4): Sink[ByteString, Future[CompleteMultipartUploadResult]] =
chunkAndRequest(s3Location, contentType, metaHeaders, cannedAcl, chunkSize)(chunkingParallelism)
chunkAndRequest(s3Location, contentType, s3Headers, chunkSize)(chunkingParallelism)
.toMat(completionSink(s3Location))(Keep.right)

private def initiateMultipartUpload(s3Location: S3Location,
contentType: ContentType,
cannedAcl: CannedAcl,
metaHeaders: MetaHeaders): Future[MultipartUpload] = {
s3Headers: S3Headers): Future[MultipartUpload] = {
import mat.executionContext

val req = initiateMultipartUploadRequest(s3Location, contentType, cannedAcl, region, metaHeaders)
val req = initiateMultipartUploadRequest(s3Location, contentType, region, s3Headers)

val response = for {
signedReq <- Signer.signedRequest(req, signingKey)
Expand Down Expand Up @@ -114,19 +112,17 @@ private[alpakka] final class S3Stream(credentials: AWSCredentials,
*/
private def initiateUpload(s3Location: S3Location,
contentType: ContentType,
cannedAcl: CannedAcl,
metaHeaders: MetaHeaders): Source[(MultipartUpload, Int), NotUsed] =
s3Headers: S3Headers): Source[(MultipartUpload, Int), NotUsed] =
Source
.single(s3Location)
.mapAsync(1)(initiateMultipartUpload(_, contentType, cannedAcl, metaHeaders))
.mapAsync(1)(initiateMultipartUpload(_, contentType, s3Headers))
.mapConcat(r => Stream.continually(r))
.zip(Source.fromIterator(() => Iterator.from(1)))

private def createRequests(
s3Location: S3Location,
contentType: ContentType,
metaHeaders: MetaHeaders,
cannedAcl: CannedAcl = CannedAcl.Private,
s3Headers: S3Headers,
chunkSize: Int = MinChunkSize,
parallelism: Int = 4
): Flow[ByteString, (HttpRequest, (MultipartUpload, Int)), NotUsed] = {
Expand All @@ -139,7 +135,7 @@ private[alpakka] final class S3Stream(credentials: AWSCredentials,
// First step of the multi part upload process is made.
// The response is then used to construct the subsequent individual upload part requests
val requestInfo: Source[(MultipartUpload, Int), NotUsed] =
initiateUpload(s3Location, contentType, cannedAcl, metaHeaders)
initiateUpload(s3Location, contentType, s3Headers)

SplitAfterSize(chunkSize)(Flow.apply[ByteString])
.via(getChunkBuffer(chunkSize)) //creates the chunks
Expand Down Expand Up @@ -167,15 +163,14 @@ private[alpakka] final class S3Stream(credentials: AWSCredentials,
private def chunkAndRequest(
s3Location: S3Location,
contentType: ContentType,
metaHeaders: MetaHeaders,
cannedAcl: CannedAcl = CannedAcl.Private,
s3Headers: S3Headers,
chunkSize: Int = MinChunkSize
)(parallelism: Int = 4): Flow[ByteString, UploadPartResponse, NotUsed] = {

// Multipart upload requests (except for the completion api) are created here.
// The initial upload request gets executed within this function as well.
// The individual upload part requests are created.
val requestFlow = createRequests(s3Location, contentType, metaHeaders, cannedAcl, chunkSize, parallelism)
val requestFlow = createRequests(s3Location, contentType, s3Headers, chunkSize, parallelism)

// The individual upload part requests are processed here
requestFlow.via(Http().superPool[(MultipartUpload, Int)]()).map {
Expand Down
27 changes: 23 additions & 4 deletions s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import akka.http.impl.model.JavaUri
import akka.http.javadsl.model.{ContentType, HttpResponse, Uri}
import akka.http.scaladsl.model.{ContentTypes, ContentType => ScalaContentType}
import akka.stream.Materializer
import akka.stream.alpakka.s3.acl.CannedAcl
import akka.stream.alpakka.s3.auth.AWSCredentials
import akka.stream.alpakka.s3.impl.{CompleteMultipartUploadResult, MetaHeaders, S3Location, S3Stream}
import akka.stream.alpakka.s3.impl._
import akka.stream.javadsl.{Sink, Source}
import akka.util.ByteString

Expand All @@ -37,12 +38,30 @@ final class S3Client(credentials: AWSCredentials, region: String, system: ActorS
def multipartUpload(bucket: String,
key: String,
contentType: ContentType,
metaHeaders: MetaHeaders): Sink[ByteString, CompletionStage[MultipartUploadResult]] =
s3Headers: S3Headers): Sink[ByteString, CompletionStage[MultipartUploadResult]] =
impl
.multipartUpload(S3Location(bucket, key), contentType.asInstanceOf[ScalaContentType], metaHeaders)
.multipartUpload(S3Location(bucket, key), contentType.asInstanceOf[ScalaContentType], s3Headers)
.mapMaterializedValue(_.map(MultipartUploadResult.create)(system.dispatcher).toJava)
.asJava

def multipartUpload(bucket: String,
key: String,
contentType: ContentType,
cannedAcl: CannedAcl,
metaHeaders: MetaHeaders): Sink[ByteString, CompletionStage[MultipartUploadResult]] =
multipartUpload(bucket, key, contentType, S3Headers(cannedAcl, metaHeaders))

def multipartUpload(bucket: String,
key: String,
contentType: ContentType,
cannedAcl: CannedAcl): Sink[ByteString, CompletionStage[MultipartUploadResult]] =
multipartUpload(bucket, key, contentType, cannedAcl, MetaHeaders(Map()))

def multipartUpload(bucket: String,
key: String,
contentType: ContentType): Sink[ByteString, CompletionStage[MultipartUploadResult]] =
multipartUpload(bucket, key, contentType, CannedAcl.Private, MetaHeaders(Map()))

def multipartUpload(bucket: String, key: String): Sink[ByteString, CompletionStage[MultipartUploadResult]] =
multipartUpload(bucket, key, ContentTypes.`application/octet-stream`, MetaHeaders(Map()))
multipartUpload(bucket, key, ContentTypes.`application/octet-stream`, CannedAcl.Private, MetaHeaders(Map()))
}
29 changes: 27 additions & 2 deletions s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import akka.stream.Materializer
import akka.stream.alpakka.s3.S3Settings
import akka.stream.alpakka.s3.acl.CannedAcl
import akka.stream.alpakka.s3.auth.AWSCredentials
import akka.stream.alpakka.s3.impl.{CompleteMultipartUploadResult, MetaHeaders, S3Location, S3Stream}
import akka.stream.alpakka.s3.impl._
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString

Expand Down Expand Up @@ -50,6 +50,31 @@ final class S3Client(credentials: AWSCredentials, region: String)(implicit syste
chunkSize: Int = MinChunkSize,
chunkingParallelism: Int = 4): Sink[ByteString, Future[MultipartUploadResult]] =
impl
.multipartUpload(S3Location(bucket, key), contentType, metaHeaders, cannedAcl, chunkSize, chunkingParallelism)
.multipartUpload(
S3Location(bucket, key),
contentType,
S3Headers(cannedAcl, metaHeaders),
chunkSize,
chunkingParallelism
)
.mapMaterializedValue(_.map(MultipartUploadResult.apply)(system.dispatcher))

def multipartUploadWithHeaders(
bucket: String,
key: String,
contentType: ContentType = ContentTypes.`application/octet-stream`,
chunkSize: Int = MinChunkSize,
chunkingParallelism: Int = 4,
s3Headers: Option[S3Headers] = None
): Sink[ByteString, Future[MultipartUploadResult]] =
impl
.multipartUpload(
S3Location(bucket, key),
contentType,
s3Headers.getOrElse(S3Headers.empty),
chunkSize,
chunkingParallelism
)
.mapMaterializedValue(_.map(MultipartUploadResult.apply)(system.dispatcher))

}
Loading