Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More logging around rebalancing when rebalanceSafeCommits is true #1360

Merged
merged 16 commits into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
clientId = clientId,
groupId = Some(groupId),
`max.poll.records` = 1,
rebalanceSafeCommits = rebalanceSafeCommits
rebalanceSafeCommits = rebalanceSafeCommits,
maxRebalanceDuration = 60.seconds
)
consumer <- Consumer.make(settings)
} yield consumer
Expand Down
126 changes: 95 additions & 31 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ private[consumer] final class Runloop private (
): Task[Unit] = {
val deadline = java.lang.System.nanoTime() + maxRebalanceDuration.toNanos - commitTimeoutNanos

def timeToDeadlineMillis(): Long = (deadline - java.lang.System.nanoTime()) / 1000000L

val endingTps = streamsToEnd.map(_.tp).toSet

def commitsOfEndingStreams(commits: Chunk[Runloop.Commit]): Chunk[Runloop.Commit] =
Expand All @@ -125,36 +127,92 @@ private[consumer] final class Runloop private (
ZIO.attempt(consumer.commitAsync(java.util.Collections.emptyMap(), null)).orDie
}

def endingStreamsCompletedAndCommitsExist(newCommits: Chunk[Commit]): Task[Boolean] =
sealed trait EndOffsetCommitStatus
case object EndOffsetNotCommitted extends EndOffsetCommitStatus { override def toString = "not committed" }
case object EndOffsetCommitPending extends EndOffsetCommitStatus { override def toString = "commit pending" }
case object EndOffsetCommitted extends EndOffsetCommitStatus { override def toString = "committed" }

final case class StreamCompletionStatus(
tp: TopicPartition,
streamEnded: Boolean,
lastPulledOffset: Option[Long],
endOffsetCommitStatus: EndOffsetCommitStatus
) {
override def toString: String =
s"${tp}: " +
s"${if (streamEnded) "stream ended" else "stream is running"}, " +
s"last pulled offset=${lastPulledOffset.getOrElse("none")}, " +
endOffsetCommitStatus
}

def completionStatusesAsString(completionStatuses: Chunk[StreamCompletionStatus]): String =
"Revoked partitions: " + completionStatuses.map(_.toString).mkString("; ")

def getStreamCompletionStatuses(newCommits: Chunk[Commit]): UIO[Chunk[StreamCompletionStatus]] =
for {
committedOffsets <- committedOffsetsRef.get
allPendingCommitOffsets =
(previousPendingCommits ++ commitsOfEndingStreams(newCommits)).flatMap(_.offsets).map {
case (tp, offsetAndMetadata) => (tp, offsetAndMetadata.offset())
}
streamResults <-
ZIO.foreach(streamsToEnd) { stream =>
for {
isDone <- stream.completedPromise.isDone
lastPulledOffset <- stream.lastPulledOffset
endOffset <- if (isDone) stream.completedPromise.await else ZIO.none
} yield (isDone || lastPulledOffset.isEmpty, endOffset)
}
committedOffsets <- committedOffsetsRef.get
} yield {
val allStreamsCompleted = streamResults.forall(_._1)
allStreamsCompleted && {
val endOffsets: Chunk[Offset] = streamResults.flatMap(_._2)
val allPendingCommits = previousPendingCommits ++ commitsOfEndingStreams(newCommits)
endOffsets.forall { endOffset =>
val tp = endOffset.topicPartition
val offset = endOffset.offset
def endOffsetWasCommitted = committedOffsets.contains(tp, offset)
def endOffsetCommitIsPending = allPendingCommits.exists { pendingCommit =>
pendingCommit.offsets.get(tp).exists { pendingOffset =>
pendingOffset.offset() >= offset
}
}
endOffsetWasCommitted || endOffsetCommitIsPending

endOffsetCommitStatus =
endOffset match {
case Some(endOffset) if committedOffsets.contains(stream.tp, endOffset.offset) =>
EndOffsetCommitted
case Some(endOffset) if allPendingCommitOffsets.contains((stream.tp, endOffset.offset)) =>
EndOffsetCommitPending
case _ => EndOffsetNotCommitted
}
} yield StreamCompletionStatus(stream.tp, isDone, lastPulledOffset.map(_.offset), endOffsetCommitStatus)
}
}
} yield streamResults

