Skip to content

Commit

Permalink
Remove Set instance from the Runloop
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvanoosten authored and guizmaii committed Apr 12, 2023
1 parent e84c85b commit 11e1c2a
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 66 deletions.
22 changes: 11 additions & 11 deletions zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ package zio.kafka.consumer

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
import org.apache.kafka.common.TopicPartition
import zio.{ Runtime, Task, Unsafe, ZIO }
import scala.jdk.CollectionConverters._
import zio.{ Chunk, Runtime, Task, Unsafe, ZIO }

/**
* ZIO wrapper around Kafka's `ConsumerRebalanceListener` to work with Scala collection types and ZIO effects.
*
* Note that the given ZIO effects are executed directly on the Kafka poll thread.
* Note that the given ZIO effects are executed directly on the Kafka poll thread. Fork and shift to another executor
* when this is not desired.
*/
final case class RebalanceListener(
onAssigned: (Set[TopicPartition], RebalanceConsumer) => Task[Unit],
onRevoked: (Set[TopicPartition], RebalanceConsumer) => Task[Unit],
onLost: (Set[TopicPartition], RebalanceConsumer) => Task[Unit]
onAssigned: (Chunk[TopicPartition], RebalanceConsumer) => Task[Unit],
onRevoked: (Chunk[TopicPartition], RebalanceConsumer) => Task[Unit],
onLost: (Chunk[TopicPartition], RebalanceConsumer) => Task[Unit]
) {

/**
Expand All @@ -35,7 +35,7 @@ final case class RebalanceListener(
partitions: java.util.Collection[TopicPartition]
): Unit = Unsafe.unsafe { implicit u =>
runtime.unsafe
.run(onRevoked(partitions.asScala.toSet, consumer))
.run(onRevoked(Chunk.fromJavaIterable(partitions), consumer))
.getOrThrowFiberFailure()
()
}
Expand All @@ -44,7 +44,7 @@ final case class RebalanceListener(
partitions: java.util.Collection[TopicPartition]
): Unit = Unsafe.unsafe { implicit u =>
runtime.unsafe
.run(onAssigned(partitions.asScala.toSet, consumer))
.run(onAssigned(Chunk.fromJavaIterable(partitions), consumer))
.getOrThrowFiberFailure()
()
}
Expand All @@ -53,7 +53,7 @@ final case class RebalanceListener(
partitions: java.util.Collection[TopicPartition]
): Unit = Unsafe.unsafe { implicit u =>
runtime.unsafe
.run(onLost(partitions.asScala.toSet, consumer))
.run(onLost(Chunk.fromJavaIterable(partitions), consumer))
.getOrThrowFiberFailure()
()
}
Expand All @@ -63,8 +63,8 @@ final case class RebalanceListener(

object RebalanceListener {
def apply(
onAssigned: (Set[TopicPartition], RebalanceConsumer) => Task[Unit],
onRevoked: (Set[TopicPartition], RebalanceConsumer) => Task[Unit]
onAssigned: (Chunk[TopicPartition], RebalanceConsumer) => Task[Unit],
onRevoked: (Chunk[TopicPartition], RebalanceConsumer) => Task[Unit]
): RebalanceListener =
RebalanceListener(onAssigned, onRevoked, onRevoked)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package zio.kafka.consumer.diagnostics

import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import zio.Chunk

sealed trait DiagnosticEvent
object DiagnosticEvent {

final case class Poll(
tpRequested: Set[TopicPartition],
tpWithData: Set[TopicPartition],
tpWithoutData: Set[TopicPartition]
tpRequested: Chunk[TopicPartition],
tpWithData: Chunk[TopicPartition],
tpWithoutData: Chunk[TopicPartition]
) extends DiagnosticEvent
final case class Request(partition: TopicPartition) extends DiagnosticEvent

Expand All @@ -22,9 +23,9 @@ object DiagnosticEvent {

sealed trait Rebalance extends DiagnosticEvent
object Rebalance {
final case class Revoked(partitions: Set[TopicPartition]) extends Rebalance
final case class Assigned(partitions: Set[TopicPartition]) extends Rebalance
final case class Lost(partitions: Set[TopicPartition]) extends Rebalance
final case class Revoked(partitions: Chunk[TopicPartition]) extends Rebalance
final case class Assigned(partitions: Chunk[TopicPartition]) extends Rebalance
final case class Lost(partitions: Chunk[TopicPartition]) extends Rebalance
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ private[internal] final class PartitionStreamControl private (
val tp: TopicPartition,
stream: ZStream[Any, Throwable, ByteArrayCommittableRecord],
dataQueue: Queue[Take[Throwable, ByteArrayCommittableRecord]],
startedPromise: Promise[Nothing, Unit],
endedPromise: Promise[Nothing, Unit],
completedPromise: Promise[Nothing, Unit],
interruptPromise: Promise[Throwable, Unit]
Expand Down Expand Up @@ -43,10 +44,11 @@ private[internal] final class PartitionStreamControl private (
/** Returns true when the stream accepts new data. */
def acceptsData: ZIO[Any, Nothing, Boolean] =
for {
started <- startedPromise.isDone
ended <- endedPromise.isDone
completed <- completedPromise.isDone
interrupted <- interruptPromise.isDone
} yield !(ended || completed || interrupted)
} yield started && !(ended || completed || interrupted)

