From 72ffcea3b48fd51a62ad0ab89f47577c2d496a1e Mon Sep 17 00:00:00 2001 From: Jan Ypma Date: Mon, 26 Sep 2016 14:15:09 +0200 Subject: [PATCH] #12: Add ability to buffer chunks to disk instead of memory This allows a (much) creater amount of upload streams to run in parallel. --- project/Dependencies.scala | 2 +- .../scala/com/bluelabs/s3stream/Chunk.scala | 7 ++ .../scala/com/bluelabs/s3stream/Chunker.scala | 51 ---------- .../com/bluelabs/s3stream/DiskBuffer.scala | 98 +++++++++++++++++++ .../com/bluelabs/s3stream/HttpRequests.scala | 7 +- .../com/bluelabs/s3stream/MemoryBuffer.scala | 69 +++++++++++++ .../com/bluelabs/s3stream/S3Stream.scala | 50 +++++++--- .../bluelabs/s3stream/SplitAfterSize.scala | 62 ++++++++++++ s3-stream/src/test/resources/reference.conf | 7 ++ .../com/bluelabs/s3stream/ChunkerSpec.scala | 83 ---------------- .../bluelabs/s3stream/DiskBufferSpec.scala | 5 + .../bluelabs/s3stream/MemoryBufferSpec.scala | 5 + .../s3stream/SplitAfterSizeSpec.scala | 5 + 13 files changed, 303 insertions(+), 148 deletions(-) create mode 100644 s3-stream/src/main/scala/com/bluelabs/s3stream/Chunk.scala delete mode 100644 s3-stream/src/main/scala/com/bluelabs/s3stream/Chunker.scala create mode 100644 s3-stream/src/main/scala/com/bluelabs/s3stream/DiskBuffer.scala create mode 100644 s3-stream/src/main/scala/com/bluelabs/s3stream/MemoryBuffer.scala create mode 100644 s3-stream/src/main/scala/com/bluelabs/s3stream/SplitAfterSize.scala create mode 100644 s3-stream/src/test/resources/reference.conf delete mode 100644 s3-stream/src/test/scala/com/bluelabs/s3stream/ChunkerSpec.scala create mode 100644 s3-stream/src/test/scala/com/bluelabs/s3stream/DiskBufferSpec.scala create mode 100644 s3-stream/src/test/scala/com/bluelabs/s3stream/MemoryBufferSpec.scala create mode 100644 s3-stream/src/test/scala/com/bluelabs/s3stream/SplitAfterSizeSpec.scala diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 9eb6385..c87babe 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -2,7 +2,7 @@ import sbt._ object Dependencies { // Versions - lazy val akkaVersion = "2.4.3" + lazy val akkaVersion = "2.4.10" lazy val scalatestVersion = "2.2.6" val akka = "com.typesafe.akka" %% "akka-actor" % akkaVersion diff --git a/s3-stream/src/main/scala/com/bluelabs/s3stream/Chunk.scala b/s3-stream/src/main/scala/com/bluelabs/s3stream/Chunk.scala new file mode 100644 index 0000000..1684a8a --- /dev/null +++ b/s3-stream/src/main/scala/com/bluelabs/s3stream/Chunk.scala @@ -0,0 +1,7 @@ +package com.bluelabs.s3stream + +import akka.stream.scaladsl.Source +import akka.NotUsed +import akka.util.ByteString + +case class Chunk(data: Source[ByteString,NotUsed], size: Int) diff --git a/s3-stream/src/main/scala/com/bluelabs/s3stream/Chunker.scala b/s3-stream/src/main/scala/com/bluelabs/s3stream/Chunker.scala deleted file mode 100644 index 0b7da98..0000000 --- a/s3-stream/src/main/scala/com/bluelabs/s3stream/Chunker.scala +++ /dev/null @@ -1,51 +0,0 @@ -package com.bluelabs.s3stream - -import akka.stream.stage._ -import akka.stream.{Attributes, FlowShape, Inlet, Outlet} -import akka.util.ByteString - -// Based on http://doc.akka.io/docs/akka/2.4.3/scala/stream/stream-cookbook.html -// A few changes around handling the edges -class Chunker(val chunkSize: Int) extends GraphStage[FlowShape[ByteString, ByteString]] { - val in = Inlet[ByteString]("Chunker.in") - val out = Outlet[ByteString]("Chunker.out") - override val shape = FlowShape.of(in, out) - - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { - private var buffer = ByteString.empty - - setHandler(out, new OutHandler { - override def onPull(): Unit = { - if (isClosed(in) || buffer.size >= chunkSize) emitChunk() - else pull(in) - } - }) - setHandler(in, new InHandler { - override def onPush(): Unit = { - val elem = grab(in) - buffer ++= elem - emitChunk() - } - - override def onUpstreamFinish(): Unit = { - if (isAvailable(out)) emitChunk() - if (buffer.isEmpty) completeStage() - } - }) - - private def emitChunk(): Unit = { - if (buffer.isEmpty) { - if (isClosed(in)) completeStage() - else pull(in) - } else if (buffer.size < chunkSize && !isClosed(in)) { - pull(in) - } else { - val (chunk, nextBuffer) = buffer.splitAt(chunkSize) - buffer = nextBuffer - push(out, chunk) - } - } - - } -} - diff --git a/s3-stream/src/main/scala/com/bluelabs/s3stream/DiskBuffer.scala b/s3-stream/src/main/scala/com/bluelabs/s3stream/DiskBuffer.scala new file mode 100644 index 0000000..e73ca28 --- /dev/null +++ b/s3-stream/src/main/scala/com/bluelabs/s3stream/DiskBuffer.scala @@ -0,0 +1,98 @@ +package com.bluelabs.s3stream + +import java.io.FileOutputStream +import java.io.RandomAccessFile +import java.nio.channels.FileChannel +import java.nio.file.Files +import java.util.concurrent.atomic.AtomicInteger + +import akka.NotUsed +import akka.stream.ActorAttributes +import akka.stream.Attributes +import akka.stream.FlowShape +import akka.stream.Inlet +import akka.stream.Outlet +import akka.stream.scaladsl.FileIO +import akka.stream.stage.GraphStage +import akka.stream.stage.GraphStageLogic +import akka.stream.stage.InHandler +import akka.stream.stage.OutHandler +import akka.util.ByteString +import java.nio.file.Path + +/** + * Buffers the complete incoming stream into a file, which can then be read several times afterwards. + * + * The stage waits for the incoming stream to complete. After that, it emits a single Chunk item on its output. The Chunk + * contains a bytestream source that can be materialized multiple times, and the total size of the file. + * + * @param maxMaterializations Number of expected materializations for the completed chunk. After this, the temp file is deleted. + * @param maximumSize Maximum size on disk to buffer + */ +class DiskBuffer (maxMaterializations: Int, maxSize: Int, tempPath: Option[Path]) extends GraphStage[FlowShape[ByteString, Chunk]] { + if (maxMaterializations < 1) throw new IllegalArgumentException("maxMaterializations should be at least 1") + if (maxSize < 1) throw new IllegalArgumentException("maximumSize should be at least 1") + + val in = Inlet[ByteString]("in") + val out = Outlet[Chunk]("out") + override val shape = FlowShape.of(in, out) + + override def initialAttributes = ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher") + + override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + val path:Path = tempPath.map(dir => Files.createTempFile(dir, "s3-buffer-", ".bin")).getOrElse(Files.createTempFile("s3-buffer-", ".bin")) + path.toFile().deleteOnExit() + val writeBuffer = new RandomAccessFile(path.toFile(), "rw").getChannel().map(FileChannel.MapMode.READ_WRITE, 0, maxSize) + var length = 0 + + setHandler(out, new OutHandler { + override def onPull(): Unit = { + if (isClosed(in)) { + emit() + } else { + pull(in) + } + } + }) + + setHandler(in, new InHandler { + override def onPush(): Unit = { + val elem = grab(in) + length += elem.size + writeBuffer.put(elem.asByteBuffer) + pull(in) + } + + override def onUpstreamFinish(): Unit = { + if (isAvailable(out)) { + emit() + } + completeStage() + } + }) + + private def emit(): Unit = { + // TODO Should we do http://stackoverflow.com/questions/2972986/how-to-unmap-a-file-from-memory-mapped-using-filechannel-in-java ? + writeBuffer.force() + + val ch = new FileOutputStream(path.toFile(), true).getChannel() + try { + ch.truncate(length) + } finally { + ch.close() + } + + val deleteCounter = new AtomicInteger(maxMaterializations) + val src = FileIO.fromPath(path, 65536).mapMaterializedValue { f => + if (deleteCounter.decrementAndGet() <= 0) { + import scala.concurrent.ExecutionContext.Implicits.global + f.onComplete { _ => + path.toFile().delete() + } + } + NotUsed + } + emit(out, Chunk(src, length), () => completeStage()) + } + } +} diff --git a/s3-stream/src/main/scala/com/bluelabs/s3stream/HttpRequests.scala b/s3-stream/src/main/scala/com/bluelabs/s3stream/HttpRequests.scala index e6eab0b..f54976f 100644 --- a/s3-stream/src/main/scala/com/bluelabs/s3stream/HttpRequests.scala +++ b/s3-stream/src/main/scala/com/bluelabs/s3stream/HttpRequests.scala @@ -8,6 +8,9 @@ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.Uri.Query import akka.http.scaladsl.model.headers.Host import akka.util.ByteString +import akka.stream.scaladsl.Source +import akka.http.scaladsl.model.RequestEntity +import akka.http.scaladsl.model.RequestEntity object HttpRequests { @@ -25,11 +28,11 @@ object HttpRequests { s3Request(s3Location) } - def uploadPartRequest(upload: MultipartUpload, partNumber: Int, payload: ByteString): HttpRequest = { + def uploadPartRequest(upload: MultipartUpload, partNumber: Int, payload: Source[ByteString,_], payloadSize: Int): HttpRequest = { s3Request(upload.s3Location, HttpMethods.PUT, _.withQuery(Query("partNumber" -> partNumber.toString, "uploadId" -> upload.uploadId)) - ).withEntity(payload) + ).withEntity(HttpEntity(ContentTypes.`application/octet-stream`, payloadSize, payload)) } def completeMultipartUploadRequest(upload: MultipartUpload, parts: Seq[(Int, String)])(implicit ec: ExecutionContext): Future[HttpRequest] = { diff --git a/s3-stream/src/main/scala/com/bluelabs/s3stream/MemoryBuffer.scala b/s3-stream/src/main/scala/com/bluelabs/s3stream/MemoryBuffer.scala new file mode 100644 index 0000000..56e0633 --- /dev/null +++ b/s3-stream/src/main/scala/com/bluelabs/s3stream/MemoryBuffer.scala @@ -0,0 +1,69 @@ +package com.bluelabs.s3stream + +import java.io.FileOutputStream +import java.io.RandomAccessFile +import java.nio.channels.FileChannel +import java.nio.file.Files +import java.util.concurrent.atomic.AtomicInteger + +import akka.NotUsed +import akka.stream.ActorAttributes +import akka.stream.Attributes +import akka.stream.FlowShape +import akka.stream.Inlet +import akka.stream.Outlet +import akka.stream.scaladsl.FileIO +import akka.stream.stage.GraphStage +import akka.stream.stage.GraphStageLogic +import akka.stream.stage.InHandler +import akka.stream.stage.OutHandler +import akka.util.ByteString +import akka.stream.scaladsl.Source + +/** + * Buffers the complete incoming stream into memory, which can then be read several times afterwards. + * + * The stage waits for the incoming stream to complete. After that, it emits a single Chunk item on its output. The Chunk + * contains a bytestream source that can be materialized multiple times, and the total size of the file. + * + * @param maximumSize Maximum size to buffer + */ +class MemoryBuffer(maxSize: Int) extends GraphStage[FlowShape[ByteString, Chunk]] { + val in = Inlet[ByteString]("in") + val out = Outlet[Chunk]("out") + override val shape = FlowShape.of(in, out) + + override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + var buffer = ByteString.empty + + setHandler(out, new OutHandler { + override def onPull(): Unit = { + if (isClosed(in)) { + emit() + } else { + pull(in) + } + } + }) + + setHandler(in, new InHandler { + override def onPush(): Unit = { + val elem = grab(in) + buffer ++= elem + pull(in) + } + + override def onUpstreamFinish(): Unit = { + if (isAvailable(out)) { + emit() + } + completeStage() + } + }) + + def emit(): Unit = { + emit(out, Chunk(Source.single(buffer), buffer.size), () => completeStage()) + } + } + +} \ No newline at end of file diff --git a/s3-stream/src/main/scala/com/bluelabs/s3stream/S3Stream.scala b/s3-stream/src/main/scala/com/bluelabs/s3stream/S3Stream.scala index e828797..fb38f92 100644 --- a/s3-stream/src/main/scala/com/bluelabs/s3stream/S3Stream.scala +++ b/s3-stream/src/main/scala/com/bluelabs/s3stream/S3Stream.scala @@ -2,23 +2,40 @@ package com.bluelabs.s3stream import java.time.LocalDate +import scala.annotation.elidable +import scala.annotation.elidable.ASSERTION import scala.collection.immutable.Seq import scala.concurrent.ExecutionContext import scala.concurrent.Future -import scala.util.{ Failure, Success } +import scala.util.Failure +import scala.util.Success -import com.bluelabs.akkaaws.{AWSCredentials, CredentialScope, Signer, SigningKey} +import com.bluelabs.akkaaws.AWSCredentials +import com.bluelabs.akkaaws.CredentialScope +import com.bluelabs.akkaaws.Signer +import com.bluelabs.akkaaws.SigningKey import akka.NotUsed import akka.actor.ActorSystem import akka.event.Logging import akka.http.scaladsl.Http -import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.HttpRequest +import akka.http.scaladsl.model.HttpResponse +import akka.http.scaladsl.model.ResponseEntity +import akka.http.scaladsl.model.Uri import akka.http.scaladsl.unmarshalling.Unmarshal import akka.http.scaladsl.unmarshalling.Unmarshaller -import akka.stream.{Attributes, Materializer} -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} +import akka.stream.Attributes +import akka.stream.Materializer +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source import akka.util.ByteString +import akka.stream.stage.GraphStage +import akka.stream.FlowShape +import java.nio.file.Path +import java.nio.file.Paths case class S3Location(bucket: String, key: String) @@ -57,7 +74,6 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic * @return */ def multipartUpload(s3Location: S3Location, chunkSize: Int = MIN_CHUNK_SIZE, chunkingParallelism: Int = 4): Sink[ByteString, Future[CompleteMultipartUploadResult]] = { - import mat.executionContext chunkAndRequest(s3Location, chunkSize)(chunkingParallelism) .log("s3-upload-response").withAttributes(Attributes.logLevels(onElement = Logging.DebugLevel, onFailure = Logging.WarningLevel, onFinish = Logging.InfoLevel)) @@ -78,7 +94,7 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic case HttpResponse(status, _, entity, _) if status.isSuccess() => Unmarshal(entity).to[MultipartUpload] case HttpResponse(status, _, entity, _) => { Unmarshal(entity).to[String].flatMap { case err => - Future.failed(new Exception(err)) + Future.failed(new Exception("Can't initiate upload: " + err)) } } } @@ -116,11 +132,23 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic def createRequests(s3Location: S3Location, chunkSize: Int = MIN_CHUNK_SIZE, parallelism: Int = 4): Flow[ByteString, (HttpRequest, (MultipartUpload, Int)), NotUsed] = { assert(chunkSize >= MIN_CHUNK_SIZE, "Chunk size must be at least 5242880B. See http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html") val requestInfo: Source[(MultipartUpload, Int), NotUsed] = initiateUpload(s3Location) - Flow[ByteString] - .via(new Chunker(chunkSize)) - .zipWith(requestInfo){case (payload, (uploadInfo, chunkIndex)) => (HttpRequests.uploadPartRequest(uploadInfo, chunkIndex, payload), (uploadInfo, chunkIndex))} + + SplitAfterSize(chunkSize)(Flow.apply[ByteString]) + .via(getChunkBuffer(chunkSize)) + .concatSubstreams + .zipWith(requestInfo){case (payload, (uploadInfo, chunkIndex)) => (HttpRequests.uploadPartRequest(uploadInfo, chunkIndex, payload.data, payload.size), (uploadInfo, chunkIndex))} .mapAsync(parallelism){case (req, info) => Signer.signedRequest(req, signingKey).zip(Future.successful(info)) } } + + private def getChunkBuffer(chunkSize: Int) = system.settings.config.getString("com.bluelabs.s3stream.buffer") match { + case "memory" => new MemoryBuffer(chunkSize * 2) + case "disk" => new DiskBuffer(2, chunkSize * 2, getDiskBufferPath) + } + + private val getDiskBufferPath = system.settings.config.getString("com.bluelabs.s3stream.disk-buffer-path") match { + case "" => None + case s => Some(Paths.get(s)) + } def chunkAndRequest(s3Location: S3Location, chunkSize: Int = MIN_CHUNK_SIZE)(parallelism: Int = 4): Flow[ByteString, UploadPartResponse, NotUsed] = { createRequests(s3Location, chunkSize, parallelism) @@ -172,7 +200,7 @@ class S3Stream(credentials: AWSCredentials, region: String = "us-east-1")(implic case HttpResponse(status, _, entity, _) if status.isSuccess() => Future.successful(entity) case HttpResponse(status, _, entity, _) => { Unmarshal(entity).to[String].flatMap { case err => - Future.failed(new Exception(err)) + Future.failed(new Exception("Error: " + err)) } } } diff --git a/s3-stream/src/main/scala/com/bluelabs/s3stream/SplitAfterSize.scala b/s3-stream/src/main/scala/com/bluelabs/s3stream/SplitAfterSize.scala new file mode 100644 index 0000000..2575568 --- /dev/null +++ b/s3-stream/src/main/scala/com/bluelabs/s3stream/SplitAfterSize.scala @@ -0,0 +1,62 @@ +package com.bluelabs.s3stream + +import akka.stream.scaladsl.SubFlow +import akka.stream.scaladsl.Source +import akka.stream.stage.GraphStage +import akka.util.ByteString +import akka.stream.FlowShape +import akka.stream.Inlet +import akka.stream.Outlet +import akka.stream.stage.GraphStageLogic +import akka.stream.Attributes +import akka.stream.stage.OutHandler +import akka.stream.stage.InHandler +import akka.stream.scaladsl.RunnableGraph +import akka.stream.scaladsl.Flow + +/** + * Splits up a bytestream source into sub-flows of a minimum size. Does not attempt to create chunkes of an exact size. + */ +object SplitAfterSize { + def apply[I,M](minChunkSize: Long)(in: Flow[I,ByteString,M]): SubFlow[ByteString, M, in.Repr, in.Closed] = { + in.via(insertMarkers(minChunkSize)).splitWhen(_ == NewStream).filter(_.isInstanceOf[ByteString]).map(_.asInstanceOf[ByteString]) + } + + private case object NewStream + + private def insertMarkers(minChunkSize: Long) = new GraphStage[FlowShape[ByteString,Any]] { + val in = Inlet[ByteString]("in") + val out = Outlet[Any]("out") + override val shape = FlowShape.of(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + var count: Long = 0; + + setHandler(out, new OutHandler { + override def onPull(): Unit = { + pull(in) + } + }) + + setHandler(in, new InHandler { + override def onPush(): Unit = { + val elem = grab(in) + count += elem.size + if (count >= minChunkSize) { + println("*** split after " + count) + count = 0 + emitMultiple(out, elem :: NewStream :: Nil) + } else { + emit(out, elem) + } + } + + override def onUpstreamFinish(): Unit = { + println("*** complete after " + count) + completeStage() + } + }) + + } + } +} \ No newline at end of file diff --git a/s3-stream/src/test/resources/reference.conf b/s3-stream/src/test/resources/reference.conf new file mode 100644 index 0000000..a182b07 --- /dev/null +++ b/s3-stream/src/test/resources/reference.conf @@ -0,0 +1,7 @@ +com.bluelabs.s3stream { + # whether the buffer request chunks (up to 5MB each) to "memory" or "disk" + buffer = "memory" + + # location for temporary files, if buffer is set to "disk". If empty, uses the standard java temp path. + disk-buffer-path = "" +} \ No newline at end of file diff --git a/s3-stream/src/test/scala/com/bluelabs/s3stream/ChunkerSpec.scala b/s3-stream/src/test/scala/com/bluelabs/s3stream/ChunkerSpec.scala deleted file mode 100644 index 0c2b34e..0000000 --- a/s3-stream/src/test/scala/com/bluelabs/s3stream/ChunkerSpec.scala +++ /dev/null @@ -1,83 +0,0 @@ -package com.bluelabs.s3stream - -import akka.actor.ActorSystem -import akka.stream.scaladsl.Keep -import akka.stream.testkit.scaladsl.{TestSink, TestSource} -import akka.stream.{ActorMaterializer, ActorMaterializerSettings} -import akka.testkit.TestKit -import akka.util.ByteString -import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers} - -class ChunkerSpec(_system: ActorSystem) extends TestKit(_system) with FlatSpecLike with Matchers with BeforeAndAfterAll { - - def this() = this(ActorSystem("ChunkerSpec")) - - implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withDebugLogging(true)) - - "A Chunker" should "resize larger chunks into smaller ones" in { - val bytes = ByteString(1, 2, 3, 4, 5, 6) - val (pub, sub) = TestSource.probe[ByteString] - .via(new Chunker(2)) - .toMat(TestSink.probe[ByteString])(Keep.both) - .run() - - pub.sendNext(bytes) - sub.request(3) - sub.expectNext(ByteString(1, 2), ByteString(3, 4), ByteString(5, 6)) - } - - it should "send the leftover bytes" in { - val bytes = ByteString(1, 2, 3, 4, 5, 6, 7) - val (pub, sub) = TestSource.probe[ByteString] - .via(new Chunker(2)) - .toMat(TestSink.probe[ByteString])(Keep.both) - .run() - - pub.sendNext(bytes) - pub.sendComplete() - sub.request(4) - sub.expectNext(ByteString(1, 2), ByteString(3, 4), ByteString(5, 6), ByteString(7)) - } - - it should "resize smaller chunks into larger ones" in { - val (pub, sub) = TestSource.probe[ByteString] - .via(new Chunker(2)) - .toMat(TestSink.probe[ByteString])(Keep.both) - .run() - - pub.sendNext(ByteString(1)) - pub.sendNext(ByteString(2)) - pub.sendNext(ByteString(3)) - pub.sendNext(ByteString(4)) - pub.sendNext(ByteString(5)) - pub.sendNext(ByteString(6)) - pub.sendNext(ByteString(7)) - pub.sendComplete() - sub.request(4) - sub.expectNext(ByteString(1, 2), ByteString(3, 4), ByteString(5, 6), ByteString(7)) - } - - it should "send bytes on complete" in { - val (pub, sub) = TestSource.probe[ByteString] - .via(new Chunker(10)) - .toMat(TestSink.probe[ByteString])(Keep.both) - .run() - - pub.sendNext(ByteString(1)) - pub.sendNext(ByteString(2)) - pub.sendNext(ByteString(3)) - pub.sendNext(ByteString(4)) - pub.sendNext(ByteString(5)) - pub.sendNext(ByteString(6)) - pub.sendNext(ByteString(7)) - pub.sendComplete() - sub.request(1) - sub.expectNext(ByteString(1, 2, 3, 4, 5, 6, 7)) - sub.expectComplete() - } - - override def afterAll { - TestKit.shutdownActorSystem(system) - } - -} diff --git a/s3-stream/src/test/scala/com/bluelabs/s3stream/DiskBufferSpec.scala b/s3-stream/src/test/scala/com/bluelabs/s3stream/DiskBufferSpec.scala new file mode 100644 index 0000000..4deca26 --- /dev/null +++ b/s3-stream/src/test/scala/com/bluelabs/s3stream/DiskBufferSpec.scala @@ -0,0 +1,5 @@ +package com.bluelabs.s3stream + +class DiskBufferSpec { + +} \ No newline at end of file diff --git a/s3-stream/src/test/scala/com/bluelabs/s3stream/MemoryBufferSpec.scala b/s3-stream/src/test/scala/com/bluelabs/s3stream/MemoryBufferSpec.scala new file mode 100644 index 0000000..05e07de --- /dev/null +++ b/s3-stream/src/test/scala/com/bluelabs/s3stream/MemoryBufferSpec.scala @@ -0,0 +1,5 @@ +package com.bluelabs.s3stream + +class MemoryBufferSpec { + +} \ No newline at end of file diff --git a/s3-stream/src/test/scala/com/bluelabs/s3stream/SplitAfterSizeSpec.scala b/s3-stream/src/test/scala/com/bluelabs/s3stream/SplitAfterSizeSpec.scala new file mode 100644 index 0000000..c1dc812 --- /dev/null +++ b/s3-stream/src/test/scala/com/bluelabs/s3stream/SplitAfterSizeSpec.scala @@ -0,0 +1,5 @@ +package com.bluelabs.s3stream + +class SplitAfterSizeSpec { + +} \ No newline at end of file