Skip to content

Commit

Permalink
Update metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
FlorianVeaux committed Sep 10, 2019
1 parent 89cdbf9 commit d423695
Show file tree
Hide file tree
Showing 6 changed files with 440 additions and 225 deletions.
167 changes: 167 additions & 0 deletions mapr/datadog_checks/mapr/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
COUNT_METRICS = {
'mapr.cache.lookups_data',
'mapr.cache.lookups_dir',
'mapr.cache.lookups_inode',
'mapr.cache.lookups_largefile',
'mapr.cache.lookups_meta',
'mapr.cache.lookups_smallfile',
'mapr.cache.lookups_table',
'mapr.cache.misses_data',
'mapr.cache.misses_dir',
'mapr.cache.misses_inode',
'mapr.cache.misses_largefile',
'mapr.cache.misses_meta',
'mapr.cache.misses_smallfile',
'mapr.cache.misses_table',
'mapr.cldb.rpc_received',
'mapr.cldb.rpcs_failed',
'mapr.db.append_bytes',
'mapr.db.append_rpcrows',
'mapr.db.append_rpcs',
'mapr.db.cdc.sent_bytes',
'mapr.db.checkandput_bytes',
'mapr.db.checkandput_rpcrows',
'mapr.db.checkandput_rpcs',
'mapr.db.flushes',
'mapr.db.forceflushes',
'mapr.db.fullcompacts',
'mapr.db.get_bytes',
'mapr.db.get_readrows',
'mapr.db.get_resprows',
'mapr.db.get_rpcs',
'mapr.db.increment_bytes',
'mapr.db.increment_rpcrows',
'mapr.db.increment_rpcs',
'mapr.db.minicompacts',
'mapr.db.put_bytes',
'mapr.db.put_readrows',
'mapr.db.put_rpcrows',
'mapr.db.put_rpcs',
'mapr.db.repl.sent_bytes',
'mapr.db.scan_bytes',
'mapr.db.scan_readrows',
'mapr.db.scan_resprows',
'mapr.db.scan_rpcs',
'mapr.db.table.read_bytes',
'mapr.db.table.read_rows',
'mapr.db.table.resp_rows',
'mapr.db.table.rpcs',
'mapr.db.table.value_cache_hits',
'mapr.db.table.value_cache_lookups',
'mapr.db.table.write_bytes',
'mapr.db.table.write_rows',
'mapr.db.ttlcompacts',
'mapr.db.updateandget_bytes',
'mapr.db.updateandget_rpcrows',
'mapr.db.updateandget_rpcs',
'mapr.db.valuecache_hits',
'mapr.db.valuecache_lookups',
'mapr.drill.queries_completed',
'mapr.fs.bulk_writes',
'mapr.fs.bulk_writesbytes',
'mapr.fs.kvstore_delete',
'mapr.fs.kvstore_insert',
'mapr.fs.kvstore_lookup',
'mapr.fs.kvstore_scan',
'mapr.fs.local_readbytes',
'mapr.fs.local_reads',
'mapr.fs.local_writebytes',
'mapr.fs.local_writes',
'mapr.fs.read_bytes',
'mapr.fs.read_cachehits',
'mapr.fs.read_cachemisses',
'mapr.fs.reads',
'mapr.fs.statstype_create',
'mapr.fs.statstype_lookup',
'mapr.fs.statstype_read',
'mapr.fs.statstype_write',
'mapr.fs.write_bytes',
'mapr.fs.writes',
'mapr.io.write_bytes',
'mapr.io.writes',
'mapr.rpc.bytes_recd',
'mapr.rpc.bytes_sent',
'mapr.rpc.calls_recd',
'mapr.streams.listen_bytes',
'mapr.streams.listen_msgs',
'mapr.streams.listen_rpcs',
'mapr.streams.produce_bytes',
'mapr.streams.produce_msgs',
'mapr.streams.produce_rpcs',
'mapr.volmetrics.read_ops',
'mapr.volmetrics.write_ops'
}

