Skip to content

Commit

Permalink
allow checks to limit the number of metric contexts they submit
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Aug 22, 2018
1 parent 196888f commit 475cf65
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 4 deletions.
39 changes: 35 additions & 4 deletions datadog_checks_base/datadog_checks/checks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,23 @@
from ..config import is_affirmative
from ..utils.common import ensure_bytes
from ..utils.proxy import config_proxy_skip
from ..utils.limiter import Limiter


# Metric types for which it's only useful to submit once per context
ONE_PER_CONTEXT_METRIC_TYPES = [
aggregator.GAUGE,
aggregator.RATE,
aggregator.MONOTONIC_COUNT,
]


class AgentCheck(object):
"""
The base class for any Agent based integrations
"""
OK, WARNING, CRITICAL, UNKNOWN = (0, 1, 2, 3)
DEFAULT_METRIC_LIMIT = 0

def __init__(self, *args, **kwargs):
"""
Expand All @@ -45,6 +55,7 @@ def __init__(self, *args, **kwargs):
self.init_config = kwargs.get('init_config', {})
self.agentConfig = kwargs.get('agentConfig', {})
self.warnings = []
self.metric_limiter = None

if len(args) > 0:
self.name = args[0]
Expand Down Expand Up @@ -98,6 +109,10 @@ def __init__(self, *args, **kwargs):
],
}

if self.DEFAULT_METRIC_LIMIT > 0:
self.metric_limiter = Limiter("metrics", self.DEFAULT_METRIC_LIMIT, self.warning)


@property
def in_developer_mode(self):
self._log_deprecation('in_developer_mode')
Expand All @@ -117,6 +132,9 @@ def get_instance_proxy(self, instance, uri, proxies=None):

return config_proxy_skip(proxies, uri, skip)

def _hash_context(mtype, name, value, tags=None, hostname=None):
return '{}-{}-{}-{}'.format(mtype, name, tags if tags is None else hash(frozenset(tags)), hostname)

def _submit_metric(self, mtype, name, value, tags=None, hostname=None, device_name=None):
if value is None:
# ignore metric sample
Expand All @@ -126,21 +144,32 @@ def _submit_metric(self, mtype, name, value, tags=None, hostname=None, device_na
if hostname is None:
hostname = b''

if self.metric_limiter:
if mtype in ONE_PER_CONTEXT_METRIC_TYPES:
# Fast path for gauges, rates, monotonic counters, assume one context per call
if self.metric_limiter.is_reached():
return
else:
# Other metric types have a legit use case for several calls per context, track unique contexts
context = self._hash_context(mtype, name, tags, hostname)
if self.metric_limiter.is_reached(context):
return

aggregator.submit_metric(self, self.check_id, mtype, ensure_bytes(name), float(value), tags, hostname)

def gauge(self, name, value, tags=None, hostname=None, device_name=None):
self._submit_metric(aggregator.GAUGE, name, value, tags=tags, hostname=hostname, device_name=device_name)

def rate(self, name, value, tags=None, hostname=None, device_name=None):
self._submit_metric(aggregator.RATE, name, value, tags=tags, hostname=hostname, device_name=device_name)

def count(self, name, value, tags=None, hostname=None, device_name=None):
self._submit_metric(aggregator.COUNT, name, value, tags=tags, hostname=hostname, device_name=device_name)

def monotonic_count(self, name, value, tags=None, hostname=None, device_name=None):
self._submit_metric(aggregator.MONOTONIC_COUNT, name, value, tags=tags, hostname=hostname,
device_name=device_name)

def rate(self, name, value, tags=None, hostname=None, device_name=None):
self._submit_metric(aggregator.RATE, name, value, tags=tags, hostname=hostname, device_name=device_name)

def histogram(self, name, value, tags=None, hostname=None, device_name=None):
self._submit_metric(aggregator.HISTOGRAM, name, value, tags=tags, hostname=hostname, device_name=device_name)

Expand Down Expand Up @@ -302,14 +331,16 @@ def run(self):
try:
self.check(copy.deepcopy(self.instances[0]))
result = b''

except Exception as e:
result = json.dumps([
{
"message": str(e),
"traceback": traceback.format_exc(),
}
])
finally:
if self.metric_limiter:
self.metric_limiter.reset()

return result

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ class GenericPrometheusCheck(AgentCheck):
- bar
- foo
"""

DEFAULT_METRIC_LIMIT = 350

def __init__(self, name, init_config, agentConfig, instances=None, default_instances=None, default_namespace=""):
super(GenericPrometheusCheck, self).__init__(name, init_config, agentConfig, instances)
self.scrapers_map = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#

class PrometheusCheck(PrometheusScraperMixin, AgentCheck):
DEFAULT_METRIC_LIMIT = 350

def __init__(self, name, init_config, agentConfig, instances=None):
super(PrometheusCheck, self).__init__(name, init_config, agentConfig, instances)

Expand Down
41 changes: 41 additions & 0 deletions datadog_checks_base/datadog_checks/utils/limiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# (C) Datadog, Inc. 2010-2016
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)


class Limiter(object):
def __init__(self, object_name, object_limit, warning_func=None):
self.warning = warning_func
self.name = object_name
self.limit = object_limit

