Skip to content

Commit

Permalink
bluelabsio#12: Add ability to buffer chunks to disk instead of memory
Browse files Browse the repository at this point in the history
This allows a (much) creater amount of upload streams to run in parallel.
  • Loading branch information
jypma committed Sep 26, 2016
1 parent ec131c8 commit 72ffcea
Show file tree
Hide file tree
Showing 13 changed files with 303 additions and 148 deletions.
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import sbt._

object Dependencies {
// Versions
lazy val akkaVersion = "2.4.3"
lazy val akkaVersion = "2.4.10"
lazy val scalatestVersion = "2.2.6"

val akka = "com.typesafe.akka" %% "akka-actor" % akkaVersion
Expand Down
7 changes: 7 additions & 0 deletions s3-stream/src/main/scala/com/bluelabs/s3stream/Chunk.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.bluelabs.s3stream

import akka.stream.scaladsl.Source
import akka.NotUsed
import akka.util.ByteString

case class Chunk(data: Source[ByteString,NotUsed], size: Int)
51 changes: 0 additions & 51 deletions s3-stream/src/main/scala/com/bluelabs/s3stream/Chunker.scala

This file was deleted.

98 changes: 98 additions & 0 deletions s3-stream/src/main/scala/com/bluelabs/s3stream/DiskBuffer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.bluelabs.s3stream

import java.io.FileOutputStream
import java.io.RandomAccessFile
import java.nio.channels.FileChannel
import java.nio.file.Files
import java.util.concurrent.atomic.AtomicInteger

import akka.NotUsed
import akka.stream.ActorAttributes
import akka.stream.Attributes
import akka.stream.FlowShape
import akka.stream.Inlet
import akka.stream.Outlet
import akka.stream.scaladsl.FileIO
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import akka.util.ByteString
import java.nio.file.Path

/**
* Buffers the complete incoming stream into a file, which can then be read several times afterwards.
*
* The stage waits for the incoming stream to complete. After that, it emits a single Chunk item on its output. The Chunk
* contains a bytestream source that can be materialized multiple times, and the total size of the file.
*
* @param maxMaterializations Number of expected materializations for the completed chunk. After this, the temp file is deleted.
* @param maximumSize Maximum size on disk to buffer
*/
class DiskBuffer (maxMaterializations: Int, maxSize: Int, tempPath: Option[Path]) extends GraphStage[FlowShape[ByteString, Chunk]] {
if (maxMaterializations < 1) throw new IllegalArgumentException("maxMaterializations should be at least 1")
if (maxSize < 1) throw new IllegalArgumentException("maximumSize should be at least 1")

val in = Inlet[ByteString]("in")
val out = Outlet[Chunk]("out")
override val shape = FlowShape.of(in, out)

override def initialAttributes = ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher")

override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
val path:Path = tempPath.map(dir => Files.createTempFile(dir, "s3-buffer-", ".bin")).getOrElse(Files.createTempFile("s3-buffer-", ".bin"))
path.toFile().deleteOnExit()
val writeBuffer = new RandomAccessFile(path.toFile(), "rw").getChannel().map(FileChannel.MapMode.READ_WRITE, 0, maxSize)
var length = 0

setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (isClosed(in)) {
emit()
} else {
pull(in)
}
}
})

setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
length += elem.size
writeBuffer.put(elem.asByteBuffer)
pull(in)
}

override def onUpstreamFinish(): Unit = {
if (isAvailable(out)) {
emit()
}
completeStage()
}
})

