Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] service metadata collection #1611

Merged
merged 2 commits into from
Jun 9, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checks.d/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ def _get_es_version(self, config):
)
version = [1, 0, 0]

self.service_metadata('version', version)
self.log.debug("Elasticsearch version is %s" % version)
return version

Expand Down
11 changes: 10 additions & 1 deletion checks.d/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ def check(self, instance):

db = self._connect(host, port, mysql_sock, user, password, defaults_file)

# Metadata collection
self._collect_metadata(db, host)

# Metric collection
self._collect_metrics(host, db, tags, options)
if Platform.is_linux():
Expand Down Expand Up @@ -188,6 +191,9 @@ def _collect_metrics(self, host, db, tags, options):
"SHOW SLAVE STATUS", db, tags=tags
)

def _collect_metadata(self, db, host):
self._get_version(db, host)

def _rate_or_gauge_statuses(self, statuses, dbResults, tags):
for status, metric in statuses.iteritems():
metric_name, metric_type = metric
Expand Down Expand Up @@ -227,7 +233,9 @@ def _version_greater_502(self, db, host):

def _get_version(self, db, host):
if host in self.mysql_version:
return self.mysql_version[host]
version = self.mysql_version[host]
self.service_metadata('version', ".".join(version))
return version

# Get MySQL version
cursor = db.cursor()
Expand All @@ -240,6 +248,7 @@ def _get_version(self, db, host):
version = result[0].split('-')
version = version[0].split('.')
self.mysql_version[host] = version
self.service_metadata('version', ".".join(version))
return version

def _collect_scalar(self, key, dict):
Expand Down
6 changes: 5 additions & 1 deletion checks.d/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@

MAX_CUSTOM_RESULTS = 100

class ShouldRestartException(Exception): pass

class ShouldRestartException(Exception):
pass


class PostgreSql(AgentCheck):
"""Collects per-database, and optionally per-relation metrics, custom metrics
Expand Down Expand Up @@ -263,6 +266,7 @@ def _get_version(self, key, db):
version = result[0]
self.versions[key] = version

self.service_metadata('version', self.versions[key])
return self.versions[key]

def _is_above(self, key, db, version_to_compare):
Expand Down
7 changes: 6 additions & 1 deletion checks.d/redisdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,12 @@ def _check_db(self, instance, custom_tags=None):
# Ping the database for info, and track the latency.
# Process the service check: the check passes if we can connect to Redis
start = time.time()
info = None
try:
info = conn.info()
status = AgentCheck.OK
self.service_check('redis.can_connect', status, tags=tags_to_add)
self._collect_metadata(info)
except ValueError, e:
status = AgentCheck.CRITICAL
self.service_check('redis.can_connect', status, tags=tags_to_add)
Expand Down Expand Up @@ -271,7 +273,6 @@ def _check_replication(self, info, tags):
self.service_check('redis.replication.master_link_status', status, tags=tags)
self.gauge('redis.replication.master_link_down_since_seconds', down_seconds, tags=tags)


def _check_slowlog(self, instance, custom_tags):
"""Retrieve length and entries from Redis' SLOWLOG

Expand Down Expand Up @@ -331,3 +332,7 @@ def check(self, instance):

self._check_db(instance, custom_tags)
self._check_slowlog(instance, custom_tags)

def _collect_metadata(self, info):
if info and 'redis_version' in info:
self.service_metadata('version', info['redis_version'])
73 changes: 56 additions & 17 deletions checks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class UnknownValue(CheckException): pass
# and not this class. This class will be removed in a future version
# of the agent.
#==============================================================================


class Check(object):
"""
(Abstract) class for all checks with the ability to:
Expand All @@ -57,16 +59,14 @@ class Check(object):
* only log error messages once (instead of each time they occur)

"""


