From 535b3e8c9cdf90e1757c0dcb178845a9ddbc6a79 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 10 Dec 2024 15:49:25 +0300 Subject: [PATCH] Make created_at utc in topic writer --- ydb/_topic_reader/topic_reader_asyncio_test.py | 2 +- ydb/_topic_writer/topic_writer_asyncio.py | 2 +- ydb/_topic_writer/topic_writer_asyncio_test.py | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ydb/_topic_reader/topic_reader_asyncio_test.py b/ydb/_topic_reader/topic_reader_asyncio_test.py index 4c76cd1d..b9f1e639 100644 --- a/ydb/_topic_reader/topic_reader_asyncio_test.py +++ b/ydb/_topic_reader/topic_reader_asyncio_test.py @@ -748,7 +748,7 @@ async def test_free_buffer_after_partition_stop(self, stream, stream_reader, par initial_buffer_size = stream_reader._buffer_size_bytes message_size = initial_buffer_size - 1 - t = datetime.datetime.now() + t = datetime.datetime.now(datetime.timezone.utc) stream.from_server.put_nowait( StreamReadMessage.FromServer( diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index c7f88a42..d759072c 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -307,7 +307,7 @@ def _add_messages_to_send_queue(self, internal_messages: List[InternalMessage]): def _prepare_internal_messages(self, messages: List[PublicMessage]) -> List[InternalMessage]: if self._settings.auto_created_at: - now = datetime.datetime.now() + now = datetime.datetime.now(datetime.timezone.utc) else: now = None diff --git a/ydb/_topic_writer/topic_writer_asyncio_test.py b/ydb/_topic_writer/topic_writer_asyncio_test.py index 8fee5cec..dc3f2cad 100644 --- a/ydb/_topic_writer/topic_writer_asyncio_test.py +++ b/ydb/_topic_writer/topic_writer_asyncio_test.py @@ -132,7 +132,7 @@ async def test_init_writer(self, stream): async def test_write_a_message(self, writer_and_stream: WriterWithMockedStream): data = "123".encode() - now = datetime.datetime.now() + now = datetime.datetime.now(datetime.timezone.utc) writer_and_stream.writer.write( [ InternalMessage( @@ -322,7 +322,7 @@ async def test_reconnect_and_resent_non_acked_messages_on_retriable_error( get_stream_writer, default_write_statistic, ): - now = datetime.datetime.now() + now = datetime.datetime.now(datetime.timezone.utc) data = "123".encode() message1 = PublicMessage( @@ -460,7 +460,7 @@ async def test_deny_double_seqno(self, reconnector: WriterAsyncIOReconnector, ge @freezegun.freeze_time("2022-01-13 20:50:00", tz_offset=0) async def test_auto_created_at(self, default_driver, default_settings, get_stream_writer): - now = datetime.datetime.now() + now = datetime.datetime.now(datetime.timezone.utc) settings = copy.deepcopy(default_settings) settings.auto_created_at = True @@ -587,7 +587,7 @@ async def test_custom_encoder(self, default_driver, default_settings, get_stream settings.codec = codec reconnector = WriterAsyncIOReconnector(default_driver, settings) - now = datetime.datetime.now() + now = datetime.datetime.now(datetime.timezone.utc) seqno = self.init_last_seqno + 1 await reconnector.write_with_ack_future([PublicMessage(data=b"123", seqno=seqno, created_at=now)])