Skip to content

Commit

Permalink
[wip] Attempts to fix: bluelabsio#2
Browse files Browse the repository at this point in the history
  • Loading branch information
hderms committed Jul 13, 2016
1 parent 4d47015 commit 7f84b4b
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model.Uri.Query
import akka.http.scaladsl.model.headers.Host
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.model.{HttpMethods, HttpRequest, RequestEntity, Uri}
import akka.util.ByteString

import scala.concurrent.{ExecutionContext, Future}

object HttpRequests {
def initiateMultipartUploadRequest(s3Location: S3Location): HttpRequest = {
HttpRequest(method = HttpMethods.POST)
def initiateMultipartUploadRequest(s3Location: S3Location, metadata: Metadata): HttpRequest = {
val contentType = RawHeader("Content-Type", metadata.contentType)

HttpRequest(method = HttpMethods.POST, headers = List(contentType))
.withHeaders(Host(requestHost(s3Location)))
.withUri(requestUri(s3Location).withQuery(Query("uploads")))
}
Expand Down
3 changes: 3 additions & 0 deletions s3-stream/src/main/scala/com/bluelabs/s3stream/Metadata.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.bluelabs.s3stream

case class Metadata(contentType: String = "binary/octet-stream")
20 changes: 10 additions & 10 deletions s3-stream/src/main/scala/com/bluelabs/s3stream/S3Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic
* @param chunkingParallelism
* @return
*/
def multipartUpload(s3Location: S3Location, chunkSize: Int = MIN_CHUNK_SIZE, chunkingParallelism: Int = 4): Sink[ByteString, Future[CompleteMultipartUploadResult]] = {
def multipartUpload(s3Location: S3Location, metaData: Metadata = Metadata(), chunkSize: Int = MIN_CHUNK_SIZE, chunkingParallelism: Int = 4): Sink[ByteString, Future[CompleteMultipartUploadResult]] = {
implicit val ec = mat.executionContext
chunkAndRequest(s3Location, chunkSize)(chunkingParallelism)
chunkAndRequest(s3Location, metaData, chunkSize)(chunkingParallelism)
.log("s3-upload-response").withAttributes(Attributes.logLevels(onElement = Logging.DebugLevel, onFailure = Logging.WarningLevel, onFinish = Logging.InfoLevel))
.toMat(completionSink(s3Location))(Keep.right)
}

def initiateMultipartUpload(s3Location: S3Location): Future[MultipartUpload] = {
def initiateMultipartUpload(s3Location: S3Location, metadata: Metadata): Future[MultipartUpload] = {
implicit val ec = mat.executionContext
val req = HttpRequests.initiateMultipartUploadRequest(s3Location)
val req = HttpRequests.initiateMultipartUploadRequest(s3Location, metadata)
val response = for {
signedReq <- Signer.signedRequest(req, signingKey)
response <- Http().singleRequest(signedReq)
Expand Down Expand Up @@ -98,8 +98,8 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic
* @param s3Location The s3 location to which to upload to
* @return
*/
def initiateUpload(s3Location: S3Location): Source[(MultipartUpload, Int), NotUsed] = {
Source.single(s3Location).mapAsync(1)(initiateMultipartUpload(_))
def initiateUpload(s3Location: S3Location, metadata: Metadata): Source[(MultipartUpload, Int), NotUsed] = {
Source.single(s3Location).mapAsync(1)(initiateMultipartUpload(_, metadata))
.mapConcat{case r => Stream.continually(r)}
.zip(StreamUtils.counter(1))
}
Expand All @@ -112,17 +112,17 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic
* @param parallelism
* @return
*/
def createRequests(s3Location: S3Location, chunkSize: Int = MIN_CHUNK_SIZE, parallelism: Int = 4): Flow[ByteString, (HttpRequest, (MultipartUpload, Int)), NotUsed] = {
def createRequests(s3Location: S3Location, metadata: Metadata, chunkSize: Int = MIN_CHUNK_SIZE, parallelism: Int = 4): Flow[ByteString, (HttpRequest, (MultipartUpload, Int)), NotUsed] = {
assert(chunkSize >= MIN_CHUNK_SIZE, "Chunk size must be at least 5242880B. See http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html")
val requestInfo: Source[(MultipartUpload, Int), NotUsed] = initiateUpload(s3Location)
val requestInfo: Source[(MultipartUpload, Int), NotUsed] = initiateUpload(s3Location, metadata)
Flow[ByteString]
.via(new Chunker(chunkSize))
.zipWith(requestInfo){case (payload, (uploadInfo, chunkIndex)) => (HttpRequests.uploadPartRequest(uploadInfo, chunkIndex, payload), (uploadInfo, chunkIndex))}
.mapAsync(parallelism){case (req, info) => Signer.signedRequest(req, signingKey).zip(Future.successful(info)) }
}

def chunkAndRequest(s3Location: S3Location, chunkSize: Int = MIN_CHUNK_SIZE)(parallelism: Int = 4): Flow[ByteString, UploadPartResponse, NotUsed] = {
createRequests(s3Location, chunkSize, parallelism)
def chunkAndRequest(s3Location: S3Location, metaData: Metadata, chunkSize: Int = MIN_CHUNK_SIZE)(parallelism: Int = 4): Flow[ByteString, UploadPartResponse, NotUsed] = {
createRequests(s3Location, metaData, chunkSize, parallelism)
.via(Http().superPool[(MultipartUpload, Int)]())
.map {
case (Success(r), (upload, index)) => {
Expand Down

0 comments on commit 7f84b4b

Please sign in to comment.