diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/reactivestreams/SimpleSubscriber.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/reactivestreams/SimpleSubscriber.scala index 2944f27182..5408274822 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/reactivestreams/SimpleSubscriber.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/reactivestreams/SimpleSubscriber.scala @@ -6,7 +6,6 @@ import org.reactivestreams.{Publisher, Subscription} import sttp.capabilities.StreamMaxLengthExceededException import java.util.concurrent.LinkedBlockingQueue -import scala.collection.mutable.ListBuffer import scala.concurrent.{Future, Promise} private[netty] class SimpleSubscriber(contentLength: Option[Int]) extends PromisingSubscriber[Array[Byte], HttpContent] { @@ -14,7 +13,7 @@ private[netty] class SimpleSubscriber(contentLength: Option[Int]) extends Promis private val resultPromise = Promise[Array[Byte]]() @volatile private var totalLength = 0 private val resultBlockingQueue = new LinkedBlockingQueue[Either[Throwable, Array[Byte]]](1) - @volatile private var buffers = new ListBuffer[ByteBuf] + @volatile private var buffers = Vector[ByteBuf]() override def future: Future[Array[Byte]] = resultPromise.future def resultBlocking(): Either[Throwable, Array[Byte]] = resultBlockingQueue.take() @@ -39,7 +38,7 @@ private[netty] class SimpleSubscriber(contentLength: Option[Int]) extends Promis subscription.request(1) } } else { - buffers = buffers.append(byteBuf) + buffers = buffers :+ byteBuf totalLength += byteBuf.readableBytes() subscription.request(1) } @@ -49,7 +48,7 @@ private[netty] class SimpleSubscriber(contentLength: Option[Int]) extends Promis buffers.foreach { buf => val _ = buf.release() } - buffers.clear() + buffers = Vector.empty resultBlockingQueue.offer(Left(t)) resultPromise.failure(t) } @@ -64,7 +63,7 @@ private[netty] class SimpleSubscriber(contentLength: Option[Int]) extends Promis currentIndex += length val _ = buf.release() } - buffers.clear() + buffers = Vector.empty if (!resultBlockingQueue.offer(Right(mergedArray))) { // Result queue full, which is unexpected. resultPromise.failure(new IllegalStateException("Calling onComplete after result was already returned"))