From e3e36bd48bd4cb8aaaf313353a5df36b98d883fc Mon Sep 17 00:00:00 2001 From: Michael Kober Date: Thu, 6 Apr 2017 14:26:09 +0200 Subject: [PATCH 1/5] implemented support for encryption, storage class and other custom headers for upload to s3 --- .../stream/alpakka/s3/acl/CannedAcl.scala | 7 +- .../stream/alpakka/s3/impl/HttpRequests.scala | 22 +---- .../stream/alpakka/s3/impl/S3Headers.scala | 90 +++++++++++++++++++ .../stream/alpakka/s3/impl/S3Stream.scala | 25 +++--- .../stream/alpakka/s3/javadsl/S3Client.scala | 26 +++++- .../stream/alpakka/s3/scaladsl/S3Client.scala | 22 ++++- .../alpakka/s3/impl/HttpRequestsSpec.scala | 61 +++++++++++-- .../alpakka/s3/impl/S3HeadersSpec.scala | 35 ++++++++ 8 files changed, 242 insertions(+), 46 deletions(-) create mode 100644 s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Headers.scala create mode 100644 s3/src/test/scala/akka/stream/alpakka/s3/impl/S3HeadersSpec.scala diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/acl/CannedAcl.scala b/s3/src/main/scala/akka/stream/alpakka/s3/acl/CannedAcl.scala index 1bef79193f..3ed41a76fe 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/acl/CannedAcl.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/acl/CannedAcl.scala @@ -3,10 +3,15 @@ */ package akka.stream.alpakka.s3.acl +import akka.http.scaladsl.model.HttpHeader +import akka.http.scaladsl.model.headers.RawHeader + /** * Documentation: http://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#canned-acl */ -sealed abstract class CannedAcl(val value: String) +sealed abstract class CannedAcl(val value: String) { + def header: HttpHeader = RawHeader("x-amz-acl", value) +} object CannedAcl { case object AuthenticatedRead extends CannedAcl("authenticated-read") diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala index c83c9ab0ec..b6ab37af0e 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala @@ -6,19 +6,15 @@ package akka.stream.alpakka.s3.impl import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._ import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model.Uri.Query -import akka.http.scaladsl.model.headers.{Host, RawHeader} -import akka.http.scaladsl.model.{RequestEntity, _} +import akka.http.scaladsl.model.headers.Host +import akka.http.scaladsl.model.{ContentTypes, RequestEntity, _} import akka.stream.alpakka.s3.S3Settings -import akka.stream.alpakka.s3.acl.CannedAcl import akka.stream.scaladsl.Source import akka.util.ByteString -import scala.collection.immutable import scala.collection.immutable.Seq import scala.concurrent.{ExecutionContext, Future} -case class MetaHeaders(headers: Map[String, String]) - private[alpakka] object HttpRequests { def getDownloadRequest(s3Location: S3Location, region: String)(implicit conf: S3Settings): HttpRequest = @@ -26,21 +22,11 @@ private[alpakka] object HttpRequests { def initiateMultipartUploadRequest(s3Location: S3Location, contentType: ContentType, - cannedAcl: CannedAcl, region: String, - metaHeaders: MetaHeaders)(implicit conf: S3Settings): HttpRequest = { - - def buildHeaders(metaHeaders: MetaHeaders, cannedAcl: CannedAcl): immutable.Seq[HttpHeader] = { - val metaHttpHeaders = metaHeaders.headers.map { header => - RawHeader(s"x-amz-meta-${header._1}", header._2) - }(collection.breakOut): Seq[HttpHeader] - metaHttpHeaders :+ RawHeader("x-amz-acl", cannedAcl.value) - } - + s3Headers: S3Headers)(implicit conf: S3Settings): HttpRequest = s3Request(s3Location, region, HttpMethods.POST, _.withQuery(Query("uploads"))) - .withDefaultHeaders(buildHeaders(metaHeaders, cannedAcl)) + .withDefaultHeaders(s3Headers.headers) .withEntity(HttpEntity.empty(contentType)) - } def uploadPartRequest(upload: MultipartUpload, partNumber: Int, diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Headers.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Headers.scala new file mode 100644 index 0000000000..82e2eca1d3 --- /dev/null +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Headers.scala @@ -0,0 +1,90 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.s3.impl + +import akka.http.scaladsl.model.HttpHeader +import akka.http.scaladsl.model.headers.RawHeader +import akka.stream.alpakka.s3.acl.CannedAcl + +import scala.collection.immutable + +/** + * Container for headers used in s3 uploads like acl, server side encryption, storage class, + * metadata or custom headers for more advanced use cases. + */ +case class S3Headers(cannedAcl: CannedAcl = CannedAcl.Private, + metaHeaders: MetaHeaders = MetaHeaders(Map()), + customHeaders: Option[AmzHeaders] = None) { + + def headers: immutable.Seq[HttpHeader] = + metaHeaders.headers ++ customHeaders.map(_.headers).getOrElse(Nil) :+ cannedAcl.header +} + +case class MetaHeaders(metaHeaders: Map[String, String]) { + def headers = + metaHeaders.map { header => + RawHeader(s"x-amz-meta-${header._1}", header._2) + }(collection.breakOut): immutable.Seq[HttpHeader] +} + +case class AmzHeaders(headers: Seq[HttpHeader]) + +/** + * Convenience apply methods for creation of encryption, storage class or custom headers. + */ +object AmzHeaders { + def apply(storageClass: StorageClass): AmzHeaders = new AmzHeaders(Seq(storageClass.header)) + def apply(encryption: ServerSideEncryption): AmzHeaders = new AmzHeaders(encryption.headers) + def apply(customHeaders: Map[String, String]): AmzHeaders = + new AmzHeaders(customHeaders.map { header => + RawHeader(header._1, header._2) + }(collection.breakOut)) + def apply(encryption: ServerSideEncryption, storageClass: StorageClass): AmzHeaders = + new AmzHeaders(encryption.headers :+ storageClass.header) + def apply(encryption: ServerSideEncryption, customHeaders: Seq[HttpHeader], storageClass: StorageClass): AmzHeaders = + new AmzHeaders(encryption.headers ++ customHeaders :+ storageClass.header) +} + +/** + * Documentation: http://docs.aws.amazon.com/AmazonS3/latest/dev/storage-class-intro.html + */ +sealed abstract class StorageClass(storageClass: String) { + def header: HttpHeader = RawHeader("x-amz-storage-class", storageClass) +} + +object StorageClass { + case object Standard extends StorageClass("STANDARD") + case object InfrequentAccess extends StorageClass("STANDARD_IA") + case object ReducedRedundancy extends StorageClass("REDUCED_REDUNDANCY") +} + +/** + * Documentation: http://docs.aws.amazon.com/AmazonS3/latest/dev/serv-side-encryption.html + * @param algorithm AES-256 or aws:kms + * @param kmsKeyId optional amazon resource name in the "arn:aws:kms:my-region:my-account-id:key/my-key-id" format. + * @param context optional base64-encoded UTF-8 string holding JSON with the encryption context key-value pairs + */ +sealed abstract class ServerSideEncryption(algorithm: String, + kmsKeyId: Option[String] = None, + context: Option[String] = None) { + def headers: immutable.Seq[HttpHeader] = algorithm match { + case "AES256" => RawHeader("x-amz-server-side-encryption", "AES256") :: Nil + case "aws:kms" if kmsKeyId.isDefined && context.isEmpty => + RawHeader("x-amz-server-side-encryption", "aws:kms") :: + RawHeader("x-amz-server-side-encryption-aws-kms-key-id", kmsKeyId.get) :: + Nil + case "aws:kms" if kmsKeyId.isDefined && context.isDefined => + RawHeader("x-amz-server-side-encryption", "aws:kms") :: + RawHeader("x-amz-server-side-encryption-aws-kms-key-id", kmsKeyId.get) :: + RawHeader("x-amz-server-side-encryption-context", context.get) :: + Nil + case _ => throw new IllegalArgumentException("Unsupported encryption algorithm.") + } +} + +object ServerSideEncryption { + case object AES256 extends ServerSideEncryption("AES256") + case class KMS(keyId: String, context: Option[String] = None) + extends ServerSideEncryption("aws:kms", Some(keyId), context) +} diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala index 88c039b497..2811540829 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala @@ -72,20 +72,18 @@ private[alpakka] final class S3Stream(credentials: AWSCredentials, */ def multipartUpload(s3Location: S3Location, contentType: ContentType = ContentTypes.`application/octet-stream`, - metaHeaders: MetaHeaders, - cannedAcl: CannedAcl = CannedAcl.Private, + s3Headers: S3Headers, chunkSize: Int = MinChunkSize, chunkingParallelism: Int = 4): Sink[ByteString, Future[CompleteMultipartUploadResult]] = - chunkAndRequest(s3Location, contentType, metaHeaders, cannedAcl, chunkSize)(chunkingParallelism) + chunkAndRequest(s3Location, contentType, s3Headers, chunkSize)(chunkingParallelism) .toMat(completionSink(s3Location))(Keep.right) private def initiateMultipartUpload(s3Location: S3Location, contentType: ContentType, - cannedAcl: CannedAcl, - metaHeaders: MetaHeaders): Future[MultipartUpload] = { + s3Headers: S3Headers): Future[MultipartUpload] = { import mat.executionContext - val req = initiateMultipartUploadRequest(s3Location, contentType, cannedAcl, region, metaHeaders) + val req = initiateMultipartUploadRequest(s3Location, contentType, region, s3Headers) val response = for { signedReq <- Signer.signedRequest(req, signingKey) @@ -114,19 +112,17 @@ private[alpakka] final class S3Stream(credentials: AWSCredentials, */ private def initiateUpload(s3Location: S3Location, contentType: ContentType, - cannedAcl: CannedAcl, - metaHeaders: MetaHeaders): Source[(MultipartUpload, Int), NotUsed] = + s3Headers: S3Headers): Source[(MultipartUpload, Int), NotUsed] = Source .single(s3Location) - .mapAsync(1)(initiateMultipartUpload(_, contentType, cannedAcl, metaHeaders)) + .mapAsync(1)(initiateMultipartUpload(_, contentType, s3Headers)) .mapConcat(r => Stream.continually(r)) .zip(Source.fromIterator(() => Iterator.from(1))) private def createRequests( s3Location: S3Location, contentType: ContentType, - metaHeaders: MetaHeaders, - cannedAcl: CannedAcl = CannedAcl.Private, + s3Headers: S3Headers, chunkSize: Int = MinChunkSize, parallelism: Int = 4 ): Flow[ByteString, (HttpRequest, (MultipartUpload, Int)), NotUsed] = { @@ -139,7 +135,7 @@ private[alpakka] final class S3Stream(credentials: AWSCredentials, // First step of the multi part upload process is made. // The response is then used to construct the subsequent individual upload part requests val requestInfo: Source[(MultipartUpload, Int), NotUsed] = - initiateUpload(s3Location, contentType, cannedAcl, metaHeaders) + initiateUpload(s3Location, contentType, s3Headers) SplitAfterSize(chunkSize)(Flow.apply[ByteString]) .via(getChunkBuffer(chunkSize)) //creates the chunks @@ -167,15 +163,14 @@ private[alpakka] final class S3Stream(credentials: AWSCredentials, private def chunkAndRequest( s3Location: S3Location, contentType: ContentType, - metaHeaders: MetaHeaders, - cannedAcl: CannedAcl = CannedAcl.Private, + s3Headers: S3Headers, chunkSize: Int = MinChunkSize )(parallelism: Int = 4): Flow[ByteString, UploadPartResponse, NotUsed] = { // Multipart upload requests (except for the completion api) are created here. // The initial upload request gets executed within this function as well. // The individual upload part requests are created. - val requestFlow = createRequests(s3Location, contentType, metaHeaders, cannedAcl, chunkSize, parallelism) + val requestFlow = createRequests(s3Location, contentType, s3Headers, chunkSize, parallelism) // The individual upload part requests are processed here requestFlow.via(Http().superPool[(MultipartUpload, Int)]()).map { diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala b/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala index 8254be58f0..1345ab6448 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala @@ -11,8 +11,9 @@ import akka.http.impl.model.JavaUri import akka.http.javadsl.model.{ContentType, HttpResponse, Uri} import akka.http.scaladsl.model.{ContentTypes, ContentType => ScalaContentType} import akka.stream.Materializer +import akka.stream.alpakka.s3.acl.CannedAcl import akka.stream.alpakka.s3.auth.AWSCredentials -import akka.stream.alpakka.s3.impl.{CompleteMultipartUploadResult, MetaHeaders, S3Location, S3Stream} +import akka.stream.alpakka.s3.impl._ import akka.stream.javadsl.{Sink, Source} import akka.util.ByteString @@ -37,12 +38,29 @@ final class S3Client(credentials: AWSCredentials, region: String, system: ActorS def multipartUpload(bucket: String, key: String, contentType: ContentType, - metaHeaders: MetaHeaders): Sink[ByteString, CompletionStage[MultipartUploadResult]] = + cannedAcl: CannedAcl, + metaHeaders: MetaHeaders, + amzHeaders: AmzHeaders): Sink[ByteString, CompletionStage[MultipartUploadResult]] = impl - .multipartUpload(S3Location(bucket, key), contentType.asInstanceOf[ScalaContentType], metaHeaders) + .multipartUpload(S3Location(bucket, key), + contentType.asInstanceOf[ScalaContentType], + S3Headers(cannedAcl, metaHeaders, Some(amzHeaders))) .mapMaterializedValue(_.map(MultipartUploadResult.create)(system.dispatcher).toJava) .asJava + def multipartUpload(bucket: String, + key: String, + contentType: ContentType, + cannedAcl: CannedAcl, + metaHeaders: MetaHeaders): Sink[ByteString, CompletionStage[MultipartUploadResult]] = + multipartUpload(bucket, key, ContentTypes.`application/octet-stream`, cannedAcl, metaHeaders, AmzHeaders(Nil)) + + def multipartUpload(bucket: String, + key: String, + contentType: ContentType, + metaHeaders: MetaHeaders): Sink[ByteString, CompletionStage[MultipartUploadResult]] = + multipartUpload(bucket, key, ContentTypes.`application/octet-stream`, CannedAcl.Private, MetaHeaders(Map())) + def multipartUpload(bucket: String, key: String): Sink[ByteString, CompletionStage[MultipartUploadResult]] = - multipartUpload(bucket, key, ContentTypes.`application/octet-stream`, MetaHeaders(Map())) + multipartUpload(bucket, key, ContentTypes.`application/octet-stream`, CannedAcl.Private, MetaHeaders(Map())) } diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3Client.scala b/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3Client.scala index c3f33fb9de..13845c3385 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3Client.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3Client.scala @@ -10,7 +10,7 @@ import akka.stream.Materializer import akka.stream.alpakka.s3.S3Settings import akka.stream.alpakka.s3.acl.CannedAcl import akka.stream.alpakka.s3.auth.AWSCredentials -import akka.stream.alpakka.s3.impl.{CompleteMultipartUploadResult, MetaHeaders, S3Location, S3Stream} +import akka.stream.alpakka.s3.impl._ import akka.stream.scaladsl.{Sink, Source} import akka.util.ByteString @@ -48,8 +48,24 @@ final class S3Client(credentials: AWSCredentials, region: String)(implicit syste metaHeaders: MetaHeaders = MetaHeaders(Map()), cannedAcl: CannedAcl = CannedAcl.Private, chunkSize: Int = MinChunkSize, - chunkingParallelism: Int = 4): Sink[ByteString, Future[MultipartUploadResult]] = + chunkingParallelism: Int = 4, + amzHeaders: Option[AmzHeaders] = None): Sink[ByteString, Future[MultipartUploadResult]] = { + + val s3Headers = S3Headers( + cannedAcl, + metaHeaders, + amzHeaders + ) + impl - .multipartUpload(S3Location(bucket, key), contentType, metaHeaders, cannedAcl, chunkSize, chunkingParallelism) + .multipartUpload( + S3Location(bucket, key), + contentType, + s3Headers, + chunkSize, + chunkingParallelism + ) .mapMaterializedValue(_.map(MultipartUploadResult.apply)(system.dispatcher)) + + } } diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala index 2fe2960896..e5ef402a45 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala @@ -61,7 +61,10 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { implicit val settings = S3Settings(ActorSystem()) val req = - HttpRequests.initiateMultipartUploadRequest(location, contentType, acl, "us-east-1", MetaHeaders(metaHeaders)) + HttpRequests.initiateMultipartUploadRequest(location, + contentType, + "us-east-1", + S3Headers(acl, MetaHeaders(metaHeaders))) req.entity shouldEqual HttpEntity.empty(contentType) req.headers should contain(RawHeader("x-amz-acl", acl.value)) @@ -77,7 +80,10 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { implicit val settings = S3Settings(ActorSystem()) val req = - HttpRequests.initiateMultipartUploadRequest(location, contentType, acl, "us-east-2", MetaHeaders(metaHeaders)) + HttpRequests.initiateMultipartUploadRequest(location, + contentType, + "us-east-2", + S3Headers(acl, MetaHeaders(metaHeaders))) req.entity shouldEqual HttpEntity.empty(contentType) req.headers should contain(RawHeader("x-amz-acl", acl.value)) @@ -93,7 +99,10 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { implicit val settings = S3Settings(ConfigFactory.parseString(pathStyleAcessConfig)) val req = - HttpRequests.initiateMultipartUploadRequest(location, contentType, acl, "us-east-1", MetaHeaders(metaHeaders)) + HttpRequests.initiateMultipartUploadRequest(location, + contentType, + "us-east-1", + S3Headers(acl, MetaHeaders(metaHeaders))) req.uri.authority.host.toString shouldEqual "s3.amazonaws.com" req.uri.path.toString shouldEqual "/bucket/image-1024@2x" @@ -112,7 +121,10 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { implicit val settings = S3Settings(ConfigFactory.parseString(pathStyleAcessConfig)) val req = - HttpRequests.initiateMultipartUploadRequest(location, contentType, acl, "us-west-2", MetaHeaders(metaHeaders)) + HttpRequests.initiateMultipartUploadRequest(location, + contentType, + "us-west-2", + S3Headers(acl, MetaHeaders(metaHeaders))) req.uri.authority.host.toString shouldEqual "s3-us-west-2.amazonaws.com" req.uri.path.toString shouldEqual "/bucket/image-1024@2x" @@ -140,7 +152,10 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { implicit val settings = S3Settings(ConfigFactory.parseString(proxyConfig)) val req = - HttpRequests.initiateMultipartUploadRequest(location, contentType, acl, "region", MetaHeaders(metaHeaders)) + HttpRequests.initiateMultipartUploadRequest(location, + contentType, + "region", + S3Headers(acl, MetaHeaders(metaHeaders))) req.uri.scheme shouldEqual "http" } @@ -163,4 +178,40 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { reqFuture.futureValue.uri.scheme shouldEqual "http" } + + it should "initiate multipart upload with AES-256 server side encryption" in { + implicit val settings = S3Settings(ActorSystem()) + val s3Headers = S3Headers(acl, MetaHeaders(Map()), Some(AmzHeaders(ServerSideEncryption.AES256))) + val req = HttpRequests.initiateMultipartUploadRequest(location, contentType, "us-east-2", s3Headers) + + req.headers should contain(RawHeader("x-amz-server-side-encryption", "AES256")) + } + + it should "initiate multipart upload with aws:kms server side encryption" in { + implicit val settings = S3Settings(ActorSystem()) + val testArn = "arn:aws:kms:my-region:my-account-id:key/my-key-id" + val s3Headers = S3Headers(acl, MetaHeaders(Map()), Some(AmzHeaders(ServerSideEncryption.KMS(testArn)))) + val req = HttpRequests.initiateMultipartUploadRequest(location, contentType, "us-east-2", s3Headers) + + req.headers should contain(RawHeader("x-amz-server-side-encryption", "aws:kms")) + req.headers should contain(RawHeader("x-amz-server-side-encryption-aws-kms-key-id", testArn)) + } + + it should "initiate multipart upload with custom s3 storage class" in { + implicit val settings = S3Settings(ActorSystem()) + val s3Headers = S3Headers(acl, MetaHeaders(Map()), Some(AmzHeaders(StorageClass.ReducedRedundancy))) + val req = HttpRequests.initiateMultipartUploadRequest(location, contentType, "us-east-2", s3Headers) + + req.headers should contain(RawHeader("x-amz-storage-class", "REDUCED_REDUNDANCY")) + } + + it should "initiate multipart upload with custom s3 headers" in { + implicit val settings = S3Settings(ActorSystem()) + + val myCustomHeaders = Map("Cache-Control" -> "no-cache") + val s3Headers = S3Headers(acl, MetaHeaders(Map()), Some(AmzHeaders(myCustomHeaders))) + val req = HttpRequests.initiateMultipartUploadRequest(location, contentType, "us-east-2", s3Headers) + + req.headers should contain(RawHeader("Cache-Control", "no-cache")) + } } diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3HeadersSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3HeadersSpec.scala new file mode 100644 index 0000000000..1b8f7e3a26 --- /dev/null +++ b/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3HeadersSpec.scala @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.s3.impl + +import akka.http.scaladsl.model.headers.RawHeader +import org.scalatest.{FlatSpec, Matchers} + +class S3HeadersSpec extends FlatSpec with Matchers { + + "ServerSideEncryption" should "create well formed headers for AES-256 encryption" in { + ServerSideEncryption.AES256.headers should contain(RawHeader("x-amz-server-side-encryption", "AES256")) + } + + it should "create well formed headers for aws:kms encryption" in { + val kms = + ServerSideEncryption.KMS("arn:aws:kms:my-region:my-account-id:key/my-key-id", Some("base-64-encoded-context")) + kms.headers should contain(RawHeader("x-amz-server-side-encryption", "aws:kms")) + kms.headers should contain( + RawHeader("x-amz-server-side-encryption-aws-kms-key-id", "arn:aws:kms:my-region:my-account-id:key/my-key-id") + ) + kms.headers should contain(RawHeader("x-amz-server-side-encryption-context", "base-64-encoded-context")) + } + + "StorageClass" should "create well formed headers for 'infrequent access'" in { + StorageClass.InfrequentAccess.header shouldEqual RawHeader("x-amz-storage-class", "STANDARD_IA") + } + + "AmzHeaders" should "have aggregate headers to one sequence" in { + val amz = AmzHeaders(ServerSideEncryption.AES256, + Seq(RawHeader("Cache-Control", "no-cache")), + StorageClass.ReducedRedundancy) + amz.headers.size shouldEqual 3 + } +} From 9163edc7e7214078b225c47793163173bbcabee4 Mon Sep 17 00:00:00 2001 From: Michael Kober Date: Tue, 18 Apr 2017 09:41:31 +0200 Subject: [PATCH 2/5] Fixed copy-paste error in javadsl --- .../akka/stream/alpakka/s3/javadsl/S3Client.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala b/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala index 1345ab6448..e5d3b80577 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala @@ -53,13 +53,18 @@ final class S3Client(credentials: AWSCredentials, region: String, system: ActorS contentType: ContentType, cannedAcl: CannedAcl, metaHeaders: MetaHeaders): Sink[ByteString, CompletionStage[MultipartUploadResult]] = - multipartUpload(bucket, key, ContentTypes.`application/octet-stream`, cannedAcl, metaHeaders, AmzHeaders(Nil)) + multipartUpload(bucket, key, contentType, cannedAcl, metaHeaders, AmzHeaders(Nil)) def multipartUpload(bucket: String, key: String, contentType: ContentType, - metaHeaders: MetaHeaders): Sink[ByteString, CompletionStage[MultipartUploadResult]] = - multipartUpload(bucket, key, ContentTypes.`application/octet-stream`, CannedAcl.Private, MetaHeaders(Map())) + cannedAcl: CannedAcl): Sink[ByteString, CompletionStage[MultipartUploadResult]] = + multipartUpload(bucket, key, contentType, cannedAcl, MetaHeaders(Map())) + + def multipartUpload(bucket: String, + key: String, + contentType: ContentType): Sink[ByteString, CompletionStage[MultipartUploadResult]] = + multipartUpload(bucket, key, contentType, CannedAcl.Private, MetaHeaders(Map())) def multipartUpload(bucket: String, key: String): Sink[ByteString, CompletionStage[MultipartUploadResult]] = multipartUpload(bucket, key, ContentTypes.`application/octet-stream`, CannedAcl.Private, MetaHeaders(Map())) From fec1915ecd27bd22e07e47ce76d89bf1475e38f5 Mon Sep 17 00:00:00 2001 From: Michael Kober Date: Tue, 18 Apr 2017 09:32:14 +0200 Subject: [PATCH 3/5] made headers in multipart upload API optional --- .../stream/alpakka/s3/impl/S3Headers.scala | 8 +++++--- .../stream/alpakka/s3/javadsl/S3Client.scala | 2 +- .../stream/alpakka/s3/scaladsl/S3Client.scala | 14 ++------------ .../alpakka/s3/impl/HttpRequestsSpec.scala | 18 +++++++++--------- .../stream/alpakka/s3/scaladsl/S3NoMock.scala | 6 +++--- 5 files changed, 20 insertions(+), 28 deletions(-) diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Headers.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Headers.scala index 82e2eca1d3..30e870598f 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Headers.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Headers.scala @@ -13,12 +13,14 @@ import scala.collection.immutable * Container for headers used in s3 uploads like acl, server side encryption, storage class, * metadata or custom headers for more advanced use cases. */ -case class S3Headers(cannedAcl: CannedAcl = CannedAcl.Private, - metaHeaders: MetaHeaders = MetaHeaders(Map()), +case class S3Headers(cannedAcl: Option[CannedAcl] = None, + metaHeaders: Option[MetaHeaders] = None, customHeaders: Option[AmzHeaders] = None) { def headers: immutable.Seq[HttpHeader] = - metaHeaders.headers ++ customHeaders.map(_.headers).getOrElse(Nil) :+ cannedAcl.header + metaHeaders.map(_.headers).getOrElse(Nil) ++ + customHeaders.map(_.headers).getOrElse(Nil) ++ + cannedAcl.map((acl) => Seq(acl.header)).getOrElse(Nil) } case class MetaHeaders(metaHeaders: Map[String, String]) { diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala b/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala index e5d3b80577..8dda48dfd9 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala @@ -44,7 +44,7 @@ final class S3Client(credentials: AWSCredentials, region: String, system: ActorS impl .multipartUpload(S3Location(bucket, key), contentType.asInstanceOf[ScalaContentType], - S3Headers(cannedAcl, metaHeaders, Some(amzHeaders))) + S3Headers(Some(cannedAcl), Some(metaHeaders), Some(amzHeaders))) .mapMaterializedValue(_.map(MultipartUploadResult.create)(system.dispatcher).toJava) .asJava diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3Client.scala b/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3Client.scala index 13845c3385..abb0ebe25b 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3Client.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3Client.scala @@ -45,27 +45,17 @@ final class S3Client(credentials: AWSCredentials, region: String)(implicit syste def multipartUpload(bucket: String, key: String, contentType: ContentType = ContentTypes.`application/octet-stream`, - metaHeaders: MetaHeaders = MetaHeaders(Map()), - cannedAcl: CannedAcl = CannedAcl.Private, chunkSize: Int = MinChunkSize, chunkingParallelism: Int = 4, - amzHeaders: Option[AmzHeaders] = None): Sink[ByteString, Future[MultipartUploadResult]] = { - - val s3Headers = S3Headers( - cannedAcl, - metaHeaders, - amzHeaders - ) - + s3Headers: Option[S3Headers] = None): Sink[ByteString, Future[MultipartUploadResult]] = impl .multipartUpload( S3Location(bucket, key), contentType, - s3Headers, + s3Headers.getOrElse(S3Headers(None, None, None)), chunkSize, chunkingParallelism ) .mapMaterializedValue(_.map(MultipartUploadResult.apply)(system.dispatcher)) - } } diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala index e5ef402a45..19ab7e9033 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala @@ -64,7 +64,7 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { HttpRequests.initiateMultipartUploadRequest(location, contentType, "us-east-1", - S3Headers(acl, MetaHeaders(metaHeaders))) + S3Headers(Some(acl), Some(MetaHeaders(metaHeaders)))) req.entity shouldEqual HttpEntity.empty(contentType) req.headers should contain(RawHeader("x-amz-acl", acl.value)) @@ -83,7 +83,7 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { HttpRequests.initiateMultipartUploadRequest(location, contentType, "us-east-2", - S3Headers(acl, MetaHeaders(metaHeaders))) + S3Headers(Some(acl), Some(MetaHeaders(metaHeaders)))) req.entity shouldEqual HttpEntity.empty(contentType) req.headers should contain(RawHeader("x-amz-acl", acl.value)) @@ -102,7 +102,7 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { HttpRequests.initiateMultipartUploadRequest(location, contentType, "us-east-1", - S3Headers(acl, MetaHeaders(metaHeaders))) + S3Headers(Some(acl), Some(MetaHeaders(metaHeaders)))) req.uri.authority.host.toString shouldEqual "s3.amazonaws.com" req.uri.path.toString shouldEqual "/bucket/image-1024@2x" @@ -124,7 +124,7 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { HttpRequests.initiateMultipartUploadRequest(location, contentType, "us-west-2", - S3Headers(acl, MetaHeaders(metaHeaders))) + S3Headers(Some(acl), Some(MetaHeaders(metaHeaders)))) req.uri.authority.host.toString shouldEqual "s3-us-west-2.amazonaws.com" req.uri.path.toString shouldEqual "/bucket/image-1024@2x" @@ -155,7 +155,7 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { HttpRequests.initiateMultipartUploadRequest(location, contentType, "region", - S3Headers(acl, MetaHeaders(metaHeaders))) + S3Headers(Some(acl), Some(MetaHeaders(metaHeaders)))) req.uri.scheme shouldEqual "http" } @@ -181,7 +181,7 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { it should "initiate multipart upload with AES-256 server side encryption" in { implicit val settings = S3Settings(ActorSystem()) - val s3Headers = S3Headers(acl, MetaHeaders(Map()), Some(AmzHeaders(ServerSideEncryption.AES256))) + val s3Headers = S3Headers(None, None, Some(AmzHeaders(ServerSideEncryption.AES256))) val req = HttpRequests.initiateMultipartUploadRequest(location, contentType, "us-east-2", s3Headers) req.headers should contain(RawHeader("x-amz-server-side-encryption", "AES256")) @@ -190,7 +190,7 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { it should "initiate multipart upload with aws:kms server side encryption" in { implicit val settings = S3Settings(ActorSystem()) val testArn = "arn:aws:kms:my-region:my-account-id:key/my-key-id" - val s3Headers = S3Headers(acl, MetaHeaders(Map()), Some(AmzHeaders(ServerSideEncryption.KMS(testArn)))) + val s3Headers = S3Headers(None, None, Some(AmzHeaders(ServerSideEncryption.KMS(testArn)))) val req = HttpRequests.initiateMultipartUploadRequest(location, contentType, "us-east-2", s3Headers) req.headers should contain(RawHeader("x-amz-server-side-encryption", "aws:kms")) @@ -199,7 +199,7 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { it should "initiate multipart upload with custom s3 storage class" in { implicit val settings = S3Settings(ActorSystem()) - val s3Headers = S3Headers(acl, MetaHeaders(Map()), Some(AmzHeaders(StorageClass.ReducedRedundancy))) + val s3Headers = S3Headers(None, None, Some(AmzHeaders(StorageClass.ReducedRedundancy))) val req = HttpRequests.initiateMultipartUploadRequest(location, contentType, "us-east-2", s3Headers) req.headers should contain(RawHeader("x-amz-storage-class", "REDUCED_REDUNDANCY")) @@ -209,7 +209,7 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { implicit val settings = S3Settings(ActorSystem()) val myCustomHeaders = Map("Cache-Control" -> "no-cache") - val s3Headers = S3Headers(acl, MetaHeaders(Map()), Some(AmzHeaders(myCustomHeaders))) + val s3Headers = S3Headers(None, None, Some(AmzHeaders(myCustomHeaders))) val req = HttpRequests.initiateMultipartUploadRequest(location, contentType, "us-east-2", s3Headers) req.headers should contain(RawHeader("Cache-Control", "no-cache")) diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3NoMock.scala b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3NoMock.scala index d027bfaae2..22254018c1 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3NoMock.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3NoMock.scala @@ -5,7 +5,7 @@ package akka.stream.alpakka.s3.scaladsl import akka.actor.ActorSystem import akka.stream.ActorMaterializer -import akka.stream.alpakka.s3.impl.MetaHeaders +import akka.stream.alpakka.s3.impl.{MetaHeaders, S3Headers} import akka.stream.scaladsl.{Sink, Source} import akka.util.ByteString import org.scalatest.concurrent.ScalaFutures @@ -29,8 +29,8 @@ class S3NoMock extends FlatSpecLike with BeforeAndAfterAll with Matchers with Sc val source: Source[ByteString, Any] = Source(ByteString(objectValue) :: Nil) //val source: Source[ByteString, Any] = FileIO.fromPath(Paths.get("/tmp/IMG_0470.JPG")) - - val result = source.runWith(S3Client().multipartUpload(bucket, objectKey, metaHeaders = MetaHeaders(metaHeaders))) + val headers = Some(S3Headers(None, Some(MetaHeaders(metaHeaders)), None)) + val result = source.runWith(S3Client().multipartUpload(bucket, objectKey, s3Headers = headers)) val multipartUploadResult = Await.ready(result, 90.seconds).futureValue multipartUploadResult.bucket shouldBe bucket From 6d928f651f836313d8dd46fee12df7006e574495 Mon Sep 17 00:00:00 2001 From: Michael Kober Date: Wed, 19 Apr 2017 15:20:26 +0200 Subject: [PATCH 4/5] Refactored, added alternative multiplartUploadWithHeaders method --- .../stream/alpakka/s3/impl/HttpRequests.scala | 2 +- .../stream/alpakka/s3/impl/S3Headers.scala | 41 +++++++++---------- .../stream/alpakka/s3/javadsl/S3Client.scala | 10 ++--- .../stream/alpakka/s3/scaladsl/S3Client.scala | 25 +++++++++-- .../alpakka/s3/javadsl/S3ClientTest.java | 18 ++++++++ .../alpakka/s3/impl/HttpRequestsSpec.scala | 20 ++++----- .../alpakka/s3/impl/S3HeadersSpec.scala | 15 ++++--- .../stream/alpakka/s3/scaladsl/S3NoMock.scala | 6 +-- .../alpakka/s3/scaladsl/S3SinkSpec.scala | 15 +++++++ 9 files changed, 101 insertions(+), 51 deletions(-) diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala index b6ab37af0e..9b5c59c6be 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala @@ -25,7 +25,7 @@ private[alpakka] object HttpRequests { region: String, s3Headers: S3Headers)(implicit conf: S3Settings): HttpRequest = s3Request(s3Location, region, HttpMethods.POST, _.withQuery(Query("uploads"))) - .withDefaultHeaders(s3Headers.headers) + .withDefaultHeaders(s3Headers.headers: _*) .withEntity(HttpEntity.empty(contentType)) def uploadPartRequest(upload: MultipartUpload, diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Headers.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Headers.scala index 30e870598f..80a953a2cd 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Headers.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Headers.scala @@ -13,15 +13,7 @@ import scala.collection.immutable * Container for headers used in s3 uploads like acl, server side encryption, storage class, * metadata or custom headers for more advanced use cases. */ -case class S3Headers(cannedAcl: Option[CannedAcl] = None, - metaHeaders: Option[MetaHeaders] = None, - customHeaders: Option[AmzHeaders] = None) { - - def headers: immutable.Seq[HttpHeader] = - metaHeaders.map(_.headers).getOrElse(Nil) ++ - customHeaders.map(_.headers).getOrElse(Nil) ++ - cannedAcl.map((acl) => Seq(acl.header)).getOrElse(Nil) -} +case class S3Headers(headers: Seq[HttpHeader]) case class MetaHeaders(metaHeaders: Map[String, String]) { def headers = @@ -30,22 +22,29 @@ case class MetaHeaders(metaHeaders: Map[String, String]) { }(collection.breakOut): immutable.Seq[HttpHeader] } -case class AmzHeaders(headers: Seq[HttpHeader]) - /** - * Convenience apply methods for creation of encryption, storage class or custom headers. + * Convenience apply methods for creation of canned acl, meta, encryption, storage class or custom headers. */ -object AmzHeaders { - def apply(storageClass: StorageClass): AmzHeaders = new AmzHeaders(Seq(storageClass.header)) - def apply(encryption: ServerSideEncryption): AmzHeaders = new AmzHeaders(encryption.headers) - def apply(customHeaders: Map[String, String]): AmzHeaders = - new AmzHeaders(customHeaders.map { header => +object S3Headers { + val empty = S3Headers(Seq()) + def apply(cannedAcl: CannedAcl): S3Headers = S3Headers(Seq(cannedAcl.header)) + def apply(metaHeaders: MetaHeaders): S3Headers = S3Headers(metaHeaders.headers) + def apply(cannedAcl: CannedAcl, metaHeaders: MetaHeaders): S3Headers = + S3Headers(metaHeaders.headers :+ cannedAcl.header) + def apply(storageClass: StorageClass): S3Headers = S3Headers(Seq(storageClass.header)) + def apply(encryption: ServerSideEncryption): S3Headers = S3Headers(encryption.headers) + def apply(customHeaders: Map[String, String]): S3Headers = + S3Headers(customHeaders.map { header => RawHeader(header._1, header._2) }(collection.breakOut)) - def apply(encryption: ServerSideEncryption, storageClass: StorageClass): AmzHeaders = - new AmzHeaders(encryption.headers :+ storageClass.header) - def apply(encryption: ServerSideEncryption, customHeaders: Seq[HttpHeader], storageClass: StorageClass): AmzHeaders = - new AmzHeaders(encryption.headers ++ customHeaders :+ storageClass.header) + def apply(cannedAcl: CannedAcl = CannedAcl.Private, + metaHeaders: MetaHeaders = MetaHeaders(Map()), + storageClass: StorageClass = StorageClass.Standard, + encryption: ServerSideEncryption = ServerSideEncryption.AES256, + customHeaders: Seq[HttpHeader] = Seq()): S3Headers = { + val headers = metaHeaders.headers ++ encryption.headers ++ customHeaders :+ cannedAcl.header :+ storageClass.header + S3Headers(headers) + } } /** diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala b/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala index 8dda48dfd9..bcb6119ba3 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala @@ -38,13 +38,9 @@ final class S3Client(credentials: AWSCredentials, region: String, system: ActorS def multipartUpload(bucket: String, key: String, contentType: ContentType, - cannedAcl: CannedAcl, - metaHeaders: MetaHeaders, - amzHeaders: AmzHeaders): Sink[ByteString, CompletionStage[MultipartUploadResult]] = + s3Headers: S3Headers): Sink[ByteString, CompletionStage[MultipartUploadResult]] = impl - .multipartUpload(S3Location(bucket, key), - contentType.asInstanceOf[ScalaContentType], - S3Headers(Some(cannedAcl), Some(metaHeaders), Some(amzHeaders))) + .multipartUpload(S3Location(bucket, key), contentType.asInstanceOf[ScalaContentType], s3Headers) .mapMaterializedValue(_.map(MultipartUploadResult.create)(system.dispatcher).toJava) .asJava @@ -53,7 +49,7 @@ final class S3Client(credentials: AWSCredentials, region: String, system: ActorS contentType: ContentType, cannedAcl: CannedAcl, metaHeaders: MetaHeaders): Sink[ByteString, CompletionStage[MultipartUploadResult]] = - multipartUpload(bucket, key, contentType, cannedAcl, metaHeaders, AmzHeaders(Nil)) + multipartUpload(bucket, key, contentType, S3Headers(cannedAcl, metaHeaders)) def multipartUpload(bucket: String, key: String, diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3Client.scala b/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3Client.scala index abb0ebe25b..329cdc3e59 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3Client.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3Client.scala @@ -45,14 +45,33 @@ final class S3Client(credentials: AWSCredentials, region: String)(implicit syste def multipartUpload(bucket: String, key: String, contentType: ContentType = ContentTypes.`application/octet-stream`, + metaHeaders: MetaHeaders = MetaHeaders(Map()), + cannedAcl: CannedAcl = CannedAcl.Private, chunkSize: Int = MinChunkSize, - chunkingParallelism: Int = 4, - s3Headers: Option[S3Headers] = None): Sink[ByteString, Future[MultipartUploadResult]] = + chunkingParallelism: Int = 4): Sink[ByteString, Future[MultipartUploadResult]] = impl .multipartUpload( S3Location(bucket, key), contentType, - s3Headers.getOrElse(S3Headers(None, None, None)), + S3Headers(cannedAcl, metaHeaders), + chunkSize, + chunkingParallelism + ) + .mapMaterializedValue(_.map(MultipartUploadResult.apply)(system.dispatcher)) + + def multipartUploadWithHeaders( + bucket: String, + key: String, + contentType: ContentType = ContentTypes.`application/octet-stream`, + chunkSize: Int = MinChunkSize, + chunkingParallelism: Int = 4, + s3Headers: Option[S3Headers] = None + ): Sink[ByteString, Future[MultipartUploadResult]] = + impl + .multipartUpload( + S3Location(bucket, key), + contentType, + s3Headers.getOrElse(S3Headers.empty), chunkSize, chunkingParallelism ) diff --git a/s3/src/test/java/akka/stream/alpakka/s3/javadsl/S3ClientTest.java b/s3/src/test/java/akka/stream/alpakka/s3/javadsl/S3ClientTest.java index 1c81efe998..45739bde42 100644 --- a/s3/src/test/java/akka/stream/alpakka/s3/javadsl/S3ClientTest.java +++ b/s3/src/test/java/akka/stream/alpakka/s3/javadsl/S3ClientTest.java @@ -5,6 +5,7 @@ import akka.NotUsed; import akka.actor.ActorSystem; + import akka.http.javadsl.model.Uri; import akka.stream.ActorMaterializer; import akka.stream.Materializer; @@ -47,6 +48,23 @@ public void multipartUpload() throws Exception { assertEquals(new MultipartUploadResult(Uri.create(url()), bucket(), bucketKey(), etag()), result); } + @Test + public void multipartUploadWithHeaders() throws Exception { + + mockUpload(); + + //#upload + final Sink> sink = client.multipartUpload(bucket(), bucketKey(), akka.http.javadsl.model.headers.ContentType.); + //#upload + + final CompletionStage resultCompletionStage = + Source.single(ByteString.fromString(body())).runWith(sink, materializer); + + MultipartUploadResult result = resultCompletionStage.toCompletableFuture().get(5, TimeUnit.SECONDS); + + assertEquals(new MultipartUploadResult(Uri.create(url()), bucket(), bucketKey(), etag()), result); + } + @Test public void download() throws Exception { diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala index 19ab7e9033..ee1d012116 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala @@ -64,7 +64,7 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { HttpRequests.initiateMultipartUploadRequest(location, contentType, "us-east-1", - S3Headers(Some(acl), Some(MetaHeaders(metaHeaders)))) + S3Headers(acl, MetaHeaders(metaHeaders))) req.entity shouldEqual HttpEntity.empty(contentType) req.headers should contain(RawHeader("x-amz-acl", acl.value)) @@ -83,7 +83,7 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { HttpRequests.initiateMultipartUploadRequest(location, contentType, "us-east-2", - S3Headers(Some(acl), Some(MetaHeaders(metaHeaders)))) + S3Headers(acl, MetaHeaders(metaHeaders))) req.entity shouldEqual HttpEntity.empty(contentType) req.headers should contain(RawHeader("x-amz-acl", acl.value)) @@ -102,7 +102,7 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { HttpRequests.initiateMultipartUploadRequest(location, contentType, "us-east-1", - S3Headers(Some(acl), Some(MetaHeaders(metaHeaders)))) + S3Headers(acl, MetaHeaders(metaHeaders))) req.uri.authority.host.toString shouldEqual "s3.amazonaws.com" req.uri.path.toString shouldEqual "/bucket/image-1024@2x" @@ -124,7 +124,7 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { HttpRequests.initiateMultipartUploadRequest(location, contentType, "us-west-2", - S3Headers(Some(acl), Some(MetaHeaders(metaHeaders)))) + S3Headers(acl, MetaHeaders(metaHeaders))) req.uri.authority.host.toString shouldEqual "s3-us-west-2.amazonaws.com" req.uri.path.toString shouldEqual "/bucket/image-1024@2x" @@ -155,7 +155,7 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { HttpRequests.initiateMultipartUploadRequest(location, contentType, "region", - S3Headers(Some(acl), Some(MetaHeaders(metaHeaders)))) + S3Headers(acl, MetaHeaders(metaHeaders))) req.uri.scheme shouldEqual "http" } @@ -181,7 +181,7 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { it should "initiate multipart upload with AES-256 server side encryption" in { implicit val settings = S3Settings(ActorSystem()) - val s3Headers = S3Headers(None, None, Some(AmzHeaders(ServerSideEncryption.AES256))) + val s3Headers = S3Headers(ServerSideEncryption.AES256) val req = HttpRequests.initiateMultipartUploadRequest(location, contentType, "us-east-2", s3Headers) req.headers should contain(RawHeader("x-amz-server-side-encryption", "AES256")) @@ -190,7 +190,7 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { it should "initiate multipart upload with aws:kms server side encryption" in { implicit val settings = S3Settings(ActorSystem()) val testArn = "arn:aws:kms:my-region:my-account-id:key/my-key-id" - val s3Headers = S3Headers(None, None, Some(AmzHeaders(ServerSideEncryption.KMS(testArn)))) + val s3Headers = S3Headers(ServerSideEncryption.KMS(testArn)) val req = HttpRequests.initiateMultipartUploadRequest(location, contentType, "us-east-2", s3Headers) req.headers should contain(RawHeader("x-amz-server-side-encryption", "aws:kms")) @@ -199,7 +199,7 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { it should "initiate multipart upload with custom s3 storage class" in { implicit val settings = S3Settings(ActorSystem()) - val s3Headers = S3Headers(None, None, Some(AmzHeaders(StorageClass.ReducedRedundancy))) + val s3Headers = S3Headers(StorageClass.ReducedRedundancy) val req = HttpRequests.initiateMultipartUploadRequest(location, contentType, "us-east-2", s3Headers) req.headers should contain(RawHeader("x-amz-storage-class", "REDUCED_REDUNDANCY")) @@ -207,9 +207,7 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { it should "initiate multipart upload with custom s3 headers" in { implicit val settings = S3Settings(ActorSystem()) - - val myCustomHeaders = Map("Cache-Control" -> "no-cache") - val s3Headers = S3Headers(None, None, Some(AmzHeaders(myCustomHeaders))) + val s3Headers = S3Headers(Map("Cache-Control" -> "no-cache")) val req = HttpRequests.initiateMultipartUploadRequest(location, contentType, "us-east-2", s3Headers) req.headers should contain(RawHeader("Cache-Control", "no-cache")) diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3HeadersSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3HeadersSpec.scala index 1b8f7e3a26..119ec9c9c9 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3HeadersSpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3HeadersSpec.scala @@ -4,6 +4,7 @@ package akka.stream.alpakka.s3.impl import akka.http.scaladsl.model.headers.RawHeader +import akka.stream.alpakka.s3.acl.CannedAcl import org.scalatest.{FlatSpec, Matchers} class S3HeadersSpec extends FlatSpec with Matchers { @@ -26,10 +27,14 @@ class S3HeadersSpec extends FlatSpec with Matchers { StorageClass.InfrequentAccess.header shouldEqual RawHeader("x-amz-storage-class", "STANDARD_IA") } - "AmzHeaders" should "have aggregate headers to one sequence" in { - val amz = AmzHeaders(ServerSideEncryption.AES256, - Seq(RawHeader("Cache-Control", "no-cache")), - StorageClass.ReducedRedundancy) - amz.headers.size shouldEqual 3 + "S3Headers" should "aggregate headers to one sequence" in { + val s3Headers = S3Headers( + cannedAcl = CannedAcl.PublicRead, + metaHeaders = MetaHeaders(Map("custom-meta" -> "custom")), + encryption = ServerSideEncryption.AES256, + customHeaders = Seq(RawHeader("Cache-Control", "no-cache")), + storageClass = StorageClass.ReducedRedundancy + ) + s3Headers.headers.size shouldEqual 5 } } diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3NoMock.scala b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3NoMock.scala index 22254018c1..d027bfaae2 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3NoMock.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3NoMock.scala @@ -5,7 +5,7 @@ package akka.stream.alpakka.s3.scaladsl import akka.actor.ActorSystem import akka.stream.ActorMaterializer -import akka.stream.alpakka.s3.impl.{MetaHeaders, S3Headers} +import akka.stream.alpakka.s3.impl.MetaHeaders import akka.stream.scaladsl.{Sink, Source} import akka.util.ByteString import org.scalatest.concurrent.ScalaFutures @@ -29,8 +29,8 @@ class S3NoMock extends FlatSpecLike with BeforeAndAfterAll with Matchers with Sc val source: Source[ByteString, Any] = Source(ByteString(objectValue) :: Nil) //val source: Source[ByteString, Any] = FileIO.fromPath(Paths.get("/tmp/IMG_0470.JPG")) - val headers = Some(S3Headers(None, Some(MetaHeaders(metaHeaders)), None)) - val result = source.runWith(S3Client().multipartUpload(bucket, objectKey, s3Headers = headers)) + + val result = source.runWith(S3Client().multipartUpload(bucket, objectKey, metaHeaders = MetaHeaders(metaHeaders))) val multipartUploadResult = Await.ready(result, 90.seconds).futureValue multipartUploadResult.bucket shouldBe bucket diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3SinkSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3SinkSpec.scala index d216e16d47..f22eba2e93 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3SinkSpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3SinkSpec.scala @@ -3,6 +3,7 @@ */ package akka.stream.alpakka.s3.scaladsl +import akka.stream.alpakka.s3.impl.{S3Headers, ServerSideEncryption} import akka.stream.scaladsl.{Sink, Source} import akka.util.ByteString @@ -23,6 +24,20 @@ class S3SinkSpec extends S3WireMockBase with S3ClientIntegrationSpec { result.futureValue shouldBe MultipartUploadResult(url, bucket, bucketKey, etag) } + it should "upload a stream of bytes to S3 with custom headers" in { + + mockUpload() + + //#upload + val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = + s3Client.multipartUploadWithHeaders(bucket, bucketKey, s3Headers = Some(S3Headers(ServerSideEncryption.AES256))) + //#upload + + val result: Future[MultipartUploadResult] = Source.single(ByteString(body)).runWith(s3Sink) + + result.futureValue shouldBe MultipartUploadResult(url, bucket, bucketKey, etag) + } + it should "fail if request returns 404" in { mock404s() From fa41436cdf3bcb4ee66529aee9a9798157b0638b Mon Sep 17 00:00:00 2001 From: Michael Kober Date: Wed, 19 Apr 2017 15:48:21 +0200 Subject: [PATCH 5/5] removed unfinshed test --- .../alpakka/s3/javadsl/S3ClientTest.java | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/s3/src/test/java/akka/stream/alpakka/s3/javadsl/S3ClientTest.java b/s3/src/test/java/akka/stream/alpakka/s3/javadsl/S3ClientTest.java index 45739bde42..1c81efe998 100644 --- a/s3/src/test/java/akka/stream/alpakka/s3/javadsl/S3ClientTest.java +++ b/s3/src/test/java/akka/stream/alpakka/s3/javadsl/S3ClientTest.java @@ -5,7 +5,6 @@ import akka.NotUsed; import akka.actor.ActorSystem; - import akka.http.javadsl.model.Uri; import akka.stream.ActorMaterializer; import akka.stream.Materializer; @@ -48,23 +47,6 @@ public void multipartUpload() throws Exception { assertEquals(new MultipartUploadResult(Uri.create(url()), bucket(), bucketKey(), etag()), result); } - @Test - public void multipartUploadWithHeaders() throws Exception { - - mockUpload(); - - //#upload - final Sink> sink = client.multipartUpload(bucket(), bucketKey(), akka.http.javadsl.model.headers.ContentType.); - //#upload - - final CompletionStage resultCompletionStage = - Source.single(ByteString.fromString(body())).runWith(sink, materializer); - - MultipartUploadResult result = resultCompletionStage.toCompletableFuture().get(5, TimeUnit.SECONDS); - - assertEquals(new MultipartUploadResult(Uri.create(url()), bucket(), bucketKey(), etag()), result); - } - @Test public void download() throws Exception {