Skip to content

Commit

Permalink
Reduce apiserver request count for pod_service_mapper
Browse files Browse the repository at this point in the history
- disable podmapper freshness check from SD, rely on event from the kubernetes check
- add a delay parameter to throttle requests to the apiserver
- remove check_services_cache_freshness and use process_events only for cache invalidation

Needs changes in integration-core
  • Loading branch information
xvello committed Jun 14, 2017
1 parent 34f3d17 commit 557cc15
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 73 deletions.
27 changes: 27 additions & 0 deletions tests/core/test_kube_event_retriever.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# stdlib
import time # noqa: F401

# 3rd party
from mock import patch

Expand Down Expand Up @@ -38,6 +41,30 @@ def test_events_resversion_filtering(self):
self.assertEquals(0, len(events)) # No new event
self.assertEquals(2709, retr.last_resversion)

@patch('time.time')
def test_events_delay(self, mock_time):
jsons = self._load_json_array(
['service_cache_events1.json', 'service_cache_events2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
retr = KubeEventRetriever(self.kube, delay=500)

mock_time.return_value = 10000
events = retr.get_event_array()
self.assertEquals(3, len(events))
self.assertEquals(2707, retr.last_resversion)

# Must skip request
mock_time.return_value = 10400
events = retr.get_event_array()
self.assertEquals(0, len(events))
self.assertEquals(2707, retr.last_resversion)

# Must retrieve events
mock_time.return_value = 10600
events = retr.get_event_array()
self.assertEquals(2, len(events))
self.assertEquals(2709, retr.last_resversion)

def test_namespace_serverside_filtering(self):
with patch.object(self.kube, 'retrieve_json_auth', return_value={}) as mock_method:
retr = KubeEventRetriever(self.kube, namespaces=['testns'])
Expand Down
38 changes: 9 additions & 29 deletions tests/core/test_kube_pod_service_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_init(self):
self.assertEqual(0, len(mapper._pod_services_mapping))

def test_service_cache_fill(self):
jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json'])
jsons = self._load_json_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
mapper._fill_services_cache()
Expand All @@ -46,36 +46,16 @@ def test_service_cache_fill(self):
self.assertEqual('hello', redis['app'])
self.assertEqual('db', redis['tier'])

def test_service_cache_invalidation_true(self):
jsons = self._load_json_array(
['service_cache_events1.json', 'service_cache_services1.json', 'service_cache_events2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
mapper._fill_services_cache()
mapper.check_services_cache_freshness()
self.assertEqual(True, mapper._service_cache_invalidated)

def test_service_cache_invalidation_false(self):
jsons = self._load_json_array(
['service_cache_events1.json', 'service_cache_services1.json', 'service_cache_events1.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
self.assertEqual(True, mapper._service_cache_invalidated)
mapper._fill_services_cache()
self.assertEqual(False, mapper._service_cache_invalidated)
mapper.check_services_cache_freshness()
self.assertEqual(False, mapper._service_cache_invalidated)

def test_pod_to_service_no_match(self):
jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json'])
jsons = self._load_json_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
mapper._fill_services_cache()
no_match = self._build_pod_metadata(0, {'app': 'unknown'})
self.assertEqual(0, len(mapper.match_services_for_pod(no_match)))

def test_pod_to_service_two_matches(self):
jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json'])
jsons = self._load_json_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
two_matches = self._build_pod_metadata(0, {'app': 'hello', 'tier': 'db'})
Expand All @@ -86,7 +66,7 @@ def test_pod_to_service_two_matches(self):
sorted(mapper.match_services_for_pod(two_matches, names=True)))

def test_pod_to_service_cache(self):
jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json'])
jsons = self._load_json_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
two_matches = self._build_pod_metadata(0, {'app': 'hello', 'tier': 'db'})
Expand All @@ -97,7 +77,7 @@ def test_pod_to_service_cache(self):
sorted(mapper.match_services_for_pod({'uid': 0}, names=True)))

def test_pods_for_service(self):
jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json'])
jsons = self._load_json_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
# Fill pod label cache
mapper = PodServiceMapper(self.kube)
Expand All @@ -123,7 +103,7 @@ def _prepare_events_tests(self, jsonfiles):
return mapper

def test_event_pod_invalidation(self):
mapper = self._prepare_events_tests(['service_cache_events2.json', 'service_cache_services2.json'])
mapper = self._prepare_events_tests(['service_cache_services2.json'])
self.assertTrue(0 in mapper._pod_labels_cache)
self.assertTrue(0 in mapper._pod_services_mapping)
self.assertTrue(1 in mapper._pod_labels_cache)
Expand All @@ -138,7 +118,7 @@ def test_event_pod_invalidation(self):
self.assertTrue(1 in mapper._pod_services_mapping)

def test_event_service_deleted_invalidation(self):
mapper = self._prepare_events_tests(['service_cache_events2.json', 'service_cache_services2.json'])
mapper = self._prepare_events_tests(['service_cache_services2.json'])
self.assertEqual(2, len(mapper.match_services_for_pod({'uid': 0})))

event = {'involvedObject': {'kind': 'Service', 'uid': REDIS_HELLO_UID},
Expand All @@ -149,13 +129,13 @@ def test_event_service_deleted_invalidation(self):
self.assertEqual(1, len(mapper.match_services_for_pod({'uid': 0})))

def test_event_service_created_invalidation(self):
mapper = self._prepare_events_tests(['service_cache_events1.json', 'service_cache_services1.json'])
mapper = self._prepare_events_tests(['service_cache_services1.json'])
self.assertEqual(1, len(mapper.match_services_for_pod(
self._build_pod_metadata(0, {'app': 'hello', 'tier': 'db'}))))

event = {'involvedObject': {'kind': 'Service', 'uid': ALL_HELLO_UID},
'reason': 'CreatedLoadBalancer'}
jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json'])
jsons = self._load_json_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
# Three pods must be reloaded
self.assertEqual(set([0, 1, 3]), mapper.process_events([event]))
Expand Down
21 changes: 20 additions & 1 deletion utils/kubernetes/kube_event_retriever.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import time

log = logging.getLogger('collector')

Expand All @@ -25,12 +26,22 @@ class KubeEventRetriever:
[3] https://github.com/kubernetes/kubernetes/issues/1362
"""

def __init__(self, kubeutil_object, namespaces=None, kinds=None):
def __init__(self, kubeutil_object, namespaces=None, kinds=None, delay=None):
"""
:param kubeutil_object: valid, initialised KubeUtil objet to route requests through
:param namespaces: namespace(s) to watch (string or list)
:param kinds: kinds(s) to watch (string or list)
:param delay: minimum time (in seconds) between two apiserver requests, return [] in the meantime
"""
self.kubeutil = kubeutil_object
self.last_resversion = -1
self.set_namespaces(namespaces)
self.set_kinds(kinds)

# Request throttling to reduce apiserver traffic
self._request_interval = delay
self._last_lookup_timestamp = -1

def set_namespaces(self, namespaces):
self.request_url = self.kubeutil.kubernetes_api_url + '/events'
self.namespace_filter = None
Expand Down Expand Up @@ -60,6 +71,14 @@ def get_event_array(self):
Fetch latest events from the apiserver for the namespaces and kinds set on init
and returns an array of event objects
"""

# Request throttling
if self._request_interval:
if (time.time() - self._last_lookup_timestamp) < self._request_interval:
return []
else:
self._last_lookup_timestamp = time.time()

lastest_resversion = None
filtered_events = []

Expand Down
14 changes: 2 additions & 12 deletions utils/kubernetes/kubeutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,16 +387,6 @@ def get_auth_token(cls):

return None

def check_services_cache_freshness(self):
"""
Entry point for sd_docker_backend to check whether to invalidate the cached services
For now, we remove the whole cache as the fill_service_cache logic
doesn't handle partial lookups
We use the event's resourceVersion, as using the service's version wouldn't catch deletion
"""
return self._service_mapper.check_services_cache_freshness()

def match_services_for_pod(self, pod_metadata, refresh=False):
"""
Match the pods labels with services' label selectors to determine the list
Expand All @@ -408,11 +398,11 @@ def match_services_for_pod(self, pod_metadata, refresh=False):
#log.warning("Matches for %s: %s" % (pod_metadata.get('name'), str(s)))
return s

def get_event_retriever(self, namespaces=None, kinds=None):
def get_event_retriever(self, namespaces=None, kinds=None, delay=None):
"""
Returns a KubeEventRetriever object ready for action
"""
return KubeEventRetriever(self, namespaces, kinds)
return KubeEventRetriever(self, namespaces, kinds, delay)

def match_containers_for_pods(self, pod_uids, podlist=None):
"""
Expand Down
28 changes: 3 additions & 25 deletions utils/kubernetes/pod_service_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,16 @@ def __init__(self, kubeutil_object):
The apiserver requests are routed through the given KubeUtil instance
"""
self.kube = kubeutil_object
self._event_retriever = self.kube.get_event_retriever(kinds=['Service'])
self._service_cache_selectors = defaultdict(dict) # {service_uid:{selectors}}
self._service_cache_names = {} # {service_uid:service_name
self._service_cache_invalidated = False # True to trigger service parsing
self._service_cache_invalidated = True
self._pod_labels_cache = defaultdict(dict) # {pod_uid:{label}}
self._pod_services_mapping = defaultdict(list) # {pod_uid:[service_uid]}

# Consume past events
self.check_services_cache_freshness()
self._service_cache_invalidated = True

def _fill_services_cache(self):
"""
Get the list of services from the kubelet API and store the label selector dicts.
The cache is to be invalidated by the user class by calling check_services_cache_freshness
The cache is to be invalidated by the user class by calling process_events
"""
try:
reply = self.kube.retrieve_json_auth(self.kube.kubernetes_api_url + '/services')
Expand All @@ -51,23 +46,6 @@ def _fill_services_cache(self):
self._service_cache_names = {}
self._service_cache_invalidated = False

def check_services_cache_freshness(self):
"""
Entry point for sd_docker_backend to check whether to invalidate the cached services
For now, we remove the whole cache as the fill_service_cache logic
doesn't handle partial lookups
"""

# Don't check if cache is already invalidated
if self._service_cache_invalidated:
return

try:
if self._event_retriever.get_event_array():
self._service_cache_invalidated = True
except Exception as e:
log.warning("Exception while parsing service events, not invalidating cache: %s", e)

def match_services_for_pod(self, pod_metadata, refresh=False, names=False):
"""
Match the pods labels with services' label selectors to determine the list
Expand Down Expand Up @@ -168,7 +146,7 @@ def process_events(self, event_array):
service_uid = event.get('involvedObject', {}).get('uid', None)

if service_cache_checked is False:
self.check_services_cache_freshness()
self._service_cache_invalidated = True
service_cache_checked = True

# Possible values in kubernetes/pkg/controller/service/servicecontroller.go
Expand Down
6 changes: 0 additions & 6 deletions utils/service_discovery/sd_docker_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,6 @@ def _make_fetch_state(self):
def update_checks(self, changed_containers):
state = self._make_fetch_state()

if Platform.is_k8s():
self.kubeutil.check_services_cache_freshness()

conf_reload_set = set()
for c_id in changed_containers:
checks = self._get_checks_to_refresh(state, c_id)
Expand Down Expand Up @@ -366,9 +363,6 @@ def get_configs(self):
container.get('Id'), container.get('Labels')
) for container in self.docker_client.containers()]

if Platform.is_k8s():
self.kubeutil.check_services_cache_freshness()

for image, cid, labels in containers:
try:
# value of the DATADOG_ID tag or the image name if the label is missing
Expand Down

0 comments on commit 557cc15

Please sign in to comment.