self.reached_limit = False
self.count = 0
self.seen = set()

def reset(self):
self.reached_limit = False
self.count = 0
self.seen.clear()

def is_reached(self, uid=None):
if self.reached_limit:
return True

if uid:
if uid in self.seen:
return False
self.count += 1
self.seen.add(uid)
else:
self.count += 1

if self.count > self.limit:
if self.warning:
self.warning("Exceeded limit of {} {}, ignoring next ones".format(self.limit, self.name))
self.reached_limit = True
return True
return False

def get_status(self):
return (self.count, self.limit, self.reached_limit)
43 changes: 43 additions & 0 deletions datadog_checks_base/tests/test_agent_check.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# (C) Datadog, Inc. 2018
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
import unittest

from datadog_checks.checks import AgentCheck
from datadog_checks.stubs import aggregator


def test_instance():
Expand Down Expand Up @@ -45,3 +48,43 @@ def test_unicode_string(self):

assert normalized_tags is not tags
assert normalized_tag == tag.encode('utf-8')


class LimitedCheck(AgentCheck):
DEFAULT_METRIC_LIMIT = 10


class TestLimits(unittest.TestCase):
def tearDown(self):
aggregator.reset()

def test_metric_limit_gauges(self):
check = LimitedCheck()
assert check.get_warnings() == []

for i in range(0, 10):
check.gauge("metric", 0)
assert len(check.get_warnings()) == 0
assert len(aggregator.metrics("metric")) == 10

for i in range(0, 10):
check.gauge("metric", 0)
assert len(check.get_warnings()) == 1
assert len(aggregator.metrics("metric")) == 10

def test_metric_limit_count(self):
check = LimitedCheck()
assert check.get_warnings() == []

# Multiple calls for a single context should not trigger
for i in range(0, 20):
check.count("metric", 0, hostname="host-single")
assert len(check.get_warnings()) == 0
assert len(aggregator.metrics("metric")) == 20

# Multiple contexts should trigger
# Only 9 new contexts should pass through
for i in range(0, 20):
check.count("metric", 0, hostname="host-{}".format(i))
assert len(check.get_warnings()) == 1
assert len(aggregator.metrics("metric")) == 29
55 changes: 55 additions & 0 deletions datadog_checks_base/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# (C) Datadog, Inc. 2018
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)

from datadog_checks.utils.common import pattern_filter
from datadog_checks.utils.limiter import Limiter


class Item:
Expand Down Expand Up @@ -50,3 +52,56 @@ def test_key_function(self):
assert pattern_filter(items, whitelist=whitelist, key=lambda item: item.name) == [
Item('abc'), Item('def'), Item('abcdef')
]


class TestLimiter():
def test_no_uid(self):
warnings = []
limiter = Limiter("names", 10, warning_func=warnings.append)
for i in range(0, 10):
assert limiter.is_reached() is False
assert limiter.get_status() == (10, 10, False)

# Reach limit
assert limiter.is_reached() is True
assert limiter.get_status() == (11, 10, True)
assert warnings == ["Exceeded limit of 10 names, ignoring next ones"]

# Make sure warning is only sent once
assert limiter.is_reached() is True
assert len(warnings) == 1

def test_with_uid(self):
warnings = []
limiter = Limiter("names", 10, warning_func=warnings.append)
for i in range(0, 20):
assert limiter.is_reached("dummy1") is False
assert limiter.get_status() == (1, 10, False)

for i in range(0, 20):
assert limiter.is_reached("dummy2") is False
assert limiter.get_status() == (2, 10, False)
assert len(warnings) == 0

def test_mixed(self):
limiter = Limiter("names", 10)

for i in range(0, 20):
assert limiter.is_reached("dummy1") is False
assert limiter.get_status() == (1, 10, False)

for i in range(0, 5):
assert limiter.is_reached() is False
assert limiter.get_status() == (6, 10, False)

def test_reset(self):
limiter = Limiter("names", 10)

for i in range(1, 20):
limiter.is_reached("dummy1")
assert limiter.get_status() == (1, 10, False)

limiter.reset()
assert limiter.get_status() == (0, 10, False)
assert limiter.is_reached("dummy1") is False
assert limiter.get_status() == (1, 10, False)
2 changes: 2 additions & 0 deletions kubelet/datadog_checks/kubelet/kubelet.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class KubeletCheck(AgentCheck, CadvisorScraper):
Collect metrics from Kubelet.
"""

DEFAULT_METRIC_LIMIT = 6000

def __init__(self, name, init_config, agentConfig, instances=None):
super(KubeletCheck, self).__init__(name, init_config, agentConfig, instances)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class KubernetesState(PrometheusCheck):
Collect kube-state-metrics metrics in the Prometheus format
See https://github.com/kubernetes/kube-state-metrics
"""
DEFAULT_METRIC_LIMIT = 20000

def __init__(self, name, init_config, agentConfig, instances=None):
super(KubernetesState, self).__init__(name, init_config, agentConfig, instances)
self.NAMESPACE = 'kubernetes_state'
Expand Down

0 comments on commit 475cf65

Please sign in to comment.