MONOTONIC_COUNTER_METRICS = {
'mapr.cldb.containers_created',
'mapr.process.context_switch_involuntary',
'mapr.process.context_switch_voluntary',
'mapr.process.cpu_time.syst',
'mapr.process.cpu_time.user',
'mapr.process.disk_octets.read',
'mapr.process.disk_octets.write',
'mapr.process.disk_ops.read',
'mapr.process.disk_ops.write',
'mapr.process.page_faults.majflt',
'mapr.process.page_faults.minflt',
}

GAUGE_METRICS = {
'mapr.alarms.alarm_raised',
'mapr.cldb.cluster_cpu_total',
'mapr.cldb.cluster_cpubusy_percent',
'mapr.cldb.cluster_disk_capacity',
'mapr.cldb.cluster_diskspace_used',
'mapr.cldb.cluster_memory_capacity',
'mapr.cldb.cluster_memory_used',
'mapr.cldb.containers',
'mapr.cldb.containers_unusable',
'mapr.cldb.disk_space_available',
'mapr.cldb.nodes_in_cluster',
'mapr.cldb.nodes_offline',
'mapr.cldb.storage_pools_cluster',
'mapr.cldb.storage_pools_offline',
'mapr.cldb.volumes',
'mapr.db.cdc.pending_bytes',
'mapr.db.get_currpcs',
'mapr.db.index.pending_bytes',
'mapr.db.put_currpcs',
'mapr.db.repl.pending_bytes',
'mapr.db.scan_currpcs',
'mapr.db.table.latency',
'mapr.db.valuecache_usedSize',
'mapr.drill.allocator_root_peak',
'mapr.drill.allocator_root_used',
'mapr.drill.blocked_count',
'mapr.drill.count',
'mapr.drill.fd_usage',
'mapr.drill.fragments_running',
'mapr.drill.heap_used',
'mapr.drill.non_heap_used',
'mapr.drill.queries_running',
'mapr.drill.runnable_count',
'mapr.drill.waiting_count',
'mapr.io.read_bytes',
'mapr.io.reads',
'mapr.process.cpu_percent',
'mapr.process.data',
'mapr.process.mem_percent',
'mapr.process.rss',
'mapr.process.vm',
'mapr.status.ok',
'mapr.streams.listen_currpcs',
'mapr.topology.disks_total_capacity',
'mapr.topology.disks_used_capacity',
'mapr.topology.utilization',
'mapr.volmetrics.read_latency',
'mapr.volmetrics.read_throughput',
'mapr.volmetrics.write_latency',
'mapr.volmetrics.write_throughput',
'mapr.volume.logical_used',
'mapr.volume.quota',
'mapr.volume.snapshot_used',
'mapr.volume.total_used',
'mapr.volume.used'
}

ALLOWED_METRICS = GAUGE_METRICS.union(COUNT_METRICS).union(MONOTONIC_COUNTER_METRICS)
24 changes: 13 additions & 11 deletions mapr/datadog_checks/mapr/data/conf.yaml.example
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
init_config:

instances:
## @param hostname - string - required
## The MapR host to monitor.
-
## @param hostname - string - optional
## The MapR host to monitor. This is used to find the correct topic to read metrics from.
## https://mapr.com/docs/61/AdministratorGuide/spyglass-on-streams.html
#
- mapr_host: <MAPR_HOST>
# hostname: <MAPR_HOST>

## @param topic_path - string - required
## @param stream_path - string - optional - default: /var/mapr/mapr.monitoring/metricstreams
## The MapR topic path.
#
topic_path: /var/mapr/mapr.monitoring/metricstreams
# stream_path: /var/mapr/mapr.monitoring/metricstreams

## @param whitelist - list - required
## List regexes of metrics to collect. They will be prefixed with `mapr.`
## @param metrics - list - optional - default: ['.*']
## List regexes of metrics to collect. Note that you can only collect metrics
## starting with "mapr."

This comment has been minimized.

Copy link
@hithwen

hithwen Sep 10, 2019

Contributor

I think this can be mislead people to add mapr.foo.* to the list. That can be ok too if instead of adding the prefix we check that is present and throw a configuration error otherwise. Maybe that's easier on the users

This comment has been minimized.

