Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More precise batch publish errors #1321

Merged
merged 7 commits into from
Sep 11, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 65 additions & 31 deletions zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import zio.kafka.serde.Serializer
import zio.kafka.utils.SslHelper
import zio.stream.{ ZPipeline, ZStream }

import java.util.concurrent.atomic.AtomicInteger
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal
import scala.util.control.{ NoStackTrace, NonFatal }

trait Producer {

Expand Down Expand Up @@ -187,6 +186,10 @@ trait Producer {
}

object Producer {
case object SendOmittedDueToPreviousRecordSendCallFailureError
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
extends RuntimeException("Send omitted due to the previous record send call failure")
with NoStackTrace
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved

val live: RLayer[ProducerSettings, Producer] =
ZLayer.scoped {
for {
Expand Down Expand Up @@ -466,41 +469,72 @@ private[producer] final class ProducerLive(
ZStream
.fromQueueWithShutdown(sendQueue)
.mapZIO { case (serializedRecords, done) =>
ZIO.succeed {
try {
val it: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex
val res: Array[Either[Throwable, RecordMetadata]] =
new Array[Either[Throwable, RecordMetadata]](serializedRecords.length)
val count: AtomicInteger = new AtomicInteger
val length = serializedRecords.length

while (it.hasNext) {
val (rec, idx): (ByteRecord, Int) = it.next()

val _ = p.send(
rec,
(metadata: RecordMetadata, err: Exception) =>
Unsafe.unsafe { implicit u =>
exec {
if (err != null) res(idx) = Left(err)
else res(idx) = Right(metadata)

if (count.incrementAndGet == length) {
ZIO.suspendSucceed {
val recordsLength = serializedRecords.length
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
val recordsIterator: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex
val sentResults: Array[Either[Throwable, RecordMetadata]] =
new Array[Either[Throwable, RecordMetadata]](recordsLength)

Ref.make(0).map { sentRecordsCountRef =>
@inline def safelyInsertSentResult(resultIndex: Int, sentResult: Either[Throwable, RecordMetadata]): Unit =
Unsafe.unsafe { implicit u =>
exec {
runtime.unsafe.run(
sentRecordsCountRef.update { sentRecordsCount =>
// Updating sentResults[resultIndex] here is safe:
// - Ref.update guarantees sentResults.update executed atomically
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
// - Ref.update starts with volatile variable read and ends with volatile variable write,
// which guarantees sentResults.update executed on the latest updated version of sentResults
// and currently updated version of sentResults
// will be visible to the next sentResults read or update called within Ref.update
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
sentResults.update(resultIndex, sentResult)

val newSentRecordsCount = sentRecordsCount + 1
if (newSentRecordsCount == recordsLength) {
val sentResultsChunk = Chunk.fromArray(sentResults)

exec {
runtime.unsafe.run(done.succeed(Chunk.fromArray(res))).getOrThrowFiberFailure()
runtime.unsafe.run(done.succeed(sentResultsChunk))
}
}

newSentRecordsCount
}
}
)
}
} catch {
case NonFatal(e) =>
Unsafe.unsafe { implicit u =>
exec {
runtime.unsafe.run(done.succeed(Chunk.fill(serializedRecords.size)(Left(e)))).getOrThrowFiberFailure()
)
}
}

var previousSendCallSucceed = true
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved

while (recordsIterator.hasNext) {
val (record: ByteRecord, recordIndex: Int) = recordsIterator.next()
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved

if (previousSendCallSucceed) {
try {
val _ = p.send(
record,
(metadata: RecordMetadata, err: Exception) =>
safelyInsertSentResult(
recordIndex,
if (err eq null) Right(metadata) else Left(err)
)
)
} catch {
case NonFatal(err) =>
previousSendCallSucceed = false

safelyInsertSentResult(
recordIndex,
Left(err)
)
}
} else {
safelyInsertSentResult(
recordIndex,
Left(Producer.SendOmittedDueToPreviousRecordSendCallFailureError)
)
}
}
}
}
}
Expand Down