diff --git a/ydb/_topic_reader/topic_reader_asyncio_test.py b/ydb/_topic_reader/topic_reader_asyncio_test.py index 072d8415..25e08029 100644 --- a/ydb/_topic_reader/topic_reader_asyncio_test.py +++ b/ydb/_topic_reader/topic_reader_asyncio_test.py @@ -208,6 +208,7 @@ def create_message( written_at=datetime.datetime(2023, 2, 3, 14, 16), producer_id="test-producer-id", data=bytes(), + metadata_items={}, _partition_session=partition_session, _commit_start_offset=partition_session._next_message_start_commit_offset + offset_delta - 1, _commit_end_offset=partition_session._next_message_start_commit_offset + offset_delta, @@ -251,6 +252,7 @@ def batch_size(): seq_no=message.seqno, created_at=message.created_at, data=message.data, + metadata_items={}, uncompresed_size=len(message.data), message_group_id=message.message_group_id, ) @@ -773,6 +775,7 @@ async def test_free_buffer_after_partition_stop(self, stream, stream_reader, par seq_no=123, created_at=t, data=bytes(), + metadata_items={}, uncompresed_size=message_size, message_group_id="test-message-group", ) @@ -853,6 +856,7 @@ def reader_batch_count(): created_at=created_at, data=data, uncompresed_size=len(data), + metadata_items={}, message_group_id=message_group_id, ) ], @@ -884,6 +888,7 @@ def reader_batch_count(): written_at=written_at, producer_id=producer_id, data=data, + metadata_items={}, _partition_session=partition_session, _commit_start_offset=expected_message_offset, _commit_end_offset=expected_message_offset + 1, @@ -930,6 +935,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti seq_no=3, created_at=created_at, data=data, + metadata_items={}, uncompresed_size=len(data), message_group_id=message_group_id, ) @@ -951,6 +957,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti seq_no=2, created_at=created_at2, data=data, + metadata_items={}, uncompresed_size=len(data), message_group_id=message_group_id, ) @@ -967,6 +974,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti seq_no=3, created_at=created_at3, data=data2, + metadata_items={}, uncompresed_size=len(data2), message_group_id=message_group_id, ), @@ -975,6 +983,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti seq_no=5, created_at=created_at4, data=data, + metadata_items={}, uncompresed_size=len(data), message_group_id=message_group_id2, ), @@ -1005,6 +1014,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti written_at=written_at, producer_id=producer_id, data=data, + metadata_items={}, _partition_session=partition_session, _commit_start_offset=partition1_mess1_expected_offset, _commit_end_offset=partition1_mess1_expected_offset + 1, @@ -1025,6 +1035,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti written_at=written_at2, producer_id=producer_id, data=data, + metadata_items={}, _partition_session=second_partition_session, _commit_start_offset=partition2_mess1_expected_offset, _commit_end_offset=partition2_mess1_expected_offset + 1, @@ -1045,6 +1056,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti written_at=written_at2, producer_id=producer_id2, data=data2, + metadata_items={}, _partition_session=second_partition_session, _commit_start_offset=partition2_mess2_expected_offset, _commit_end_offset=partition2_mess2_expected_offset + 1, @@ -1058,6 +1070,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti written_at=written_at2, producer_id=producer_id, data=data, + metadata_items={}, _partition_session=second_partition_session, _commit_start_offset=partition2_mess3_expected_offset, _commit_end_offset=partition2_mess3_expected_offset + 1, diff --git a/ydb/_topic_writer/topic_writer.py b/ydb/_topic_writer/topic_writer.py index 8fa5fd48..aa5fe974 100644 --- a/ydb/_topic_writer/topic_writer.py +++ b/ydb/_topic_writer/topic_writer.py @@ -120,11 +120,12 @@ class InternalMessage(StreamWriteMessage.WriteRequest.MessageData, IToProto): codec: PublicCodec def __init__(self, mess: PublicMessage): + metadata_items = mess.metadata_items or {} super().__init__( seq_no=mess.seqno, created_at=mess.created_at, data=mess.data, - metadata_items=mess.metadata_items, + metadata_items=metadata_items, uncompressed_size=len(mess.data), partitioning=None, ) diff --git a/ydb/_topic_writer/topic_writer_asyncio_test.py b/ydb/_topic_writer/topic_writer_asyncio_test.py index 8b320f23..b288d0f0 100644 --- a/ydb/_topic_writer/topic_writer_asyncio_test.py +++ b/ydb/_topic_writer/topic_writer_asyncio_test.py @@ -153,6 +153,7 @@ async def test_write_a_message(self, writer_and_stream: WriterWithMockedStream): seq_no=1, created_at=now, data=data, + metadata_items={}, uncompressed_size=len(data), partitioning=None, )