Skip to content

Commit

Permalink
upgrade aiohttp to 3.9.5
Browse files Browse the repository at this point in the history
  • Loading branch information
zubenkoivan committed May 4, 2024
1 parent f1bf421 commit 00935c0
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 70 deletions.
26 changes: 16 additions & 10 deletions platform_monitoring/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@
HEARTBEAT = 30


CONFIG_KEY = aiohttp.web.AppKey("config", Config)
KUBE_CLIENT_KEY = aiohttp.web.AppKey("kube_client", KubeClient)
JOBS_SERVICE_KEY = aiohttp.web.AppKey("jobs_service", JobsService)
LOGS_SERVICE_KEY = aiohttp.web.AppKey("logs_service", LogsService)
CONFIG_CLIENT_KEY = aiohttp.web.AppKey("config_client", ConfigClient)
MONITORING_APP_KEY = aiohttp.web.AppKey("monitoring_app", aiohttp.web.Application)

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -124,15 +131,15 @@ def register(self, app: aiohttp.web.Application) -> None:

@property
def _jobs_service(self) -> JobsService:
return self._app["jobs_service"]
return self._app[JOBS_SERVICE_KEY]

@property
def _kube_client(self) -> KubeClient:
return self._app["kube_client"]
return self._app[KUBE_CLIENT_KEY]

@property
def _logs_service(self) -> LogsService:
return self._app["logs_service"]
return self._app[LOGS_SERVICE_KEY]

async def get_capacity(self, request: Request) -> Response:
# Check that user has access to the cluster
Expand Down Expand Up @@ -750,7 +757,7 @@ async def add_version_to_header(request: Request, response: StreamResponse) -> N

async def create_app(config: Config) -> aiohttp.web.Application:
app = aiohttp.web.Application(middlewares=[handle_exceptions])
app["config"] = config
app[CONFIG_KEY] = config

async def _init_app(app: aiohttp.web.Application) -> AsyncIterator[None]:
async with AsyncExitStack() as exit_stack:
Expand Down Expand Up @@ -789,18 +796,18 @@ async def _init_app(app: aiohttp.web.Application) -> AsyncIterator[None]:
kube_client = await exit_stack.enter_async_context(
create_kube_client(config.kube)
)
app["monitoring_app"]["kube_client"] = kube_client
app[MONITORING_APP_KEY][KUBE_CLIENT_KEY] = kube_client

logger.info("Initializing Platform Config client")
config_client = await exit_stack.enter_async_context(
ConfigClient(config.platform_config.url, config.platform_config.token)
)
app["monitoring_app"]["config_client"] = config_client
app[MONITORING_APP_KEY][CONFIG_CLIENT_KEY] = config_client

logs_service = create_logs_service(
config, kube_client, es_client, s3_client
)
app["monitoring_app"]["logs_service"] = logs_service
app[MONITORING_APP_KEY][LOGS_SERVICE_KEY] = logs_service

