Skip to content

Commit

Permalink
fix KSM job metrics (#4224)
Browse files Browse the repository at this point in the history
* fix job metrics and added some extra test

* fix flake8 test

* fix flake8 test on test

* fix black test

* fix flak8 test again

* changed extract timestamp + added test

* changed extract timestamp to add log

* fixed issue when timestamp not in order. Added tests for the logic.

* changed logic to have one array that gets reseted instead of one growing out of bounds array

* removed the array as it was unecessary and work with timestamps over the current run and over all the runs

* removed the array as it was unecessary and work with timestamps over the current run and over all the runs

* fixed flake8

* fixed flake8

* fixed brake

* fix one last time black

* removed two unused dict

* changed var names

* modified following the good comments and added few methods to the JobCount class
  • Loading branch information
Simwar authored and therve committed Aug 19, 2019
1 parent 33fecca commit 9ef31f7
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,22 @@ class KubernetesState(OpenMetricsBaseCheck):
See https://github.com/kubernetes/kube-state-metrics
"""

class JobCount:
def __init__(self):
self.count = 0
self.previous_run_max_ts = 0
self.current_run_max_ts = 0

def set_previous_and_reset_current_ts(self):
if self.current_run_max_ts > 0:
self.previous_run_max_ts = self.current_run_max_ts
self.current_run_max_ts = 0

def update_current_ts_and_add_count(self, job_ts, count):
if job_ts != 0 and job_ts > self.previous_run_max_ts:
self.count += count
self.current_run_max_ts = max(self.current_run_max_ts, job_ts)

DEFAULT_METRIC_LIMIT = 0

def __init__(self, name, init_config, agentConfig, instances=None):
Expand Down Expand Up @@ -80,21 +96,23 @@ def __init__(self, name, init_config, agentConfig, instances=None):
'kube_service_spec_type': self.count_objects_by_tags,
}

# Handling jobs succeeded/failed counts
self.failed_job_counts = defaultdict(KubernetesState.JobCount)
self.succeeded_job_counts = defaultdict(KubernetesState.JobCount)

def check(self, instance):
endpoint = instance.get('kube_state_url')

# Job counters are monotonic: they increase at every run of the job
# We want to send the delta via the `monotonic_count` method
self.job_succeeded_count = defaultdict(int)
self.job_failed_count = defaultdict(int)

scraper_config = self.config_map[endpoint]
self.process(scraper_config, metric_transformers=self.METRIC_TRANSFORMERS)

for job_tags, job_count in iteritems(self.job_succeeded_count):
self.monotonic_count(scraper_config['namespace'] + '.job.succeeded', job_count, list(job_tags))
for job_tags, job_count in iteritems(self.job_failed_count):
self.monotonic_count(scraper_config['namespace'] + '.job.failed', job_count, list(job_tags))
for job_tags, job in iteritems(self.failed_job_counts):
self.monotonic_count(scraper_config['namespace'] + '.job.failed', job.count, list(job_tags))
job.set_previous_and_reset_current_ts()

for job_tags, job in iteritems(self.succeeded_job_counts):
self.monotonic_count(scraper_config['namespace'] + '.job.succeeded', job.count, list(job_tags))
job.set_previous_and_reset_current_ts()

def _filter_metric(self, metric, scraper_config):
if scraper_config['telemetry']:
Expand Down Expand Up @@ -398,6 +416,18 @@ def _trim_job_tag(self, name):
pattern = r"(-\d{4,10}$)"
return re.sub(pattern, '', name)

def _extract_job_timestamp(self, name):
"""
Extract timestamp of job names
"""
ts = name.split('-')[-1]
if ts.isdigit():
return int(ts)
else:
msg = 'Cannot extract ts from job name {}'
self.log.debug(msg, name)
return 0

# Labels attached: namespace, pod
# As a message the phase=Pending|Running|Succeeded|Failed|Unknown
# From the phase the check will update its status
Expand Down Expand Up @@ -510,25 +540,31 @@ def kube_job_failed(self, metric, scraper_config):

def kube_job_status_failed(self, metric, scraper_config):
for sample in metric.samples:
job_ts = 0
tags = [] + scraper_config['custom_tags']
for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS]):
if label_name == 'job' or label_name == 'job_name':
trimmed_job = self._trim_job_tag(label_value)
job_ts = self._extract_job_timestamp(label_value)
tags.append(self._format_tag(label_name, trimmed_job, scraper_config))
else:
tags.append(self._format_tag(label_name, label_value, scraper_config))
self.job_failed_count[frozenset(tags)] += sample[self.SAMPLE_VALUE]
self.failed_job_counts[frozenset(tags)].update_current_ts_and_add_count(job_ts, sample[self.SAMPLE_VALUE])

def kube_job_status_succeeded(self, metric, scraper_config):
for sample in metric.samples:
job_ts = 0
tags = [] + scraper_config['custom_tags']
for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS]):
if label_name == 'job' or label_name == 'job_name':
trimmed_job = self._trim_job_tag(label_value)
job_ts = self._extract_job_timestamp(label_value)
tags.append(self._format_tag(label_name, trimmed_job, scraper_config))
else:
tags.append(self._format_tag(label_name, label_value, scraper_config))
self.job_succeeded_count[frozenset(tags)] += sample[self.SAMPLE_VALUE]
self.succeeded_job_counts[frozenset(tags)].update_current_ts_and_add_count(
job_ts, sample[self.SAMPLE_VALUE]
)

def kube_node_status_condition(self, metric, scraper_config):
""" The ready status of a cluster node. v1.0+"""
Expand Down
67 changes: 62 additions & 5 deletions kubernetes_state/tests/test_kubernetes_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ def close(self):
pass


def mock_from_file(fname):
with open(os.path.join(HERE, 'fixtures', fname), 'rb') as f:
return f.read()


@pytest.fixture
def instance():
return {'host': 'foo', 'kube_state_url': 'http://foo', 'tags': ['optional:tag1'], 'telemetry': False}
Expand All @@ -216,9 +221,7 @@ def instance():
@pytest.fixture
def check(instance):
check = KubernetesState(CHECK_NAME, {}, {}, [instance])
with open(os.path.join(HERE, 'fixtures', 'prometheus.txt'), 'rb') as f:
check.poll = mock.MagicMock(return_value=MockResponse(f.read(), 'text/plain'))

check.poll = mock.MagicMock(return_value=MockResponse(mock_from_file("prometheus.txt"), 'text/plain'))
return check


Expand Down Expand Up @@ -381,12 +384,66 @@ def test_pod_phase_gauges(aggregator, instance, check):
)


def test_extract_timestamp(check):
job_name = "hello2-1509998340"
job_name2 = "hello-2-1509998340"
job_name3 = "hello2"
result = check._extract_job_timestamp(job_name)
assert result == 1509998340
result = check._extract_job_timestamp(job_name2)
assert result == 1509998340
result = check._extract_job_timestamp(job_name3)
assert result == 0


def test_job_counts(aggregator, instance):
check = KubernetesState(CHECK_NAME, {}, {}, [instance])
payload = mock_from_file("prometheus.txt")
check.poll = mock.MagicMock(return_value=MockResponse(payload, 'text/plain'))

for _ in range(2):
check.check(instance)
aggregator.assert_metric(
NAMESPACE + '.job.failed', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=0
)
aggregator.assert_metric(
NAMESPACE + '.job.succeeded', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=3
)

# Re-run check to make sure we don't count the same jobs
check.check(instance)
aggregator.assert_metric(
NAMESPACE + '.job.failed', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=0
)
aggregator.assert_metric(
NAMESPACE + '.job.succeeded', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=3
)

# Edit the payload and rerun the check
payload = payload.replace(
b'kube_job_status_succeeded{job="hello-1509998340",namespace="default"} 1',
b'kube_job_status_succeeded{job="hello-1509998500",namespace="default"} 1',
)
payload = payload.replace(
b'kube_job_status_failed{job="hello-1509998340",namespace="default"} 0',
b'kube_job_status_failed{job="hello-1509998510",namespace="default"} 1',
)

check.poll = mock.MagicMock(return_value=MockResponse(payload, 'text/plain'))
check.check(instance)
aggregator.assert_metric(
NAMESPACE + '.job.failed', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=1
)
aggregator.assert_metric(
NAMESPACE + '.job.succeeded', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=4
)


def test_telemetry(aggregator, instance):
instance['telemetry'] = True

check = KubernetesState(CHECK_NAME, {}, {}, [instance])
with open(os.path.join(HERE, 'fixtures', 'prometheus.txt'), 'rb') as f:
check.poll = mock.MagicMock(return_value=MockResponse(f.read(), 'text/plain'))
check.poll = mock.MagicMock(return_value=MockResponse(mock_from_file("prometheus.txt"), 'text/plain'))

endpoint = instance['kube_state_url']
scraper_config = check.config_map[endpoint]
Expand Down

0 comments on commit 9ef31f7

Please sign in to comment.