diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index 550bc4349..bb436efbc 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -82,12 +82,6 @@ final class PartitionStreamControl private ( private[internal] def maxPollIntervalExceeded(now: NanoTime): UIO[Boolean] = queueInfoRef.get.map(_.deadlineExceeded(now)) - /** To be invoked when the partition was lost. */ - private[internal] def lost: UIO[Boolean] = { - val lostException = new RuntimeException(s"Partition ${tp.toString} was lost") with NoStackTrace - interruptionPromise.fail(lostException) - } - /** To be invoked when the stream is no longer processing. */ private[internal] def halt: UIO[Boolean] = { val timeOutMessage = s"No records were polled for more than $maxPollInterval for topic partition $tp. " + @@ -97,6 +91,17 @@ final class PartitionStreamControl private ( interruptionPromise.fail(consumeTimeout) } + /** To be invoked when the partition was lost. It clears the queue and ends the stream. */ + private[internal] def lost: UIO[Unit] = + logAnnotate { + for { + _ <- ZIO.logDebug(s"Partition ${tp.toString} lost") + taken <- dataQueue.takeAll.map(_.size) + _ <- dataQueue.offer(Take.end) + _ <- ZIO.logDebug(s"Ignored ${taken} records on lost partition").when(taken != 0) + } yield () + } + /** To be invoked when the partition was revoked or otherwise needs to be ended. */ private[internal] def end: ZIO[Any, Nothing, Unit] = logAnnotate {