Skip to content

Commit

Permalink
Refactor http timeouts (#703)
Browse files Browse the repository at this point in the history
  • Loading branch information
asvetlov authored Apr 11, 2019
1 parent 49e2462 commit c17784f
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.D/703.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Respect `--network-timeout` option in `logs` and `cp` operations.
16 changes: 15 additions & 1 deletion neuromation/api/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(
self._connector = connector
self._url = url
self._token = token
self._timeout = timeout
self._session = aiohttp.ClientSession(
connector=connector,
connector_owner=False,
Expand All @@ -71,6 +72,10 @@ def __init__(
def connector(self) -> aiohttp.TCPConnector:
return self._connector

@property
def timeout(self) -> aiohttp.ClientTimeout:
return self._timeout

async def close(self) -> None:
await self._session.close()

Expand All @@ -88,12 +93,21 @@ async def request(
data: Any = None,
json: Any = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[aiohttp.ClientTimeout] = None,
) -> AsyncIterator[aiohttp.ClientResponse]:
assert not rel_url.is_absolute(), rel_url
url = (self._url / "").join(rel_url)
log.debug("Fetch [%s] %s", method, url)
if timeout is None:
timeout = self._timeout
async with self._session.request(
method, url, headers=headers, params=params, json=json, data=data
method,
url,
headers=headers,
params=params,
json=json,
data=data,
timeout=timeout,
) as resp:
if 400 <= resp.status:
err_text = await resp.text()
Expand Down
4 changes: 3 additions & 1 deletion neuromation/api/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
SupportsInt,
)

import attr
from aiohttp import WSServerHandshakeError
from multidict import MultiDict
from yarl import URL
Expand Down Expand Up @@ -396,8 +397,9 @@ async def monitor(
self, id: str
) -> Any: # real type is async generator with data chunks
url = URL(f"jobs/{id}/log")
timeout = attr.evolve(self._core.timeout, sock_read=None)
async with self._core.request(
"GET", url, headers={"Accept-Encoding": "identity"}
"GET", url, headers={"Accept-Encoding": "identity"}, timeout=timeout
) as resp:
async for data in resp.content.iter_any():
yield data
Expand Down
7 changes: 5 additions & 2 deletions neuromation/api/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path
from typing import Any, AsyncIterator, Dict, List, Optional

import attr
from yarl import URL

from .abc import AbstractProgress
Expand Down Expand Up @@ -87,8 +88,9 @@ async def create(self, uri: URL, data: AsyncIterator[bytes]) -> None:
assert path, "Creation in root is not allowed"
url = URL("storage") / path
url = url.with_query(op="CREATE")
timeout = attr.evolve(self._core.timeout, sock_read=None)

async with self._core.request("PUT", url, data=data) as resp:
async with self._core.request("PUT", url, data=data, timeout=timeout) as resp:
resp # resp.status == 201

async def stats(self, uri: URL) -> FileStatus:
Expand All @@ -105,8 +107,9 @@ async def open(self, uri: URL) -> AsyncIterator[bytes]:
raise IsADirectoryError(uri)
url = URL("storage") / self._uri_to_path(uri)
url = url.with_query(op="OPEN")
timeout = attr.evolve(self._core.timeout, sock_read=None)

async with self._core.request("GET", url) as resp:
async with self._core.request("GET", url, timeout=timeout) as resp:
async for data in resp.content.iter_any():
yield data

Expand Down
7 changes: 1 addition & 6 deletions neuromation/cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import sys
from typing import Sequence

import aiohttp
import click

from neuromation.api import (
Expand Down Expand Up @@ -318,11 +317,7 @@ async def logs(cfg: Config, job: str) -> None:
"""
Print the logs for a container.
"""
timeout = aiohttp.ClientTimeout(
total=None, connect=None, sock_read=None, sock_connect=30
)

async with cfg.make_client(timeout=timeout) as client:
async with cfg.make_client() as client:
id = await resolve_job(client, job)
async for chunk in client.jobs.monitor(id):
if not chunk:
Expand Down
17 changes: 9 additions & 8 deletions neuromation/cli/rc.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,19 @@ def username(self) -> str:
token, username = self._check_registered()
return username

def make_client(self, *, timeout: Optional[aiohttp.ClientTimeout] = None) -> Client:
def make_client(self) -> Client:
token, username = self._check_registered()
kwargs: Dict[str, Any] = {}
if timeout is not None:
kwargs["timeout"] = timeout
else:
kwargs["timeout"] = aiohttp.ClientTimeout(
None, None, self.network_timeout, self.network_timeout
)
if self.registry_url:
kwargs["registry_url"] = self.registry_url
return Client(self.url, token, **kwargs)
return Client(
self.url,
token,
timeout=aiohttp.ClientTimeout(
None, None, self.network_timeout, self.network_timeout
),
**kwargs,
)


class ConfigFactory:
Expand Down
6 changes: 1 addition & 5 deletions neuromation/cli/storage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging

import aiohttp
import click
from yarl import URL

Expand Down Expand Up @@ -124,9 +123,6 @@ async def cp(
# explicit file:// scheme set
neuro storage cp storage:///foo file:///foo
"""
timeout = aiohttp.ClientTimeout(
total=None, connect=None, sock_read=None, sock_connect=30
)
src = URL(source)
dst = URL(destination)

Expand All @@ -136,7 +132,7 @@ async def cp(
src = URL(f"file:{source}")
if not dst.scheme or len(dst.scheme) == 1:
dst = URL(f"file:{destination}")
async with cfg.make_client(timeout=timeout) as client:
async with cfg.make_client() as client:
if src.scheme == "file" and dst.scheme == "storage":
src = normalize_local_path_uri(src)
dst = normalize_storage_path_uri(dst, cfg.username)
Expand Down

0 comments on commit c17784f

Please sign in to comment.