diff --git a/build.sbt b/build.sbt index fe630dd..f4aa2db 100644 --- a/build.sbt +++ b/build.sbt @@ -3,7 +3,7 @@ import sbt.Keys._ lazy val commonSettings = Seq( organization := "com.bluelabs", - version := "0.0.3", + version := "0.0.4", scalaVersion := "2.11.8", scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature"), bintrayReleaseOnPublish in ThisBuild := false, diff --git a/s3-stream/src/main/scala/com/bluelabs/s3stream/HttpRequests.scala b/s3-stream/src/main/scala/com/bluelabs/s3stream/HttpRequests.scala index 941218c..e6eab0b 100644 --- a/s3-stream/src/main/scala/com/bluelabs/s3stream/HttpRequests.scala +++ b/s3-stream/src/main/scala/com/bluelabs/s3stream/HttpRequests.scala @@ -1,26 +1,35 @@ package com.bluelabs.s3stream + +import scala.concurrent.{ExecutionContext, Future} import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._ import akka.http.scaladsl.marshalling.Marshal +import akka.http.scaladsl.model._ import akka.http.scaladsl.model.Uri.Query import akka.http.scaladsl.model.headers.Host -import akka.http.scaladsl.model.{HttpMethods, HttpRequest, RequestEntity, Uri} import akka.util.ByteString -import scala.concurrent.{ExecutionContext, Future} - object HttpRequests { - def initiateMultipartUploadRequest(s3Location: S3Location): HttpRequest = { - HttpRequest(method = HttpMethods.POST) + + def s3Request(s3Location: S3Location, method: HttpMethod = HttpMethods.GET, uriFn: (Uri => Uri) = identity): HttpRequest = { + HttpRequest(method) .withHeaders(Host(requestHost(s3Location))) - .withUri(requestUri(s3Location).withQuery(Query("uploads"))) + .withUri(uriFn(requestUri(s3Location))) + } + + def initiateMultipartUploadRequest(s3Location: S3Location): HttpRequest = { + s3Request(s3Location, HttpMethods.POST, _.withQuery(Query("uploads"))) + } + + def getRequest(s3Location: S3Location): HttpRequest = { + s3Request(s3Location) } def uploadPartRequest(upload: MultipartUpload, partNumber: Int, payload: ByteString): HttpRequest = { - HttpRequest(method = HttpMethods.PUT) - .withHeaders(Host(requestHost(upload.s3Location))) - .withUri(requestUri(upload.s3Location).withQuery(Query("partNumber" -> partNumber.toString, "uploadId" -> upload.uploadId))) - .withEntity(payload) + s3Request(upload.s3Location, + HttpMethods.PUT, + _.withQuery(Query("partNumber" -> partNumber.toString, "uploadId" -> upload.uploadId)) + ).withEntity(payload) } def completeMultipartUploadRequest(upload: MultipartUpload, parts: Seq[(Int, String)])(implicit ec: ExecutionContext): Future[HttpRequest] = { @@ -32,10 +41,10 @@ object HttpRequests { for { entity <- Marshal(payload).to[RequestEntity] } yield { - HttpRequest(method = HttpMethods.POST) - .withHeaders(Host(requestHost(upload.s3Location))) - .withUri(requestUri(upload.s3Location).withQuery(Query("uploadId" -> upload.uploadId))) - .withEntity(entity) + s3Request(upload.s3Location, + HttpMethods.POST, + _.withQuery(Query("uploadId" -> upload.uploadId)) + ).withEntity(entity) } } diff --git a/s3-stream/src/main/scala/com/bluelabs/s3stream/S3Stream.scala b/s3-stream/src/main/scala/com/bluelabs/s3stream/S3Stream.scala index fdb846c..e828797 100644 --- a/s3-stream/src/main/scala/com/bluelabs/s3stream/S3Stream.scala +++ b/s3-stream/src/main/scala/com/bluelabs/s3stream/S3Stream.scala @@ -2,20 +2,23 @@ package com.bluelabs.s3stream import java.time.LocalDate +import scala.collection.immutable.Seq +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.util.{ Failure, Success } + +import com.bluelabs.akkaaws.{AWSCredentials, CredentialScope, Signer, SigningKey} + import akka.NotUsed import akka.actor.ActorSystem import akka.event.Logging import akka.http.scaladsl.Http import akka.http.scaladsl.model._ import akka.http.scaladsl.unmarshalling.Unmarshal -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} +import akka.http.scaladsl.unmarshalling.Unmarshaller import akka.stream.{Attributes, Materializer} +import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import akka.util.ByteString -import com.bluelabs.akkaaws.{AWSCredentials, CredentialScope, Signer, SigningKey} - -import scala.collection.immutable.Seq -import scala.concurrent.Future -import scala.util.{Failure, Success} case class S3Location(bucket: String, key: String) @@ -39,6 +42,12 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic val MIN_CHUNK_SIZE = 5242880 val signingKey = SigningKey(credentials, CredentialScope(LocalDate.now(), region, "s3")) + def download(s3Location: S3Location): Source[ByteString, NotUsed] = { + import mat.executionContext + Source.fromFuture(signAndGet(HttpRequests.getRequest(s3Location)).map(_.dataBytes)) + .flatMapConcat(identity) + } + /** * Uploades a stream of ByteStrings to a specified location as a multipart upload. * @@ -48,14 +57,16 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic * @return */ def multipartUpload(s3Location: S3Location, chunkSize: Int = MIN_CHUNK_SIZE, chunkingParallelism: Int = 4): Sink[ByteString, Future[CompleteMultipartUploadResult]] = { - implicit val ec = mat.executionContext + import mat.executionContext + chunkAndRequest(s3Location, chunkSize)(chunkingParallelism) .log("s3-upload-response").withAttributes(Attributes.logLevels(onElement = Logging.DebugLevel, onFailure = Logging.WarningLevel, onFinish = Logging.InfoLevel)) .toMat(completionSink(s3Location))(Keep.right) } def initiateMultipartUpload(s3Location: S3Location): Future[MultipartUpload] = { - implicit val ec = mat.executionContext + import mat.executionContext + val req = HttpRequests.initiateMultipartUploadRequest(s3Location) val response = for { signedReq <- Signer.signedRequest(req, signingKey) @@ -74,22 +85,12 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic } def completeMultipartUpload(s3Location: S3Location, parts: Seq[SuccessfulUploadPart]): Future[CompleteMultipartUploadResult] = { - implicit val ec = mat.executionContext - val response: Future[HttpResponse] = for { - req <- HttpRequests.completeMultipartUploadRequest(parts.head.multipartUpload, parts.map { case p => (p.index, p.etag) }) - signedReq <- Signer.signedRequest(req, signingKey) - response <- Http().singleRequest(signedReq) - } yield { - response - } - - response.flatMap { - case HttpResponse(status, _, entity, _) if status.isSuccess() => { - Unmarshal(entity).to[CompleteMultipartUploadResult] - } - case HttpResponse(status, _, entity, _) => - Unmarshal(entity).to[String].flatMap{ case e => Future.failed(new Exception(e)) } - } + import mat.executionContext + + for ( + req <- HttpRequests.completeMultipartUploadRequest(parts.head.multipartUpload, parts.map { case p => (p.index, p.etag) }); + res <- signAndGetAs[CompleteMultipartUploadResult](req) + ) yield res } /** @@ -135,7 +136,7 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic } def completionSink(s3Location: S3Location): Sink[UploadPartResponse, Future[CompleteMultipartUploadResult]] = { - implicit val ec = mat.executionContext + import mat.executionContext Sink.seq[UploadPartResponse].mapMaterializedValue { case responseFuture: Future[Seq[UploadPartResponse]] => responseFuture.flatMap { case responses: Seq[UploadPartResponse] => @@ -152,4 +153,28 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic } } + private def signAndGetAs[T](request: HttpRequest)(implicit um: Unmarshaller[ResponseEntity, T]): Future[T] = { + import mat.executionContext + signAndGet(request).flatMap(entity => Unmarshal(entity).to[T]) + } + + private def signAndGet(request: HttpRequest): Future[ResponseEntity] = { + import mat.executionContext + for ( + req <- Signer.signedRequest(request, signingKey); + res <- Http().singleRequest(req); + t <- entityForSuccess(res) + ) yield t + } + + private def entityForSuccess(resp: HttpResponse)(implicit ctx: ExecutionContext): Future[ResponseEntity] = { + resp match { + case HttpResponse(status, _, entity, _) if status.isSuccess() => Future.successful(entity) + case HttpResponse(status, _, entity, _) => { + Unmarshal(entity).to[String].flatMap { case err => + Future.failed(new Exception(err)) + } + } + } + } }