Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Attempts to fix: https://github.com/bluelabsio/s3-stream/issues/2 #7

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model.Uri.Query
import akka.http.scaladsl.model.headers.Host
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.model.{HttpMethods, HttpRequest, RequestEntity, Uri}
import akka.util.ByteString

import scala.concurrent.{ExecutionContext, Future}

object HttpRequests {
def initiateMultipartUploadRequest(s3Location: S3Location): HttpRequest = {
HttpRequest(method = HttpMethods.POST)
def initiateMultipartUploadRequest(s3Location: S3Location, metadata: Metadata): HttpRequest = {
val contentType = RawHeader("Content-Type", metadata.contentType)

HttpRequest(method = HttpMethods.POST, headers = List(contentType))
.withHeaders(Host(requestHost(s3Location)))
.withUri(requestUri(s3Location).withQuery(Query("uploads")))
}
Expand Down
3 changes: 3 additions & 0 deletions s3-stream/src/main/scala/com/bluelabs/s3stream/Metadata.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.bluelabs.s3stream

case class Metadata(contentType: String = "binary/octet-stream")
20 changes: 10 additions & 10 deletions s3-stream/src/main/scala/com/bluelabs/s3stream/S3Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic
* @param chunkingParallelism
* @return
*/
def multipartUpload(s3Location: S3Location, chunkSize: Int = MIN_CHUNK_SIZE, chunkingParallelism: Int = 4): Sink[ByteString, Future[CompleteMultipartUploadResult]] = {
def multipartUpload(s3Location: S3Location, metaData: Metadata = Metadata(), chunkSize: Int = MIN_CHUNK_SIZE, chunkingParallelism: Int = 4): Sink[ByteString, Future[CompleteMultipartUploadResult]] = {
implicit val ec = mat.executionContext
chunkAndRequest(s3Location, chunkSize)(chunkingParallelism)
chunkAndRequest(s3Location, metaData, chunkSize)(chunkingParallelism)
.log("s3-upload-response").withAttributes(Attributes.logLevels(onElement = Logging.DebugLevel, onFailure = Logging.WarningLevel, onFinish = Logging.InfoLevel))
.toMat(completionSink(s3Location))(Keep.right)
}

def initiateMultipartUpload(s3Location: S3Location): Future[MultipartUpload] = {
def initiateMultipartUpload(s3Location: S3Location, metadata: Metadata): Future[MultipartUpload] = {
implicit val ec = mat.executionContext
val req = HttpRequests.initiateMultipartUploadRequest(s3Location)
val req = HttpRequests.initiateMultipartUploadRequest(s3Location, metadata)
val response = for {
signedReq <- Signer.signedRequest(req, signingKey)
response <- Http().singleRequest(signedReq)
Expand Down Expand Up @@ -98,8 +98,8 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic
* @param s3Location The s3 location to which to upload to
* @return
*/
def initiateUpload(s3Location: S3Location): Source[(MultipartUpload, Int), NotUsed] = {
Source.single(s3Location).mapAsync(1)(initiateMultipartUpload(_))
def initiateUpload(s3Location: S3Location, metadata: Metadata): Source[(MultipartUpload, Int), NotUsed] = {
Source.single(s3Location).mapAsync(1)(initiateMultipartUpload(_, metadata))
.mapConcat{case r => Stream.continually(r)}
.zip(StreamUtils.counter(1))
}
Expand All @@ -112,17 +112,17 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic
* @param parallelism
* @return
*/
def createRequests(s3Location: S3Location, chunkSize: Int = MIN_CHUNK_SIZE, parallelism: Int = 4): Flow[ByteString, (HttpRequest, (MultipartUpload, Int)), NotUsed] = {
def createRequests(s3Location: S3Location, metadata: Metadata, chunkSize: Int = MIN_CHUNK_SIZE, parallelism: Int = 4): Flow[ByteString, (HttpRequest, (MultipartUpload, Int)), NotUsed] = {
assert(chunkSize >= MIN_CHUNK_SIZE, "Chunk size must be at least 5242880B. See http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html")
val requestInfo: Source[(MultipartUpload, Int), NotUsed] = initiateUpload(s3Location)
val requestInfo: Source[(MultipartUpload, Int), NotUsed] = initiateUpload(s3Location, metadata)
Flow[ByteString]
.via(new Chunker(chunkSize))
.zipWith(requestInfo){case (payload, (uploadInfo, chunkIndex)) => (HttpRequests.uploadPartRequest(uploadInfo, chunkIndex, payload), (uploadInfo, chunkIndex))}
.mapAsync(parallelism){case (req, info) => Signer.signedRequest(req, signingKey).zip(Future.successful(info)) }
}

def chunkAndRequest(s3Location: S3Location, chunkSize: Int = MIN_CHUNK_SIZE)(parallelism: Int = 4): Flow[ByteString, UploadPartResponse, NotUsed] = {
createRequests(s3Location, chunkSize, parallelism)
def chunkAndRequest(s3Location: S3Location, metaData: Metadata, chunkSize: Int = MIN_CHUNK_SIZE)(parallelism: Int = 4): Flow[ByteString, UploadPartResponse, NotUsed] = {
createRequests(s3Location, metaData, chunkSize, parallelism)
.via(Http().superPool[(MultipartUpload, Int)]())
.map {
case (Success(r), (upload, index)) => {
Expand Down