From 8f3116414e1c9550849923f8ca0ab09496add0f3 Mon Sep 17 00:00:00 2001 From: Ivan Zubenko Date: Mon, 27 Nov 2023 12:01:18 +0200 Subject: [PATCH] log compaction improvements (#854) --- Dockerfile | 4 +- platform_monitoring/api.py | 6 +- platform_monitoring/config.py | 2 +- platform_monitoring/logs.py | 157 ++++++++++++++++----------------- setup.cfg | 1 + tests/integration/conftest.py | 12 +-- tests/integration/test_kube.py | 46 ++++++---- 7 files changed, 117 insertions(+), 111 deletions(-) diff --git a/Dockerfile b/Dockerfile index 1d5067e9..3e007493 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.9.9-slim-bullseye AS installer +FROM python:3.9.18-slim-bookworm AS installer ENV PATH=/root/.local/bin:$PATH @@ -9,7 +9,7 @@ COPY dist /tmp/dist RUN ls /tmp/dist RUN pip install --user --find-links /tmp/dist platform-monitoring -FROM python:3.9.9-slim-bullseye AS service +FROM python:3.9.18-slim-bookworm AS service LABEL org.opencontainers.image.source = "https://github.com/neuro-inc/platform-monitoring" diff --git a/platform_monitoring/api.py b/platform_monitoring/api.py index c0a15b1a..ac9d28be 100644 --- a/platform_monitoring/api.py +++ b/platform_monitoring/api.py @@ -15,6 +15,7 @@ import aiohttp.web import aiohttp_cors from aiobotocore.client import AioBaseClient +from aiobotocore.config import AioConfig from aioelasticsearch import Elasticsearch from aiohttp.client_ws import ClientWebSocketResponse from aiohttp.web import ( @@ -690,7 +691,7 @@ async def create_elasticsearch_client( def create_s3_client(config: S3Config) -> AioBaseClient: - kwargs: dict[str, str] = {} + kwargs: dict[str, Any] = {} if config.access_key_id: kwargs["aws_access_key_id"] = config.access_key_id if config.secret_access_key: @@ -699,6 +700,9 @@ def create_s3_client(config: S3Config) -> AioBaseClient: kwargs["endpoint_url"] = str(config.endpoint_url) if config.region: kwargs["region_name"] = config.region + kwargs["config"] = AioConfig( + retries={"mode": "standard"}, # 3 retries by default + ) session = aiobotocore.session.get_session() return session.create_client("s3", **kwargs) diff --git a/platform_monitoring/config.py b/platform_monitoring/config.py index fdde02a3..cb99b138 100644 --- a/platform_monitoring/config.py +++ b/platform_monitoring/config.py @@ -102,7 +102,7 @@ class RegistryConfig: @property def host(self) -> str: - port = self.url.explicit_port # type: ignore + port = self.url.explicit_port suffix = f":{port}" if port else "" return f"{self.url.host}{suffix}" diff --git a/platform_monitoring/logs.py b/platform_monitoring/logs.py index b252192f..8269326d 100644 --- a/platform_monitoring/logs.py +++ b/platform_monitoring/logs.py @@ -2,9 +2,7 @@ import abc import asyncio -import functools import io -import json import logging import sys import time @@ -18,6 +16,7 @@ import aiohttp import botocore.exceptions +import orjson from aiobotocore.client import AioBaseClient from aiobotocore.response import StreamingBody from aioelasticsearch import Elasticsearch, RequestError @@ -388,16 +387,13 @@ async def get(self, pod_name: str) -> S3LogsMetadata: @trace async def _get_from_s3(self, pod_name: str) -> S3LogsMetadata: - get_object = functools.partial( - self._s3_client.get_object, + response = await self._s3_client.get_object( Bucket=self._bucket_name, Key=self.get_metadata_key(pod_name), ) - resp = await retry_if_failed(get_object) - resp_body = resp["Body"] - async with resp_body: - raw_data = await resp_body.read() - data = json.loads(raw_data) + async with response["Body"]: + raw_data = await response["Body"].read() + data = orjson.loads(raw_data) return S3LogsMetadata.from_primitive(data) async def put(self, pod_name: str, metadata: S3LogsMetadata) -> None: @@ -407,13 +403,11 @@ async def put(self, pod_name: str, metadata: S3LogsMetadata) -> None: @trace async def _put_to_s3(self, pod_name: str, metadata: S3LogsMetadata) -> None: - put_object = functools.partial( - self._s3_client.put_object, + await self._s3_client.put_object( Bucket=self._bucket_name, Key=self.get_metadata_key(pod_name), - Body=json.dumps(metadata.to_primitive()).encode(), + Body=orjson.dumps(metadata.to_primitive()), ) - await retry_if_failed(put_object) class S3LogsMetadataService: @@ -529,22 +523,17 @@ async def get_pods_cleanup_queue(self, cleanup_interval: float = 3600) -> list[s @trace async def add_pod_to_cleanup_queue(self, pod_name: str) -> None: - put_object = functools.partial( - self._s3_client.put_object, + await self._s3_client.put_object( Bucket=self.bucket_name, Key=f"{self.CLEANUP_KEY_PREFIX}/{pod_name}", Body=b"", ) - await retry_if_failed(put_object) @trace async def remove_pod_from_cleanup_queue(self, pod_name: str) -> None: - delete_object = functools.partial( - self._s3_client.delete_object, - Bucket=self.bucket_name, - Key=f"{self.CLEANUP_KEY_PREFIX}/{pod_name}", + await self._s3_client.delete_object( + Bucket=self.bucket_name, Key=f"{self.CLEANUP_KEY_PREFIX}/{pod_name}" ) - await retry_if_failed(delete_object) @dataclass(frozen=True) @@ -559,7 +548,7 @@ class S3LogRecord: def parse( cls, line: bytes, fallback_time: datetime, *, container_id: str ) -> S3LogRecord: - data = json.loads(line.decode(errors="replace")) + data = orjson.loads(line.decode(errors="replace")) time_str, time = cls._parse_time(data, fallback_time) return cls( time_str=time_str, @@ -607,10 +596,17 @@ def _parse_message(cls, data: dict[str, Any]) -> str: class S3FileReader: - def __init__(self, s3_client: AioBaseClient, bucket_name: str, key: str) -> None: + def __init__( + self, + s3_client: AioBaseClient, + bucket_name: str, + key: str, + chunk_size: int = 1024, + ) -> None: self._s3_client = s3_client self._bucket_name = bucket_name self._key = key + self._chunk_size = chunk_size self._loop = asyncio.get_event_loop() @classmethod @@ -618,39 +614,46 @@ def _is_compressed(cls, response: Any) -> bool: return response["ContentType"] == "application/x-gzip" async def iter_lines(self) -> AsyncIterator[bytes]: - get_object = functools.partial( - self._s3_client.get_object, Bucket=self._bucket_name, Key=self._key + response = await self._s3_client.get_object( + Bucket=self._bucket_name, Key=self._key ) - response = await retry_if_failed(get_object) - response_body = response["Body"] - async with response_body: + async with response["Body"]: if self._is_compressed(response): - line_iterator = self._iter_decompressed_lines(response_body) + line_iterator = self._iter_decompressed_lines(response["Body"]) else: - line_iterator = response_body.iter_lines() + line_iterator = response["Body"].iter_lines(chunk_size=self._chunk_size) async with aclosing(line_iterator): async for line in line_iterator: yield line - @classmethod async def _iter_decompressed_lines( - cls, body: StreamingBody + self, body: StreamingBody ) -> AsyncIterator[bytes]: loop = asyncio.get_event_loop() decompress_obj = zlib.decompressobj(wbits=ZLIB_WBITS) pending = b"" - async for chunk in body.iter_chunks(): + async for chunk in body.iter_chunks(chunk_size=self._chunk_size): chunk_d = await loop.run_in_executor( None, lambda: decompress_obj.decompress(chunk) ) - lines = (pending + chunk_d).splitlines(True) - for line in lines[:-1]: - yield line.splitlines()[0] - pending = lines[-1] + if chunk_d: + lines = chunk_d.splitlines() + lines[0] = pending + lines[0] + for i in range(len(lines) - 1): + yield lines[i] + if chunk_d.endswith(b"\n"): + pending = b"" + yield lines[-1] + else: + pending = lines[-1] + chunk_d = await loop.run_in_executor(None, decompress_obj.flush) + if chunk_d: + pending += chunk_d if pending: - yield pending.splitlines()[0] + for line in pending.splitlines(): + yield line class S3LogRecordsWriter: @@ -663,10 +666,14 @@ def __init__( pod_name: str, size_limit: int = 10 * 1024**2, buffer: IO[bytes] | None = None, + write_buffer_attempts: int = 4, + write_buffer_timeout: int = 10, ) -> None: self._s3_client = s3_client self._bucket_name = bucket_name self._size_limit = size_limit + self._write_buffer_attempts = write_buffer_attempts + self._write_buffer_timeout = write_buffer_timeout self._buffer = io.BytesIO() if buffer is None else buffer self._buffer.seek(0) @@ -712,14 +719,7 @@ async def _flush(self) -> None: last_record_time=self._last_record_time, ) logger.info("Flushing records to %s", file) - put_object = functools.partial( - self._s3_client.put_object, - Bucket=self._bucket_name, - Key=key, - Body=self._buffer, - ContentType="application/x-gzip", - ) - await retry_if_failed(put_object) + await self._put_buffer_to_s3(key) self._output_files.append(file) self._compress_obj = zlib.compressobj(wbits=ZLIB_WBITS) self._buffer.seek(0) @@ -727,6 +727,27 @@ async def _flush(self) -> None: self._records_count = 0 self._size = 0 + async def _put_buffer_to_s3(self, key: str) -> None: + last_err = None + for _ in range(self._write_buffer_attempts): + try: + await asyncio.wait_for( + self._s3_client.put_object( + Bucket=self._bucket_name, + Key=key, + Body=self._buffer, + ContentType="application/x-gzip", + ), + self._write_buffer_timeout, + ) + return + except asyncio.TimeoutError as err: + last_err = err + logger.warning("Timeout while flushing records") + else: + assert last_err + raise last_err + async def write(self, record: S3LogRecord) -> None: if self._size and self._container_id != record.container_id: await self._flush() @@ -750,8 +771,7 @@ def encode_record(self, record: S3LogRecord) -> bytes: body = {"time": record.time_str, "log": record.message} if record.stream != S3LogRecord.stream: body["stream"] = record.stream - body_str = json.dumps(body) + "\n" - return body_str.encode() + return orjson.dumps(body) + b"\n" def get_output_files(self) -> list[S3LogFile]: return self._output_files.copy() @@ -1192,7 +1212,7 @@ async def compact_all( async def compact_one(self, pod_name: str) -> None: await self._metadata_service.add_pod_to_cleanup_queue(pod_name) metadata = await self._metadata_service.get_metadata(pod_name) - logger.info("Compacting pod %s, %s", pod_name, metadata) + logger.info("Compacting pod %s with %s", pod_name, metadata) raw_keys = await self._metadata_service.get_raw_log_keys(pod_name) raw_keys = await self._delete_merged_keys(metadata, raw_keys) await self._delete_orphaned_keys(pod_name, metadata) @@ -1202,7 +1222,7 @@ async def compact_one(self, pod_name: str) -> None: @trace async def cleanup_one(self, pod_name: str) -> None: metadata = await self._metadata_service.get_metadata(pod_name) - logger.info("Cleaning pod %s, %s", pod_name, metadata) + logger.info("Cleaning pod %s with %s", pod_name, metadata) raw_keys = await self._metadata_service.get_raw_log_keys(pod_name) raw_keys = await self._delete_merged_keys(metadata, raw_keys) await self._delete_orphaned_keys(pod_name, metadata) @@ -1237,6 +1257,10 @@ async def _merge_raw_keys( return metadata def _create_log_record_writer(self, pod_name: str) -> S3LogRecordsWriter: + # NOTE: write_buffer_timeout must be less than s3_client read_timeout, + # which is 60s by default. Buffer flushing can happen while compactor + # is reading the raw log file. So it should be acceptable for flushing + # to be retried several times while the file is being read. return S3LogRecordsWriter( self._s3_client, self._bucket_name, @@ -1304,10 +1328,7 @@ async def _delete_keys(self, keys: Iterable[str]) -> None: @trace async def _delete_key(self, key: str) -> None: - delete_object = functools.partial( - self._s3_client.delete_object, Bucket=self._bucket_name, Key=key - ) - await retry_if_failed(delete_object) + await self._s3_client.delete_object(Bucket=self._bucket_name, Key=key) async def get_first_log_entry_time( @@ -1360,31 +1381,3 @@ def s3_client_error(code: str | int) -> type[Exception]: ): return botocore.exceptions.ClientError return UnknownS3Error - - -async def retry_if_failed( - partial: Any, - attempts: int = 6, - sleep_seconds: float = 10, - exceptions: tuple[type[BaseException], ...] | None = None, -) -> Any: - if exceptions is None: - exceptions = (botocore.exceptions.EndpointConnectionError,) - last_err = None - for attempt in range(attempts): - try: - return await partial() - except exceptions as err: - last_err = err - logger.error( - "Unable to connect to the endpoint. Check your network connection. " - "Sleeping and retrying %d more times " - "before giving up." % (attempts - attempt - 1) - ) - await asyncio.sleep(sleep_seconds) - else: - logger.error( - "Unable to connect to the endpoint after %d attempts. Giving up.", attempts - ) - assert last_err - raise last_err diff --git a/setup.cfg b/setup.cfg index 6999930a..8a373843 100644 --- a/setup.cfg +++ b/setup.cfg @@ -28,6 +28,7 @@ install_requires = aiobotocore==2.7.0 iso8601==2.1.0 cachetools==5.3.2 + orjson [options.entry_points] console_scripts = diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 0b721eed..f18d8e4c 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -9,7 +9,6 @@ from typing import Any from uuid import uuid1 -import aiobotocore.session import aiohttp import aiohttp.web import pytest @@ -19,7 +18,7 @@ from async_timeout import timeout from yarl import URL -from platform_monitoring.api import create_elasticsearch_client +from platform_monitoring.api import create_elasticsearch_client, create_s3_client from platform_monitoring.config import ( Config, ContainerRuntimeConfig, @@ -192,14 +191,7 @@ def s3_config() -> S3Config: @pytest.fixture async def s3_client(s3_config: S3Config) -> AsyncIterator[AioBaseClient]: - session = aiobotocore.session.get_session() - async with session.create_client( - "s3", - endpoint_url=str(s3_config.endpoint_url), - region_name=s3_config.region, - aws_access_key_id=s3_config.access_key_id, - aws_secret_access_key=s3_config.secret_access_key, - ) as client: + async with create_s3_client(s3_config) as client: yield client diff --git a/tests/integration/test_kube.py b/tests/integration/test_kube.py index 592a1d3f..12c36e7b 100644 --- a/tests/integration/test_kube.py +++ b/tests/integration/test_kube.py @@ -1654,18 +1654,34 @@ async def _put(key: str, *lines: str, compress: bool = False) -> None: class TestS3FileReader: - @pytest.mark.parametrize("compress", [False, True]) - async def test_iter_lines( + async def test_iter_lines__without_compression( self, s3_client: AioBaseClient, s3_logs_bucket: str, write_lines_to_s3: Callable[..., Awaitable[None]], - compress: bool, ) -> None: key = f"tests/{uuid.uuid4()}" reader = S3FileReader(s3_client, s3_logs_bucket, key) - await write_lines_to_s3(key, "1", "2", "3", compress=compress) + await write_lines_to_s3(key, "1", "2", "3") + + result = [line async for line in reader.iter_lines()] + + assert result == [b"1", b"2", b"3"] + + async def test_iter_lines__with_compression( + self, + s3_client: AioBaseClient, + s3_logs_bucket: str, + write_lines_to_s3: Callable[..., Awaitable[None]], + ) -> None: + key = f"tests/{uuid.uuid4()}" + # chunk_size = 1 will for DecompressObj to periodically + # keep decompressed data in internal buffer and not return + # it to the caller. + reader = S3FileReader(s3_client, s3_logs_bucket, key, chunk_size=1) + + await write_lines_to_s3(key, "1", "2", "3", compress=True) result = [line async for line in reader.iter_lines()] @@ -2232,7 +2248,7 @@ async def test_write__single_file( S3LogFile( key=mock.ANY, records_count=3, - size=173, + size=162, first_record_time=records[0].time, last_record_time=records[2].time, ) @@ -2253,14 +2269,14 @@ async def test_write__multiple_files( S3LogFile( key=mock.ANY, records_count=2, - size=102, + size=96, first_record_time=records[0].time, last_record_time=records[1].time, ), S3LogFile( key=mock.ANY, records_count=1, - size=71, + size=66, first_record_time=records[2].time, last_record_time=records[2].time, ), @@ -2281,21 +2297,21 @@ async def test_write__long_record( S3LogFile( key=mock.ANY, records_count=1, - size=51, + size=48, first_record_time=records[0].time, last_record_time=records[0].time, ), S3LogFile( key=mock.ANY, records_count=1, - size=51, + size=48, first_record_time=records[1].time, last_record_time=records[1].time, ), S3LogFile( key=mock.ANY, records_count=1, - size=71, + size=66, first_record_time=records[2].time, last_record_time=records[2].time, ), @@ -2324,7 +2340,7 @@ async def test_write__shared_buffer( S3LogFile( key=mock.ANY, records_count=3, - size=173, + size=162, first_record_time=records[0].time, last_record_time=records[2].time, ), @@ -2362,8 +2378,8 @@ async def test_compact_one( f"{raw_log_key_prefix}-c1.log/202301011234_1.gz", ] records = [ - '{"time": "2023-01-01T12:34:56.123456", "log": "1"}', - '{"time": "2023-01-01T12:34:57.123456", "log": "2"}', + '{"time":"2023-01-01T12:34:56.123456","log":"1"}', + '{"time":"2023-01-01T12:34:57.123456","log":"2"}', ] await write_lines_to_s3(pod_keys[0], records[0]) @@ -2400,8 +2416,8 @@ async def test_compact_one__resume_write_to_last_file( f"{raw_log_key_prefix}-c1.log/202301011234_1.gz", ] records = [ - '{"time": "2023-01-01T12:34:56.123456", "log": "1"}', - '{"time": "2023-01-01T12:34:57.123456", "log": "2"}', + '{"time":"2023-01-01T12:34:56.123456","log":"1"}', + '{"time":"2023-01-01T12:34:57.123456","log":"2"}', ] await write_lines_to_s3(pod_keys[0], records[0])