Skip to content

Commit

Permalink
Merge pull request #10 from bluelabsio/get-support
Browse files Browse the repository at this point in the history
Get support
  • Loading branch information
joearasin authored Sep 2, 2016
2 parents 4d47015 + b4368c6 commit ec131c8
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 40 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import sbt.Keys._

lazy val commonSettings = Seq(
organization := "com.bluelabs",
version := "0.0.3",
version := "0.0.4",
scalaVersion := "2.11.8",
scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature"),
bintrayReleaseOnPublish in ThisBuild := false,
Expand Down
37 changes: 23 additions & 14 deletions s3-stream/src/main/scala/com/bluelabs/s3stream/HttpRequests.scala
Original file line number Diff line number Diff line change
@@ -1,26 +1,35 @@
package com.bluelabs.s3stream


import scala.concurrent.{ExecutionContext, Future}
import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.Uri.Query
import akka.http.scaladsl.model.headers.Host
import akka.http.scaladsl.model.{HttpMethods, HttpRequest, RequestEntity, Uri}
import akka.util.ByteString

import scala.concurrent.{ExecutionContext, Future}

object HttpRequests {
def initiateMultipartUploadRequest(s3Location: S3Location): HttpRequest = {
HttpRequest(method = HttpMethods.POST)

def s3Request(s3Location: S3Location, method: HttpMethod = HttpMethods.GET, uriFn: (Uri => Uri) = identity): HttpRequest = {
HttpRequest(method)
.withHeaders(Host(requestHost(s3Location)))
.withUri(requestUri(s3Location).withQuery(Query("uploads")))
.withUri(uriFn(requestUri(s3Location)))
}

def initiateMultipartUploadRequest(s3Location: S3Location): HttpRequest = {
s3Request(s3Location, HttpMethods.POST, _.withQuery(Query("uploads")))
}

def getRequest(s3Location: S3Location): HttpRequest = {
s3Request(s3Location)
}

def uploadPartRequest(upload: MultipartUpload, partNumber: Int, payload: ByteString): HttpRequest = {
HttpRequest(method = HttpMethods.PUT)
.withHeaders(Host(requestHost(upload.s3Location)))
.withUri(requestUri(upload.s3Location).withQuery(Query("partNumber" -> partNumber.toString, "uploadId" -> upload.uploadId)))
.withEntity(payload)
s3Request(upload.s3Location,
HttpMethods.PUT,
_.withQuery(Query("partNumber" -> partNumber.toString, "uploadId" -> upload.uploadId))
).withEntity(payload)
}

def completeMultipartUploadRequest(upload: MultipartUpload, parts: Seq[(Int, String)])(implicit ec: ExecutionContext): Future[HttpRequest] = {
Expand All @@ -32,10 +41,10 @@ object HttpRequests {
for {
entity <- Marshal(payload).to[RequestEntity]
} yield {
HttpRequest(method = HttpMethods.POST)
.withHeaders(Host(requestHost(upload.s3Location)))
.withUri(requestUri(upload.s3Location).withQuery(Query("uploadId" -> upload.uploadId)))
.withEntity(entity)
s3Request(upload.s3Location,
HttpMethods.POST,
_.withQuery(Query("uploadId" -> upload.uploadId))
).withEntity(entity)
}
}

Expand Down
75 changes: 50 additions & 25 deletions s3-stream/src/main/scala/com/bluelabs/s3stream/S3Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,23 @@ package com.bluelabs.s3stream

import java.time.LocalDate

import scala.collection.immutable.Seq
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.{ Failure, Success }

import com.bluelabs.akkaaws.{AWSCredentials, CredentialScope, Signer, SigningKey}

import akka.NotUsed
import akka.actor.ActorSystem
import akka.event.Logging
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.http.scaladsl.unmarshalling.Unmarshaller
import akka.stream.{Attributes, Materializer}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.util.ByteString
import com.bluelabs.akkaaws.{AWSCredentials, CredentialScope, Signer, SigningKey}

import scala.collection.immutable.Seq
import scala.concurrent.Future
import scala.util.{Failure, Success}

case class S3Location(bucket: String, key: String)

Expand All @@ -39,6 +42,12 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic
val MIN_CHUNK_SIZE = 5242880
val signingKey = SigningKey(credentials, CredentialScope(LocalDate.now(), region, "s3"))

def download(s3Location: S3Location): Source[ByteString, NotUsed] = {
import mat.executionContext
Source.fromFuture(signAndGet(HttpRequests.getRequest(s3Location)).map(_.dataBytes))
.flatMapConcat(identity)
}

/**
* Uploades a stream of ByteStrings to a specified location as a multipart upload.
*
Expand All @@ -48,14 +57,16 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic
* @return
*/
def multipartUpload(s3Location: S3Location, chunkSize: Int = MIN_CHUNK_SIZE, chunkingParallelism: Int = 4): Sink[ByteString, Future[CompleteMultipartUploadResult]] = {
implicit val ec = mat.executionContext
import mat.executionContext

chunkAndRequest(s3Location, chunkSize)(chunkingParallelism)
.log("s3-upload-response").withAttributes(Attributes.logLevels(onElement = Logging.DebugLevel, onFailure = Logging.WarningLevel, onFinish = Logging.InfoLevel))
.toMat(completionSink(s3Location))(Keep.right)
}

def initiateMultipartUpload(s3Location: S3Location): Future[MultipartUpload] = {
implicit val ec = mat.executionContext
import mat.executionContext

val req = HttpRequests.initiateMultipartUploadRequest(s3Location)
val response = for {
signedReq <- Signer.signedRequest(req, signingKey)
Expand All @@ -74,22 +85,12 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic
}

def completeMultipartUpload(s3Location: S3Location, parts: Seq[SuccessfulUploadPart]): Future[CompleteMultipartUploadResult] = {
implicit val ec = mat.executionContext
val response: Future[HttpResponse] = for {
req <- HttpRequests.completeMultipartUploadRequest(parts.head.multipartUpload, parts.map { case p => (p.index, p.etag) })
signedReq <- Signer.signedRequest(req, signingKey)
response <- Http().singleRequest(signedReq)
} yield {
response
}

response.flatMap {
case HttpResponse(status, _, entity, _) if status.isSuccess() => {
Unmarshal(entity).to[CompleteMultipartUploadResult]
}
case HttpResponse(status, _, entity, _) =>
Unmarshal(entity).to[String].flatMap{ case e => Future.failed(new Exception(e)) }
}
import mat.executionContext

for (
req <- HttpRequests.completeMultipartUploadRequest(parts.head.multipartUpload, parts.map { case p => (p.index, p.etag) });
res <- signAndGetAs[CompleteMultipartUploadResult](req)
) yield res
}

/**
Expand Down Expand Up @@ -135,7 +136,7 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic
}

def completionSink(s3Location: S3Location): Sink[UploadPartResponse, Future[CompleteMultipartUploadResult]] = {
implicit val ec = mat.executionContext
import mat.executionContext

Sink.seq[UploadPartResponse].mapMaterializedValue { case responseFuture: Future[Seq[UploadPartResponse]] =>
responseFuture.flatMap { case responses: Seq[UploadPartResponse] =>
Expand All @@ -152,4 +153,28 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic
}
}

private def signAndGetAs[T](request: HttpRequest)(implicit um: Unmarshaller[ResponseEntity, T]): Future[T] = {
import mat.executionContext
signAndGet(request).flatMap(entity => Unmarshal(entity).to[T])
}

private def signAndGet(request: HttpRequest): Future[ResponseEntity] = {
import mat.executionContext
for (
req <- Signer.signedRequest(request, signingKey);
res <- Http().singleRequest(req);
t <- entityForSuccess(res)
) yield t
}

private def entityForSuccess(resp: HttpResponse)(implicit ctx: ExecutionContext): Future[ResponseEntity] = {
resp match {
case HttpResponse(status, _, entity, _) if status.isSuccess() => Future.successful(entity)
case HttpResponse(status, _, entity, _) => {
Unmarshal(entity).to[String].flatMap { case err =>
Future.failed(new Exception(err))
}
}
}
}
}

0 comments on commit ec131c8

Please sign in to comment.