From 7ce4da687463adb530cf9c3528e368616ce1e0a2 Mon Sep 17 00:00:00 2001 From: Ydjin0602 Date: Mon, 1 May 2023 17:59:29 +0700 Subject: [PATCH 1/5] key and value serialization for producer batch builder --- aiokafka/producer/message_accumulator.py | 28 +++++++++++++++++++----- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/aiokafka/producer/message_accumulator.py b/aiokafka/producer/message_accumulator.py index a20be471..2808ad13 100644 --- a/aiokafka/producer/message_accumulator.py +++ b/aiokafka/producer/message_accumulator.py @@ -14,8 +14,10 @@ class BatchBuilder: - def __init__(self, magic, batch_size, compression_type, - *, is_transactional): + def __init__( + self, magic, batch_size, compression_type, + *, is_transactional, key_serializer=None, value_serializer=None + ): if magic < 2: assert not is_transactional self._builder = LegacyRecordBatchBuilder( @@ -28,6 +30,14 @@ def __init__(self, magic, batch_size, compression_type, self._relative_offset = 0 self._buffer = None self._closed = False + self._key_serializer = key_serializer + self._value_serializer = value_serializer + + def _serialize(self, key, value): + serialized_key = self._key_serializer(key) if self._key_serializer else key + serialized_value = self._value_serializer(value) if self._value_serializer else key + + return serialized_key, serialized_value def append(self, *, timestamp, key, value, headers=[]): """Add a message to the batch. @@ -49,8 +59,9 @@ def append(self, *, timestamp, key, value, headers=[]): if self._closed: return None + key_bytes, value_bytes = self._serialize(key, value) metadata = self._builder.append( - self._relative_offset, timestamp, key, value, + self._relative_offset, timestamp, key=key_bytes, value=value_bytes, headers=headers) # Check if we could add the message @@ -422,7 +433,7 @@ def drain_by_nodes(self, ignore_nodes, muted_partitions=set()): return nodes, unknown_leaders_exist - def create_builder(self): + def create_builder(self, key_serializer=None, value_serializer=None): if self._api_version >= (0, 11): magic = 2 elif self._api_version >= (0, 10): @@ -435,8 +446,13 @@ def create_builder(self): self._txn_manager.transactional_id is not None: is_transactional = True return BatchBuilder( - magic, self._batch_size, self._compression_type, - is_transactional=is_transactional) + magic, + self._batch_size, + self._compression_type, + is_transactional=is_transactional, + key_serializer=key_serializer, + value_serializer=value_serializer + ) def _append_batch(self, builder, tp): # We must do this before actual add takes place to check for errors. From dc8a0a0c9ce7263df6db607b3acfbf9a26bdd930 Mon Sep 17 00:00:00 2001 From: Ydjin0602 Date: Mon, 1 May 2023 18:45:13 +0700 Subject: [PATCH 2/5] fixes --- aiokafka/producer/message_accumulator.py | 2 +- aiokafka/producer/producer.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/aiokafka/producer/message_accumulator.py b/aiokafka/producer/message_accumulator.py index 2808ad13..997c0d55 100644 --- a/aiokafka/producer/message_accumulator.py +++ b/aiokafka/producer/message_accumulator.py @@ -35,7 +35,7 @@ def __init__( def _serialize(self, key, value): serialized_key = self._key_serializer(key) if self._key_serializer else key - serialized_value = self._value_serializer(value) if self._value_serializer else key + serialized_value = self._value_serializer(value) if self._value_serializer else value return serialized_key, serialized_value diff --git a/aiokafka/producer/producer.py b/aiokafka/producer/producer.py index ea9fdbe0..e78e0a5f 100644 --- a/aiokafka/producer/producer.py +++ b/aiokafka/producer/producer.py @@ -484,7 +484,9 @@ def create_batch(self): Returns: BatchBuilder: empty batch to be filled and submitted by the caller. """ - return self._message_accumulator.create_builder() + return self._message_accumulator.create_builder( + key_serializer=self._key_serializer, value_serializer=self._value_serializer + ) async def send_batch(self, batch, topic, *, partition): """Submit a BatchBuilder for publication. From 36c326efba059e68b0a6ecab621a313d951fcf26 Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Sun, 14 Jan 2024 17:49:01 +0200 Subject: [PATCH 3/5] Add test for serialization in batch --- tests/test_producer.py | 47 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/tests/test_producer.py b/tests/test_producer.py index a6004a4e..d0b5cbd6 100644 --- a/tests/test_producer.py +++ b/tests/test_producer.py @@ -380,6 +380,53 @@ async def test_producer_send_batch(self): await producer.send_batch( batch, self.topic, partition=partition) + @run_until_complete + async def test_producer_send_batch_with_serializer(self): + def key_serializer(val): + return val.upper().encode() + + def value_serializer(val): + return json.dumps(val, separators=(',', ':')).encode() + + producer = AIOKafkaProducer( + bootstrap_servers=self.hosts, + key_serializer=key_serializer, + value_serializer=value_serializer, + ) + await producer.start() + + partitions = await producer.partitions_for(self.topic) + partition = partitions.pop() + + batch = producer.create_batch() + batch.append(key="key1", value={"value": 111}, timestamp=None) + batch.append(key="key2", value={"value": 222}, timestamp=None) + self.assertEqual(batch.record_count(), 2) + + # batch gets properly sent + future = await producer.send_batch( + batch, self.topic, partition=partition) + resp = await future + await producer.stop() + self.assertEqual(resp.partition, partition) + + consumer = AIOKafkaConsumer( + self.topic, + bootstrap_servers=self.hosts, + enable_auto_commit=True, + auto_offset_reset="earliest") + await consumer.start() + + msg = await consumer.getone() + self.assertEqual(msg.key, b"KEY1") + self.assertEqual(msg.value, b"{\"value\":111}") + + msg = await consumer.getone() + self.assertEqual(msg.key, b"KEY2") + self.assertEqual(msg.value, b"{\"value\":222}") + + await consumer.stop() + @pytest.mark.ssl @run_until_complete async def test_producer_ssl(self): From 9c116b833885f0f4aab9be068091732938201d9e Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Sun, 14 Jan 2024 17:55:03 +0200 Subject: [PATCH 4/5] Fix linting errors --- aiokafka/producer/message_accumulator.py | 14 ++++++++++---- aiokafka/producer/producer.py | 10 +++++----- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/aiokafka/producer/message_accumulator.py b/aiokafka/producer/message_accumulator.py index 997c0d55..d5cedbd7 100644 --- a/aiokafka/producer/message_accumulator.py +++ b/aiokafka/producer/message_accumulator.py @@ -15,8 +15,8 @@ class BatchBuilder: def __init__( - self, magic, batch_size, compression_type, - *, is_transactional, key_serializer=None, value_serializer=None + self, magic, batch_size, compression_type, + *, is_transactional, key_serializer=None, value_serializer=None ): if magic < 2: assert not is_transactional @@ -34,8 +34,14 @@ def __init__( self._value_serializer = value_serializer def _serialize(self, key, value): - serialized_key = self._key_serializer(key) if self._key_serializer else key - serialized_value = self._value_serializer(value) if self._value_serializer else value + if self._key_serializer is None: + serialized_key = key + else: + serialized_key = self._key_serializer(key) + if self._value_serializer is None: + serialized_value = value + else: + serialized_value = self._value_serializer(value) return serialized_key, serialized_value diff --git a/aiokafka/producer/producer.py b/aiokafka/producer/producer.py index e78e0a5f..472ac515 100644 --- a/aiokafka/producer/producer.py +++ b/aiokafka/producer/producer.py @@ -348,14 +348,14 @@ async def partitions_for(self, topic): return (await self.client._wait_on_metadata(topic)) def _serialize(self, topic, key, value): - if self._key_serializer: - serialized_key = self._key_serializer(key) - else: + if self._key_serializer is None: serialized_key = key - if self._value_serializer: - serialized_value = self._value_serializer(value) else: + serialized_key = self._key_serializer(key) + if self._value_serializer is None: serialized_value = value + else: + serialized_value = self._value_serializer(value) message_size = LegacyRecordBatchBuilder.record_overhead( self._producer_magic) From af897bdebd98ceaf4281f9cb38733032a2158588 Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Sun, 14 Jan 2024 17:57:36 +0200 Subject: [PATCH 5/5] Add changelog entry --- CHANGES.rst | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index b4c47383..e4452182 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -2,6 +2,14 @@ Changelog ========= +Unreleased +========== + +Bugfixes: + +* Fix serialization for batch (issue #886, pr #887 by @ydjin0602) + + 0.10.0 (2023-12-15) ===================