From 11e1c2adffd28276933e4160092506207d0ed99f Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Wed, 12 Apr 2023 14:19:09 +0200 Subject: [PATCH] Remove `Set` instance from the Runloop --- .../kafka/consumer/RebalanceListener.scala | 22 ++--- .../diagnostics/DiagnosticEvent.scala | 13 +-- .../internal/PartitionStreamControl.scala | 24 +++-- .../zio/kafka/consumer/internal/Runloop.scala | 87 ++++++++++--------- 4 files changed, 80 insertions(+), 66 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceListener.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceListener.scala index 4196aa19d6..d272ee77b7 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceListener.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceListener.scala @@ -2,18 +2,18 @@ package zio.kafka.consumer import org.apache.kafka.clients.consumer.ConsumerRebalanceListener import org.apache.kafka.common.TopicPartition -import zio.{ Runtime, Task, Unsafe, ZIO } -import scala.jdk.CollectionConverters._ +import zio.{ Chunk, Runtime, Task, Unsafe, ZIO } /** * ZIO wrapper around Kafka's `ConsumerRebalanceListener` to work with Scala collection types and ZIO effects. * - * Note that the given ZIO effects are executed directly on the Kafka poll thread. + * Note that the given ZIO effects are executed directly on the Kafka poll thread. Fork and shift to another executor + * when this is not desired. */ final case class RebalanceListener( - onAssigned: (Set[TopicPartition], RebalanceConsumer) => Task[Unit], - onRevoked: (Set[TopicPartition], RebalanceConsumer) => Task[Unit], - onLost: (Set[TopicPartition], RebalanceConsumer) => Task[Unit] + onAssigned: (Chunk[TopicPartition], RebalanceConsumer) => Task[Unit], + onRevoked: (Chunk[TopicPartition], RebalanceConsumer) => Task[Unit], + onLost: (Chunk[TopicPartition], RebalanceConsumer) => Task[Unit] ) { /** @@ -35,7 +35,7 @@ final case class RebalanceListener( partitions: java.util.Collection[TopicPartition] ): Unit = Unsafe.unsafe { implicit u => runtime.unsafe - .run(onRevoked(partitions.asScala.toSet, consumer)) + .run(onRevoked(Chunk.fromJavaIterable(partitions), consumer)) .getOrThrowFiberFailure() () } @@ -44,7 +44,7 @@ final case class RebalanceListener( partitions: java.util.Collection[TopicPartition] ): Unit = Unsafe.unsafe { implicit u => runtime.unsafe - .run(onAssigned(partitions.asScala.toSet, consumer)) + .run(onAssigned(Chunk.fromJavaIterable(partitions), consumer)) .getOrThrowFiberFailure() () } @@ -53,7 +53,7 @@ final case class RebalanceListener( partitions: java.util.Collection[TopicPartition] ): Unit = Unsafe.unsafe { implicit u => runtime.unsafe - .run(onLost(partitions.asScala.toSet, consumer)) + .run(onLost(Chunk.fromJavaIterable(partitions), consumer)) .getOrThrowFiberFailure() () } @@ -63,8 +63,8 @@ final case class RebalanceListener( object RebalanceListener { def apply( - onAssigned: (Set[TopicPartition], RebalanceConsumer) => Task[Unit], - onRevoked: (Set[TopicPartition], RebalanceConsumer) => Task[Unit] + onAssigned: (Chunk[TopicPartition], RebalanceConsumer) => Task[Unit], + onRevoked: (Chunk[TopicPartition], RebalanceConsumer) => Task[Unit] ): RebalanceListener = RebalanceListener(onAssigned, onRevoked, onRevoked) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala index 2a2da75e07..4f3fc791eb 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala @@ -2,14 +2,15 @@ package zio.kafka.consumer.diagnostics import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.TopicPartition +import zio.Chunk sealed trait DiagnosticEvent object DiagnosticEvent { final case class Poll( - tpRequested: Set[TopicPartition], - tpWithData: Set[TopicPartition], - tpWithoutData: Set[TopicPartition] + tpRequested: Chunk[TopicPartition], + tpWithData: Chunk[TopicPartition], + tpWithoutData: Chunk[TopicPartition] ) extends DiagnosticEvent final case class Request(partition: TopicPartition) extends DiagnosticEvent @@ -22,9 +23,9 @@ object DiagnosticEvent { sealed trait Rebalance extends DiagnosticEvent object Rebalance { - final case class Revoked(partitions: Set[TopicPartition]) extends Rebalance - final case class Assigned(partitions: Set[TopicPartition]) extends Rebalance - final case class Lost(partitions: Set[TopicPartition]) extends Rebalance + final case class Revoked(partitions: Chunk[TopicPartition]) extends Rebalance + final case class Assigned(partitions: Chunk[TopicPartition]) extends Rebalance + final case class Lost(partitions: Chunk[TopicPartition]) extends Rebalance } } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index 2a207d948b..6839ed9625 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -11,6 +11,7 @@ private[internal] final class PartitionStreamControl private ( val tp: TopicPartition, stream: ZStream[Any, Throwable, ByteArrayCommittableRecord], dataQueue: Queue[Take[Throwable, ByteArrayCommittableRecord]], + startedPromise: Promise[Nothing, Unit], endedPromise: Promise[Nothing, Unit], completedPromise: Promise[Nothing, Unit], interruptPromise: Promise[Throwable, Unit] @@ -43,10 +44,11 @@ private[internal] final class PartitionStreamControl private ( /** Returns true when the stream accepts new data. */ def acceptsData: ZIO[Any, Nothing, Boolean] = for { + started <- startedPromise.isDone ended <- endedPromise.isDone completed <- completedPromise.isDone interrupted <- interruptPromise.isDone - } yield !(ended || completed || interrupted) + } yield started && !(ended || completed || interrupted) /** Returns true when the stream is done. */ def isCompleted: ZIO[Any, Nothing, Boolean] = @@ -69,6 +71,7 @@ private[internal] object PartitionStreamControl { ): ZIO[Any, Nothing, PartitionStreamControl] = for { _ <- ZIO.logTrace(s"Creating partition stream ${tp.toString}") + startedPromise <- Promise.make[Nothing, Unit] endedPromise <- Promise.make[Nothing, Unit] completedPromise <- Promise.make[Nothing, Unit] interruptionPromise <- Promise.make[Throwable, Unit] @@ -80,10 +83,11 @@ private[internal] object PartitionStreamControl { taken <- dataQueue.takeBetween(1, Int.MaxValue) } yield taken - stream = ZStream.logAnnotate( - LogAnnotation("topic", tp.topic()), - LogAnnotation("partition", tp.partition().toString) - ) *> + stream = ZStream.fromZIO(startedPromise.succeed(())) *> + ZStream.logAnnotate( + LogAnnotation("topic", tp.topic()), + LogAnnotation("partition", tp.partition().toString) + ) *> ZStream.finalizer( completedPromise.succeed(()) <* ZIO.logDebug(s"Partition stream ${tp.toString} has ended") @@ -94,6 +98,14 @@ private[internal] object PartitionStreamControl { dataQueue.takeAll.flatMap(data => if (data.isEmpty) requestAndAwaitData else ZIO.succeed(data)) }.flattenTake .interruptWhen(interruptionPromise) - } yield new PartitionStreamControl(tp, stream, dataQueue, endedPromise, completedPromise, interruptionPromise) + } yield new PartitionStreamControl( + tp, + stream, + dataQueue, + startedPromise, + endedPromise, + completedPromise, + interruptionPromise + ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 2410356332..3d55f38fe9 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -21,49 +21,46 @@ import scala.jdk.CollectionConverters._ * * ## Stream management * - * - When a partition gets assigned manually or by the broker, a new stream is started. - * - When a partition is revoked by the broker, the stream is ended. - * - When a partition is reported as lost, the stream is interrupted. + * - When a partition gets assigned manually or by the broker, a new stream is started. + * - When a partition is revoked by the broker, the stream is ended. + * - When a partition is reported as lost, the stream is interrupted. * * ## Fetching data * - * - Streams that needs data request this via a [[Request]] command to the command-queue. - * - Partitions for which no data is needed are paused. This backpressure prevents unnecessary - * buffering of data. + * - Streams that needs data request this via a [[Request]] command to the command-queue. + * - Partitions for which no data is needed are paused. This backpressure prevents unnecessary buffering of data. * * ## Poll-loop * - * The poll-loop continuously polls the broker for new data. Since polling is also needed for learning about - * partition assignment changes, or for completing commits, polling also continuous when no partitions are - * assigned, or when there are pending commits. + * The poll-loop continuously polls the broker for new data. Since polling is also needed for learning about partition + * assignment changes, or for completing commits, polling also continuous when no partitions are assigned, or when there + * are pending commits. * - * When all streams stop processing, polling stops so that the broker can detect that this Kafka client is - * stalled. + * When all streams stop processing, polling stops so that the broker can detect that this Kafka client is stalled. * * ## Rebalance listener * * The rebalance listener runs during a poll to the broker. It is used to track changes to partition assignments. * Partitions can be assigned, revoked or lost. * - * When a partition is revoked, the stream that handles it will be ended (signal the stream that no more data will - * be available). Processing however, might continue. + * When a partition is revoked, the stream that handles it will be ended (signal the stream that no more data will be + * available). Processing however, might continue. * * ### Rebalance listener - Commit-loop * * When `endRevokedStreamsBeforeRebalance` is `true` (the default), we wait for the stream to complete running inside - * the rebalance listener. This gives the stream a chance to commit offsets before its partition is given to - * another consumer. + * the rebalance listener. This gives the stream a chance to commit offsets before its partition is given to another + * consumer. * - * While the rebalance listener is waiting for streams to complete, we need to continue sending commits. In addition - * we need to continue polling the broker so that we hear of completing - * commits. For both we use commitAsync (in the second case with an empty map of offsets). This forms the commit-loop. + * While the rebalance listener is waiting for streams to complete, we need to continue sending commits. In addition we + * need to continue polling the broker so that we hear of completing commits. For both we use commitAsync (in the second + * case with an empty map of offsets). This forms the commit-loop. * * The commit-loop ends when the streams completed or a time out occurs. * * ## The command-queue and the commit-queue * * TODO: document more here ... - * */ // Disable zio-intellij's inspection `SimplifyWhenInspection` because its suggestion is not // equivalent performance-wise. @@ -82,7 +79,7 @@ private[consumer] final class Runloop private ( offsetRetrieval: OffsetRetrieval, userRebalanceListener: RebalanceListener, restartStreamsOnRebalancing: Boolean, - endRevokedStreamsBeforeRebalance: Boolean, // TODO: rename to something like 'completeRevokedStreamsDuringRebalance' + endRevokedStreamsBeforeRebalance: Boolean, // TODO: rename to something like 'completeRevokedStreamsDuringRebalance' currentState: Ref[State] ) { @@ -273,13 +270,13 @@ private[consumer] final class Runloop private ( private def offerRecordsToStreams( partitionStreams: Chunk[PartitionStreamControl], pendingRequests: Chunk[Request], - ignoreRecordsForTps: Set[TopicPartition], + ignoreRecordsForTps: Chunk[TopicPartition], polledRecords: ConsumerRecords[Array[Byte], Array[Byte]] ): UIO[Runloop.FulfillResult] = { // The most efficient way to get the records from [[ConsumerRecords]] per // topic-partition, is by first getting the set of topic-partitions, and // then requesting the records per topic-partition. - val tps = polledRecords.partitions().asScala.toSet -- ignoreRecordsForTps + val tps = Chunk.fromJavaIterable(polledRecords.partitions()) diff ignoreRecordsForTps val fulfillResult = Runloop.FulfillResult(pendingRequests = pendingRequests.filter(req => !tps.contains(req.tp))) val streams = if (tps.isEmpty) Chunk.empty else partitionStreams.filter(streamControl => tps.contains(streamControl.tp)) @@ -316,26 +313,29 @@ private[consumer] final class Runloop private ( if (hasGroupId) consumer.withConsumer(_.groupMetadata()).fold(_ => None, Some(_)) else ZIO.none - private def doSeekForNewPartitions(c: ByteArrayKafkaConsumer, tps: Set[TopicPartition]): Task[Set[TopicPartition]] = + private def doSeekForNewPartitions( + c: ByteArrayKafkaConsumer, + tps: Chunk[TopicPartition] + ): Task[Chunk[TopicPartition]] = offsetRetrieval match { case OffsetRetrieval.Manual(getOffsets) => - getOffsets(tps) + getOffsets(tps.toSet) .tap(offsets => ZIO.foreachDiscard(offsets) { case (tp, offset) => ZIO.attempt(c.seek(tp, offset)) }) .when(tps.nonEmpty) .as(tps) case OffsetRetrieval.Auto(_) => - ZIO.succeed(Set.empty) + ZIO.succeed(Chunk.empty) } // Pause partitions for which there is no demand and resume those for which there is now demand private def resumeAndPausePartitions( c: ByteArrayKafkaConsumer, - assignment: Set[TopicPartition], - requestedPartitions: Set[TopicPartition] + assignment: Chunk[TopicPartition], + requestedPartitions: Chunk[TopicPartition] ): Unit = { val toResume = assignment intersect requestedPartitions - val toPause = assignment -- requestedPartitions + val toPause = assignment diff requestedPartitions if (toResume.nonEmpty) c.resume(toResume.asJava) if (toPause.nonEmpty) c.pause(toPause.asJava) @@ -357,8 +357,8 @@ private[consumer] final class Runloop private ( _ <- rebalanceListenerEvent.set(RebalanceEvent.None) pollResult <- consumer.withConsumerZIO { c => - val prevAssigned = c.assignment().asScala.toSet - val requestedPartitions = state.pendingRequests.map(_.tp).toSet + val prevAssigned = Chunk.fromJavaIterable(c.assignment()) + val requestedPartitions = state.pendingRequests.map(_.tp) resumeAndPausePartitions(c, prevAssigned, requestedPartitions) @@ -379,18 +379,18 @@ private[consumer] final class Runloop private ( // either because they are restarting, or because they // are new. val startingTps = - if (restartStreamsOnRebalancing) c.assignment().asScala.toSet + if (restartStreamsOnRebalancing) Chunk.fromJavaIterable(c.assignment()) else newlyAssigned for { ignoreRecordsForTps <- doSeekForNewPartitions(c, newlyAssigned) _ <- diagnostics.emitIfEnabled { - val providedTps = records.partitions().asScala.toSet + val providedTps = Chunk.fromJavaIterable(records.partitions()) DiagnosticEvent.Poll( tpRequested = requestedPartitions, tpWithData = providedTps, - tpWithoutData = requestedPartitions -- providedTps + tpWithoutData = requestedPartitions diff providedTps ) } @@ -411,7 +411,7 @@ private[consumer] final class Runloop private ( runningStreams <- ZIO.filter(state.assignedStreams)(_.acceptsData) updatedStreams = runningStreams ++ startingStreams updatedPendingRequests = { - val streamTps = updatedStreams.map(_.tp).toSet + val streamTps = updatedStreams.map(_.tp) state.pendingRequests.filter(req => streamTps.contains(req.tp)) } fulfillResult <- offerRecordsToStreams( @@ -566,17 +566,17 @@ private[consumer] object Runloop { private final case class PollResult( newCommits: Chunk[Commit], - startingTps: Set[TopicPartition], + startingTps: Chunk[TopicPartition], records: ConsumerRecords[Array[Byte], Array[Byte]], - ignoreRecordsForTps: Set[TopicPartition] + ignoreRecordsForTps: Chunk[TopicPartition] ) private object PollResult { def apply(records: ConsumerRecords[Array[Byte], Array[Byte]]): PollResult = PollResult( newCommits = Chunk.empty, - startingTps = Set.empty, + startingTps = Chunk.empty, records = records, - ignoreRecordsForTps = Set.empty + ignoreRecordsForTps = Chunk.empty ) } @@ -586,10 +586,10 @@ private[consumer] object Runloop { private final case class RebalanceEvent( wasInvoked: Boolean, - newlyAssigned: Set[TopicPartition], + newlyAssigned: Chunk[TopicPartition], pendingCommits: Chunk[Commit] ) { - def onAssigned(assigned: Set[TopicPartition], commits: Chunk[Commit]): RebalanceEvent = + def onAssigned(assigned: Chunk[TopicPartition], commits: Chunk[Commit]): RebalanceEvent = RebalanceEvent( wasInvoked = true, newlyAssigned = newlyAssigned ++ assigned, @@ -603,11 +603,12 @@ private[consumer] object Runloop { } private object RebalanceEvent { - val None: RebalanceEvent = RebalanceEvent(wasInvoked = false, Set.empty, Chunk.empty) + val None: RebalanceEvent = RebalanceEvent(wasInvoked = false, Chunk.empty, Chunk.empty) } sealed trait Command object Command { + /** Used for internal control of the runloop. */ sealed trait Control extends Command @@ -617,8 +618,8 @@ private[consumer] object Runloop { /** Used as a signal to the poll-loop that commits are available in the commit-queue. */ case object CommitAvailable extends Control - case object StopRunloop extends Control - case object StopAllStreams extends Control + case object StopRunloop extends Control + case object StopAllStreams extends Control /** Used by a stream to request more records. */ final case class Request(tp: TopicPartition) extends Command