Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kube] Reduce apiserver request count for pod_service_mapper #3387

Merged
merged 1 commit into from
Jun 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions tests/core/test_kube_event_retriever.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# stdlib
import time # noqa: F401

# 3rd party
from mock import patch

Expand Down Expand Up @@ -38,6 +41,30 @@ def test_events_resversion_filtering(self):
self.assertEquals(0, len(events)) # No new event
self.assertEquals(2709, retr.last_resversion)

@patch('time.time')
def test_events_delay(self, mock_time):
jsons = self._load_json_array(
['service_cache_events1.json', 'service_cache_events2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
retr = KubeEventRetriever(self.kube, delay=500)

mock_time.return_value = 10000
events = retr.get_event_array()
self.assertEquals(3, len(events))
self.assertEquals(2707, retr.last_resversion)

# Must skip request
mock_time.return_value = 10400
events = retr.get_event_array()
self.assertEquals(0, len(events))
self.assertEquals(2707, retr.last_resversion)

# Must retrieve events
mock_time.return_value = 10600
events = retr.get_event_array()
self.assertEquals(2, len(events))
self.assertEquals(2709, retr.last_resversion)

def test_namespace_serverside_filtering(self):
with patch.object(self.kube, 'retrieve_json_auth', return_value={}) as mock_method:
retr = KubeEventRetriever(self.kube, namespaces=['testns'])
Expand Down
38 changes: 9 additions & 29 deletions tests/core/test_kube_pod_service_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_init(self):
self.assertEqual(0, len(mapper._pod_services_mapping))

def test_service_cache_fill(self):
jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json'])
jsons = self._load_json_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
mapper._fill_services_cache()
Expand All @@ -46,36 +46,16 @@ def test_service_cache_fill(self):
self.assertEqual('hello', redis['app'])
self.assertEqual('db', redis['tier'])

def test_service_cache_invalidation_true(self):
jsons = self._load_json_array(
['service_cache_events1.json', 'service_cache_services1.json', 'service_cache_events2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
mapper._fill_services_cache()
mapper.check_services_cache_freshness()
self.assertEqual(True, mapper._service_cache_invalidated)

def test_service_cache_invalidation_false(self):
jsons = self._load_json_array(
['service_cache_events1.json', 'service_cache_services1.json', 'service_cache_events1.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
self.assertEqual(True, mapper._service_cache_invalidated)
mapper._fill_services_cache()
self.assertEqual(False, mapper._service_cache_invalidated)
mapper.check_services_cache_freshness()
self.assertEqual(False, mapper._service_cache_invalidated)

def test_pod_to_service_no_match(self):
jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json'])
jsons = self._load_json_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
mapper._fill_services_cache()
no_match = self._build_pod_metadata(0, {'app': 'unknown'})
self.assertEqual(0, len(mapper.match_services_for_pod(no_match)))

def test_pod_to_service_two_matches(self):
jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json'])
jsons = self._load_json_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
two_matches = self._build_pod_metadata(0, {'app': 'hello', 'tier': 'db'})
Expand All @@ -86,7 +66,7 @@ def test_pod_to_service_two_matches(self):
sorted(mapper.match_services_for_pod(two_matches, names=True)))

def test_pod_to_service_cache(self):
jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json'])
jsons = self._load_json_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
two_matches = self._build_pod_metadata(0, {'app': 'hello', 'tier': 'db'})
Expand All @@ -97,7 +77,7 @@ def test_pod_to_service_cache(self):
sorted(mapper.match_services_for_pod({'uid': 0}, names=True)))

def test_pods_for_service(self):
jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json'])
jsons = self._load_json_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
# Fill pod label cache
mapper = PodServiceMapper(self.kube)
Expand All @@ -123,7 +103,7 @@ def _prepare_events_tests(self, jsonfiles):
return mapper

def test_event_pod_invalidation(self):
mapper = self._prepare_events_tests(['service_cache_events2.json', 'service_cache_services2.json'])
mapper = self._prepare_events_tests(['service_cache_services2.json'])
self.assertTrue(0 in mapper._pod_labels_cache)
self.assertTrue(0 in mapper._pod_services_mapping)
self.assertTrue(1 in mapper._pod_labels_cache)
Expand All @@ -138,7 +118,7 @@ def test_event_pod_invalidation(self):
self.assertTrue(1 in mapper._pod_services_mapping)

def test_event_service_deleted_invalidation(self):
mapper = self._prepare_events_tests(['service_cache_events2.json', 'service_cache_services2.json'])
mapper = self._prepare_events_tests(['service_cache_services2.json'])
self.assertEqual(2, len(mapper.match_services_for_pod({'uid': 0})))

event = {'involvedObject': {'kind': 'Service', 'uid': REDIS_HELLO_UID},
Expand All @@ -149,13 +129,13 @@ def test_event_service_deleted_invalidation(self):
self.assertEqual(1, len(mapper.match_services_for_pod({'uid': 0})))

def test_event_service_created_invalidation(self):
mapper = self._prepare_events_tests(['service_cache_events1.json', 'service_cache_services1.json'])
mapper = self._prepare_events_tests(['service_cache_services1.json'])
self.assertEqual(1, len(mapper.match_services_for_pod(
self._build_pod_metadata(0, {'app': 'hello', 'tier': 'db'}))))

event = {'involvedObject': {'kind': 'Service', 'uid': ALL_HELLO_UID},
'reason': 'CreatedLoadBalancer'}
jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json'])
jsons = self._load_json_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
# Three pods must be reloaded
self.assertEqual(set([0, 1, 3]), mapper.process_events([event]))
Expand Down
21 changes: 20 additions & 1 deletion utils/kubernetes/kube_event_retriever.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import time

log = logging.getLogger('collector')

Expand All @@ -25,12 +26,22 @@ class KubeEventRetriever:
[3] https://github.com/kubernetes/kubernetes/issues/1362
"""

def __init__(self, kubeutil_object, namespaces=None, kinds=None):
def __init__(self, kubeutil_object, namespaces=None, kinds=None, delay=None):
"""
:param kubeutil_object: valid, initialised KubeUtil objet to route requests through
:param namespaces: namespace(s) to watch (string or list)
:param kinds: kinds(s) to watch (string or list)
:param delay: minimum time (in seconds) between two apiserver requests, return [] in the meantime
"""
self.kubeutil = kubeutil_object
self.last_resversion = -1
self.set_namespaces(namespaces)
self.set_kinds(kinds)

# Request throttling to reduce apiserver traffic
self._request_interval = delay
self._last_lookup_timestamp = -1

def set_namespaces(self, namespaces):
self.request_url = self.kubeutil.kubernetes_api_url + '/events'
self.namespace_filter = None
Expand Down Expand Up @@ -60,6 +71,14 @@ def get_event_array(self):
Fetch latest events from the apiserver for the namespaces and kinds set on init
and returns an array of event objects
"""

# Request throttling
if self._request_interval:
if (time.time() - self._last_lookup_timestamp) < self._request_interval:
return []
else:
self._last_lookup_timestamp = time.time()

lastest_resversion = None
filtered_events = []

Expand Down
25 changes: 10 additions & 15 deletions utils/kubernetes/kubeutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class KubeUtil:
DEFAULT_MASTER_PORT = 8080
DEFAULT_MASTER_NAME = 'kubernetes' # DNS name to reach the master from a pod.
DEFAULT_LABEL_PREFIX = 'kube_'
DEFAULT_COLLECT_SERVICE_TAG = True
CA_CRT_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/ca.crt'
AUTH_TOKEN_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/token'

Expand Down Expand Up @@ -102,6 +103,9 @@ def __init__(self, instance=None):
self.metrics_url = urljoin(self.cadvisor_url, KubeUtil.METRICS_PATH)
self.machine_info_url = urljoin(self.cadvisor_url, KubeUtil.MACHINE_INFO_PATH)

from config import _is_affirmative
self.collect_service_tag = _is_affirmative(instance.get('collect_service_tags', KubeUtil.DEFAULT_COLLECT_SERVICE_TAG))

# keep track of the latest k8s event we collected and posted
# default value is 0 but TTL for k8s events is one hour anyways
self.last_event_collection_ts = 0
Expand Down Expand Up @@ -231,9 +235,10 @@ def extract_kube_pod_tags(self, pods_list, excluded_keys=None, label_prefix=None
podtags = self.get_pod_creator_tags(metadata)

# Extract services tags
for service in self.match_services_for_pod(metadata):
if service is not None:
podtags.append(u'kube_service:%s' % service)
if self.collect_service_tag:
for service in self.match_services_for_pod(metadata):
if service is not None:
podtags.append(u'kube_service:%s' % service)

# Extract labels
for k, v in labels.iteritems():
Expand Down Expand Up @@ -387,16 +392,6 @@ def get_auth_token(cls):

return None

def check_services_cache_freshness(self):
"""
Entry point for sd_docker_backend to check whether to invalidate the cached services
For now, we remove the whole cache as the fill_service_cache logic
doesn't handle partial lookups

We use the event's resourceVersion, as using the service's version wouldn't catch deletion
"""
return self._service_mapper.check_services_cache_freshness()

def match_services_for_pod(self, pod_metadata, refresh=False):
"""
Match the pods labels with services' label selectors to determine the list
Expand All @@ -408,11 +403,11 @@ def match_services_for_pod(self, pod_metadata, refresh=False):
#log.warning("Matches for %s: %s" % (pod_metadata.get('name'), str(s)))
return s

def get_event_retriever(self, namespaces=None, kinds=None):
def get_event_retriever(self, namespaces=None, kinds=None, delay=None):
"""
Returns a KubeEventRetriever object ready for action
"""
return KubeEventRetriever(self, namespaces, kinds)
return KubeEventRetriever(self, namespaces, kinds, delay)

def match_containers_for_pods(self, pod_uids, podlist=None):
"""
Expand Down
28 changes: 3 additions & 25 deletions utils/kubernetes/pod_service_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,16 @@ def __init__(self, kubeutil_object):
The apiserver requests are routed through the given KubeUtil instance
"""
self.kube = kubeutil_object
self._event_retriever = self.kube.get_event_retriever(kinds=['Service'])
self._service_cache_selectors = defaultdict(dict) # {service_uid:{selectors}}
self._service_cache_names = {} # {service_uid:service_name
self._service_cache_invalidated = False # True to trigger service parsing
self._service_cache_invalidated = True
self._pod_labels_cache = defaultdict(dict) # {pod_uid:{label}}
self._pod_services_mapping = defaultdict(list) # {pod_uid:[service_uid]}

# Consume past events
self.check_services_cache_freshness()
self._service_cache_invalidated = True

def _fill_services_cache(self):
"""
Get the list of services from the kubelet API and store the label selector dicts.
The cache is to be invalidated by the user class by calling check_services_cache_freshness
The cache is to be invalidated by the user class by calling process_events
"""
try:
reply = self.kube.retrieve_json_auth(self.kube.kubernetes_api_url + '/services')
Expand All @@ -51,23 +46,6 @@ def _fill_services_cache(self):
self._service_cache_names = {}
self._service_cache_invalidated = False

def check_services_cache_freshness(self):
"""
Entry point for sd_docker_backend to check whether to invalidate the cached services
For now, we remove the whole cache as the fill_service_cache logic
doesn't handle partial lookups
"""

# Don't check if cache is already invalidated
if self._service_cache_invalidated:
return

try:
if self._event_retriever.get_event_array():
self._service_cache_invalidated = True
except Exception as e:
log.warning("Exception while parsing service events, not invalidating cache: %s", e)

def match_services_for_pod(self, pod_metadata, refresh=False, names=False):
"""
Match the pods labels with services' label selectors to determine the list
Expand Down Expand Up @@ -168,7 +146,7 @@ def process_events(self, event_array):
service_uid = event.get('involvedObject', {}).get('uid', None)

if service_cache_checked is False:
self.check_services_cache_freshness()
self._service_cache_invalidated = True
service_cache_checked = True

# Possible values in kubernetes/pkg/controller/service/servicecontroller.go
Expand Down
15 changes: 5 additions & 10 deletions utils/service_discovery/sd_docker_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,6 @@ def _make_fetch_state(self):
def update_checks(self, changed_containers):
state = self._make_fetch_state()

if Platform.is_k8s():
self.kubeutil.check_services_cache_freshness()

conf_reload_set = set()
for c_id in changed_containers:
checks = self._get_checks_to_refresh(state, c_id)
Expand Down Expand Up @@ -307,10 +304,11 @@ def get_tags(self, state, c_id):
tags.extend(creator_tags)

# add services tags
services = self.kubeutil.match_services_for_pod(pod_metadata)
for s in services:
if s is not None:
tags.append('kube_service:%s' % s)
if self.kubeutil.collect_service_tag:
services = self.kubeutil.match_services_for_pod(pod_metadata)
for s in services:
if s is not None:
tags.append('kube_service:%s' % s)

elif Platform.is_swarm():
c_labels = c_inspect.get('Config', {}).get('Labels', {})
Expand Down Expand Up @@ -366,9 +364,6 @@ def get_configs(self):
container.get('Id'), container.get('Labels')
) for container in self.docker_client.containers()]

if Platform.is_k8s():
self.kubeutil.check_services_cache_freshness()

for image, cid, labels in containers:
try:
# value of the DATADOG_ID tag or the image name if the label is missing
Expand Down