@inline
def logStreamCompletionStatuses(completionStatuses: Chunk[StreamCompletionStatus]): UIO[Unit] = {
val statusStrings = completionStatusesAsString(completionStatuses)
ZIO.logInfo(
s"Delaying rebalance until ${streamsToEnd.size} streams (of revoked partitions) have committed " +
s"the offsets of the records they consumed. Deadline in ${timeToDeadlineMillis()}ms. $statusStrings"
)
}

def logInitialStreamCompletionStatuses: UIO[Unit] =
for {
completionStatuses <- getStreamCompletionStatuses(newCommits = Chunk.empty)
_ <- logStreamCompletionStatuses(completionStatuses)
} yield ()

def endingStreamsCompletedAndCommitsExist(newCommits: Chunk[Commit]): UIO[Boolean] =
for {
completionStatuses <- getStreamCompletionStatuses(newCommits)
_ <- logStreamCompletionStatuses(completionStatuses)
} yield completionStatuses.forall { status =>
// A stream is complete when it never got any records, or when it committed the offset of the last consumed record
status.lastPulledOffset.isEmpty || (status.streamEnded && status.endOffsetCommitStatus != EndOffsetNotCommitted)
}

def logFinalStreamCompletionStatuses(completed: Boolean, newCommits: Chunk[Commit]): UIO[Unit] =
if (completed)
ZIO.logInfo("Continuing rebalance, all offsets of consumed records in the revoked partitions were committed.")
else
for {
completionStatuses <- getStreamCompletionStatuses(newCommits)
statusStrings = completionStatusesAsString(completionStatuses)
_ <-
ZIO.logWarning(
s"Exceeded deadline waiting for streams (of revoked partitions) to commit the offsets of " +
s"the records they consumed; the rebalance will continue. " +
s"This might cause another consumer to process some records again. $statusStrings"
)
} yield ()

def commitSync: Task[Unit] =
ZIO.attempt(consumer.commitSync(java.util.Collections.emptyMap(), commitTimeout))

Expand All @@ -174,17 +232,23 @@ private[consumer] final class Runloop private (
//
// Note, we cannot use ZStream.fromQueue because that will emit nothing when the queue is empty.
// Instead, we poll the queue in a loop.
ZIO.logDebug(s"Waiting for ${streamsToEnd.size} streams to end") *>
ZStream
.fromZIO(blockingSleep(commitQueuePollInterval) *> commitQueue.takeAll)
.tap(commitAsync)
.forever
.takeWhile(_ => java.lang.System.nanoTime() <= deadline)
.scan(Chunk.empty[Runloop.Commit])(_ ++ _)
.takeUntilZIO(endingStreamsCompletedAndCommitsExist)
.runDrain *>
commitSync *>
ZIO.logDebug(s"Done waiting for ${streamsToEnd.size} streams to end")
for {
_ <- logInitialStreamCompletionStatuses
completedAndCommits <-
ZStream
.fromZIO(blockingSleep(commitQueuePollInterval) *> commitQueue.takeAll)
.tap(commitAsync)
.forever
.takeWhile(_ => java.lang.System.nanoTime() <= deadline)
.scan(Chunk.empty[Runloop.Commit])(_ ++ _)
.mapZIO(commits => endingStreamsCompletedAndCommitsExist(commits).map((_, commits)))
.takeUntil { case (completed, _) => completed }
.runLast
.map(_.getOrElse((false, Chunk.empty)))
_ <- logFinalStreamCompletionStatuses(completedAndCommits._1, completedAndCommits._2)
_ <- commitSync
_ <- ZIO.logDebug(s"Done waiting for ${streamsToEnd.size} streams to end")
} yield ()
}

// During a poll, the java kafka client might call each method of the rebalance listener 0 or 1 times.
Expand Down
Loading