Copy link
@FlorianVeaux

FlorianVeaux Sep 11, 2019

Author Member

We're not adding the prefix anymore, so that people have to explicitly write mapr.fs.* if they want all FS metrics.
I think that's easier on the user to have a one-to-one matching with what they would have on datadog.

This comment has been minimized.

Copy link
@hithwen

hithwen Sep 13, 2019

Contributor

That sounds good, and you're right its easier on the user but we should throw a config option if they put things that do not start with mapr. Otherwise they can get jmx metrics

#
whitelist:
- fs.*
# metrics: ['.*']

## @param mapr_ticketfile_location - string - optional
## @param ticket_location - string - optional
## The path to the MapR user ticket, if included it overrides the MAPR_TICKETFILE_LOCATION environment variable.
## Either the environment variable or this config option needs to be set if security is enabled on the cluster.
#
# mapr_ticketfile_location: <MAPR_TICKETFILE_LOCATION>
# ticket_location: <TICKETFILE_LOCATION>

## @param tags - list of key:value elements - optional
## A list of tags to attach to every metric, event, and service check emitted by this integration.
Expand Down
141 changes: 80 additions & 61 deletions mapr/datadog_checks/mapr/mapr.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,102 +6,121 @@
import re

from six import iteritems
try:
# The `confluent_kafka` library here is the one made by mapr
import confluent_kafka as ck
except ImportError as e:
ck = None

from datadog_checks.base import AgentCheck
from datadog_checks.base.errors import CheckException
from .common import ALLOWED_METRICS, GAUGE_METRICS, MONOTONIC_COUNTER_METRICS, COUNT_METRICS
from .utils import get_fqdn, get_stream_id_for_topic

try:
# This should be `confluent_kafka` but made by mapr!
from confluent_kafka import Consumer, KafkaError
except ImportError as e:
print("Unable to import library `confluent_kafka`, make sure it is installed and LD_LIBRARY_PATH is set correctly")
raise e
# on our infra you can run
# export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/mapr/lib:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.el7_6.x86_64/jre/lib/amd64/server/ # noqa

DEFAULT_STREAM_PATH = "/var/mapr/mapr.monitoring/metricstreams"
STATUS_METRIC = "mapr.status.ok"


class MaprCheck(AgentCheck):
def __init__(self, name, init_config, agentConfig, instances):
super(MaprCheck, self).__init__(name, init_config, agentConfig, instances)

def __init__(self, name, init_config, instances):
super(MaprCheck, self).__init__(name, init_config, instances)
self._conn = None
self.mapr_host = instances[0]['mapr_host']
self.topic_path = instances[0]['topic_path']
self.allowed_metrics = [re.compile('mapr.{}'.format(w)) for w in instances[0]['whitelist']]
self.hostname = self.instance.get('hostname', get_fqdn())
self.topic_path = "{stream_path}/{stream_id}:{topic_name}".format(
stream_path=self.instance.get('stream_path', DEFAULT_STREAM_PATH),
stream_id=get_stream_id_for_topic(self.hostname),
topic_name=self.hostname
)
self.allowed_metrics = [re.compile(w) for w in self.instance.get('metrics', [])]
self.base_tags = self.instance.get('tags', [])

