Skip to content

Commit

Permalink
Treat read timeouts similar to the server hanging up the connection.
Browse files Browse the repository at this point in the history
That is, stop yielding more data and return normally.
A warning is also logged however.

A read timeout might happen in empty clusters if there isn't any new
events appearing for the specified time. In most cases, the default
apiserver timeout will make it hang up the connection before that
happens, which is why this problem hasn't been noticed before.
  • Loading branch information
perj committed Dec 15, 2023
1 parent ef1eb65 commit 830e758
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 13 deletions.
45 changes: 33 additions & 12 deletions k8s/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import logging
from collections import namedtuple

import requests
import requests.packages.urllib3 as urllib3

from . import config
from .client import Client, NotFound
from .fields import Field
Expand Down Expand Up @@ -128,20 +131,38 @@ def watch_list(cls, namespace=None):
if not url:
raise NotImplementedError("Cannot watch_list, no watch_list_url defined on class {}".format(cls))

resp = cls._client.get(url, stream=True, timeout=config.stream_timeout)
for line in resp.iter_lines(chunk_size=None):
if line:
try:
event_json = json.loads(line)
try:
# The timeout here appears to be per call to the poll (or similar) system call, so each time data is received, the timeout will reset.
resp = cls._client.get(url, stream=True, timeout=config.stream_timeout)
for line in resp.iter_lines(chunk_size=None):
if line:
try:
event = WatchEvent(event_json, cls)
yield event
except TypeError:
event_json = json.loads(line)
try:
event = WatchEvent(event_json, cls)
yield event
except TypeError:
LOG.exception(
"Unable to create instance of %s from watch event json, discarding event. event_json=%r",
cls.__name__,
event_json,
)
except ValueError:
LOG.exception(
"Unable to create instance of %s from watch event json, discarding event. event_json=%r",
cls.__name__, event_json)
except ValueError:
LOG.exception("Unable to parse JSON on watch event, discarding event. Line: %r", line)
"Unable to parse JSON on watch event, discarding event. Line: %r",
line,
)
except requests.ConnectionError as e:
# ConnectionError is fairly generic, but check for ReadTimeoutError from urllib3.
# If we get this, there were no events received for the timeout period, which might not be an error, just a quiet period.
underlying = e.args[0]
if isinstance(underlying, urllib3.exceptions.ReadTimeoutError):
LOG.warning(
"Read timeout while streaming from API server. Error: %s",
e,
)
return
raise

@classmethod
def get(cls, name, namespace="default"):
Expand Down
44 changes: 43 additions & 1 deletion tests/k8s/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import mock
import pytest
import requests
import requests.packages.urllib3 as urllib3

from k8s.base import Model, Field, WatchEvent, Equality, Inequality, In, NotIn, Exists
from k8s.models.common import DeleteOptions, Preconditions
Expand Down Expand Up @@ -113,4 +115,44 @@ def test_delete_with_options(self, client):
},
"propagationPolicy": "Foreground"
}
client.delete.assert_called_once_with("/example", params={"labelSelector": "foo=bar"}, body=expected_body)
client.delete.assert_called_once_with("/example", params={"labelSelector": "foo=bar"}, body=expected_body)


class TestWatchList(object):
@pytest.fixture
def client(self):
with mock.patch.object(Example, "_client") as m:
yield m

def test_watch_list(self, client):
client.get.return_value.iter_lines.return_value = [
'{"type": "ADDED", "object": {"value": 1}}',
]
gen = Example.watch_list()
assert next(gen) == WatchEvent(
{"type": "ADDED", "object": {"value": 1}}, Example
)
client.get.assert_called_once_with(
"/watch/example", stream=True, timeout=3600, params={}
)
assert list(gen) == []

def test_watch_list_with_timeout(self, client):
client.get.return_value.iter_lines.return_value.__getitem__.side_effect = [
'{"type": "ADDED", "object": {"value": 1}}',
requests.ConnectionError(urllib3.exceptions.ReadTimeoutError("", "", "")),
'{"type": "MODIFIED", "object": {"value": 2}}', # Not reached
]
# Seal to avoid __iter__ being used instead of __getitem__
mock.seal(client)
gen = Example.watch_list()
assert next(gen) == WatchEvent(
{"type": "ADDED", "object": {"value": 1}}, Example
)
assert list(gen) == []
assert (
client.get.return_value.iter_lines.return_value.__getitem__.call_count == 2
)
client.get.assert_called_once_with(
"/watch/example", stream=True, timeout=3600, params={}
)

0 comments on commit 830e758

Please sign in to comment.