container_runtime_client_registry = await exit_stack.enter_async_context(
ContainerRuntimeClientRegistry(
Expand All @@ -816,7 +823,7 @@ async def _init_app(app: aiohttp.web.Application) -> AsyncIterator[None]:
kube_job_label=config.kube.job_label,
kube_node_pool_label=config.kube.node_pool_label,
)
app["monitoring_app"]["jobs_service"] = jobs_service
app[MONITORING_APP_KEY][JOBS_SERVICE_KEY] = jobs_service

await exit_stack.enter_async_context(
LogCleanupPoller(
Expand All @@ -833,10 +840,9 @@ async def _init_app(app: aiohttp.web.Application) -> AsyncIterator[None]:
api_v1_app = aiohttp.web.Application()
api_v1_handler = ApiHandler()
api_v1_handler.register(api_v1_app)
app["api_v1_app"] = api_v1_app

monitoring_app = await create_monitoring_app(config)
app["monitoring_app"] = monitoring_app
app[MONITORING_APP_KEY] = monitoring_app
api_v1_app.add_subapp("/jobs", monitoring_app)

app.add_subapp("/api/v1", api_v1_app)
Expand Down
9 changes: 4 additions & 5 deletions platform_monitoring/kube_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import aiohttp
from aiohttp import ContentTypeError
from async_timeout import timeout
from yarl import URL

from .base import JobStats, Telemetry
Expand Down Expand Up @@ -361,9 +360,9 @@ def __init__(
def _is_ssl(self) -> bool:
return urlsplit(self._base_url).scheme == "https"

def _create_ssl_context(self) -> ssl.SSLContext | None:
def _create_ssl_context(self) -> bool | ssl.SSLContext:
if not self._is_ssl:
return None
return True
ssl_context = ssl.create_default_context(
cafile=self._cert_authority_path, cadata=self._cert_authority_data_pem
)
Expand Down Expand Up @@ -516,7 +515,7 @@ async def wait_pod_is_running(
Raise JobNotFoundException if there is no such pod.
Raise asyncio.TimeoutError if it takes too long for the pod.
"""
async with timeout(timeout_s):
async with asyncio.timeout(timeout_s):
while True:
status = await self.get_container_status(pod_name)
if status.is_running:
Expand All @@ -533,7 +532,7 @@ async def wait_pod_is_not_waiting(
Raise JobNotFoundException if there is no such pod.
Raise asyncio.TimeoutError if it takes too long for the pod.
"""
async with timeout(timeout_s):
async with asyncio.timeout(timeout_s):
while True:
status = await self.get_container_status(pod_name)
if not status.is_waiting:
Expand Down
5 changes: 2 additions & 3 deletions platform_monitoring/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import orjson
from aiobotocore.client import AioBaseClient
from aiobotocore.response import StreamingBody
from async_timeout import timeout
from cachetools import LRUCache
from elasticsearch import AsyncElasticsearch, RequestError
from elasticsearch.helpers import async_scan
Expand Down Expand Up @@ -1071,7 +1070,7 @@ async def wait_pod_is_running(
timeout_s: float = 10.0 * 60,
interval_s: float = 1.0,
) -> ContainerStatus:
async with timeout(timeout_s):
async with asyncio.timeout(timeout_s):
while True:
status = await self.get_container_status(name)
if not status.is_waiting:
Expand Down Expand Up @@ -1375,7 +1374,7 @@ async def get_first_log_entry_time(
timestamps=True,
read_timeout_s=timeout_s,
) as stream:
async with timeout(timeout_s):
async with asyncio.timeout(timeout_s):
while True:
chunk = await stream.readany()
if not chunk:
Expand Down
9 changes: 5 additions & 4 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ classifiers =
[options]
packages = find:
install_requires =
aiobotocore==2.11.2
aiohttp==3.8.6
aiobotocore==2.12.3
aiohttp[speedups]==3.9.5
cachetools==5.3.3
docker-image-py==0.1.12
elasticsearch<8.0.0
Expand All @@ -26,6 +26,7 @@ install_requires =
neuro-sdk==22.7.1
orjson
trafaret==2.1.1
uvloop
python_requires = >=3.11
include_package_data = True
platforms = any
Expand All @@ -41,8 +42,8 @@ dev =
mypy
pre-commit
pytest==7.4.0
pytest-asyncio==0.21.1
pytest-cov==4.1.0
pytest-asyncio==0.23.6
pytest-cov==5.0.0
python-jose==3.3.0
ruff
types-cachetools
Expand Down
26 changes: 4 additions & 22 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import subprocess
import time
from collections.abc import AsyncIterator, Callable, Iterator
from collections.abc import AsyncIterator, Callable
from contextlib import asynccontextmanager
from dataclasses import dataclass
from pathlib import Path
Expand All @@ -14,7 +14,6 @@
import pytest
from _pytest.fixtures import FixtureRequest
from aiobotocore.client import AioBaseClient
from async_timeout import timeout
from elasticsearch import AsyncElasticsearch
from yarl import URL

Expand Down Expand Up @@ -50,23 +49,6 @@ def in_minikube(in_docker: bool) -> bool: # noqa: FBT001
return in_docker


@pytest.fixture(scope="session")
def event_loop() -> Iterator[asyncio.AbstractEventLoop]:
"""This fixture fixes scope mismatch error with implicitly added "event_loop".
see https://github.com/pytest-dev/pytest-asyncio/issues/68
"""
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
loop = asyncio.get_event_loop_policy().new_event_loop()
loop.set_debug(True)

watcher = asyncio.SafeChildWatcher()
watcher.attach_loop(loop)
asyncio.get_event_loop_policy().set_child_watcher(watcher)

yield loop
loop.close()


def random_str(length: int = 8) -> str:
return str(uuid1())[:length]

Expand All @@ -88,11 +70,11 @@ async def wait_for_service(
timeout_s: float = 30,
interval_s: float = 1,
) -> None:
async with timeout(timeout_s):
async with asyncio.timeout(timeout_s):
while True:
try:
async with aiohttp.ClientSession() as client:
async with timeout(1):
async with asyncio.timeout(1):
async with client.get(service_ping_url) as resp:
assert resp.status == aiohttp.web.HTTPOk.status_code
return
Expand Down Expand Up @@ -164,7 +146,7 @@ async def es_config(
else:
es_host = get_service_url("elasticsearch-logging")
async with AsyncElasticsearch(hosts=[es_host]) as client:
async with timeout(120):
async with asyncio.timeout(120):
while True:
try:
await client.ping()
Expand Down
11 changes: 4 additions & 7 deletions tests/integration/conftest_admin.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
from collections.abc import Awaitable, Callable

import aiohttp
Expand Down Expand Up @@ -59,16 +58,14 @@ async def _factory(


@pytest.fixture(scope="session")
def regular_user1(
event_loop: asyncio.AbstractEventLoop,
async def regular_user1(
regular_user_factory: Callable[..., Awaitable[_User]],
) -> _User:
return event_loop.run_until_complete(regular_user_factory())
return await regular_user_factory()


@pytest.fixture(scope="session")
def regular_user2(
event_loop: asyncio.AbstractEventLoop,
async def regular_user2(
regular_user_factory: Callable[..., Awaitable[_User]],
) -> _User:
return event_loop.run_until_complete(regular_user_factory())
return await regular_user_factory()
7 changes: 3 additions & 4 deletions tests/integration/conftest_kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import pytest
from _pytest.fixtures import FixtureRequest
from async_timeout import timeout

from platform_monitoring.config import KubeConfig
from platform_monitoring.kube_client import (
Expand Down Expand Up @@ -57,7 +56,7 @@ async def wait_pod_is_terminated(
allow_pod_not_exists: bool = False,
) -> None:
try:
async with timeout(timeout_s):
async with asyncio.timeout(timeout_s):
while True:
try:
state = await self._get_raw_container_state(pod_name)
Expand All @@ -80,7 +79,7 @@ async def wait_pod_is_deleted(
interval_s: float = 1.0,
) -> None:
try:
async with timeout(timeout_s):
async with asyncio.timeout(timeout_s):
while await self.check_pod_exists(pod_name):
await asyncio.sleep(interval_s)
except TimeoutError:
Expand All @@ -99,7 +98,7 @@ async def wait_container_is_restarted(
interval_s: float = 1.0,
) -> None:
try:
async with timeout(timeout_s):
async with asyncio.timeout(timeout_s):
while True:
status = await self.get_container_status(name)
if status.restart_count >= count:
Expand Down
3 changes: 1 addition & 2 deletions tests/integration/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
HTTPNotFound,
HTTPUnauthorized,
)
from async_timeout import timeout
from yarl import URL

from platform_monitoring.api import create_app
Expand All @@ -39,7 +38,7 @@ async def expect_prompt(ws: aiohttp.ClientWebSocketResponse) -> bytes:
_exit_re = re.compile(rb"exit \d+")
try:
ret: bytes = b""
async with timeout(3):
async with asyncio.timeout(3):
async for msg in ws:
assert msg.type == aiohttp.WSMsgType.BINARY
assert msg.data[0] == 1
Expand Down
3 changes: 1 addition & 2 deletions tests/integration/test_jobs_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from typing import Any

import pytest
from async_timeout import timeout
from neuro_config_client import ConfigClient
from neuro_sdk import (
Client as PlatformApiClient,
Expand Down Expand Up @@ -107,7 +106,7 @@ async def wait_for_job(
interval_s: float = 1.0,
) -> None:
try:
async with timeout(timeout_s):
async with asyncio.timeout(timeout_s):
while True:
job = await platform_api_client.jobs.status(job.id)
if condition(job):
Expand Down
Loading

0 comments on commit 00935c0

Please sign in to comment.