From 7f84b4ba6a94d6ca8a85085852d64cd3df19c48b Mon Sep 17 00:00:00 2001 From: Dermot Haughey Date: Wed, 13 Jul 2016 16:26:21 -0500 Subject: [PATCH] [wip] Attempts to fix: https://github.com/bluelabsio/s3-stream/issues/2 --- .../com/bluelabs/s3stream/HttpRequests.scala | 7 +++++-- .../com/bluelabs/s3stream/Metadata.scala | 3 +++ .../com/bluelabs/s3stream/S3Stream.scala | 20 +++++++++---------- 3 files changed, 18 insertions(+), 12 deletions(-) create mode 100644 s3-stream/src/main/scala/com/bluelabs/s3stream/Metadata.scala diff --git a/s3-stream/src/main/scala/com/bluelabs/s3stream/HttpRequests.scala b/s3-stream/src/main/scala/com/bluelabs/s3stream/HttpRequests.scala index 941218c..daa3cbd 100644 --- a/s3-stream/src/main/scala/com/bluelabs/s3stream/HttpRequests.scala +++ b/s3-stream/src/main/scala/com/bluelabs/s3stream/HttpRequests.scala @@ -4,14 +4,17 @@ import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._ import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model.Uri.Query import akka.http.scaladsl.model.headers.Host +import akka.http.scaladsl.model.headers.RawHeader import akka.http.scaladsl.model.{HttpMethods, HttpRequest, RequestEntity, Uri} import akka.util.ByteString import scala.concurrent.{ExecutionContext, Future} object HttpRequests { - def initiateMultipartUploadRequest(s3Location: S3Location): HttpRequest = { - HttpRequest(method = HttpMethods.POST) + def initiateMultipartUploadRequest(s3Location: S3Location, metadata: Metadata): HttpRequest = { + val contentType = RawHeader("Content-Type", metadata.contentType) + + HttpRequest(method = HttpMethods.POST, headers = List(contentType)) .withHeaders(Host(requestHost(s3Location))) .withUri(requestUri(s3Location).withQuery(Query("uploads"))) } diff --git a/s3-stream/src/main/scala/com/bluelabs/s3stream/Metadata.scala b/s3-stream/src/main/scala/com/bluelabs/s3stream/Metadata.scala new file mode 100644 index 0000000..3d3e8bd --- /dev/null +++ b/s3-stream/src/main/scala/com/bluelabs/s3stream/Metadata.scala @@ -0,0 +1,3 @@ +package com.bluelabs.s3stream + +case class Metadata(contentType: String = "binary/octet-stream") diff --git a/s3-stream/src/main/scala/com/bluelabs/s3stream/S3Stream.scala b/s3-stream/src/main/scala/com/bluelabs/s3stream/S3Stream.scala index fdb846c..f25957d 100644 --- a/s3-stream/src/main/scala/com/bluelabs/s3stream/S3Stream.scala +++ b/s3-stream/src/main/scala/com/bluelabs/s3stream/S3Stream.scala @@ -47,16 +47,16 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic * @param chunkingParallelism * @return */ - def multipartUpload(s3Location: S3Location, chunkSize: Int = MIN_CHUNK_SIZE, chunkingParallelism: Int = 4): Sink[ByteString, Future[CompleteMultipartUploadResult]] = { + def multipartUpload(s3Location: S3Location, metaData: Metadata = Metadata(), chunkSize: Int = MIN_CHUNK_SIZE, chunkingParallelism: Int = 4): Sink[ByteString, Future[CompleteMultipartUploadResult]] = { implicit val ec = mat.executionContext - chunkAndRequest(s3Location, chunkSize)(chunkingParallelism) + chunkAndRequest(s3Location, metaData, chunkSize)(chunkingParallelism) .log("s3-upload-response").withAttributes(Attributes.logLevels(onElement = Logging.DebugLevel, onFailure = Logging.WarningLevel, onFinish = Logging.InfoLevel)) .toMat(completionSink(s3Location))(Keep.right) } - def initiateMultipartUpload(s3Location: S3Location): Future[MultipartUpload] = { + def initiateMultipartUpload(s3Location: S3Location, metadata: Metadata): Future[MultipartUpload] = { implicit val ec = mat.executionContext - val req = HttpRequests.initiateMultipartUploadRequest(s3Location) + val req = HttpRequests.initiateMultipartUploadRequest(s3Location, metadata) val response = for { signedReq <- Signer.signedRequest(req, signingKey) response <- Http().singleRequest(signedReq) @@ -98,8 +98,8 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic * @param s3Location The s3 location to which to upload to * @return */ - def initiateUpload(s3Location: S3Location): Source[(MultipartUpload, Int), NotUsed] = { - Source.single(s3Location).mapAsync(1)(initiateMultipartUpload(_)) + def initiateUpload(s3Location: S3Location, metadata: Metadata): Source[(MultipartUpload, Int), NotUsed] = { + Source.single(s3Location).mapAsync(1)(initiateMultipartUpload(_, metadata)) .mapConcat{case r => Stream.continually(r)} .zip(StreamUtils.counter(1)) } @@ -112,17 +112,17 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic * @param parallelism * @return */ - def createRequests(s3Location: S3Location, chunkSize: Int = MIN_CHUNK_SIZE, parallelism: Int = 4): Flow[ByteString, (HttpRequest, (MultipartUpload, Int)), NotUsed] = { + def createRequests(s3Location: S3Location, metadata: Metadata, chunkSize: Int = MIN_CHUNK_SIZE, parallelism: Int = 4): Flow[ByteString, (HttpRequest, (MultipartUpload, Int)), NotUsed] = { assert(chunkSize >= MIN_CHUNK_SIZE, "Chunk size must be at least 5242880B. See http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html") - val requestInfo: Source[(MultipartUpload, Int), NotUsed] = initiateUpload(s3Location) + val requestInfo: Source[(MultipartUpload, Int), NotUsed] = initiateUpload(s3Location, metadata) Flow[ByteString] .via(new Chunker(chunkSize)) .zipWith(requestInfo){case (payload, (uploadInfo, chunkIndex)) => (HttpRequests.uploadPartRequest(uploadInfo, chunkIndex, payload), (uploadInfo, chunkIndex))} .mapAsync(parallelism){case (req, info) => Signer.signedRequest(req, signingKey).zip(Future.successful(info)) } } - def chunkAndRequest(s3Location: S3Location, chunkSize: Int = MIN_CHUNK_SIZE)(parallelism: Int = 4): Flow[ByteString, UploadPartResponse, NotUsed] = { - createRequests(s3Location, chunkSize, parallelism) + def chunkAndRequest(s3Location: S3Location, metaData: Metadata, chunkSize: Int = MIN_CHUNK_SIZE)(parallelism: Int = 4): Flow[ByteString, UploadPartResponse, NotUsed] = { + createRequests(s3Location, metaData, chunkSize, parallelism) .via(Http().superPool[(MultipartUpload, Int)]()) .map { case (Success(r), (upload, index)) => {