mapr_ticketfile_location = instances[0].get('mapr_ticketfile_location')
if mapr_ticketfile_location:
os.environ['MAPR_TICKETFILE_LOCATION'] = mapr_ticketfile_location
auth_ticket = self.instance.get('ticket_location')
if auth_ticket:
os.environ['MAPR_TICKETFILE_LOCATION'] = auth_ticket
elif not os.environ.get('MAPR_TICKETFILE_LOCATION'):
self.log.debug(
self.log.info(
"MAPR_TICKETFILE_LOCATION environment variable not set, this may cause authentication issues"
)

def check(self, instance):
tags = instance.get('tags', [])
def check(self, _):
if ck is None:
raise CheckException(
"confluent_kafka was not imported correctly, make sure the library is installed and that you've"
"set LD_LIBRARY_PATH correctly. Please refer to datadog documentation for more details."
)

conn = self.get_connection()
# TODO: assert that the topic exists, otherwise the check polls from nowhere
while True:
m = self.conn.poll(timeout=1.0)
if m is None:
msg = conn.poll(timeout=0.4)
if msg is None:
# Timed out, no more messages
break
if m.error() is None:

if msg.error() is None:
# Metric received
try:
kafka_metric = json.loads(m.value().decode('utf-8'))[0]
self.submit_metric(kafka_metric, tags)
metric = json.loads(msg.value().decode('utf-8'))[0]
metric_name = metric['metric']
if self.should_collect_metric(metric_name):
# Will sometimes submit the same metric multiple time, but because it's only
# gauges and monotonic_counter that's fine.
self.submit_metric(metric)
except Exception as e:
self.log.error("Received unexpected message %s, it wont be processed", m.value())
self.log.warning("Received unexpected message %s, wont be processed", msg.value())
self.log.exception(e)
elif m.error().code() != KafkaError._PARTITION_EOF:
elif msg.error().code() != ck.KafkaError._PARTITION_EOF:
# Real error happened
self.log.error(m.error())
break
else:
self.log.debug(m.error())

@staticmethod
def get_stream_id(topic_name, rng=2):
"""To distribute load, all the topics are not in the same stream. Each topic named is hashed
to obtain an id which is in turn the name of the stream"""
h = 5381
for c in topic_name:
h = ((h << 5) + h) + ord(c)
return abs(h % rng)

@property
def conn(self):
raise CheckException(msg.error())

self.gauge(STATUS_METRIC, 1)

def get_connection(self):
if self._conn:
return self._conn

topic_name = self.mapr_host # According to docs we should append the metric name.
stream_id = MaprCheck.get_stream_id(topic_name, rng=2)

topic_path = "{}:{}".format(os.path.join(self.topic_path, str(stream_id)), topic_name)
self._conn = Consumer(
self._conn = ck.Consumer(
{
"group.id": "dd-agent", # uniquely identify this consumer
"enable.auto.commit": False # important, we don't need to store the offset for this consumer,
# and if we do it just once the mapr library has a bug which prevents reading from the head
}
)
self._conn.subscribe([topic_path])
self._conn.subscribe([self.topic_path])
return self._conn

def should_collect_metric(self, metric_name):
if metric_name not in ALLOWED_METRICS:
# Metric is not part of datadog allowed list
return False
if not self.allowed_metrics:
# No filter specified, allow everything
return True

for reg in self.allowed_metrics:
if re.match(reg, metric_name):
# Metric matched one pattern
return True
else:
self.log.debug("Ignoring non whitelisted metric %s", metric_name)

def submit_metric(self, metric, additional_tags):
self.log.debug("Ignoring non whitelisted metric: %s", metric_name)
return False

def submit_metric(self, metric):
metric_name = metric['metric']
if self.should_collect_metric(metric_name):
tags = ["{}:{}".format(k, v) for k, v in iteritems(metric['tags'])] + additional_tags
if 'buckets' in metric:
for bounds, value in metric['buckets'].items():
lower, upper = bounds.split(',')
self.submit_histogram_bucket(
metric_name, value, int(lower), int(upper), monotonic=True, hostname=self.hostname, tags=tags
)
else:
# No distinction between gauge and count metrics, this should be hardcoded metric by metric
tags = self.base_tags + ["{}:{}".format(k, v) for k, v in iteritems(metric['tags'])]

if 'buckets' in metric:
for bounds, value in metric['buckets'].items():
lower, upper = bounds.split(',')
self.submit_histogram_bucket(
metric_name, value, int(lower), int(upper), monotonic=True, hostname=self.hostname, tags=tags
)
else:
if metric_name in GAUGE_METRICS:
self.gauge(metric_name, metric['value'], tags=tags)
elif metric_name in MONOTONIC_COUNTER_METRICS:
self.monotonic_count(metric_name, metric['value'], tags=tags)
elif metric_name in COUNT_METRICS:
self.count(metric_name, metric['value'], tags=tags)
Loading

0 comments on commit d423695

Please sign in to comment.