Skip to content

Commit

Permalink
log compaction improvements (#854)
Browse files Browse the repository at this point in the history
  • Loading branch information
zubenkoivan authored Nov 27, 2023
1 parent cda3303 commit 8f31164
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 111 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.9.9-slim-bullseye AS installer
FROM python:3.9.18-slim-bookworm AS installer

ENV PATH=/root/.local/bin:$PATH

Expand All @@ -9,7 +9,7 @@ COPY dist /tmp/dist
RUN ls /tmp/dist
RUN pip install --user --find-links /tmp/dist platform-monitoring

FROM python:3.9.9-slim-bullseye AS service
FROM python:3.9.18-slim-bookworm AS service

LABEL org.opencontainers.image.source = "https://github.com/neuro-inc/platform-monitoring"

Expand Down
6 changes: 5 additions & 1 deletion platform_monitoring/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import aiohttp.web
import aiohttp_cors
from aiobotocore.client import AioBaseClient
from aiobotocore.config import AioConfig
from aioelasticsearch import Elasticsearch
from aiohttp.client_ws import ClientWebSocketResponse
from aiohttp.web import (
Expand Down Expand Up @@ -690,7 +691,7 @@ async def create_elasticsearch_client(


def create_s3_client(config: S3Config) -> AioBaseClient:
kwargs: dict[str, str] = {}
kwargs: dict[str, Any] = {}
if config.access_key_id:
kwargs["aws_access_key_id"] = config.access_key_id
if config.secret_access_key:
Expand All @@ -699,6 +700,9 @@ def create_s3_client(config: S3Config) -> AioBaseClient:
kwargs["endpoint_url"] = str(config.endpoint_url)
if config.region:
kwargs["region_name"] = config.region
kwargs["config"] = AioConfig(
retries={"mode": "standard"}, # 3 retries by default
)
session = aiobotocore.session.get_session()
return session.create_client("s3", **kwargs)

Expand Down
2 changes: 1 addition & 1 deletion platform_monitoring/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class RegistryConfig:

@property
def host(self) -> str:
port = self.url.explicit_port # type: ignore
port = self.url.explicit_port
suffix = f":{port}" if port else ""
return f"{self.url.host}{suffix}"

Expand Down
157 changes: 75 additions & 82 deletions platform_monitoring/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

import abc
import asyncio
import functools
import io
import json
import logging
import sys
import time
Expand All @@ -18,6 +16,7 @@

import aiohttp
import botocore.exceptions
import orjson
from aiobotocore.client import AioBaseClient
from aiobotocore.response import StreamingBody
from aioelasticsearch import Elasticsearch, RequestError
Expand Down Expand Up @@ -388,16 +387,13 @@ async def get(self, pod_name: str) -> S3LogsMetadata:

@trace
async def _get_from_s3(self, pod_name: str) -> S3LogsMetadata:
get_object = functools.partial(
self._s3_client.get_object,
response = await self._s3_client.get_object(
Bucket=self._bucket_name,
Key=self.get_metadata_key(pod_name),
)
resp = await retry_if_failed(get_object)
resp_body = resp["Body"]
async with resp_body:
raw_data = await resp_body.read()
data = json.loads(raw_data)
async with response["Body"]:
raw_data = await response["Body"].read()
data = orjson.loads(raw_data)
return S3LogsMetadata.from_primitive(data)

async def put(self, pod_name: str, metadata: S3LogsMetadata) -> None:
Expand All @@ -407,13 +403,11 @@ async def put(self, pod_name: str, metadata: S3LogsMetadata) -> None:

@trace
async def _put_to_s3(self, pod_name: str, metadata: S3LogsMetadata) -> None:
put_object = functools.partial(
self._s3_client.put_object,
await self._s3_client.put_object(
Bucket=self._bucket_name,
Key=self.get_metadata_key(pod_name),
Body=json.dumps(metadata.to_primitive()).encode(),
Body=orjson.dumps(metadata.to_primitive()),
)
await retry_if_failed(put_object)


class S3LogsMetadataService:
Expand Down Expand Up @@ -529,22 +523,17 @@ async def get_pods_cleanup_queue(self, cleanup_interval: float = 3600) -> list[s

@trace
async def add_pod_to_cleanup_queue(self, pod_name: str) -> None:
put_object = functools.partial(
self._s3_client.put_object,
await self._s3_client.put_object(
Bucket=self.bucket_name,
Key=f"{self.CLEANUP_KEY_PREFIX}/{pod_name}",
Body=b"",
)
await retry_if_failed(put_object)

@trace
async def remove_pod_from_cleanup_queue(self, pod_name: str) -> None:
delete_object = functools.partial(
self._s3_client.delete_object,
Bucket=self.bucket_name,
Key=f"{self.CLEANUP_KEY_PREFIX}/{pod_name}",
await self._s3_client.delete_object(
Bucket=self.bucket_name, Key=f"{self.CLEANUP_KEY_PREFIX}/{pod_name}"
)
await retry_if_failed(delete_object)


@dataclass(frozen=True)
Expand All @@ -559,7 +548,7 @@ class S3LogRecord:
def parse(
cls, line: bytes, fallback_time: datetime, *, container_id: str
) -> S3LogRecord:
data = json.loads(line.decode(errors="replace"))
data = orjson.loads(line.decode(errors="replace"))
time_str, time = cls._parse_time(data, fallback_time)
return cls(
time_str=time_str,
Expand Down Expand Up @@ -607,50 +596,64 @@ def _parse_message(cls, data: dict[str, Any]) -> str:


class S3FileReader:
def __init__(self, s3_client: AioBaseClient, bucket_name: str, key: str) -> None:
def __init__(
self,
s3_client: AioBaseClient,
bucket_name: str,
key: str,
chunk_size: int = 1024,
) -> None:
self._s3_client = s3_client
self._bucket_name = bucket_name
self._key = key
self._chunk_size = chunk_size
self._loop = asyncio.get_event_loop()

@classmethod
def _is_compressed(cls, response: Any) -> bool:
return response["ContentType"] == "application/x-gzip"

async def iter_lines(self) -> AsyncIterator[bytes]:
get_object = functools.partial(
self._s3_client.get_object, Bucket=self._bucket_name, Key=self._key
response = await self._s3_client.get_object(
Bucket=self._bucket_name, Key=self._key
)
response = await retry_if_failed(get_object)
response_body = response["Body"]

async with response_body:
async with response["Body"]:
if self._is_compressed(response):
line_iterator = self._iter_decompressed_lines(response_body)
line_iterator = self._iter_decompressed_lines(response["Body"])
else:
line_iterator = response_body.iter_lines()
line_iterator = response["Body"].iter_lines(chunk_size=self._chunk_size)

async with aclosing(line_iterator):
async for line in line_iterator:
yield line

@classmethod
async def _iter_decompressed_lines(
cls, body: StreamingBody
self, body: StreamingBody
) -> AsyncIterator[bytes]:
loop = asyncio.get_event_loop()
decompress_obj = zlib.decompressobj(wbits=ZLIB_WBITS)
pending = b""
async for chunk in body.iter_chunks():
async for chunk in body.iter_chunks(chunk_size=self._chunk_size):
chunk_d = await loop.run_in_executor(
None, lambda: decompress_obj.decompress(chunk)
)
lines = (pending + chunk_d).splitlines(True)
for line in lines[:-1]:
yield line.splitlines()[0]
pending = lines[-1]
if chunk_d:
lines = chunk_d.splitlines()
lines[0] = pending + lines[0]
for i in range(len(lines) - 1):
yield lines[i]
if chunk_d.endswith(b"\n"):
pending = b""
yield lines[-1]
else:
pending = lines[-1]
chunk_d = await loop.run_in_executor(None, decompress_obj.flush)
if chunk_d:
pending += chunk_d
if pending:
yield pending.splitlines()[0]
for line in pending.splitlines():
yield line


class S3LogRecordsWriter:
Expand All @@ -663,10 +666,14 @@ def __init__(
pod_name: str,
size_limit: int = 10 * 1024**2,
buffer: IO[bytes] | None = None,
write_buffer_attempts: int = 4,
write_buffer_timeout: int = 10,
) -> None:
self._s3_client = s3_client
self._bucket_name = bucket_name
self._size_limit = size_limit
self._write_buffer_attempts = write_buffer_attempts
self._write_buffer_timeout = write_buffer_timeout

self._buffer = io.BytesIO() if buffer is None else buffer
self._buffer.seek(0)
Expand Down Expand Up @@ -712,21 +719,35 @@ async def _flush(self) -> None:
last_record_time=self._last_record_time,
)
logger.info("Flushing records to %s", file)
put_object = functools.partial(
self._s3_client.put_object,
Bucket=self._bucket_name,
Key=key,
Body=self._buffer,
ContentType="application/x-gzip",
)
await retry_if_failed(put_object)
await self._put_buffer_to_s3(key)
self._output_files.append(file)
self._compress_obj = zlib.compressobj(wbits=ZLIB_WBITS)
self._buffer.seek(0)
self._buffer.truncate(0)
self._records_count = 0
self._size = 0

async def _put_buffer_to_s3(self, key: str) -> None:
last_err = None
for _ in range(self._write_buffer_attempts):
try:
await asyncio.wait_for(
self._s3_client.put_object(
Bucket=self._bucket_name,
Key=key,
Body=self._buffer,
ContentType="application/x-gzip",
),
self._write_buffer_timeout,
)
return
except asyncio.TimeoutError as err:
last_err = err
logger.warning("Timeout while flushing records")
else:
assert last_err
raise last_err

async def write(self, record: S3LogRecord) -> None:
if self._size and self._container_id != record.container_id:
await self._flush()
Expand All @@ -750,8 +771,7 @@ def encode_record(self, record: S3LogRecord) -> bytes:
body = {"time": record.time_str, "log": record.message}
if record.stream != S3LogRecord.stream:
body["stream"] = record.stream
body_str = json.dumps(body) + "\n"
return body_str.encode()
return orjson.dumps(body) + b"\n"

def get_output_files(self) -> list[S3LogFile]:
return self._output_files.copy()
Expand Down Expand Up @@ -1192,7 +1212,7 @@ async def compact_all(
async def compact_one(self, pod_name: str) -> None:
await self._metadata_service.add_pod_to_cleanup_queue(pod_name)
metadata = await self._metadata_service.get_metadata(pod_name)
logger.info("Compacting pod %s, %s", pod_name, metadata)
logger.info("Compacting pod %s with %s", pod_name, metadata)
raw_keys = await self._metadata_service.get_raw_log_keys(pod_name)
raw_keys = await self._delete_merged_keys(metadata, raw_keys)
await self._delete_orphaned_keys(pod_name, metadata)
Expand All @@ -1202,7 +1222,7 @@ async def compact_one(self, pod_name: str) -> None:
@trace
async def cleanup_one(self, pod_name: str) -> None:
metadata = await self._metadata_service.get_metadata(pod_name)
logger.info("Cleaning pod %s, %s", pod_name, metadata)
logger.info("Cleaning pod %s with %s", pod_name, metadata)
raw_keys = await self._metadata_service.get_raw_log_keys(pod_name)
raw_keys = await self._delete_merged_keys(metadata, raw_keys)
await self._delete_orphaned_keys(pod_name, metadata)
Expand Down Expand Up @@ -1237,6 +1257,10 @@ async def _merge_raw_keys(
return metadata

def _create_log_record_writer(self, pod_name: str) -> S3LogRecordsWriter:
# NOTE: write_buffer_timeout must be less than s3_client read_timeout,
# which is 60s by default. Buffer flushing can happen while compactor
# is reading the raw log file. So it should be acceptable for flushing
# to be retried several times while the file is being read.
return S3LogRecordsWriter(
self._s3_client,
self._bucket_name,
Expand Down Expand Up @@ -1304,10 +1328,7 @@ async def _delete_keys(self, keys: Iterable[str]) -> None:

@trace
async def _delete_key(self, key: str) -> None:
delete_object = functools.partial(
self._s3_client.delete_object, Bucket=self._bucket_name, Key=key
)
await retry_if_failed(delete_object)
await self._s3_client.delete_object(Bucket=self._bucket_name, Key=key)


async def get_first_log_entry_time(
Expand Down Expand Up @@ -1360,31 +1381,3 @@ def s3_client_error(code: str | int) -> type[Exception]:
):
return botocore.exceptions.ClientError
return UnknownS3Error


async def retry_if_failed(
partial: Any,
attempts: int = 6,
sleep_seconds: float = 10,
exceptions: tuple[type[BaseException], ...] | None = None,
) -> Any:
if exceptions is None:
exceptions = (botocore.exceptions.EndpointConnectionError,)
last_err = None
for attempt in range(attempts):
try:
return await partial()
except exceptions as err:
last_err = err
logger.error(
"Unable to connect to the endpoint. Check your network connection. "
"Sleeping and retrying %d more times "
"before giving up." % (attempts - attempt - 1)
)
await asyncio.sleep(sleep_seconds)
else:
logger.error(
"Unable to connect to the endpoint after %d attempts. Giving up.", attempts
)
assert last_err
raise last_err
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ install_requires =
aiobotocore==2.7.0
iso8601==2.1.0
cachetools==5.3.2
orjson

[options.entry_points]
console_scripts =
Expand Down
Loading

0 comments on commit 8f31164

Please sign in to comment.