From 7fb3c7444ef788dce91b3a8118d548df75e92ddd Mon Sep 17 00:00:00 2001 From: Ulrik Born Date: Tue, 26 Nov 2024 11:57:44 +0100 Subject: [PATCH 1/2] Added new IProducer.ProduceAsync overload for produce with minimal allocations. --- src/Confluent.Kafka/IProducer.cs | 114 ++++++++++++++++----------- src/Confluent.Kafka/ProduceResult.cs | 52 ++++++++++++ src/Confluent.Kafka/Producer.cs | 98 +++++++++++++++-------- 3 files changed, 185 insertions(+), 79 deletions(-) create mode 100644 src/Confluent.Kafka/ProduceResult.cs diff --git a/src/Confluent.Kafka/IProducer.cs b/src/Confluent.Kafka/IProducer.cs index cb3501cdc..cd331050c 100644 --- a/src/Confluent.Kafka/IProducer.cs +++ b/src/Confluent.Kafka/IProducer.cs @@ -21,12 +21,33 @@ namespace Confluent.Kafka -{ - /// - /// Defines a high-level Apache Kafka producer client - /// that provides key and value serialization. - /// - public interface IProducer : IClient +{ + /// + /// Defines a high-level Apache Kafka producer client without serialization capable of producing pre-serialized messages. + /// + public interface IProducer : IClient + { + /// + /// Asynchronously send a single preserialized message to a Kafka topic. + /// + /// + /// Use this method to produce with minimal allocations. + /// + /// The topic to produce message to. + /// The partition to produce to or Partition.Any to use configured partitioner. + /// Serialized message key or null. + /// Serialized message value or null. + /// Message headers or null to produce message without headers. + /// + /// Result of produce. + Task ProduceAsync(string topic, Partition partition, ArraySegment? key, ArraySegment? value, IReadOnlyList headers, Timestamp timestamp); + } + + /// + /// Defines a high-level Apache Kafka producer client + /// that provides key and value serialization. + /// + public interface IProducer : IProducer { /// /// Asynchronously send a single message to a @@ -99,45 +120,44 @@ Task> ProduceAsync( Task> ProduceAsync( TopicPartition topicPartition, Message message, - CancellationToken cancellationToken = default(CancellationToken)); - - - /// - /// Asynchronously send a single message to a - /// Kafka topic. The partition the message is sent - /// to is determined by the partitioner defined - /// using the 'partitioner' configuration property. - /// - /// - /// The topic to produce the message to. - /// - /// - /// The message to produce. - /// - /// - /// A delegate that will be called - /// with a delivery report corresponding to the - /// produce request (if enabled). - /// - /// - /// Thrown in response to any error that is known - /// immediately (excluding user application logic - /// errors), for example ErrorCode.Local_QueueFull. - /// Asynchronous notification of unsuccessful produce - /// requests is made available via the - /// parameter (if specified). The Error property of - /// the exception / delivery report provides more - /// detailed information. - /// - /// - /// Thrown in response to invalid argument values. - /// - /// - /// Thrown in response to error conditions that - /// reflect an error in the application logic of - /// the calling application. - /// - void Produce( + CancellationToken cancellationToken = default(CancellationToken)); + + /// + /// Asynchronously send a single message to a + /// Kafka topic. The partition the message is sent + /// to is determined by the partitioner defined + /// using the 'partitioner' configuration property. + /// + /// + /// The topic to produce the message to. + /// + /// + /// The message to produce. + /// + /// + /// A delegate that will be called + /// with a delivery report corresponding to the + /// produce request (if enabled). + /// + /// + /// Thrown in response to any error that is known + /// immediately (excluding user application logic + /// errors), for example ErrorCode.Local_QueueFull. + /// Asynchronous notification of unsuccessful produce + /// requests is made available via the + /// parameter (if specified). The Error property of + /// the exception / delivery report provides more + /// detailed information. + /// + /// + /// Thrown in response to invalid argument values. + /// + /// + /// Thrown in response to error conditions that + /// reflect an error in the application logic of + /// the calling application. + /// + void Produce( string topic, Message message, Action> deliveryHandler = null); @@ -540,6 +560,6 @@ void Produce( /// /// Thrown on all other errors. /// - void SendOffsetsToTransaction(IEnumerable offsets, IConsumerGroupMetadata groupMetadata, TimeSpan timeout); - } + void SendOffsetsToTransaction(IEnumerable offsets, IConsumerGroupMetadata groupMetadata, TimeSpan timeout); + } } diff --git a/src/Confluent.Kafka/ProduceResult.cs b/src/Confluent.Kafka/ProduceResult.cs new file mode 100644 index 000000000..af5e13fed --- /dev/null +++ b/src/Confluent.Kafka/ProduceResult.cs @@ -0,0 +1,52 @@ +// Copyright 2016-2018 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +namespace Confluent.Kafka +{ + /// + /// Models the result of a call to ProduceAsync. + /// + public readonly record struct ProduceResult + { + /// + /// The partition message was produced to. + /// + public Partition Partition { get; } + + /// + /// The offset message was persisted at. + /// + public Offset Offset { get; } + + /// + /// The persistence status of the message. + /// + public PersistenceStatus PersistenceStatus { get; } + + /// + /// Creates new result. + /// + /// + /// + /// + public ProduceResult(Partition partition, Offset offset, PersistenceStatus persistenceStatus) + { + Partition = partition; + Offset = offset; + PersistenceStatus = persistenceStatus; + } + } +} diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index 79867514b..da8e58573 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -1,19 +1,19 @@ -// Copyright 2016-2018 Confluent Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// Refer to LICENSE for more information. - +// Copyright 2016-2018 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + using System; using System.Collections.Generic; using System.Linq; @@ -25,11 +25,11 @@ namespace Confluent.Kafka -{ - /// - /// A high level producer with serialization capability. - /// - internal class Producer : IProducer, IClient +{ + /// + /// A high level producer with serialization capability. + /// + internal class Producer : IProducer { internal class Config { @@ -56,9 +56,9 @@ internal class Config { typeof(float), Serializers.Single }, { typeof(double), Serializers.Double }, { typeof(byte[]), Serializers.ByteArray } - }; - - private int cancellationDelayMaxMs; + }; + + private int cancellationDelayMaxMs; private bool disposeHasBeenCalled = false; private object disposeHasBeenCalledLockObj = new object(); @@ -324,10 +324,10 @@ private void ProduceImpl( else { err = KafkaHandle.Produce( - topic, + topic, val, valOffset, valLength, key, keyOffset, keyLength, - partition.Value, + partition.Value, timestamp.UnixTimestampMs, headers, IntPtr.Zero); @@ -741,9 +741,43 @@ internal Producer(ProducerBuilder builder) builder.AsyncKeySerializer, builder.AsyncValueSerializer); } - - /// - public async Task> ProduceAsync( + /// + public async Task ProduceAsync( + string topic, + Partition partition, + ArraySegment? key, + ArraySegment? value, + IReadOnlyList headers, + Timestamp timestamp) + { + // Start produce request + DeliveryHandlerSlim deliveryHandler = new(); + ProduceImpl(topic, + value?.Array, value?.Offset ?? 0, value?.Count ?? 0, + key?.Array, key?.Offset ?? 0, key?.Count ?? 0, + timestamp, + partition, + headers ?? Array.Empty(), + deliveryHandler); + + // Wait for response + DeliveryReport deliveryReport = await deliveryHandler.Task.ConfigureAwait(false); + + // Return response + return new(deliveryReport.Partition, deliveryReport.Offset, deliveryReport.Status); + } + + /// + /// Implements light-weight delivery handler for produce requests. + /// + class DeliveryHandlerSlim : TaskCompletionSource>, IDeliveryHandler + { + public DeliveryHandlerSlim() : base(TaskCreationOptions.RunContinuationsAsynchronously) {} + public void HandleDeliveryReport(DeliveryReport deliveryReport) => TrySetResult(deliveryReport); + } + + /// + public async Task> ProduceAsync( TopicPartition topicPartition, Message message, CancellationToken cancellationToken) @@ -815,10 +849,10 @@ public async Task> ProduceAsync( else { ProduceImpl( - topicPartition.Topic, + topicPartition.Topic, valBytes, 0, valBytes == null ? 0 : valBytes.Length, keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, - message.Timestamp, topicPartition.Partition, headers.BackingList, + message.Timestamp, topicPartition.Partition, headers.BackingList, null); var result = new DeliveryResult @@ -912,7 +946,7 @@ public void Produce( } try - { + { ProduceImpl( topicPartition.Topic, valBytes, 0, valBytes == null ? 0 : valBytes.Length, From 90b41dccffa87ad9d2f3fdb2ca18b67f01c77f5a Mon Sep 17 00:00:00 2001 From: Ulrik Born Date: Tue, 26 Nov 2024 12:14:41 +0100 Subject: [PATCH 2/2] Whitespace --- src/Confluent.Kafka/IProducer.cs | 118 +++++++++++++++---------------- src/Confluent.Kafka/Producer.cs | 42 +++++------ 2 files changed, 80 insertions(+), 80 deletions(-) diff --git a/src/Confluent.Kafka/IProducer.cs b/src/Confluent.Kafka/IProducer.cs index cd331050c..9aa9f426b 100644 --- a/src/Confluent.Kafka/IProducer.cs +++ b/src/Confluent.Kafka/IProducer.cs @@ -22,32 +22,32 @@ namespace Confluent.Kafka { - /// - /// Defines a high-level Apache Kafka producer client without serialization capable of producing pre-serialized messages. - /// - public interface IProducer : IClient - { - /// - /// Asynchronously send a single preserialized message to a Kafka topic. - /// + /// + /// Defines a high-level Apache Kafka producer client without serialization capable of producing pre-serialized messages. + /// + public interface IProducer : IClient + { + /// + /// Asynchronously send a single preserialized message to a Kafka topic. + /// /// /// Use this method to produce with minimal allocations. /// - /// The topic to produce message to. - /// The partition to produce to or Partition.Any to use configured partitioner. - /// Serialized message key or null. - /// Serialized message value or null. - /// Message headers or null to produce message without headers. - /// - /// Result of produce. - Task ProduceAsync(string topic, Partition partition, ArraySegment? key, ArraySegment? value, IReadOnlyList headers, Timestamp timestamp); - } + /// The topic to produce message to. + /// The partition to produce to or Partition.Any to use configured partitioner. + /// Serialized message key or null. + /// Serialized message value or null. + /// Message headers or null to produce message without headers. + /// + /// Result of produce. + Task ProduceAsync(string topic, Partition partition, ArraySegment? key, ArraySegment? value, IReadOnlyList headers, Timestamp timestamp); + } - /// - /// Defines a high-level Apache Kafka producer client - /// that provides key and value serialization. - /// - public interface IProducer : IProducer + /// + /// Defines a high-level Apache Kafka producer client + /// that provides key and value serialization. + /// + public interface IProducer : IProducer { /// /// Asynchronously send a single message to a @@ -122,42 +122,42 @@ Task> ProduceAsync( Message message, CancellationToken cancellationToken = default(CancellationToken)); - /// - /// Asynchronously send a single message to a - /// Kafka topic. The partition the message is sent - /// to is determined by the partitioner defined - /// using the 'partitioner' configuration property. - /// - /// - /// The topic to produce the message to. - /// - /// - /// The message to produce. - /// - /// - /// A delegate that will be called - /// with a delivery report corresponding to the - /// produce request (if enabled). - /// - /// - /// Thrown in response to any error that is known - /// immediately (excluding user application logic - /// errors), for example ErrorCode.Local_QueueFull. - /// Asynchronous notification of unsuccessful produce - /// requests is made available via the - /// parameter (if specified). The Error property of - /// the exception / delivery report provides more - /// detailed information. - /// - /// - /// Thrown in response to invalid argument values. - /// - /// - /// Thrown in response to error conditions that - /// reflect an error in the application logic of - /// the calling application. - /// - void Produce( + /// + /// Asynchronously send a single message to a + /// Kafka topic. The partition the message is sent + /// to is determined by the partitioner defined + /// using the 'partitioner' configuration property. + /// + /// + /// The topic to produce the message to. + /// + /// + /// The message to produce. + /// + /// + /// A delegate that will be called + /// with a delivery report corresponding to the + /// produce request (if enabled). + /// + /// + /// Thrown in response to any error that is known + /// immediately (excluding user application logic + /// errors), for example ErrorCode.Local_QueueFull. + /// Asynchronous notification of unsuccessful produce + /// requests is made available via the + /// parameter (if specified). The Error property of + /// the exception / delivery report provides more + /// detailed information. + /// + /// + /// Thrown in response to invalid argument values. + /// + /// + /// Thrown in response to error conditions that + /// reflect an error in the application logic of + /// the calling application. + /// + void Produce( string topic, Message message, Action> deliveryHandler = null); @@ -561,5 +561,5 @@ void Produce( /// Thrown on all other errors. /// void SendOffsetsToTransaction(IEnumerable offsets, IConsumerGroupMetadata groupMetadata, TimeSpan timeout); - } + } } diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index da8e58573..8ec0ac837 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -26,10 +26,10 @@ namespace Confluent.Kafka { - /// - /// A high level producer with serialization capability. - /// - internal class Producer : IProducer + /// + /// A high level producer with serialization capability. + /// + internal class Producer : IProducer { internal class Config { @@ -57,8 +57,8 @@ internal class Config { typeof(double), Serializers.Double }, { typeof(byte[]), Serializers.ByteArray } }; - - private int cancellationDelayMaxMs; + + private int cancellationDelayMaxMs; private bool disposeHasBeenCalled = false; private object disposeHasBeenCalledLockObj = new object(); @@ -327,7 +327,7 @@ private void ProduceImpl( topic, val, valOffset, valLength, key, keyOffset, keyLength, - partition.Value, + partition.Value, timestamp.UnixTimestampMs, headers, IntPtr.Zero); @@ -741,8 +741,8 @@ internal Producer(ProducerBuilder builder) builder.AsyncKeySerializer, builder.AsyncValueSerializer); } - /// - public async Task ProduceAsync( + /// + public async Task ProduceAsync( string topic, Partition partition, ArraySegment? key, @@ -750,8 +750,8 @@ public async Task ProduceAsync( IReadOnlyList headers, Timestamp timestamp) { - // Start produce request - DeliveryHandlerSlim deliveryHandler = new(); + // Start produce request + DeliveryHandlerSlim deliveryHandler = new(); ProduceImpl(topic, value?.Array, value?.Offset ?? 0, value?.Count ?? 0, key?.Array, key?.Offset ?? 0, key?.Count ?? 0, @@ -765,19 +765,19 @@ public async Task ProduceAsync( // Return response return new(deliveryReport.Partition, deliveryReport.Offset, deliveryReport.Status); - } + } - /// - /// Implements light-weight delivery handler for produce requests. - /// - class DeliveryHandlerSlim : TaskCompletionSource>, IDeliveryHandler - { + /// + /// Implements light-weight delivery handler for produce requests. + /// + class DeliveryHandlerSlim : TaskCompletionSource>, IDeliveryHandler + { public DeliveryHandlerSlim() : base(TaskCreationOptions.RunContinuationsAsynchronously) {} public void HandleDeliveryReport(DeliveryReport deliveryReport) => TrySetResult(deliveryReport); - } + } - /// - public async Task> ProduceAsync( + /// + public async Task> ProduceAsync( TopicPartition topicPartition, Message message, CancellationToken cancellationToken) @@ -852,7 +852,7 @@ public async Task> ProduceAsync( topicPartition.Topic, valBytes, 0, valBytes == null ? 0 : valBytes.Length, keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, - message.Timestamp, topicPartition.Partition, headers.BackingList, + message.Timestamp, topicPartition.Partition, headers.BackingList, null); var result = new DeliveryResult