Skip to content

Commit

Permalink
Use a Vector
Browse files Browse the repository at this point in the history
  • Loading branch information
kciesielski committed Mar 11, 2024
1 parent 57484a9 commit be27ae9
Showing 1 changed file with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import org.reactivestreams.{Publisher, Subscription}
import sttp.capabilities.StreamMaxLengthExceededException

import java.util.concurrent.LinkedBlockingQueue
import scala.collection.mutable.ListBuffer
import scala.concurrent.{Future, Promise}

private[netty] class SimpleSubscriber(contentLength: Option[Int]) extends PromisingSubscriber[Array[Byte], HttpContent] {
private var subscription: Subscription = _
private val resultPromise = Promise[Array[Byte]]()
@volatile private var totalLength = 0
private val resultBlockingQueue = new LinkedBlockingQueue[Either[Throwable, Array[Byte]]](1)
@volatile private var buffers = new ListBuffer[ByteBuf]
@volatile private var buffers = Vector[ByteBuf]()

override def future: Future[Array[Byte]] = resultPromise.future
def resultBlocking(): Either[Throwable, Array[Byte]] = resultBlockingQueue.take()
Expand All @@ -39,7 +38,7 @@ private[netty] class SimpleSubscriber(contentLength: Option[Int]) extends Promis
subscription.request(1)
}
} else {
buffers = buffers.append(byteBuf)
buffers = buffers :+ byteBuf
totalLength += byteBuf.readableBytes()
subscription.request(1)
}
Expand All @@ -49,7 +48,7 @@ private[netty] class SimpleSubscriber(contentLength: Option[Int]) extends Promis
buffers.foreach { buf =>
val _ = buf.release()
}
buffers.clear()
buffers = Vector.empty
resultBlockingQueue.offer(Left(t))
resultPromise.failure(t)
}
Expand All @@ -64,7 +63,7 @@ private[netty] class SimpleSubscriber(contentLength: Option[Int]) extends Promis
currentIndex += length
val _ = buf.release()
}
buffers.clear()
buffers = Vector.empty
if (!resultBlockingQueue.offer(Right(mergedArray))) {
// Result queue full, which is unexpected.
resultPromise.failure(new IllegalStateException("Calling onComplete after result was already returned"))
Expand Down

0 comments on commit be27ae9

Please sign in to comment.