/** Returns true when the stream is done. */
def isCompleted: ZIO[Any, Nothing, Boolean] =
Expand All @@ -69,6 +71,7 @@ private[internal] object PartitionStreamControl {
): ZIO[Any, Nothing, PartitionStreamControl] =
for {
_ <- ZIO.logTrace(s"Creating partition stream ${tp.toString}")
startedPromise <- Promise.make[Nothing, Unit]
endedPromise <- Promise.make[Nothing, Unit]
completedPromise <- Promise.make[Nothing, Unit]
interruptionPromise <- Promise.make[Throwable, Unit]
Expand All @@ -80,10 +83,11 @@ private[internal] object PartitionStreamControl {
taken <- dataQueue.takeBetween(1, Int.MaxValue)
} yield taken

stream = ZStream.logAnnotate(
LogAnnotation("topic", tp.topic()),
LogAnnotation("partition", tp.partition().toString)
) *>
stream = ZStream.fromZIO(startedPromise.succeed(())) *>
ZStream.logAnnotate(
LogAnnotation("topic", tp.topic()),
LogAnnotation("partition", tp.partition().toString)
) *>
ZStream.finalizer(
completedPromise.succeed(()) <*
ZIO.logDebug(s"Partition stream ${tp.toString} has ended")
Expand All @@ -94,6 +98,14 @@ private[internal] object PartitionStreamControl {
dataQueue.takeAll.flatMap(data => if (data.isEmpty) requestAndAwaitData else ZIO.succeed(data))
}.flattenTake
.interruptWhen(interruptionPromise)
} yield new PartitionStreamControl(tp, stream, dataQueue, endedPromise, completedPromise, interruptionPromise)
} yield new PartitionStreamControl(
tp,
stream,
dataQueue,
startedPromise,
endedPromise,
completedPromise,
interruptionPromise
)

}
87 changes: 44 additions & 43 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,49 +21,46 @@ import scala.jdk.CollectionConverters._
*
* ## Stream management
*
* - When a partition gets assigned manually or by the broker, a new stream is started.
* - When a partition is revoked by the broker, the stream is ended.
* - When a partition is reported as lost, the stream is interrupted.
* - When a partition gets assigned manually or by the broker, a new stream is started.
* - When a partition is revoked by the broker, the stream is ended.
* - When a partition is reported as lost, the stream is interrupted.
*
* ## Fetching data
*
* - Streams that needs data request this via a [[Request]] command to the command-queue.
* - Partitions for which no data is needed are paused. This backpressure prevents unnecessary
* buffering of data.
* - Streams that needs data request this via a [[Request]] command to the command-queue.
* - Partitions for which no data is needed are paused. This backpressure prevents unnecessary buffering of data.
*
* ## Poll-loop
*
* The poll-loop continuously polls the broker for new data. Since polling is also needed for learning about
* partition assignment changes, or for completing commits, polling also continuous when no partitions are
* assigned, or when there are pending commits.
* The poll-loop continuously polls the broker for new data. Since polling is also needed for learning about partition
* assignment changes, or for completing commits, polling also continuous when no partitions are assigned, or when there
* are pending commits.
*
* When all streams stop processing, polling stops so that the broker can detect that this Kafka client is
* stalled.
* When all streams stop processing, polling stops so that the broker can detect that this Kafka client is stalled.
*
* ## Rebalance listener
*
* The rebalance listener runs during a poll to the broker. It is used to track changes to partition assignments.
* Partitions can be assigned, revoked or lost.
*
* When a partition is revoked, the stream that handles it will be ended (signal the stream that no more data will
* be available). Processing however, might continue.
* When a partition is revoked, the stream that handles it will be ended (signal the stream that no more data will be
* available). Processing however, might continue.
*
* ### Rebalance listener - Commit-loop
*
* When `endRevokedStreamsBeforeRebalance` is `true` (the default), we wait for the stream to complete running inside
* the rebalance listener. This gives the stream a chance to commit offsets before its partition is given to
* another consumer.
* the rebalance listener. This gives the stream a chance to commit offsets before its partition is given to another
* consumer.
*
* While the rebalance listener is waiting for streams to complete, we need to continue sending commits. In addition
* we need to continue polling the broker so that we hear of completing
* commits. For both we use commitAsync (in the second case with an empty map of offsets). This forms the commit-loop.
* While the rebalance listener is waiting for streams to complete, we need to continue sending commits. In addition we
* need to continue polling the broker so that we hear of completing commits. For both we use commitAsync (in the second
* case with an empty map of offsets). This forms the commit-loop.
*
* The commit-loop ends when the streams completed or a time out occurs.
*
* ## The command-queue and the commit-queue
*
* TODO: document more here ...
*
*/
// Disable zio-intellij's inspection `SimplifyWhenInspection` because its suggestion is not
// equivalent performance-wise.
Expand All @@ -82,7 +79,7 @@ private[consumer] final class Runloop private (
offsetRetrieval: OffsetRetrieval,
userRebalanceListener: RebalanceListener,
restartStreamsOnRebalancing: Boolean,
endRevokedStreamsBeforeRebalance: Boolean, // TODO: rename to something like 'completeRevokedStreamsDuringRebalance'
endRevokedStreamsBeforeRebalance: Boolean, // TODO: rename to something like 'completeRevokedStreamsDuringRebalance'
currentState: Ref[State]
) {

Expand Down Expand Up @@ -273,13 +270,13 @@ private[consumer] final class Runloop private (
private def offerRecordsToStreams(
partitionStreams: Chunk[PartitionStreamControl],
pendingRequests: Chunk[Request],
ignoreRecordsForTps: Set[TopicPartition],
ignoreRecordsForTps: Chunk[TopicPartition],
polledRecords: ConsumerRecords[Array[Byte], Array[Byte]]
): UIO[Runloop.FulfillResult] = {
// The most efficient way to get the records from [[ConsumerRecords]] per
// topic-partition, is by first getting the set of topic-partitions, and
// then requesting the records per topic-partition.
val tps = polledRecords.partitions().asScala.toSet -- ignoreRecordsForTps
val tps = Chunk.fromJavaIterable(polledRecords.partitions()) diff ignoreRecordsForTps
val fulfillResult = Runloop.FulfillResult(pendingRequests = pendingRequests.filter(req => !tps.contains(req.tp)))
val streams =
if (tps.isEmpty) Chunk.empty else partitionStreams.filter(streamControl => tps.contains(streamControl.tp))
Expand Down Expand Up @@ -316,26 +313,29 @@ private[consumer] final class Runloop private (
if (hasGroupId) consumer.withConsumer(_.groupMetadata()).fold(_ => None, Some(_))
else ZIO.none

private def doSeekForNewPartitions(c: ByteArrayKafkaConsumer, tps: Set[TopicPartition]): Task[Set[TopicPartition]] =
private def doSeekForNewPartitions(
c: ByteArrayKafkaConsumer,
tps: Chunk[TopicPartition]
): Task[Chunk[TopicPartition]] =
offsetRetrieval match {
case OffsetRetrieval.Manual(getOffsets) =>
getOffsets(tps)
getOffsets(tps.toSet)
.tap(offsets => ZIO.foreachDiscard(offsets) { case (tp, offset) => ZIO.attempt(c.seek(tp, offset)) })
.when(tps.nonEmpty)
.as(tps)

case OffsetRetrieval.Auto(_) =>
ZIO.succeed(Set.empty)
ZIO.succeed(Chunk.empty)
}

// Pause partitions for which there is no demand and resume those for which there is now demand
private def resumeAndPausePartitions(
c: ByteArrayKafkaConsumer,
assignment: Set[TopicPartition],
requestedPartitions: Set[TopicPartition]
assignment: Chunk[TopicPartition],
requestedPartitions: Chunk[TopicPartition]
): Unit = {
val toResume = assignment intersect requestedPartitions
val toPause = assignment -- requestedPartitions
val toPause = assignment diff requestedPartitions

if (toResume.nonEmpty) c.resume(toResume.asJava)
if (toPause.nonEmpty) c.pause(toPause.asJava)
Expand All @@ -357,8 +357,8 @@ private[consumer] final class Runloop private (
_ <- rebalanceListenerEvent.set(RebalanceEvent.None)
pollResult <-
consumer.withConsumerZIO { c =>
val prevAssigned = c.assignment().asScala.toSet
val requestedPartitions = state.pendingRequests.map(_.tp).toSet
val prevAssigned = Chunk.fromJavaIterable(c.assignment())
val requestedPartitions = state.pendingRequests.map(_.tp)

resumeAndPausePartitions(c, prevAssigned, requestedPartitions)

Expand All @@ -379,18 +379,18 @@ private[consumer] final class Runloop private (
// either because they are restarting, or because they
// are new.
val startingTps =
if (restartStreamsOnRebalancing) c.assignment().asScala.toSet
if (restartStreamsOnRebalancing) Chunk.fromJavaIterable(c.assignment())
else newlyAssigned

for {
ignoreRecordsForTps <- doSeekForNewPartitions(c, newlyAssigned)

_ <- diagnostics.emitIfEnabled {
val providedTps = records.partitions().asScala.toSet
val providedTps = Chunk.fromJavaIterable(records.partitions())
DiagnosticEvent.Poll(
tpRequested = requestedPartitions,
tpWithData = providedTps,
tpWithoutData = requestedPartitions -- providedTps
tpWithoutData = requestedPartitions diff providedTps
)
}

Expand All @@ -411,7 +411,7 @@ private[consumer] final class Runloop private (
runningStreams <- ZIO.filter(state.assignedStreams)(_.acceptsData)
updatedStreams = runningStreams ++ startingStreams
updatedPendingRequests = {
val streamTps = updatedStreams.map(_.tp).toSet
val streamTps = updatedStreams.map(_.tp)
state.pendingRequests.filter(req => streamTps.contains(req.tp))
}
fulfillResult <- offerRecordsToStreams(
Expand Down Expand Up @@ -566,17 +566,17 @@ private[consumer] object Runloop {

private final case class PollResult(
newCommits: Chunk[Commit],
startingTps: Set[TopicPartition],
startingTps: Chunk[TopicPartition],
records: ConsumerRecords[Array[Byte], Array[Byte]],
ignoreRecordsForTps: Set[TopicPartition]
ignoreRecordsForTps: Chunk[TopicPartition]
)
private object PollResult {
def apply(records: ConsumerRecords[Array[Byte], Array[Byte]]): PollResult =
PollResult(
newCommits = Chunk.empty,
startingTps = Set.empty,
startingTps = Chunk.empty,
records = records,
ignoreRecordsForTps = Set.empty
ignoreRecordsForTps = Chunk.empty
)
}

Expand All @@ -586,10 +586,10 @@ private[consumer] object Runloop {

private final case class RebalanceEvent(
wasInvoked: Boolean,
newlyAssigned: Set[TopicPartition],
newlyAssigned: Chunk[TopicPartition],
pendingCommits: Chunk[Commit]
) {
def onAssigned(assigned: Set[TopicPartition], commits: Chunk[Commit]): RebalanceEvent =
def onAssigned(assigned: Chunk[TopicPartition], commits: Chunk[Commit]): RebalanceEvent =
RebalanceEvent(
wasInvoked = true,
newlyAssigned = newlyAssigned ++ assigned,
Expand All @@ -603,11 +603,12 @@ private[consumer] object Runloop {
}

private object RebalanceEvent {
val None: RebalanceEvent = RebalanceEvent(wasInvoked = false, Set.empty, Chunk.empty)
val None: RebalanceEvent = RebalanceEvent(wasInvoked = false, Chunk.empty, Chunk.empty)
}

sealed trait Command
object Command {

/** Used for internal control of the runloop. */
sealed trait Control extends Command

Expand All @@ -617,8 +618,8 @@ private[consumer] object Runloop {
/** Used as a signal to the poll-loop that commits are available in the commit-queue. */
case object CommitAvailable extends Control

case object StopRunloop extends Control
case object StopAllStreams extends Control
case object StopRunloop extends Control
case object StopAllStreams extends Control

/** Used by a stream to request more records. */
final case class Request(tp: TopicPartition) extends Command
Expand Down

0 comments on commit 11e1c2a

Please sign in to comment.