def __init__(self, logger):
# where to store samples, indexed by metric_name
# metric_name: {("sorted", "tags"): [(ts, value), (ts, value)],
# tuple(tags) are stored as a key since lists are not hashable
# None: [(ts, value), (ts, value)]}
# untagged values are indexed by None
self._sample_store = {}
self._counters = {} # metric_name: bool
self._counters = {} # metric_name: bool
self.logger = logger
try:
self.logger.addFilter(LaconicFilter())
Expand Down Expand Up @@ -316,6 +316,8 @@ def __init__(self, name, init_config, agentConfig, instances=None):
self.warnings = []
self.library_versions = None
self.last_collection_time = defaultdict(int)
self._instance_metadata = []
self.svc_metadata = []

def instance_count(self):
""" Return the number of instances that are configured for this check. """
Expand Down Expand Up @@ -474,6 +476,18 @@ def service_check(self, check_name, status, tags=None, timestamp=None,
hostname, check_run_id, message)
)

def service_metadata(self, meta_name, value):
"""
Save metadata.

:param meta_name: metadata key name
:type meta_name: string

:param value: metadata value
:type value: string
"""
self._instance_metadata.append((meta_name, str(value)))

def has_events(self):
"""
Check whether the check has saved any events
Expand Down Expand Up @@ -515,6 +529,27 @@ def get_service_checks(self):
self.service_checks = []
return service_checks

def _roll_up_instance_metadata(self):
"""
Concatenate and flush instance metadata.
"""
self.svc_metadata.append(dict((k, v) for (k, v) in self._instance_metadata))
self._instance_metadata = []

def get_service_metadata(self):
"""
Return a list of the metadata dictionaries saved by the check -if any-
and clears them out of the instance's service_checks list

@return the list of metadata saved by this check
@rtype list of metadata dicts
"""
if self._instance_metadata:
self._roll_up_instance_metadata()
service_metadata = self.svc_metadata
self.svc_metadata = []
return service_metadata

def has_warnings(self):
"""
Check whether the instance run created any warnings
Expand Down Expand Up @@ -598,14 +633,18 @@ def run(self):
if self.in_developer_mode and self.name != AGENT_METRICS_CHECK_NAME:
try:
before = AgentCheck._collect_internal_stats()
except Exception: # It's fine if we can't collect stats for the run, just log and proceed
except Exception: # It's fine if we can't collect stats for the run, just log and proceed
self.log.debug("Failed to collect Agent Stats before check {0}".format(self.name))

instance_statuses = []
for i, instance in enumerate(self.instances):
try:
min_collection_interval = instance.get('min_collection_interval',
self.init_config.get('min_collection_interval', self.DEFAULT_MIN_COLLECTION_INTERVAL))
min_collection_interval = instance.get(
'min_collection_interval', self.init_config.get(
'min_collection_interval',
self.DEFAULT_MIN_COLLECTION_INTERVAL
)
)
now = time.time()
if now - self.last_collection_time[i] < min_collection_interval:
self.log.debug("Not running instance #{0} of check {1} as it ran less than {2}s ago".format(i, self.name, min_collection_interval))
Expand All @@ -623,32 +662,32 @@ def run(self):
instance_check_stats = {'run_time': timeit.default_timer() - check_start_time}

