diff --git a/tests/core/test_kube_event_retriever.py b/tests/core/test_kube_event_retriever.py index 4b532d9119..4231ad10d2 100644 --- a/tests/core/test_kube_event_retriever.py +++ b/tests/core/test_kube_event_retriever.py @@ -1,3 +1,6 @@ +# stdlib +import time # noqa: F401 + # 3rd party from mock import patch @@ -38,6 +41,30 @@ def test_events_resversion_filtering(self): self.assertEquals(0, len(events)) # No new event self.assertEquals(2709, retr.last_resversion) + @patch('time.time') + def test_events_delay(self, mock_time): + jsons = self._load_json_array( + ['service_cache_events1.json', 'service_cache_events2.json']) + with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons): + retr = KubeEventRetriever(self.kube, delay=500) + + mock_time.return_value = 10000 + events = retr.get_event_array() + self.assertEquals(3, len(events)) + self.assertEquals(2707, retr.last_resversion) + + # Must skip request + mock_time.return_value = 10400 + events = retr.get_event_array() + self.assertEquals(0, len(events)) + self.assertEquals(2707, retr.last_resversion) + + # Must retrieve events + mock_time.return_value = 10600 + events = retr.get_event_array() + self.assertEquals(2, len(events)) + self.assertEquals(2709, retr.last_resversion) + def test_namespace_serverside_filtering(self): with patch.object(self.kube, 'retrieve_json_auth', return_value={}) as mock_method: retr = KubeEventRetriever(self.kube, namespaces=['testns']) diff --git a/tests/core/test_kube_pod_service_mapper.py b/tests/core/test_kube_pod_service_mapper.py index a11be31dd9..7c57d25296 100644 --- a/tests/core/test_kube_pod_service_mapper.py +++ b/tests/core/test_kube_pod_service_mapper.py @@ -32,7 +32,7 @@ def test_init(self): self.assertEqual(0, len(mapper._pod_services_mapping)) def test_service_cache_fill(self): - jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json']) + jsons = self._load_json_array(['service_cache_services2.json']) with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons): mapper = PodServiceMapper(self.kube) mapper._fill_services_cache() @@ -46,28 +46,8 @@ def test_service_cache_fill(self): self.assertEqual('hello', redis['app']) self.assertEqual('db', redis['tier']) - def test_service_cache_invalidation_true(self): - jsons = self._load_json_array( - ['service_cache_events1.json', 'service_cache_services1.json', 'service_cache_events2.json']) - with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons): - mapper = PodServiceMapper(self.kube) - mapper._fill_services_cache() - mapper.check_services_cache_freshness() - self.assertEqual(True, mapper._service_cache_invalidated) - - def test_service_cache_invalidation_false(self): - jsons = self._load_json_array( - ['service_cache_events1.json', 'service_cache_services1.json', 'service_cache_events1.json']) - with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons): - mapper = PodServiceMapper(self.kube) - self.assertEqual(True, mapper._service_cache_invalidated) - mapper._fill_services_cache() - self.assertEqual(False, mapper._service_cache_invalidated) - mapper.check_services_cache_freshness() - self.assertEqual(False, mapper._service_cache_invalidated) - def test_pod_to_service_no_match(self): - jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json']) + jsons = self._load_json_array(['service_cache_services2.json']) with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons): mapper = PodServiceMapper(self.kube) mapper._fill_services_cache() @@ -75,7 +55,7 @@ def test_pod_to_service_no_match(self): self.assertEqual(0, len(mapper.match_services_for_pod(no_match))) def test_pod_to_service_two_matches(self): - jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json']) + jsons = self._load_json_array(['service_cache_services2.json']) with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons): mapper = PodServiceMapper(self.kube) two_matches = self._build_pod_metadata(0, {'app': 'hello', 'tier': 'db'}) @@ -86,7 +66,7 @@ def test_pod_to_service_two_matches(self): sorted(mapper.match_services_for_pod(two_matches, names=True))) def test_pod_to_service_cache(self): - jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json']) + jsons = self._load_json_array(['service_cache_services2.json']) with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons): mapper = PodServiceMapper(self.kube) two_matches = self._build_pod_metadata(0, {'app': 'hello', 'tier': 'db'}) @@ -97,7 +77,7 @@ def test_pod_to_service_cache(self): sorted(mapper.match_services_for_pod({'uid': 0}, names=True))) def test_pods_for_service(self): - jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json']) + jsons = self._load_json_array(['service_cache_services2.json']) with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons): # Fill pod label cache mapper = PodServiceMapper(self.kube) @@ -123,7 +103,7 @@ def _prepare_events_tests(self, jsonfiles): return mapper def test_event_pod_invalidation(self): - mapper = self._prepare_events_tests(['service_cache_events2.json', 'service_cache_services2.json']) + mapper = self._prepare_events_tests(['service_cache_services2.json']) self.assertTrue(0 in mapper._pod_labels_cache) self.assertTrue(0 in mapper._pod_services_mapping) self.assertTrue(1 in mapper._pod_labels_cache) @@ -138,7 +118,7 @@ def test_event_pod_invalidation(self): self.assertTrue(1 in mapper._pod_services_mapping) def test_event_service_deleted_invalidation(self): - mapper = self._prepare_events_tests(['service_cache_events2.json', 'service_cache_services2.json']) + mapper = self._prepare_events_tests(['service_cache_services2.json']) self.assertEqual(2, len(mapper.match_services_for_pod({'uid': 0}))) event = {'involvedObject': {'kind': 'Service', 'uid': REDIS_HELLO_UID}, @@ -149,13 +129,13 @@ def test_event_service_deleted_invalidation(self): self.assertEqual(1, len(mapper.match_services_for_pod({'uid': 0}))) def test_event_service_created_invalidation(self): - mapper = self._prepare_events_tests(['service_cache_events1.json', 'service_cache_services1.json']) + mapper = self._prepare_events_tests(['service_cache_services1.json']) self.assertEqual(1, len(mapper.match_services_for_pod( self._build_pod_metadata(0, {'app': 'hello', 'tier': 'db'})))) event = {'involvedObject': {'kind': 'Service', 'uid': ALL_HELLO_UID}, 'reason': 'CreatedLoadBalancer'} - jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json']) + jsons = self._load_json_array(['service_cache_services2.json']) with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons): # Three pods must be reloaded self.assertEqual(set([0, 1, 3]), mapper.process_events([event])) diff --git a/utils/kubernetes/kube_event_retriever.py b/utils/kubernetes/kube_event_retriever.py index 9b654b7dcb..111fbe1e5a 100644 --- a/utils/kubernetes/kube_event_retriever.py +++ b/utils/kubernetes/kube_event_retriever.py @@ -1,4 +1,5 @@ import logging +import time log = logging.getLogger('collector') @@ -25,12 +26,22 @@ class KubeEventRetriever: [3] https://github.com/kubernetes/kubernetes/issues/1362 """ - def __init__(self, kubeutil_object, namespaces=None, kinds=None): + def __init__(self, kubeutil_object, namespaces=None, kinds=None, delay=None): + """ + :param kubeutil_object: valid, initialised KubeUtil objet to route requests through + :param namespaces: namespace(s) to watch (string or list) + :param kinds: kinds(s) to watch (string or list) + :param delay: minimum time (in seconds) between two apiserver requests, return [] in the meantime + """ self.kubeutil = kubeutil_object self.last_resversion = -1 self.set_namespaces(namespaces) self.set_kinds(kinds) + # Request throttling to reduce apiserver traffic + self._request_interval = delay + self._last_lookup_timestamp = -1 + def set_namespaces(self, namespaces): self.request_url = self.kubeutil.kubernetes_api_url + '/events' self.namespace_filter = None @@ -60,6 +71,14 @@ def get_event_array(self): Fetch latest events from the apiserver for the namespaces and kinds set on init and returns an array of event objects """ + + # Request throttling + if self._request_interval: + if (time.time() - self._last_lookup_timestamp) < self._request_interval: + return [] + else: + self._last_lookup_timestamp = time.time() + lastest_resversion = None filtered_events = [] diff --git a/utils/kubernetes/kubeutil.py b/utils/kubernetes/kubeutil.py index 00d2fef6e4..7359c86a93 100644 --- a/utils/kubernetes/kubeutil.py +++ b/utils/kubernetes/kubeutil.py @@ -387,16 +387,6 @@ def get_auth_token(cls): return None - def check_services_cache_freshness(self): - """ - Entry point for sd_docker_backend to check whether to invalidate the cached services - For now, we remove the whole cache as the fill_service_cache logic - doesn't handle partial lookups - - We use the event's resourceVersion, as using the service's version wouldn't catch deletion - """ - return self._service_mapper.check_services_cache_freshness() - def match_services_for_pod(self, pod_metadata, refresh=False): """ Match the pods labels with services' label selectors to determine the list @@ -408,11 +398,11 @@ def match_services_for_pod(self, pod_metadata, refresh=False): #log.warning("Matches for %s: %s" % (pod_metadata.get('name'), str(s))) return s - def get_event_retriever(self, namespaces=None, kinds=None): + def get_event_retriever(self, namespaces=None, kinds=None, delay=None): """ Returns a KubeEventRetriever object ready for action """ - return KubeEventRetriever(self, namespaces, kinds) + return KubeEventRetriever(self, namespaces, kinds, delay) def match_containers_for_pods(self, pod_uids, podlist=None): """ diff --git a/utils/kubernetes/pod_service_mapper.py b/utils/kubernetes/pod_service_mapper.py index 5dbb99c591..ae909eab90 100644 --- a/utils/kubernetes/pod_service_mapper.py +++ b/utils/kubernetes/pod_service_mapper.py @@ -16,21 +16,16 @@ def __init__(self, kubeutil_object): The apiserver requests are routed through the given KubeUtil instance """ self.kube = kubeutil_object - self._event_retriever = self.kube.get_event_retriever(kinds=['Service']) self._service_cache_selectors = defaultdict(dict) # {service_uid:{selectors}} self._service_cache_names = {} # {service_uid:service_name - self._service_cache_invalidated = False # True to trigger service parsing + self._service_cache_invalidated = True self._pod_labels_cache = defaultdict(dict) # {pod_uid:{label}} self._pod_services_mapping = defaultdict(list) # {pod_uid:[service_uid]} - # Consume past events - self.check_services_cache_freshness() - self._service_cache_invalidated = True - def _fill_services_cache(self): """ Get the list of services from the kubelet API and store the label selector dicts. - The cache is to be invalidated by the user class by calling check_services_cache_freshness + The cache is to be invalidated by the user class by calling process_events """ try: reply = self.kube.retrieve_json_auth(self.kube.kubernetes_api_url + '/services') @@ -51,23 +46,6 @@ def _fill_services_cache(self): self._service_cache_names = {} self._service_cache_invalidated = False - def check_services_cache_freshness(self): - """ - Entry point for sd_docker_backend to check whether to invalidate the cached services - For now, we remove the whole cache as the fill_service_cache logic - doesn't handle partial lookups - """ - - # Don't check if cache is already invalidated - if self._service_cache_invalidated: - return - - try: - if self._event_retriever.get_event_array(): - self._service_cache_invalidated = True - except Exception as e: - log.warning("Exception while parsing service events, not invalidating cache: %s", e) - def match_services_for_pod(self, pod_metadata, refresh=False, names=False): """ Match the pods labels with services' label selectors to determine the list @@ -168,7 +146,7 @@ def process_events(self, event_array): service_uid = event.get('involvedObject', {}).get('uid', None) if service_cache_checked is False: - self.check_services_cache_freshness() + self._service_cache_invalidated = True service_cache_checked = True # Possible values in kubernetes/pkg/controller/service/servicecontroller.go diff --git a/utils/service_discovery/sd_docker_backend.py b/utils/service_discovery/sd_docker_backend.py index df3506a01d..908e50080d 100644 --- a/utils/service_discovery/sd_docker_backend.py +++ b/utils/service_discovery/sd_docker_backend.py @@ -128,9 +128,6 @@ def _make_fetch_state(self): def update_checks(self, changed_containers): state = self._make_fetch_state() - if Platform.is_k8s(): - self.kubeutil.check_services_cache_freshness() - conf_reload_set = set() for c_id in changed_containers: checks = self._get_checks_to_refresh(state, c_id) @@ -366,9 +363,6 @@ def get_configs(self): container.get('Id'), container.get('Labels') ) for container in self.docker_client.containers()] - if Platform.is_k8s(): - self.kubeutil.check_services_cache_freshness() - for image, cid, labels in containers: try: # value of the DATADOG_ID tag or the image name if the label is missing