From e66cd285a800cc1e9df969876fc6a0042548593c Mon Sep 17 00:00:00 2001 From: Rishabh Moudgil Date: Thu, 9 Aug 2018 14:50:51 -0400 Subject: [PATCH] Use OpenMetricsBaseCheck --- .../kubernetes_state/kubernetes_state.py | 520 ++++++++++-------- kubernetes_state/setup.py | 10 - .../tests/test_kubernetes_state.py | 17 +- 3 files changed, 292 insertions(+), 255 deletions(-) diff --git a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py index 94727d25d5e0a..c42fca713ed6e 100644 --- a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py +++ b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py @@ -7,7 +7,8 @@ from collections import defaultdict, Counter from datadog_checks.errors import CheckException -from datadog_checks.checks.prometheus import PrometheusCheck +from datadog_checks.checks.openmetrics import OpenMetricsBaseCheck +from datadog_checks.config import is_affirmative METRIC_TYPES = ['counter', 'gauge'] @@ -17,14 +18,30 @@ WHITELISTED_TERMINATED_REASONS = ['oomkilled', 'containercannotrun', 'error'] -class KubernetesState(PrometheusCheck): +class KubernetesState(OpenMetricsBaseCheck): """ Collect kube-state-metrics metrics in the Prometheus format See https://github.com/kubernetes/kube-state-metrics """ + + METRIC_TRANSFORMERS = {} + def __init__(self, name, init_config, agentConfig, instances=None): - super(KubernetesState, self).__init__(name, init_config, agentConfig, instances) - self.NAMESPACE = 'kubernetes_state' + generic_instances = [] + if instances is not None: + # We do not support more than one instance of kube-state-metrics + extra_labels = instances[0].get('label_joins', {}) + hostname_override = is_affirmative(instances[0].get('hostname_override', True)) + + for instance in instances: + kubernetes_state_instance = self._create_kubernetes_state_prometheus_instance( + instance, + extra_labels, + hostname_override + ) + generic_instances.append(kubernetes_state_instance) + + super(KubernetesState, self).__init__(name, init_config, agentConfig, generic_instances) self.pod_phase_to_status = { 'Pending': self.WARNING, @@ -46,165 +63,177 @@ def __init__(self, name, init_config, agentConfig, instances=None): 'unknown': self.UNKNOWN } - self.metrics_mapper = { - 'kube_daemonset_status_current_number_scheduled': 'daemonset.scheduled', - 'kube_daemonset_status_desired_number_scheduled': 'daemonset.desired', - 'kube_daemonset_status_number_misscheduled': 'daemonset.misscheduled', - 'kube_daemonset_status_number_ready': 'daemonset.ready', - 'kube_deployment_spec_paused': 'deployment.paused', - 'kube_deployment_spec_replicas': 'deployment.replicas_desired', - 'kube_deployment_spec_strategy_rollingupdate_max_unavailable': 'deployment.rollingupdate.max_unavailable', - 'kube_deployment_status_replicas': 'deployment.replicas', - 'kube_deployment_status_replicas_available': 'deployment.replicas_available', - 'kube_deployment_status_replicas_unavailable': 'deployment.replicas_unavailable', - 'kube_deployment_status_replicas_updated': 'deployment.replicas_updated', - 'kube_hpa_spec_min_replicas': 'hpa.min_replicas', - 'kube_hpa_spec_max_replicas': 'hpa.max_replicas', - 'kube_hpa_status_desired_replicas': 'hpa.desired_replicas', - 'kube_hpa_status_current_replicas': 'hpa.current_replicas', - 'kube_node_status_allocatable_cpu_cores': 'node.cpu_allocatable', - 'kube_node_status_allocatable_memory_bytes': 'node.memory_allocatable', - 'kube_node_status_allocatable_pods': 'node.pods_allocatable', - 'kube_node_status_capacity_cpu_cores': 'node.cpu_capacity', - 'kube_node_status_capacity_memory_bytes': 'node.memory_capacity', - 'kube_node_status_capacity_pods': 'node.pods_capacity', - 'kube_node_status_allocatable_nvidia_gpu_cards': 'node.gpu.cards_allocatable', - 'kube_node_status_capacity_nvidia_gpu_cards': 'node.gpu.cards_capacity', - 'kube_pod_container_status_terminated': 'container.terminated', - 'kube_pod_container_status_waiting': 'container.waiting', - 'kube_persistentvolumeclaim_status_phase': 'persistentvolumeclaim.status', - 'kube_pod_container_resource_limits_cpu_cores': 'container.cpu_limit', - 'kube_pod_container_resource_limits_memory_bytes': 'container.memory_limit', - 'kube_pod_container_resource_requests_cpu_cores': 'container.cpu_requested', - 'kube_pod_container_resource_requests_memory_bytes': 'container.memory_requested', - 'kube_pod_container_status_ready': 'container.ready', - 'kube_pod_container_status_restarts': 'container.restarts', # up to kube-state-metrics 1.1.x - 'kube_pod_container_status_restarts_total': 'container.restarts', # from kube-state-metrics 1.2.0 - 'kube_pod_container_status_running': 'container.running', - 'kube_pod_container_resource_requests_nvidia_gpu_devices': 'container.gpu.request', - 'kube_pod_container_resource_limits_nvidia_gpu_devices': 'container.gpu.limit', - 'kube_pod_status_ready': 'pod.ready', - 'kube_pod_status_scheduled': 'pod.scheduled', - 'kube_replicaset_spec_replicas': 'replicaset.replicas_desired', - 'kube_replicaset_status_fully_labeled_replicas': 'replicaset.fully_labeled_replicas', - 'kube_replicaset_status_ready_replicas': 'replicaset.replicas_ready', - 'kube_replicaset_status_replicas': 'replicaset.replicas', - 'kube_replicationcontroller_spec_replicas': 'replicationcontroller.replicas_desired', - 'kube_replicationcontroller_status_available_replicas': 'replicationcontroller.replicas_available', - 'kube_replicationcontroller_status_fully_labeled_replicas': 'replicationcontroller.fully_labeled_replicas', - 'kube_replicationcontroller_status_ready_replicas': 'replicationcontroller.replicas_ready', - 'kube_replicationcontroller_status_replicas': 'replicationcontroller.replicas', - 'kube_statefulset_replicas': 'statefulset.replicas_desired', - 'kube_statefulset_status_replicas': 'statefulset.replicas', - 'kube_statefulset_status_replicas_current': 'statefulset.replicas_current', - 'kube_statefulset_status_replicas_ready': 'statefulset.replicas_ready', - 'kube_statefulset_status_replicas_updated': 'statefulset.replicas_updated', - } - - self.ignore_metrics = [ - # _info, _labels and _created don't convey any metric - 'kube_cronjob_info', - 'kube_cronjob_created', - 'kube_daemonset_created', - 'kube_deployment_created', - 'kube_deployment_labels', - 'kube_job_created', - 'kube_job_info', - 'kube_limitrange_created', - 'kube_namespace_created', - 'kube_namespace_labels', - 'kube_node_created', - 'kube_node_info', - 'kube_node_labels', - 'kube_pod_created' - 'kube_pod_container_info', - 'kube_pod_info', - 'kube_pod_owner', - 'kube_pod_start_time', - 'kube_pod_labels', - 'kube_replicaset_created', - 'kube_replicationcontroller_created', - 'kube_resourcequota_created', - 'kube_service_created', - 'kube_service_info', - 'kube_service_labels', - 'kube_statefulset_labels', - 'kube_statefulset_created', - # _generation metrics are more metadata than metrics, no real use case for now - 'kube_daemonset_metadata_generation', - 'kube_deployment_metadata_generation', - 'kube_deployment_status_observed_generation', - 'kube_replicaset_metadata_generation', - 'kube_replicaset_status_observed_generation', - 'kube_replicationcontroller_metadata_generation', - 'kube_replicationcontroller_status_observed_generation', - 'kube_statefulset_metadata_generation', - 'kube_statefulset_status_observed_generation', - 'kube_hpa_metadata_generation', - # kube_node_status_phase and kube_namespace_status_phase have no use case as a service check - 'kube_namespace_status_phase', - 'kube_node_status_phase', - # These CronJob and Job metrics need use cases to determine how do implement - 'kube_cronjob_status_active', - 'kube_cronjob_status_last_schedule_time', - 'kube_cronjob_spec_suspend', - 'kube_cronjob_spec_starting_deadline_seconds', - 'kube_job_spec_active_dealine_seconds', - 'kube_job_spec_completions', - 'kube_job_spec_parallelism', - 'kube_job_status_active', - 'kube_job_status_completion_time', # We could compute the duration=completion-start as a gauge - 'kube_job_status_start_time', - ] - - self.label_joins = { - 'kube_pod_info': { - 'label_to_match': 'pod', - 'labels_to_get': ['node'] - } + self.METRIC_TRANSFORMERS = { + 'kube_pod_status_phase': self.kube_pod_status_phase, + 'kube_pod_container_status_waiting_reason': self.kube_pod_container_status_waiting_reason, + 'kube_pod_container_status_terminated_reason': self.kube_pod_container_status_terminated_reason, + 'kube_cronjob_next_schedule_time': self.kube_cronjob_next_schedule_time, + 'kube_job_complete': self.kube_job_complete, + 'kube_job_failed': self.kube_job_failed, + 'kube_job_status_failed': self.kube_job_status_failed, + 'kube_job_status_succeeded': self.kube_job_status_succeeded, + 'kube_node_status_condition': self.kube_node_status_condition, + 'kube_node_status_ready': self.kube_node_status_ready, + 'kube_node_status_out_of_disk': self.kube_node_status_out_of_disk, + 'kube_node_status_memory_pressure': self.kube_node_status_memory_pressure, + 'kube_node_status_disk_pressure': self.kube_node_status_disk_pressure, + 'kube_node_status_network_unavailable': self.kube_node_status_network_unavailable, + 'kube_node_spec_unschedulable': self.kube_node_spec_unschedulable, + 'kube_resourcequota': self.kube_resourcequota, + 'kube_limitrange': self.kube_limitrange } - # We do not support more than one instance of kube-state-metrics - extra_labels = instances[0].get("label_joins", {}) - self.label_joins.update(extra_labels) - hostname_override = instances[0].get('hostname_override', True) - if hostname_override: - self.label_to_hostname = 'node' - def check(self, instance): endpoint = instance.get('kube_state_url') - if endpoint is None: - raise CheckException("Unable to find kube_state_url in config file.") - - self.set_prometheus_timeout(instance) - if 'labels_mapper' in instance: - if isinstance(instance['labels_mapper'], dict): - self.labels_mapper = instance['labels_mapper'] - else: - self.log.warning("labels_mapper should be a dictionnary") - - send_buckets = instance.get('send_histograms_buckets', True) - # By default we send the buckets. - if send_buckets is not None and str(send_buckets).lower() == 'false': - send_buckets = False - else: - send_buckets = True - - self.custom_tags = instance.get('tags', []) - if self.custom_tags is None: - self.custom_tags = [] # Job counters are monotonic: they increase at every run of the job # We want to send the delta via the `monotonic_count` method self.job_succeeded_count = defaultdict(int) self.job_failed_count = defaultdict(int) - self.process(endpoint, send_histograms_buckets=send_buckets, instance=instance) + scraper_config = self.config_map[endpoint] + self.process(scraper_config, metric_transformers=self.METRIC_TRANSFORMERS) for job_tags, job_count in self.job_succeeded_count.iteritems(): - self.monotonic_count(self.NAMESPACE + '.job.succeeded', job_count, list(job_tags)) + self.monotonic_count(scraper_config['NAMESPACE'] + '.job.succeeded', job_count, list(job_tags)) for job_tags, job_count in self.job_failed_count.iteritems(): - self.monotonic_count(self.NAMESPACE + '.job.failed', job_count, list(job_tags)) + self.monotonic_count(scraper_config['NAMESPACE'] + '.job.failed', job_count, list(job_tags)) + + def _create_kubernetes_state_prometheus_instance(self, instance, extra_labels, hostname_override): + """ + Set up the kubernetes_state instance so it can be used in OpenMetricsBaseCheck + """ + endpoint = instance.get('kube_state_url') + if endpoint is None: + raise CheckException("Unable to find kube_state_url in config file.") + + instance.update({ + 'namespace': 'kubernetes_state', + 'metrics': [{ + 'kube_daemonset_status_current_number_scheduled': 'daemonset.scheduled', + 'kube_daemonset_status_desired_number_scheduled': 'daemonset.desired', + 'kube_daemonset_status_number_misscheduled': 'daemonset.misscheduled', + 'kube_daemonset_status_number_ready': 'daemonset.ready', + 'kube_deployment_spec_paused': 'deployment.paused', + 'kube_deployment_spec_replicas': 'deployment.replicas_desired', + 'kube_deployment_spec_strategy_rollingupdate_max_unavailable': 'deployment.rollingupdate.max_unavailable', # noqa: E501 + 'kube_deployment_status_replicas': 'deployment.replicas', + 'kube_deployment_status_replicas_available': 'deployment.replicas_available', + 'kube_deployment_status_replicas_unavailable': 'deployment.replicas_unavailable', + 'kube_deployment_status_replicas_updated': 'deployment.replicas_updated', + 'kube_hpa_spec_min_replicas': 'hpa.min_replicas', + 'kube_hpa_spec_max_replicas': 'hpa.max_replicas', + 'kube_hpa_status_desired_replicas': 'hpa.desired_replicas', + 'kube_hpa_status_current_replicas': 'hpa.current_replicas', + 'kube_node_status_allocatable_cpu_cores': 'node.cpu_allocatable', + 'kube_node_status_allocatable_memory_bytes': 'node.memory_allocatable', + 'kube_node_status_allocatable_pods': 'node.pods_allocatable', + 'kube_node_status_capacity_cpu_cores': 'node.cpu_capacity', + 'kube_node_status_capacity_memory_bytes': 'node.memory_capacity', + 'kube_node_status_capacity_pods': 'node.pods_capacity', + 'kube_node_status_allocatable_nvidia_gpu_cards': 'node.gpu.cards_allocatable', + 'kube_node_status_capacity_nvidia_gpu_cards': 'node.gpu.cards_capacity', + 'kube_pod_container_status_terminated': 'container.terminated', + 'kube_pod_container_status_waiting': 'container.waiting', + 'kube_persistentvolumeclaim_status_phase': 'persistentvolumeclaim.status', + 'kube_pod_container_resource_limits_cpu_cores': 'container.cpu_limit', + 'kube_pod_container_resource_limits_memory_bytes': 'container.memory_limit', + 'kube_pod_container_resource_requests_cpu_cores': 'container.cpu_requested', + 'kube_pod_container_resource_requests_memory_bytes': 'container.memory_requested', + 'kube_pod_container_status_ready': 'container.ready', + 'kube_pod_container_status_restarts': 'container.restarts', # up to kube-state-metrics 1.1.x + 'kube_pod_container_status_restarts_total': 'container.restarts', # from kube-state-metrics 1.2.0 + 'kube_pod_container_status_running': 'container.running', + 'kube_pod_container_resource_requests_nvidia_gpu_devices': 'container.gpu.request', + 'kube_pod_container_resource_limits_nvidia_gpu_devices': 'container.gpu.limit', + 'kube_pod_status_ready': 'pod.ready', + 'kube_pod_status_scheduled': 'pod.scheduled', + 'kube_replicaset_spec_replicas': 'replicaset.replicas_desired', + 'kube_replicaset_status_fully_labeled_replicas': 'replicaset.fully_labeled_replicas', + 'kube_replicaset_status_ready_replicas': 'replicaset.replicas_ready', + 'kube_replicaset_status_replicas': 'replicaset.replicas', + 'kube_replicationcontroller_spec_replicas': 'replicationcontroller.replicas_desired', + 'kube_replicationcontroller_status_available_replicas': 'replicationcontroller.replicas_available', + 'kube_replicationcontroller_status_fully_labeled_replicas': 'replicationcontroller.fully_labeled_replicas', # noqa: E501 + 'kube_replicationcontroller_status_ready_replicas': 'replicationcontroller.replicas_ready', + 'kube_replicationcontroller_status_replicas': 'replicationcontroller.replicas', + 'kube_statefulset_replicas': 'statefulset.replicas_desired', + 'kube_statefulset_status_replicas': 'statefulset.replicas', + 'kube_statefulset_status_replicas_current': 'statefulset.replicas_current', + 'kube_statefulset_status_replicas_ready': 'statefulset.replicas_ready', + 'kube_statefulset_status_replicas_updated': 'statefulset.replicas_updated', + }], + 'ignore_metrics': [ + # _info, _labels and _created don't convey any metric + 'kube_cronjob_info', + 'kube_cronjob_created', + 'kube_daemonset_created', + 'kube_deployment_created', + 'kube_deployment_labels', + 'kube_job_created', + 'kube_job_info', + 'kube_limitrange_created', + 'kube_namespace_created', + 'kube_namespace_labels', + 'kube_node_created', + 'kube_node_info', + 'kube_node_labels', + 'kube_pod_created' + 'kube_pod_container_info', + 'kube_pod_info', + 'kube_pod_owner', + 'kube_pod_start_time', + 'kube_pod_labels', + 'kube_replicaset_created', + 'kube_replicationcontroller_created', + 'kube_resourcequota_created', + 'kube_service_created', + 'kube_service_info', + 'kube_service_labels', + 'kube_statefulset_labels', + 'kube_statefulset_created', + # _generation metrics are more metadata than metrics, no real use case for now + 'kube_daemonset_metadata_generation', + 'kube_deployment_metadata_generation', + 'kube_deployment_status_observed_generation', + 'kube_replicaset_metadata_generation', + 'kube_replicaset_status_observed_generation', + 'kube_replicationcontroller_metadata_generation', + 'kube_replicationcontroller_status_observed_generation', + 'kube_statefulset_metadata_generation', + 'kube_statefulset_status_observed_generation', + 'kube_hpa_metadata_generation', + # kube_node_status_phase and kube_namespace_status_phase have no use case as a service check + 'kube_namespace_status_phase', + 'kube_node_status_phase', + # These CronJob and Job metrics need use cases to determine how do implement + 'kube_cronjob_status_active', + 'kube_cronjob_status_last_schedule_time', + 'kube_cronjob_spec_suspend', + 'kube_cronjob_spec_starting_deadline_seconds', + 'kube_job_spec_active_dealine_seconds', + 'kube_job_spec_completions', + 'kube_job_spec_parallelism', + 'kube_job_status_active', + 'kube_job_status_completion_time', # We could compute the duration=completion-start as a gauge + 'kube_job_status_start_time', + ], + 'label_joins': { + 'kube_pod_info': { + 'label_to_match': 'pod', + 'labels_to_get': ['node'] + } + } + }) + + instance['prometheus_url'] = endpoint + instance['label_joins'].update(extra_labels) + instance['label_to_hostname'] = 'node' if hostname_override else None + + if 'labels_mapper' in instance and not isinstance(instance['labels_mapper'], dict): + self.log.warning("Option labels_mapper should be a dictionary for {}".format(endpoint)) + + return instance def _condition_to_service_check(self, metric, sc_name, mapping, tags=None): """ @@ -231,7 +260,7 @@ def _condition_to_service_check(self, metric, sc_name, mapping, tags=None): else: self.log.debug("Unable to handle %s - unknown condition %s" % (sc_name, label.value)) - def _condition_to_tag_check(self, metric, base_sc_name, mapping, tags=None): + def _condition_to_tag_check(self, metric, base_sc_name, mapping, scraper_config, tags=None): """ Metrics from kube-state-metrics have changed For example: @@ -255,11 +284,13 @@ def _condition_to_tag_check(self, metric, base_sc_name, mapping, tags=None): mapping = condition_map['mapping'] if base_sc_name == 'kubernetes_state.pod.phase': - message = "{} is currently reporting {}".format(self._label_to_tag('pod', metric.label), - self._label_to_tag('phase', metric.label)) + pod = self._label_to_tag('pod', metric.label, scraper_config) + phase = self._label_to_tag('phase', metric.label, scraper_config) + message = "{} is currently reporting {}".format(pod, phase) else: - message = "{} is currently reporting {}".format(self._label_to_tag('node', metric.label), - self._label_to_tag('condition', metric.label)) + node = self._label_to_tag('node', metric.label, scraper_config) + condition = self._label_to_tag('condition', metric.label, scraper_config) + message = "{} is currently reporting {}".format(node, condition) if condition_map['service_check_name'] is None: self.log.debug("Unable to handle {} - unknown condition {}".format(service_check_name, label_value)) @@ -310,14 +341,14 @@ def _extract_label_value(self, name, labels): return label.value return None - def _format_tag(self, name, value): + def _format_tag(self, name, value, scraper_config): """ Lookups the labels_mapper table to see if replacing the tag name is necessary, then returns a "name:value" tag string """ - return '%s:%s' % (self.labels_mapper.get(name, name), value) + return '%s:%s' % (scraper_config['labels_mapper'].get(name, name), value) - def _label_to_tag(self, name, labels, tag_name=None): + def _label_to_tag(self, name, labels, scraper_config, tag_name=None): """ Search for `name` in labels name and returns corresponding tag string. Tag name is label name if not specified. @@ -325,7 +356,7 @@ def _label_to_tag(self, name, labels, tag_name=None): """ value = self._extract_label_value(name, labels) if value: - return self._format_tag(tag_name or name, value) + return self._format_tag(tag_name or name, value, scraper_config) else: return None @@ -341,74 +372,76 @@ def _trim_job_tag(self, name): # From the phase the check will update its status # Also submits as an aggregated count with minimal tags so it is # visualisable over time per namespace and phase - def kube_pod_status_phase(self, message, **kwargs): + def kube_pod_status_phase(self, message, scraper_config): """ Phase a pod is in. """ - metric_name = self.NAMESPACE + '.pod.status_phase' + metric_name = scraper_config['NAMESPACE'] + '.pod.status_phase' # Will submit a service check which status is given by its phase. # More details about the phase in the message of the check. - check_basename = self.NAMESPACE + '.pod.phase' + check_basename = scraper_config['NAMESPACE'] + '.pod.phase' status_phase_counter = Counter() for metric in message.metric: - self._condition_to_tag_check(metric, check_basename, self.pod_phase_to_status, - tags=[self._label_to_tag("pod", metric.label), - self._label_to_tag("namespace", metric.label)] + self.custom_tags) + pod_tag = self._label_to_tag("pod", metric.label, scraper_config) + namespace_tag = self._label_to_tag("namespace", metric.label, scraper_config) + self._condition_to_tag_check(metric, check_basename, self.pod_phase_to_status, scraper_config, + tags=[pod_tag, namespace_tag] + scraper_config['custom_tags']) # Counts aggregated cluster-wide to avoid no-data issues on pod churn, # pod granularity available in the service checks tags = [ - self._label_to_tag("namespace", metric.label), - self._label_to_tag("phase", metric.label) - ] + self.custom_tags + self._label_to_tag("namespace", metric.label, scraper_config), + self._label_to_tag("phase", metric.label, scraper_config) + ] + scraper_config['custom_tags'] status_phase_counter[tuple(sorted(tags))] += metric.gauge.value for tags, count in status_phase_counter.iteritems(): self.gauge(metric_name, count, tags=list(tags)) - def kube_pod_container_status_waiting_reason(self, message, **kwargs): - metric_name = self.NAMESPACE + '.container.status_report.count.waiting' + def kube_pod_container_status_waiting_reason(self, message, scraper_config): + metric_name = scraper_config['NAMESPACE'] + '.container.status_report.count.waiting' for metric in message.metric: tags = [] skip_metric = False for label in metric.label: if label.name == "reason": if label.value.lower() in WHITELISTED_WAITING_REASONS: - tags.append(self._format_tag(label.name, label.value)) + tags.append(self._format_tag(label.name, label.value, scraper_config)) else: skip_metric = True elif label.name == "container": - tags.append(self._format_tag("kube_container_name", label.value)) + tags.append(self._format_tag("kube_container_name", label.value, scraper_config)) elif label.name == "namespace": - tags.append(self._format_tag(label.name, label.value)) + tags.append(self._format_tag(label.name, label.value, scraper_config)) if not skip_metric: - self.count(metric_name, metric.gauge.value, tags + self.custom_tags) + self.count(metric_name, metric.gauge.value, tags + scraper_config['custom_tags']) - def kube_pod_container_status_terminated_reason(self, message, **kwargs): - metric_name = self.NAMESPACE + '.container.status_report.count.terminated' + def kube_pod_container_status_terminated_reason(self, message, scraper_config): + metric_name = scraper_config['NAMESPACE'] + '.container.status_report.count.terminated' for metric in message.metric: tags = [] skip_metric = False for label in metric.label: if label.name == "reason": if label.value.lower() in WHITELISTED_TERMINATED_REASONS: - tags.append(self._format_tag(label.name, label.value)) + tags.append(self._format_tag(label.name, label.value, scraper_config)) else: skip_metric = True elif label.name == "container": - tags.append(self._format_tag("kube_container_name", label.value)) + tags.append(self._format_tag("kube_container_name", label.value, scraper_config)) elif label.name == "namespace": - tags.append(self._format_tag(label.name, label.value)) + tags.append(self._format_tag(label.name, label.value, scraper_config)) if not skip_metric: - self.count(metric_name, metric.gauge.value, tags + self.custom_tags) + self.count(metric_name, metric.gauge.value, tags + scraper_config['custom_tags']) - def kube_cronjob_next_schedule_time(self, message, **kwargs): + def kube_cronjob_next_schedule_time(self, message, scraper_config): """ Time until the next schedule """ # Used as a service check so that one can be alerted if the cronjob's next schedule is in the past - check_basename = self.NAMESPACE + '.cronjob.on_schedule_check' + check_basename = scraper_config['NAMESPACE'] + '.cronjob.on_schedule_check' curr_time = int(time.time()) for metric in message.metric: on_schedule = int(metric.gauge.value) - curr_time - tags = [self._format_tag(label.name, label.value) for label in metric.label] + self.custom_tags + tags = [self._format_tag(label.name, label.value, scraper_config) for label in metric.label] + tags += scraper_config['custom_tags'] if on_schedule < 0: message = "The service check scheduled at {} is {} seconds late".format( time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(int(metric.gauge.value))), on_schedule @@ -417,144 +450,151 @@ def kube_cronjob_next_schedule_time(self, message, **kwargs): else: self.service_check(check_basename, self.OK, tags=tags) - def kube_job_complete(self, message, **kwargs): - service_check_name = self.NAMESPACE + '.job.complete' + def kube_job_complete(self, message, scraper_config): + service_check_name = scraper_config['NAMESPACE'] + '.job.complete' for metric in message.metric: tags = [] for label in metric.label: if label.name == 'job': trimmed_job = self._trim_job_tag(label.value) - tags.append(self._format_tag(label.name, trimmed_job)) + tags.append(self._format_tag(label.name, trimmed_job, scraper_config)) else: - tags.append(self._format_tag(label.name, label.value)) - self.service_check(service_check_name, self.OK, tags=tags + self.custom_tags) + tags.append(self._format_tag(label.name, label.value, scraper_config)) + self.service_check(service_check_name, self.OK, tags=tags + scraper_config['custom_tags']) - def kube_job_failed(self, message, **kwargs): - service_check_name = self.NAMESPACE + '.job.complete' + def kube_job_failed(self, message, scraper_config): + service_check_name = scraper_config['NAMESPACE'] + '.job.complete' for metric in message.metric: tags = [] for label in metric.label: if label.name == 'job': trimmed_job = self._trim_job_tag(label.value) - tags.append(self._format_tag(label.name, trimmed_job)) + tags.append(self._format_tag(label.name, trimmed_job, scraper_config)) else: - tags.append(self._format_tag(label.name, label.value)) - self.service_check(service_check_name, self.CRITICAL, tags=tags + self.custom_tags) + tags.append(self._format_tag(label.name, label.value, scraper_config)) + self.service_check(service_check_name, self.CRITICAL, tags=tags + scraper_config['custom_tags']) - def kube_job_status_failed(self, message, **kwargs): + def kube_job_status_failed(self, message, scraper_config): for metric in message.metric: - tags = [] + self.custom_tags + tags = [] + scraper_config['custom_tags'] for label in metric.label: if label.name == 'job': trimmed_job = self._trim_job_tag(label.value) - tags.append(self._format_tag(label.name, trimmed_job)) + tags.append(self._format_tag(label.name, trimmed_job, scraper_config)) else: - tags.append(self._format_tag(label.name, label.value)) + tags.append(self._format_tag(label.name, label.value, scraper_config)) self.job_failed_count[frozenset(tags)] += metric.gauge.value - def kube_job_status_succeeded(self, message, **kwargs): + def kube_job_status_succeeded(self, message, scraper_config): for metric in message.metric: - tags = [] + self.custom_tags + tags = [] + scraper_config['custom_tags'] for label in metric.label: if label.name == 'job': trimmed_job = self._trim_job_tag(label.value) - tags.append(self._format_tag(label.name, trimmed_job)) + tags.append(self._format_tag(label.name, trimmed_job, scraper_config)) else: - tags.append(self._format_tag(label.name, label.value)) + tags.append(self._format_tag(label.name, label.value, scraper_config)) self.job_succeeded_count[frozenset(tags)] += metric.gauge.value - def kube_node_status_condition(self, message, **kwargs): + def kube_node_status_condition(self, message, scraper_config): """ The ready status of a cluster node. v1.0+""" - base_check_name = self.NAMESPACE + '.node' - metric_name = self.NAMESPACE + '.nodes.by_condition' + base_check_name = scraper_config['NAMESPACE'] + '.node' + metric_name = scraper_config['NAMESPACE'] + '.nodes.by_condition' by_condition_counter = Counter() for metric in message.metric: - self._condition_to_tag_check(metric, base_check_name, self.condition_to_status_positive, - tags=[self._label_to_tag("node", metric.label)] + self.custom_tags) + node_tag = self._label_to_tag("node", metric.label, scraper_config) + self._condition_to_tag_check(metric, base_check_name, self.condition_to_status_positive, scraper_config, + tags=[node_tag] + scraper_config['custom_tags']) # Counts aggregated cluster-wide to avoid no-data issues on node churn, # node granularity available in the service checks tags = [ - self._label_to_tag("condition", metric.label), - self._label_to_tag("status", metric.label) - ] + self.custom_tags + self._label_to_tag("condition", metric.label, scraper_config), + self._label_to_tag("status", metric.label, scraper_config) + ] + scraper_config['custom_tags'] by_condition_counter[tuple(sorted(tags))] += metric.gauge.value for tags, count in by_condition_counter.iteritems(): self.gauge(metric_name, count, tags=list(tags)) - def kube_node_status_ready(self, message, **kwargs): + def kube_node_status_ready(self, message, scraper_config): """ The ready status of a cluster node (legacy)""" - service_check_name = self.NAMESPACE + '.node.ready' + service_check_name = scraper_config['NAMESPACE'] + '.node.ready' for metric in message.metric: + node_tag = self._label_to_tag("node", metric.label, scraper_config) self._condition_to_service_check(metric, service_check_name, self.condition_to_status_positive, - tags=[self._label_to_tag("node", metric.label)] + self.custom_tags) + tags=[node_tag] + scraper_config['custom_tags']) - def kube_node_status_out_of_disk(self, message, **kwargs): + def kube_node_status_out_of_disk(self, message, scraper_config): """ Whether the node is out of disk space (legacy)""" - service_check_name = self.NAMESPACE + '.node.out_of_disk' + service_check_name = scraper_config['NAMESPACE'] + '.node.out_of_disk' for metric in message.metric: + node_tag = self._label_to_tag("node", metric.label, scraper_config) self._condition_to_service_check(metric, service_check_name, self.condition_to_status_negative, - tags=[self._label_to_tag("node", metric.label)] + self.custom_tags) + tags=[node_tag] + scraper_config['custom_tags']) - def kube_node_status_memory_pressure(self, message, **kwargs): + def kube_node_status_memory_pressure(self, message, scraper_config): """ Whether the node is in a memory pressure state (legacy)""" - service_check_name = self.NAMESPACE + '.node.memory_pressure' + service_check_name = scraper_config['NAMESPACE'] + '.node.memory_pressure' for metric in message.metric: + node_tag = self._label_to_tag("node", metric.label, scraper_config) self._condition_to_service_check(metric, service_check_name, self.condition_to_status_negative, - tags=[self._label_to_tag("node", metric.label)] + self.custom_tags) + tags=[node_tag] + scraper_config['custom_tags']) - def kube_node_status_disk_pressure(self, message, **kwargs): + def kube_node_status_disk_pressure(self, message, scraper_config): """ Whether the node is in a disk pressure state (legacy)""" - service_check_name = self.NAMESPACE + '.node.disk_pressure' + service_check_name = scraper_config['NAMESPACE'] + '.node.disk_pressure' for metric in message.metric: + node_tag = self._label_to_tag("node", metric.label, scraper_config) self._condition_to_service_check(metric, service_check_name, self.condition_to_status_negative, - tags=[self._label_to_tag("node", metric.label)] + self.custom_tags) + tags=[node_tag] + scraper_config['custom_tags']) - def kube_node_status_network_unavailable(self, message, **kwargs): + def kube_node_status_network_unavailable(self, message, scraper_config): """ Whether the node is in a network unavailable state (legacy)""" - service_check_name = self.NAMESPACE + '.node.network_unavailable' + service_check_name = scraper_config['NAMESPACE'] + '.node.network_unavailable' for metric in message.metric: + node_tag = self._label_to_tag("node", metric.label, scraper_config) self._condition_to_service_check(metric, service_check_name, self.condition_to_status_negative, - tags=[self._label_to_tag("node", metric.label)] + self.custom_tags) + tags=[node_tag] + scraper_config['custom_tags']) - def kube_node_spec_unschedulable(self, message, **kwargs): + def kube_node_spec_unschedulable(self, message, scraper_config): """ Whether a node can schedule new pods. """ - metric_name = self.NAMESPACE + '.node.status' + metric_name = scraper_config['NAMESPACE'] + '.node.status' statuses = ('schedulable', 'unschedulable') if message.type < len(METRIC_TYPES): for metric in message.metric: - tags = [self._format_tag(label.name, label.value) for label in metric.label] + self.custom_tags + tags = [self._format_tag(label.name, label.value, scraper_config) for label in metric.label] + tags += scraper_config['custom_tags'] status = statuses[int(getattr(metric, METRIC_TYPES[message.type]).value)] # value can be 0 or 1 - tags.append(self._format_tag('status', status)) + tags.append(self._format_tag('status', status, scraper_config)) self.gauge(metric_name, 1, tags) # metric value is always one, value is on the tags else: self.log.error("Metric type %s unsupported for metric %s" % (message.type, message.name)) - def kube_resourcequota(self, message, **kwargs): + def kube_resourcequota(self, message, scraper_config): """ Quota and current usage by resource type. """ - metric_base_name = self.NAMESPACE + '.resourcequota.{}.{}' + metric_base_name = scraper_config['NAMESPACE'] + '.resourcequota.{}.{}' suffixes = {'used': 'used', 'hard': 'limit'} if message.type < len(METRIC_TYPES): for metric in message.metric: mtype = self._extract_label_value("type", metric.label) resource = self._extract_label_value("resource", metric.label) tags = [ - self._label_to_tag("namespace", metric.label), - self._label_to_tag("resourcequota", metric.label) - ] + self.custom_tags + self._label_to_tag("namespace", metric.label, scraper_config), + self._label_to_tag("resourcequota", metric.label, scraper_config) + ] + scraper_config['custom_tags'] val = getattr(metric, METRIC_TYPES[message.type]).value self.gauge(metric_base_name.format(resource, suffixes[mtype]), val, tags) else: self.log.error("Metric type %s unsupported for metric %s" % (message.type, message.name)) - def kube_limitrange(self, message, **kwargs): + def kube_limitrange(self, message, scraper_config): """ Resource limits by consumer type. """ # type's cardinality's low: https://github.com/kubernetes/kubernetes/blob/v1.6.1/pkg/api/v1/types.go#L3872-L3879 # idem for resource: https://github.com/kubernetes/kubernetes/blob/v1.6.1/pkg/api/v1/types.go#L3342-L3352 # idem for constraint: https://github.com/kubernetes/kubernetes/blob/v1.6.1/pkg/api/v1/types.go#L3882-L3901 - metric_base_name = self.NAMESPACE + '.limitrange.{}.{}' + metric_base_name = scraper_config['NAMESPACE'] + '.limitrange.{}.{}' constraints = { 'min': 'min', 'max': 'max', @@ -565,7 +605,7 @@ def kube_limitrange(self, message, **kwargs): if message.type < len(METRIC_TYPES): for metric in message.metric: - constraint = self._extract_label_value("constraint", metric.label) + constraint = self._extract_label_value('constraint', metric.label) if constraint in constraints: constraint = constraints[constraint] else: @@ -573,10 +613,10 @@ def kube_limitrange(self, message, **kwargs): continue resource = self._extract_label_value("resource", metric.label) tags = [ - self._label_to_tag("namespace", metric.label), - self._label_to_tag("limitrange", metric.label), - self._label_to_tag("type", metric.label, tag_name="consumer_type") - ] + self.custom_tags + self._label_to_tag("namespace", metric.label, scraper_config), + self._label_to_tag("limitrange", metric.label, scraper_config), + self._label_to_tag("type", metric.label, scraper_config, tag_name="consumer_type") + ] + scraper_config['custom_tags'] val = getattr(metric, METRIC_TYPES[message.type]).value self.gauge(metric_base_name.format(resource, constraint), val, tags) else: diff --git a/kubernetes_state/setup.py b/kubernetes_state/setup.py index d49bda1f7d292..4909a67d86c74 100644 --- a/kubernetes_state/setup.py +++ b/kubernetes_state/setup.py @@ -16,13 +16,6 @@ with open(path.join(HERE, 'README.md'), encoding='utf-8') as f: long_description = f.read() - -# Parse requirements -def get_requirements(fpath): - with open(path.join(HERE, fpath), encoding='utf-8') as f: - return f.readlines() - - CHECKS_BASE_REQ = 'datadog_checks_base' setup( @@ -59,9 +52,6 @@ def get_requirements(fpath): # Run-time dependencies install_requires=[CHECKS_BASE_REQ], - # Testing setup and dependencies - tests_require=get_requirements('requirements-dev.txt'), - # Extra files to ship with the wheel package include_package_data=True, ) diff --git a/kubernetes_state/tests/test_kubernetes_state.py b/kubernetes_state/tests/test_kubernetes_state.py index 9436da3a2f184..53cbd32c98d13 100644 --- a/kubernetes_state/tests/test_kubernetes_state.py +++ b/kubernetes_state/tests/test_kubernetes_state.py @@ -168,8 +168,8 @@ def instance(): @pytest.fixture -def check(): - check = KubernetesState(CHECK_NAME, {}, {}, [instance()]) +def check(instance): + check = KubernetesState(CHECK_NAME, {}, {}, [instance]) with open(os.path.join(HERE, 'fixtures', 'prometheus.txt'), 'rb') as f: check.poll = mock.MagicMock(return_value=MockResponse(f.read(), 'text/plain')) @@ -272,8 +272,11 @@ def test_join_custom_labels(aggregator, instance, check): } } + endpoint = instance['kube_state_url'] + + scraper_config = check.config_map[endpoint] # this would be normally done in the __init__ function of the check - check.label_joins.update(instance['label_joins']) + scraper_config['label_joins'].update(instance['label_joins']) # run check twice to have pod/node mapping for _ in range(2): @@ -288,8 +291,12 @@ def test_join_custom_labels(aggregator, instance, check): def test_disabling_hostname_override(instance): + endpoint = instance['kube_state_url'] check = KubernetesState(CHECK_NAME, {}, {}, [instance]) - assert check.label_to_hostname == "node" + scraper_config = check.config_map[endpoint] + assert scraper_config['label_to_hostname'] == "node" + instance["hostname_override"] = False check = KubernetesState(CHECK_NAME, {}, {}, [instance]) - assert check.label_to_hostname is None + scraper_config = check.config_map[endpoint] + assert scraper_config['label_to_hostname'] is None