if self.has_warnings():
instance_status = check_status.InstanceStatus(i,
check_status.STATUS_WARNING,
warnings=self.get_warnings(),
instance_check_stats=instance_check_stats
instance_status = check_status.InstanceStatus(
i, check_status.STATUS_WARNING,
warnings=self.get_warnings(), instance_check_stats=instance_check_stats
)
else:
instance_status = check_status.InstanceStatus(
i,
check_status.STATUS_OK,
i, check_status.STATUS_OK,
instance_check_stats=instance_check_stats
)
except Exception, e:
self.log.exception("Check '%s' instance #%s failed" % (self.name, i))
instance_status = check_status.InstanceStatus(i,
check_status.STATUS_ERROR,
error=str(e),
tb=traceback.format_exc()
instance_status = check_status.InstanceStatus(
i, check_status.STATUS_ERROR,
error=str(e), tb=traceback.format_exc()
)
finally:
self._roll_up_instance_metadata()

instance_statuses.append(instance_status)

if self.in_developer_mode and self.name != AGENT_METRICS_CHECK_NAME:
try:
after = AgentCheck._collect_internal_stats()
self._set_internal_profiling_stats(before, after)
log.info("\n \t %s %s" % (self.name, pretty_statistics(self._internal_profiling_stats)))
except Exception: # It's fine if we can't collect stats for the run, just log and proceed
except Exception: # It's fine if we can't collect stats for the run, just log and proceed
self.log.debug("Failed to collect Agent Stats after check {0}".format(self.name))

return instance_statuses
Expand Down
54 changes: 45 additions & 9 deletions checks/check_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

# project
import config
from config import get_config, get_jmx_status_path, _windows_commondata_path
from config import get_config, get_jmx_status_path, _is_affirmative, _windows_commondata_path
from util import plural
from utils.ntp import get_ntp_args
from utils.pidfile import PidFile
Expand Down Expand Up @@ -80,6 +80,7 @@ def stylize(cls, text, *styles):
def style(*args):
return Stylizer.stylize(*args)


def logger_info():
loggers = []
root_logger = logging.getLogger()
Expand Down Expand Up @@ -285,7 +286,7 @@ def has_warnings(self):
class CheckStatus(object):

def __init__(self, check_name, instance_statuses, metric_count=None,
event_count=None, service_check_count=None,
event_count=None, service_check_count=None, service_metadata=[],
init_failed_error=None, init_failed_traceback=None,
library_versions=None, source_type_name=None,
check_stats=None):
Expand All @@ -299,6 +300,7 @@ def __init__(self, check_name, instance_statuses, metric_count=None,
self.init_failed_traceback = init_failed_traceback
self.library_versions = library_versions
self.check_stats = check_stats
self.service_metadata = service_metadata

@property
def status(self):
Expand Down Expand Up @@ -340,7 +342,7 @@ def __init__(self, check_statuses=None, emitter_statuses=None, metadata=None):
AgentStatus.__init__(self)
self.check_statuses = check_statuses or []
self.emitter_statuses = emitter_statuses or []
self.metadata = metadata or []
self.host_metadata = metadata or []

@property
def status(self):
Expand Down Expand Up @@ -433,7 +435,6 @@ def body_lines(self):
'instance-id'
]


lines = [
'Clocks',
'======',
Expand Down Expand Up @@ -477,10 +478,10 @@ def body_lines(self):
''
]

if not self.metadata:
if not self.host_metadata:
lines.append(" No host information available yet.")
else:
for key, host in self.metadata.items():
for key, host in self.host_metadata.iteritems():
for whitelist_item in metadata_whitelist:
if whitelist_item in key:
lines.append(" " + key + ": " + host)
Expand Down Expand Up @@ -563,6 +564,41 @@ def body_lines(self):

lines += check_lines

# Metadata status
metadata_enabled = _is_affirmative(get_config().get('display_service_metadata', False))

if metadata_enabled:
lines += [
"",
"Service metadata",
"================",
""
]
if not check_statuses:
lines.append(" No checks have run yet.")
else:
meta_lines = []
for cs in check_statuses:
# Check title
check_line = [
' ' + cs.name,
' ' + '-' * len(cs.name)
]
instance_lines = []
for i, meta in enumerate(cs.service_metadata):
if not meta:
continue
instance_lines += [" - instance #%s:" % i]
for k, v in meta.iteritems():
instance_lines += [" - %s: %s" % (k, v)]
if instance_lines:
check_line += instance_lines
meta_lines += check_line
if meta_lines:
lines += meta_lines
else:
lines.append(" No metadata were collected.")

# Emitter status
lines += [
"",
Expand All @@ -577,7 +613,7 @@ def body_lines(self):
c = 'green'
if es.has_error():
c = 'red'
line = " - %s [%s]" % (es.name, style(es.status,c))
line = " - %s [%s]" % (es.name, style(es.status, c))
if es.status != STATUS_OK:
line += ": %s" % es.error
lines.append(line)
Expand All @@ -595,8 +631,8 @@ def to_dict(self):
'ipv4',
'instance-id'
]
if self.metadata:
for key, host in self.metadata.items():
if self.host_metadata:
for key, host in self.host_metadata.iteritems():
for whitelist_item in metadata_whitelist:
if whitelist_item in key:
status_info['hostnames'][key] = host
Expand Down
Loading