private def emit(): Unit = {
// TODO Should we do http://stackoverflow.com/questions/2972986/how-to-unmap-a-file-from-memory-mapped-using-filechannel-in-java ?
writeBuffer.force()

val ch = new FileOutputStream(path.toFile(), true).getChannel()
try {
ch.truncate(length)
} finally {
ch.close()
}

val deleteCounter = new AtomicInteger(maxMaterializations)
val src = FileIO.fromPath(path, 65536).mapMaterializedValue { f =>
if (deleteCounter.decrementAndGet() <= 0) {
import scala.concurrent.ExecutionContext.Implicits.global
f.onComplete { _ =>
path.toFile().delete()
}
}
NotUsed
}
emit(out, Chunk(src, length), () => completeStage())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import akka.http.scaladsl.model._
import akka.http.scaladsl.model.Uri.Query
import akka.http.scaladsl.model.headers.Host
import akka.util.ByteString
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.RequestEntity
import akka.http.scaladsl.model.RequestEntity

object HttpRequests {

Expand All @@ -25,11 +28,11 @@ object HttpRequests {
s3Request(s3Location)
}

def uploadPartRequest(upload: MultipartUpload, partNumber: Int, payload: ByteString): HttpRequest = {
def uploadPartRequest(upload: MultipartUpload, partNumber: Int, payload: Source[ByteString,_], payloadSize: Int): HttpRequest = {
s3Request(upload.s3Location,
HttpMethods.PUT,
_.withQuery(Query("partNumber" -> partNumber.toString, "uploadId" -> upload.uploadId))
).withEntity(payload)
).withEntity(HttpEntity(ContentTypes.`application/octet-stream`, payloadSize, payload))
}

def completeMultipartUploadRequest(upload: MultipartUpload, parts: Seq[(Int, String)])(implicit ec: ExecutionContext): Future[HttpRequest] = {
Expand Down
69 changes: 69 additions & 0 deletions s3-stream/src/main/scala/com/bluelabs/s3stream/MemoryBuffer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.bluelabs.s3stream

import java.io.FileOutputStream
import java.io.RandomAccessFile
import java.nio.channels.FileChannel
import java.nio.file.Files
import java.util.concurrent.atomic.AtomicInteger

import akka.NotUsed
import akka.stream.ActorAttributes
import akka.stream.Attributes
import akka.stream.FlowShape
import akka.stream.Inlet
import akka.stream.Outlet
import akka.stream.scaladsl.FileIO
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import akka.util.ByteString
import akka.stream.scaladsl.Source

/**
* Buffers the complete incoming stream into memory, which can then be read several times afterwards.
*
* The stage waits for the incoming stream to complete. After that, it emits a single Chunk item on its output. The Chunk
* contains a bytestream source that can be materialized multiple times, and the total size of the file.
*
* @param maximumSize Maximum size to buffer
*/
class MemoryBuffer(maxSize: Int) extends GraphStage[FlowShape[ByteString, Chunk]] {
val in = Inlet[ByteString]("in")
val out = Outlet[Chunk]("out")
override val shape = FlowShape.of(in, out)

override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
var buffer = ByteString.empty

setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (isClosed(in)) {
emit()
} else {
pull(in)
}
}
})

setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
buffer ++= elem
pull(in)
}

override def onUpstreamFinish(): Unit = {
if (isAvailable(out)) {
emit()
}
completeStage()
}
})

def emit(): Unit = {
emit(out, Chunk(Source.single(buffer), buffer.size), () => completeStage())
}
}

}
50 changes: 39 additions & 11 deletions s3-stream/src/main/scala/com/bluelabs/s3stream/S3Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,40 @@ package com.bluelabs.s3stream

import java.time.LocalDate

import scala.annotation.elidable
import scala.annotation.elidable.ASSERTION
import scala.collection.immutable.Seq
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.{ Failure, Success }
import scala.util.Failure
import scala.util.Success

import com.bluelabs.akkaaws.{AWSCredentials, CredentialScope, Signer, SigningKey}
import com.bluelabs.akkaaws.AWSCredentials
import com.bluelabs.akkaaws.CredentialScope
import com.bluelabs.akkaaws.Signer
import com.bluelabs.akkaaws.SigningKey

