Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor http timeouts #703

Merged
merged 3 commits into from
Apr 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.D/703.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Respect `--network-timeout` option in `logs` and `cp` operations.
16 changes: 15 additions & 1 deletion neuromation/api/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(
self._connector = connector
self._url = url
self._token = token
self._timeout = timeout
self._session = aiohttp.ClientSession(
connector=connector,
connector_owner=False,
Expand All @@ -71,6 +72,10 @@ def __init__(
def connector(self) -> aiohttp.TCPConnector:
return self._connector

@property
def timeout(self) -> aiohttp.ClientTimeout:
return self._timeout

async def close(self) -> None:
await self._session.close()

Expand All @@ -88,12 +93,21 @@ async def request(
data: Any = None,
json: Any = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[aiohttp.ClientTimeout] = None,
) -> AsyncIterator[aiohttp.ClientResponse]:
assert not rel_url.is_absolute(), rel_url
url = (self._url / "").join(rel_url)
log.debug("Fetch [%s] %s", method, url)
if timeout is None:
timeout = self._timeout
async with self._session.request(
method, url, headers=headers, params=params, json=json, data=data
method,
url,
headers=headers,
params=params,
json=json,
data=data,
timeout=timeout,
) as resp:
if 400 <= resp.status:
err_text = await resp.text()
Expand Down
4 changes: 3 additions & 1 deletion neuromation/api/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
Tuple,
)

import attr
from aiohttp import WSServerHandshakeError
from multidict import MultiDict
from yarl import URL
Expand Down Expand Up @@ -426,8 +427,9 @@ async def monitor(
self, id: str
) -> Any: # real type is async generator with data chunks
url = URL(f"jobs/{id}/log")
timeout = attr.evolve(self._core.timeout, sock_read=None)
async with self._core.request(
"GET", url, headers={"Accept-Encoding": "identity"}
"GET", url, headers={"Accept-Encoding": "identity"}, timeout=timeout
) as resp:
async for data in resp.content.iter_any():
yield data
Expand Down
7 changes: 5 additions & 2 deletions neuromation/api/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path
from typing import Any, AsyncIterator, Dict, List, Optional

import attr
from yarl import URL

from .abc import AbstractProgress
Expand Down Expand Up @@ -87,8 +88,9 @@ async def create(self, uri: URL, data: AsyncIterator[bytes]) -> None:
assert path, "Creation in root is not allowed"
url = URL("storage") / path
url = url.with_query(op="CREATE")
timeout = attr.evolve(self._core.timeout, sock_read=None)

async with self._core.request("PUT", url, data=data) as resp:
async with self._core.request("PUT", url, data=data, timeout=timeout) as resp:
resp # resp.status == 201

async def stats(self, uri: URL) -> FileStatus:
Expand All @@ -105,8 +107,9 @@ async def open(self, uri: URL) -> AsyncIterator[bytes]:
raise IsADirectoryError(uri)
url = URL("storage") / self._uri_to_path(uri)
url = url.with_query(op="OPEN")
timeout = attr.evolve(self._core.timeout, sock_read=None)

async with self._core.request("GET", url) as resp:
async with self._core.request("GET", url, timeout=timeout) as resp:
async for data in resp.content.iter_any():
yield data

Expand Down
7 changes: 1 addition & 6 deletions neuromation/cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import sys
from typing import Sequence

import aiohttp
import click

from neuromation.api import (
Expand Down Expand Up @@ -353,11 +352,7 @@ async def logs(cfg: Config, job: str) -> None:
"""
Print the logs for a container.
"""
timeout = aiohttp.ClientTimeout(
total=None, connect=None, sock_read=None, sock_connect=30
)

async with cfg.make_client(timeout=timeout) as client:
async with cfg.make_client() as client:
id = await resolve_job(client, job)
async for chunk in client.jobs.monitor(id):
if not chunk:
Expand Down
17 changes: 9 additions & 8 deletions neuromation/cli/rc.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,19 @@ def username(self) -> str:
token, username = self._check_registered()
return username

def make_client(self, *, timeout: Optional[aiohttp.ClientTimeout] = None) -> Client:
def make_client(self) -> Client:
token, username = self._check_registered()
kwargs: Dict[str, Any] = {}
if timeout is not None:
kwargs["timeout"] = timeout
else:
kwargs["timeout"] = aiohttp.ClientTimeout(
None, None, self.network_timeout, self.network_timeout
)
if self.registry_url:
kwargs["registry_url"] = self.registry_url
return Client(self.url, token, **kwargs)
return Client(
self.url,
token,
timeout=aiohttp.ClientTimeout(
None, None, self.network_timeout, self.network_timeout
),
**kwargs,
)


class ConfigFactory:
Expand Down
6 changes: 1 addition & 5 deletions neuromation/cli/storage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging

import aiohttp
import click
from yarl import URL

Expand Down Expand Up @@ -124,9 +123,6 @@ async def cp(
# explicit file:// scheme set
neuro storage cp storage:///foo file:///foo
"""
timeout = aiohttp.ClientTimeout(
total=None, connect=None, sock_read=None, sock_connect=30
)
src = URL(source)
dst = URL(destination)

Expand All @@ -136,7 +132,7 @@ async def cp(
src = URL(f"file:{source}")
if not dst.scheme or len(dst.scheme) == 1:
dst = URL(f"file:{destination}")
async with cfg.make_client(timeout=timeout) as client:
async with cfg.make_client() as client:
if src.scheme == "file" and dst.scheme == "storage":
src = normalize_local_path_uri(src)
dst = normalize_storage_path_uri(dst, cfg.username)
Expand Down