From 2459d44e416c79cb4c985bae09ebc2d4d82781d7 Mon Sep 17 00:00:00 2001 From: ytalashko Date: Sun, 1 Sep 2024 00:34:04 +0300 Subject: [PATCH 1/6] Provide proper produce results in case of send call errors --- .../scala/zio/kafka/producer/Producer.scala | 96 +++++++++++++------ 1 file changed, 65 insertions(+), 31 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala index 4dcc0a519..823562031 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala @@ -8,9 +8,8 @@ import zio.kafka.serde.Serializer import zio.kafka.utils.SslHelper import zio.stream.{ ZPipeline, ZStream } -import java.util.concurrent.atomic.AtomicInteger import scala.jdk.CollectionConverters._ -import scala.util.control.NonFatal +import scala.util.control.{ NoStackTrace, NonFatal } trait Producer { @@ -187,6 +186,10 @@ trait Producer { } object Producer { + case object SendOmittedDueToPreviousRecordSendCallFailureError + extends RuntimeException("Send omitted due to the previous record send call failure") + with NoStackTrace + val live: RLayer[ProducerSettings, Producer] = ZLayer.scoped { for { @@ -466,41 +469,72 @@ private[producer] final class ProducerLive( ZStream .fromQueueWithShutdown(sendQueue) .mapZIO { case (serializedRecords, done) => - ZIO.succeed { - try { - val it: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex - val res: Array[Either[Throwable, RecordMetadata]] = - new Array[Either[Throwable, RecordMetadata]](serializedRecords.length) - val count: AtomicInteger = new AtomicInteger - val length = serializedRecords.length - - while (it.hasNext) { - val (rec, idx): (ByteRecord, Int) = it.next() - - val _ = p.send( - rec, - (metadata: RecordMetadata, err: Exception) => - Unsafe.unsafe { implicit u => - exec { - if (err != null) res(idx) = Left(err) - else res(idx) = Right(metadata) - - if (count.incrementAndGet == length) { + ZIO.suspendSucceed { + val recordsLength = serializedRecords.length + val recordsIterator: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex + val sentResults: Array[Either[Throwable, RecordMetadata]] = + new Array[Either[Throwable, RecordMetadata]](recordsLength) + + Ref.make(0).map { sentRecordsCountRef => + @inline def safelyInsertSentResult(resultIndex: Int, sentResult: Either[Throwable, RecordMetadata]): Unit = + Unsafe.unsafe { implicit u => + exec { + runtime.unsafe.run( + sentRecordsCountRef.update { sentRecordsCount => + // Updating sentResults[resultIndex] here is safe: + // - Ref.update guarantees sentResults.update executed atomically + // - Ref.update starts with volatile variable read and ends with volatile variable write, + // which guarantees sentResults.update executed on the latest updated version of sentResults + // and currently updated version of sentResults + // will be visible to the next sentResults read or update called within Ref.update + sentResults.update(resultIndex, sentResult) + + val newSentRecordsCount = sentRecordsCount + 1 + if (newSentRecordsCount == recordsLength) { + val sentResultsChunk = Chunk.fromArray(sentResults) + exec { - runtime.unsafe.run(done.succeed(Chunk.fromArray(res))).getOrThrowFiberFailure() + runtime.unsafe.run(done.succeed(sentResultsChunk)) } } + + newSentRecordsCount } - } - ) - } - } catch { - case NonFatal(e) => - Unsafe.unsafe { implicit u => - exec { - runtime.unsafe.run(done.succeed(Chunk.fill(serializedRecords.size)(Left(e)))).getOrThrowFiberFailure() + ) + } + } + + var previousSendCallSucceed = true + + while (recordsIterator.hasNext) { + val (record: ByteRecord, recordIndex: Int) = recordsIterator.next() + + if (previousSendCallSucceed) { + try { + val _ = p.send( + record, + (metadata: RecordMetadata, err: Exception) => + safelyInsertSentResult( + recordIndex, + if (err eq null) Right(metadata) else Left(err) + ) + ) + } catch { + case NonFatal(err) => + previousSendCallSucceed = false + + safelyInsertSentResult( + recordIndex, + Left(err) + ) } + } else { + safelyInsertSentResult( + recordIndex, + Left(Producer.SendOmittedDueToPreviousRecordSendCallFailureError) + ) } + } } } } From a27b431fb08d48edef31444ab22fea2d9f85a5b9 Mon Sep 17 00:00:00 2001 From: ytalashko Date: Wed, 4 Sep 2024 01:22:36 +0300 Subject: [PATCH 2/6] Make Producer sendFromQueue related parts names better --- .../scala/zio/kafka/producer/Producer.scala | 27 +++++++++---------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala index 823562031..8499322ff 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala @@ -186,8 +186,8 @@ trait Producer { } object Producer { - case object SendOmittedDueToPreviousRecordSendCallFailureError - extends RuntimeException("Send omitted due to the previous record send call failure") + case object SendOmittedException + extends RuntimeException("Send omitted due to a send error for a previous record in the chunk") with NoStackTrace val live: RLayer[ProducerSettings, Producer] = @@ -481,12 +481,11 @@ private[producer] final class ProducerLive( exec { runtime.unsafe.run( sentRecordsCountRef.update { sentRecordsCount => - // Updating sentResults[resultIndex] here is safe: - // - Ref.update guarantees sentResults.update executed atomically - // - Ref.update starts with volatile variable read and ends with volatile variable write, - // which guarantees sentResults.update executed on the latest updated version of sentResults - // and currently updated version of sentResults - // will be visible to the next sentResults read or update called within Ref.update + // Updating sentResults[resultIndex] here is safe, + // cause Ref.update starts with volatile variable read and ends with volatile variable write, + // which guarantees sentResults.update executed on the latest updated version of sentResults + // and currently updated version of sentResults + // will be visible to the next sentResults read or update called within Ref.update sentResults.update(resultIndex, sentResult) val newSentRecordsCount = sentRecordsCount + 1 @@ -504,12 +503,10 @@ private[producer] final class ProducerLive( } } - var previousSendCallSucceed = true + var previousSendCallsSucceed = true - while (recordsIterator.hasNext) { - val (record: ByteRecord, recordIndex: Int) = recordsIterator.next() - - if (previousSendCallSucceed) { + recordsIterator.foreach { case (record: ByteRecord, recordIndex: Int) => + if (previousSendCallsSucceed) { try { val _ = p.send( record, @@ -521,7 +518,7 @@ private[producer] final class ProducerLive( ) } catch { case NonFatal(err) => - previousSendCallSucceed = false + previousSendCallsSucceed = false safelyInsertSentResult( recordIndex, @@ -531,7 +528,7 @@ private[producer] final class ProducerLive( } else { safelyInsertSentResult( recordIndex, - Left(Producer.SendOmittedDueToPreviousRecordSendCallFailureError) + Left(Producer.SendOmittedException) ) } } From ecc979df0c4ac123f0e95c5172167298c4e6ac89 Mon Sep 17 00:00:00 2001 From: ytalashko Date: Sun, 8 Sep 2024 00:22:47 +0300 Subject: [PATCH 3/6] Rename SendOmittedException to PublishOmittedException --- zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala index 8499322ff..c2687bfab 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala @@ -186,8 +186,8 @@ trait Producer { } object Producer { - case object SendOmittedException - extends RuntimeException("Send omitted due to a send error for a previous record in the chunk") + case object PublishOmittedException + extends RuntimeException("Publish omitted due to a publish error for a previous record in the chunk") with NoStackTrace val live: RLayer[ProducerSettings, Producer] = @@ -528,7 +528,7 @@ private[producer] final class ProducerLive( } else { safelyInsertSentResult( recordIndex, - Left(Producer.SendOmittedException) + Left(Producer.PublishOmittedException) ) } } From e0eeb24b572b38cd282e9030f95c3a0b98f398e9 Mon Sep 17 00:00:00 2001 From: ytalashko Date: Sun, 8 Sep 2024 00:23:40 +0300 Subject: [PATCH 4/6] Add basic Producer unit tests --- .../zio/kafka/producer/ProducerSpec.scala | 228 ++++++++++++++++++ 1 file changed, 228 insertions(+) create mode 100644 zio-kafka-test/src/test/scala/zio/kafka/producer/ProducerSpec.scala diff --git a/zio-kafka-test/src/test/scala/zio/kafka/producer/ProducerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/producer/ProducerSpec.scala new file mode 100644 index 000000000..8c90a46ff --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/producer/ProducerSpec.scala @@ -0,0 +1,228 @@ +package zio.kafka.producer + +import org.apache.kafka.clients.producer +import org.apache.kafka.clients.producer.{ MockProducer, ProducerRecord, RecordMetadata } +import org.apache.kafka.common.KafkaException +import org.apache.kafka.common.errors.AuthenticationException +import org.apache.kafka.common.serialization.ByteArraySerializer +import zio._ +import zio.test.TestAspect.withLiveClock +import zio.test._ + +import java.util.concurrent.Future +import java.util.concurrent.atomic.AtomicBoolean + +object ProducerSpec extends ZIOSpecDefault { + + private object TestKeyValueSerializer extends ByteArraySerializer + + private class BinaryMockProducer(autoComplete: Boolean) + extends MockProducer[Array[Byte], Array[Byte]]( + autoComplete, + TestKeyValueSerializer, + TestKeyValueSerializer + ) { + + private val nextSendAllowed = new AtomicBoolean(autoComplete) + + override def send( + record: ProducerRecord[Array[Byte], Array[Byte]], + callback: producer.Callback + ): Future[RecordMetadata] = { + awaitSendAllowed() + val sendResult = super.send(record, callback) + nextSendAllowed.set(autoComplete) + + sendResult + } + + def allowNextSendAndAwaitSendCompletion(): Unit = { + allowNextSend() + awaitSendCompletion() + } + + def allowNextSend(): Unit = + nextSendAllowed.set(true) + + def awaitSendAllowed(): Unit = + awaitSendCondition(true) + + def awaitSendCompletion(): Unit = + awaitSendCondition(false) + + private def awaitSendCondition(expectedCondition: Boolean): Unit = { + var awaitingSendCondition = true + while (awaitingSendCondition) + awaitingSendCondition = expectedCondition != nextSendAllowed.get() + } + + } + + override def spec: Spec[TestEnvironment with Scope, Any] = + suite("Producer")( + suite("produceChunkAsyncWithFailures")( + test("successfully produces chunk of records") { + withProducer() { (_, producer) => + val recordsToSend = Chunk( + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord() + ) + for { + results <- producer.produceChunkAsyncWithFailures(recordsToSend).flatten + } yield assertTrue( + results.length == recordsToSend.length, + results.forall(_.isRight) + ) + } + }, + test("omits sending further records in chunk in case the first send call fails") { + withProducer() { (mockJavaProducer, producer) => + val recordsToSend = Chunk( + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord() + ) + val testAuthenticationExceptionMessage = "test authentication exception" + mockJavaProducer.sendException = new AuthenticationException(testAuthenticationExceptionMessage) + for { + results <- producer.produceChunkAsyncWithFailures(recordsToSend).flatten + } yield assertTrue( + results.length == recordsToSend.length, + results.head.isLeft, + results.head.left.forall(_.getMessage == testAuthenticationExceptionMessage), + results.tail.forall(_ == Left(Producer.PublishOmittedException)) + ) + } + }, + test("provides correct results in case last send call fails") { + withProducer(autoCompleteProducerRequests = false) { (mockJavaProducer, producer) => + val recordsToSend = Chunk( + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord() + ) + val testAuthenticationExceptionMessage = "test authentication exception" + val mockJavaProducerBehaviour = ZIO.succeed { + // Send calls behaviours + mockJavaProducer.allowNextSendAndAwaitSendCompletion() + mockJavaProducer.allowNextSendAndAwaitSendCompletion() + mockJavaProducer.allowNextSendAndAwaitSendCompletion() + mockJavaProducer.allowNextSendAndAwaitSendCompletion() + mockJavaProducer.sendException = new AuthenticationException(testAuthenticationExceptionMessage) + mockJavaProducer.allowNextSend() + // Send callbacks behaviours + mockJavaProducer.completeNext() + mockJavaProducer.completeNext() + mockJavaProducer.completeNext() + mockJavaProducer.completeNext() + } + for { + _ <- mockJavaProducerBehaviour.forkScoped + results <- producer.produceChunkAsyncWithFailures(recordsToSend).flatten + } yield assertTrue( + results.length == recordsToSend.length, + results.init.forall(_.isRight), + results.last.isLeft, + results.last.left.forall(_.getMessage == testAuthenticationExceptionMessage) + ) + } + }, + test("omits sending further records in chunk and provides correct results in case middle send call fails") { + withProducer(autoCompleteProducerRequests = false) { (mockJavaProducer, producer) => + val recordsToSend = Chunk( + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord() + ) + val testAuthenticationExceptionMessage = "test authentication exception" + val mockJavaProducerBehaviour = ZIO.succeed { + // Send calls behaviours + mockJavaProducer.allowNextSendAndAwaitSendCompletion() + mockJavaProducer.allowNextSendAndAwaitSendCompletion() + mockJavaProducer.sendException = new AuthenticationException(testAuthenticationExceptionMessage) + mockJavaProducer.allowNextSend() + // Send callbacks behaviours + mockJavaProducer.completeNext() + mockJavaProducer.completeNext() + } + for { + _ <- mockJavaProducerBehaviour.forkScoped + results <- producer.produceChunkAsyncWithFailures(recordsToSend).flatten + } yield assertTrue( + results.length == recordsToSend.length, + results(0).isRight, + results(1).isRight, + results(2).left.forall(_.getMessage == testAuthenticationExceptionMessage), + results(3) == Left(Producer.PublishOmittedException), + results(4) == Left(Producer.PublishOmittedException) + ) + } + }, + test( + "omits sending further records in chunk and provides correct results in case second publication to broker fails along with middle send call fails" + ) { + withProducer(autoCompleteProducerRequests = false) { (mockJavaProducer, producer) => + val recordsToSend = Chunk( + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord() + ) + val testAuthenticationExceptionMessage = "test authentication exception" + val testKafkaExceptionMessage = "unexpected broker exception" + val mockJavaProducerBehaviour = ZIO.succeed { + // Send calls behaviours + mockJavaProducer.allowNextSendAndAwaitSendCompletion() + mockJavaProducer.allowNextSendAndAwaitSendCompletion() + mockJavaProducer.sendException = new AuthenticationException(testAuthenticationExceptionMessage) + mockJavaProducer.allowNextSend() + // Send callbacks behaviours + mockJavaProducer.completeNext() + mockJavaProducer.errorNext(new KafkaException(testKafkaExceptionMessage)) + } + for { + _ <- mockJavaProducerBehaviour.forkScoped + results <- producer.produceChunkAsyncWithFailures(recordsToSend).flatten + } yield assertTrue( + results.length == recordsToSend.length, + results(0).isRight, + results(1).isLeft, + results(1).left.forall(_.getMessage == testKafkaExceptionMessage), + results(2).left.forall(_.getMessage == testAuthenticationExceptionMessage), + results(3) == Left(Producer.PublishOmittedException), + results(4) == Left(Producer.PublishOmittedException) + ) + } + } + ) + ) @@ withLiveClock + + private def withProducer(autoCompleteProducerRequests: Boolean = true)( + producerTest: (BinaryMockProducer, Producer) => ZIO[Scope, Throwable, TestResult] + ): ZIO[Scope, Throwable, TestResult] = + ZIO.scoped { + val mockJavaProducer = new BinaryMockProducer(autoCompleteProducerRequests) + + Producer + .fromJavaProducer(mockJavaProducer, ProducerSettings()) + .flatMap(producerTest(mockJavaProducer, _)) + } + + private def makeProducerRecord( + topic: String = "testTopic", + key: String = "key", + value: String = "value" + ): ProducerRecord[Array[Byte], Array[Byte]] = + new ProducerRecord[Array[Byte], Array[Byte]](topic, key.getBytes, value.getBytes) + +} From 62fd48aaab7506633da67116d5db96dead63586f Mon Sep 17 00:00:00 2001 From: ytalashko Date: Sun, 8 Sep 2024 00:34:46 +0300 Subject: [PATCH 5/6] Update Producer.produceChunk methods scaladocs --- .../main/scala/zio/kafka/producer/Producer.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala index c2687bfab..02717e529 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala @@ -103,6 +103,9 @@ trait Producer { /** * Produces a chunk of records. See [[produceChunkAsync(records*]] for version that allows to avoid round-trip-time * penalty for each chunk. + * + * When publishing any of the records fails, the whole batch fails even though some records might have been published. + * Use [[produceChunkAsyncWithFailures]] to get results per record. */ def produceChunk( records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]] @@ -111,6 +114,9 @@ trait Producer { /** * Produces a chunk of records. See [[produceChunkAsync(records*]] for version that allows to avoid round-trip-time * penalty for each chunk. + * + * When publishing any of the records fails, the whole batch fails even though some records might have been published. + * Use [[produceChunkAsyncWithFailures]] to get results per record. */ def produceChunk[R, K, V]( records: Chunk[ProducerRecord[K, V]], @@ -127,6 +133,9 @@ trait Producer { * It is possible that for chunks that exceed the producer's internal buffer size, the outer layer will also signal * the transmission of part of the chunk. Regardless, awaiting the inner layer guarantees the transmission of the * entire chunk. + * + * When publishing any of the records fails, the whole batch fails even though some records might have been published. + * Use [[produceChunkAsyncWithFailures]] to get results per record. */ def produceChunkAsync( records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]] @@ -141,6 +150,9 @@ trait Producer { * It is possible that for chunks that exceed the producer's internal buffer size, the outer layer will also signal * the transmission of part of the chunk. Regardless, awaiting the inner layer guarantees the transmission of the * entire chunk. + * + * When publishing any of the records fails, the whole batch fails even though some records might have been published. + * Use [[produceChunkAsyncWithFailures]] to get results per record. */ def produceChunkAsync[R, K, V]( records: Chunk[ProducerRecord[K, V]], @@ -161,6 +173,9 @@ trait Producer { * This variant of `produceChunkAsync` more accurately reflects that individual records within the Chunk can fail to * publish, rather than the failure being at the level of the Chunk. * + * When attempt to send a record into buffer for publication fails, the following records in the chunk are not + * published. This is indicated with a [[Producer.PublishOmittedException]]. + * * This variant does not accept serializers as they may also fail independently of each record and this is not * reflected in the return type. */ From 919b1a95546e2c9f05ac127f7b38965716c8ae5c Mon Sep 17 00:00:00 2001 From: ytalashko Date: Sun, 8 Sep 2024 00:38:43 +0300 Subject: [PATCH 6/6] ProducerSpec.scala test wording update --- .../src/test/scala/zio/kafka/producer/ProducerSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/producer/ProducerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/producer/ProducerSpec.scala index 8c90a46ff..5fe6fa203 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/producer/ProducerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/producer/ProducerSpec.scala @@ -168,7 +168,7 @@ object ProducerSpec extends ZIOSpecDefault { } }, test( - "omits sending further records in chunk and provides correct results in case second publication to broker fails along with middle send call fails" + "omits sending further records in chunk and provides correct results in case second publication to broker fails along with middle send call failure" ) { withProducer(autoCompleteProducerRequests = false) { (mockJavaProducer, producer) => val recordsToSend = Chunk(