import akka.NotUsed
import akka.actor.ActorSystem
import akka.event.Logging
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.http.scaladsl.model.ResponseEntity
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.unmarshalling.Unmarshaller
import akka.stream.{Attributes, Materializer}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.Attributes
import akka.stream.Materializer
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.ByteString
import akka.stream.stage.GraphStage
import akka.stream.FlowShape
import java.nio.file.Path
import java.nio.file.Paths

case class S3Location(bucket: String, key: String)

Expand Down Expand Up @@ -57,7 +74,6 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic
* @return
*/
def multipartUpload(s3Location: S3Location, chunkSize: Int = MIN_CHUNK_SIZE, chunkingParallelism: Int = 4): Sink[ByteString, Future[CompleteMultipartUploadResult]] = {
import mat.executionContext

chunkAndRequest(s3Location, chunkSize)(chunkingParallelism)
.log("s3-upload-response").withAttributes(Attributes.logLevels(onElement = Logging.DebugLevel, onFailure = Logging.WarningLevel, onFinish = Logging.InfoLevel))
Expand All @@ -78,7 +94,7 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic
case HttpResponse(status, _, entity, _) if status.isSuccess() => Unmarshal(entity).to[MultipartUpload]
case HttpResponse(status, _, entity, _) => {
Unmarshal(entity).to[String].flatMap { case err =>
Future.failed(new Exception(err))
Future.failed(new Exception("Can't initiate upload: " + err))
}
}
}
Expand Down Expand Up @@ -116,11 +132,23 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic
def createRequests(s3Location: S3Location, chunkSize: Int = MIN_CHUNK_SIZE, parallelism: Int = 4): Flow[ByteString, (HttpRequest, (MultipartUpload, Int)), NotUsed] = {
assert(chunkSize >= MIN_CHUNK_SIZE, "Chunk size must be at least 5242880B. See http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html")
val requestInfo: Source[(MultipartUpload, Int), NotUsed] = initiateUpload(s3Location)
Flow[ByteString]
.via(new Chunker(chunkSize))
.zipWith(requestInfo){case (payload, (uploadInfo, chunkIndex)) => (HttpRequests.uploadPartRequest(uploadInfo, chunkIndex, payload), (uploadInfo, chunkIndex))}

SplitAfterSize(chunkSize)(Flow.apply[ByteString])
.via(getChunkBuffer(chunkSize))
.concatSubstreams
.zipWith(requestInfo){case (payload, (uploadInfo, chunkIndex)) => (HttpRequests.uploadPartRequest(uploadInfo, chunkIndex, payload.data, payload.size), (uploadInfo, chunkIndex))}
.mapAsync(parallelism){case (req, info) => Signer.signedRequest(req, signingKey).zip(Future.successful(info)) }
}

private def getChunkBuffer(chunkSize: Int) = system.settings.config.getString("com.bluelabs.s3stream.buffer") match {
case "memory" => new MemoryBuffer(chunkSize * 2)
case "disk" => new DiskBuffer(2, chunkSize * 2, getDiskBufferPath)
}

private val getDiskBufferPath = system.settings.config.getString("com.bluelabs.s3stream.disk-buffer-path") match {
case "" => None
case s => Some(Paths.get(s))
}

def chunkAndRequest(s3Location: S3Location, chunkSize: Int = MIN_CHUNK_SIZE)(parallelism: Int = 4): Flow[ByteString, UploadPartResponse, NotUsed] = {
createRequests(s3Location, chunkSize, parallelism)
Expand Down Expand Up @@ -172,7 +200,7 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic
case HttpResponse(status, _, entity, _) if status.isSuccess() => Future.successful(entity)
case HttpResponse(status, _, entity, _) => {
Unmarshal(entity).to[String].flatMap { case err =>
Future.failed(new Exception(err))
Future.failed(new Exception("Error: " + err))
}
}
}
Expand Down
Loading

0 comments on commit 72ffcea

Please sign in to comment.