From 47b81c7e612b9151db428d3f83ff4593287debde Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Wed, 10 Apr 2019 14:43:17 +0300 Subject: [PATCH 1/3] Refactor timeouts --- neuromation/api/core.py | 16 +++++++++++++++- neuromation/api/jobs.py | 4 +++- neuromation/api/storage.py | 7 +++++-- neuromation/cli/job.py | 6 +----- neuromation/cli/rc.py | 17 +++++++++-------- neuromation/cli/storage.py | 5 +---- 6 files changed, 34 insertions(+), 21 deletions(-) diff --git a/neuromation/api/core.py b/neuromation/api/core.py index d2467a8c4..3ab46dec0 100644 --- a/neuromation/api/core.py +++ b/neuromation/api/core.py @@ -53,6 +53,7 @@ def __init__( self._connector = connector self._url = url self._token = token + self._timeout = timeout self._session = aiohttp.ClientSession( connector=connector, connector_owner=False, @@ -71,6 +72,10 @@ def __init__( def connector(self) -> aiohttp.TCPConnector: return self._connector + @property + def timeout(self) -> aiohttp.ClientTimeout: + return self._timeout + async def close(self) -> None: await self._session.close() @@ -88,12 +93,21 @@ async def request( data: Any = None, json: Any = None, headers: Optional[Dict[str, str]] = None, + timeout: Optional[aiohttp.ClientTimeout] = None, ) -> AsyncIterator[aiohttp.ClientResponse]: assert not rel_url.is_absolute(), rel_url url = (self._url / "").join(rel_url) log.debug("Fetch [%s] %s", method, url) + if timeout is None: + timeout = self._timeout async with self._session.request( - method, url, headers=headers, params=params, json=json, data=data + method, + url, + headers=headers, + params=params, + json=json, + data=data, + timeout=timeout, ) as resp: if 400 <= resp.status: err_text = await resp.text() diff --git a/neuromation/api/jobs.py b/neuromation/api/jobs.py index ab0d98d32..7fbf6388b 100644 --- a/neuromation/api/jobs.py +++ b/neuromation/api/jobs.py @@ -16,6 +16,7 @@ Tuple, ) +import attr from aiohttp import WSServerHandshakeError from multidict import MultiDict from yarl import URL @@ -426,8 +427,9 @@ async def monitor( self, id: str ) -> Any: # real type is async generator with data chunks url = URL(f"jobs/{id}/log") + timeout = attr.evolve(self._core.timeout, sock_read=None) async with self._core.request( - "GET", url, headers={"Accept-Encoding": "identity"} + "GET", url, headers={"Accept-Encoding": "identity"}, timeout=timeout ) as resp: async for data in resp.content.iter_any(): yield data diff --git a/neuromation/api/storage.py b/neuromation/api/storage.py index ff04818fb..d4422648c 100644 --- a/neuromation/api/storage.py +++ b/neuromation/api/storage.py @@ -5,6 +5,7 @@ from pathlib import Path from typing import Any, AsyncIterator, Dict, List, Optional +import attr from yarl import URL from .abc import AbstractProgress @@ -87,8 +88,9 @@ async def create(self, uri: URL, data: AsyncIterator[bytes]) -> None: assert path, "Creation in root is not allowed" url = URL("storage") / path url = url.with_query(op="CREATE") + timeout = attr.evolve(self._core.timeout, sock_read=None) - async with self._core.request("PUT", url, data=data) as resp: + async with self._core.request("PUT", url, data=data, timeout=timeout) as resp: resp # resp.status == 201 async def stats(self, uri: URL) -> FileStatus: @@ -105,8 +107,9 @@ async def open(self, uri: URL) -> AsyncIterator[bytes]: raise IsADirectoryError(uri) url = URL("storage") / self._uri_to_path(uri) url = url.with_query(op="OPEN") + timeout = attr.evolve(self._core.timeout, sock_read=None) - async with self._core.request("GET", url) as resp: + async with self._core.request("GET", url, timeout=timeout) as resp: async for data in resp.content.iter_any(): yield data diff --git a/neuromation/cli/job.py b/neuromation/cli/job.py index d1a75e1c4..5153b55ce 100644 --- a/neuromation/cli/job.py +++ b/neuromation/cli/job.py @@ -353,11 +353,7 @@ async def logs(cfg: Config, job: str) -> None: """ Print the logs for a container. """ - timeout = aiohttp.ClientTimeout( - total=None, connect=None, sock_read=None, sock_connect=30 - ) - - async with cfg.make_client(timeout=timeout) as client: + async with cfg.make_client() as client: id = await resolve_job(client, job) async for chunk in client.jobs.monitor(id): if not chunk: diff --git a/neuromation/cli/rc.py b/neuromation/cli/rc.py index 5ad714d4a..f0de726d1 100644 --- a/neuromation/cli/rc.py +++ b/neuromation/cli/rc.py @@ -103,18 +103,19 @@ def username(self) -> str: token, username = self._check_registered() return username - def make_client(self, *, timeout: Optional[aiohttp.ClientTimeout] = None) -> Client: + def make_client(self) -> Client: token, username = self._check_registered() kwargs: Dict[str, Any] = {} - if timeout is not None: - kwargs["timeout"] = timeout - else: - kwargs["timeout"] = aiohttp.ClientTimeout( - None, None, self.network_timeout, self.network_timeout - ) if self.registry_url: kwargs["registry_url"] = self.registry_url - return Client(self.url, token, **kwargs) + return Client( + self.url, + token, + timeout=aiohttp.ClientTimeout( + None, None, self.network_timeout, self.network_timeout + ), + **kwargs, + ) class ConfigFactory: diff --git a/neuromation/cli/storage.py b/neuromation/cli/storage.py index 1dfc56771..fbd973a67 100644 --- a/neuromation/cli/storage.py +++ b/neuromation/cli/storage.py @@ -124,9 +124,6 @@ async def cp( # explicit file:// scheme set neuro storage cp storage:///foo file:///foo """ - timeout = aiohttp.ClientTimeout( - total=None, connect=None, sock_read=None, sock_connect=30 - ) src = URL(source) dst = URL(destination) @@ -136,7 +133,7 @@ async def cp( src = URL(f"file:{source}") if not dst.scheme or len(dst.scheme) == 1: dst = URL(f"file:{destination}") - async with cfg.make_client(timeout=timeout) as client: + async with cfg.make_client() as client: if src.scheme == "file" and dst.scheme == "storage": src = normalize_local_path_uri(src) dst = normalize_storage_path_uri(dst, cfg.username) From d04177ec28035c7438470a11297db45f90fbc0a5 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Wed, 10 Apr 2019 14:44:54 +0300 Subject: [PATCH 2/3] Drop unused imports --- neuromation/cli/job.py | 1 - neuromation/cli/storage.py | 1 - 2 files changed, 2 deletions(-) diff --git a/neuromation/cli/job.py b/neuromation/cli/job.py index 5153b55ce..875bd02f3 100644 --- a/neuromation/cli/job.py +++ b/neuromation/cli/job.py @@ -5,7 +5,6 @@ import sys from typing import Sequence -import aiohttp import click from neuromation.api import ( diff --git a/neuromation/cli/storage.py b/neuromation/cli/storage.py index fbd973a67..a551a1bc6 100644 --- a/neuromation/cli/storage.py +++ b/neuromation/cli/storage.py @@ -1,6 +1,5 @@ import logging -import aiohttp import click from yarl import URL From 8260c12825dfa213b9c112f824d693b7093319c8 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Wed, 10 Apr 2019 15:24:35 +0300 Subject: [PATCH 3/3] Add changelog --- CHANGELOG.D/703.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 CHANGELOG.D/703.bugfix diff --git a/CHANGELOG.D/703.bugfix b/CHANGELOG.D/703.bugfix new file mode 100644 index 000000000..d5abd0e6c --- /dev/null +++ b/CHANGELOG.D/703.bugfix @@ -0,0 +1 @@ +Respect `--network-timeout` option in `logs` and `cp` operations. \ No newline at end of file