Skip to content

Commit

Permalink
[vsphere] check revamp addressing several issues (#3055)
Browse files Browse the repository at this point in the history
* [vsphere] collect metrics using container views.

* [vsphere] fixing empty raw morlist key potential exception.

* [vsphere] realtime resources accept intervalId 20, others dont

* [vsphere] fix MOR batch collection bug.

* [vsphere] adding new metrics vsphere 6+ compatible.

* [vsphere] include system metrics in all metrics.

* [vsphere] adding some more 6.0+ metrics.

* [vmware] add corresponding vsphere type tag to metrics.

* [vsphere] vsphere rates should be submitted as gauges. More tags.

* [vsphere] add vsphere_host tag to VMs.

* [vsphere] compute parent tags for MORs

* [vsphere] fixing some backward compatibility issues. Improving tagging.

* [vsphere] fixing test to match new check code.

* [vsphere][test] refactor container_view.

* [vsphere] PR polish 💄

* [vsphere] not all MORs will have host tags.
  • Loading branch information
truthbk authored Jan 30, 2017
1 parent 0ce5592 commit e309c52
Show file tree
Hide file tree
Showing 4 changed files with 810 additions and 137 deletions.
248 changes: 134 additions & 114 deletions checks.d/vsphere.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@
# The amount of jobs batched at the same time in the queue to query available metrics
BATCH_MORLIST_SIZE = 50

REALTIME_RESOURCES = {'vm', 'host'}

RESOURCE_TYPE_MAP = {
'vm': vim.VirtualMachine,
'datacenter': vim.Datacenter,
'host': vim.HostSystem,
'datastore': vim.Datastore
}

# Time after which we reap the jobs that clog the queue
# TODO: use it
JOB_TIMEOUT = 10
Expand Down Expand Up @@ -343,6 +352,8 @@ def __init__(self, name, init_config, agentConfig, instances):

self.event_config[i_key] = instance.get('event_config')

# managed entity raw view
self.registry = {}
# First layer of cache (get entities from the tree)
self.morlist_raw = {}
# Second layer, processed from the first one
Expand Down Expand Up @@ -527,11 +538,13 @@ def get_external_host_tags(self):
continue

for mor in mor_by_mor_name.itervalues():
external_host_tags.append((mor['hostname'], {SOURCE_TYPE: mor['tags']}))
if mor['hostname']: # some mor's have a None hostname
external_host_tags.append((mor['hostname'], {SOURCE_TYPE: mor['tags']}))

return external_host_tags

def _discover_mor(self, instance_key, obj, tags, regexes=None, include_only_marked=False):

def _discover_mor(self, instance, tags, regexes=None, include_only_marked=False):
"""
Explore vCenter infrastructure to discover hosts, virtual machines
and compute their associated tags.
Expand All @@ -557,93 +570,88 @@ def _discover_mor(self, instance_key, obj, tags, regexes=None, include_only_mark
If it's a node we want to query metric for, queue it in `self.morlist_raw` that
will be processed by another job.
"""
@atomic_method
def browse_mor(obj, prev_tags, depth):
self.log.debug(
u"job_atomic: Exploring MOR %s: name=%s, class=%s",
obj, obj.name, obj.__class__
)

tags = list(prev_tags)

# Folder
if isinstance(obj, vim.Folder):
# Do not tag with root folder
if depth:
tags.append(obj.name)

for resource in obj.childEntity:
self.pool.apply_async(
browse_mor,
args=(resource, tags, depth + 1)
)

# Datacenter
elif isinstance(obj, vim.Datacenter):
tags.append(u"vsphere_datacenter:{0}".format(obj.name))

for resource in obj.hostFolder.childEntity:
self.pool.apply_async(
browse_mor,
args=(resource, tags, depth + 1)
)

# ClusterComputeResource
elif isinstance(obj, vim.ClusterComputeResource):
tags.append(u"vsphere_cluster:{0}".format(obj.name))

for host in obj.host:
# Skip non-host
if not hasattr(host, 'vm'):
continue

self.pool.apply_async(
browse_mor,
args=(host, tags, depth + 1)
)

# Host
elif isinstance(obj, vim.HostSystem):
if self._is_excluded(obj, regexes, include_only_marked):
self.log.debug(
u"Filtered out host '%s'.", obj.name
)
return

watched_mor = dict(
mor_type='host', mor=obj, hostname=obj.name, tags=tags + [u"vsphere_type:host"]
)
self.morlist_raw[instance_key].append(watched_mor)

tags.append(u"vsphere_host:{}".format(obj.name))
for vm in obj.vm:
if vm.runtime.powerState != 'poweredOn':
continue
self.pool.apply_async(
browse_mor,
args=(vm, tags, depth + 1)
)

# Virtual Machine
elif isinstance(obj, vim.VirtualMachine):
if self._is_excluded(obj, regexes, include_only_marked):
self.log.debug(
u"Filtered out VM '%s'.", obj.name
)
return

watched_mor = dict(
mor_type='vm', mor=obj, hostname=obj.name, tags=tags + ['vsphere_type:vm']
def _get_parent_tags(mor):
tags = []
if mor.parent:
tag = []
if isinstance(mor.parent, vim.HostSystem):
tag.append(u'vsphere_host:{}'.format(mor.parent.name))
elif isinstance(mor.parent, vim.Folder):
tag.append(u'vsphere_folder:{}'.format(mor.parent.name))
elif isinstance(mor.parent, vim.ComputeResource):
if isinstance(mor.parent, vim.ClusterComputeResource):
tag.append(u'vsphere_cluster:{}'.format(mor.parent.name))
tag.append(u'vsphere_compute:{}'.format(mor.parent.name))
elif isinstance(mor.parent, vim.Datacenter):
tag.append(u'vsphere_datacenter:{}'.format(mor.parent.name))

tags = _get_parent_tags(mor.parent)
if tag:
tags.extend(tag)

return tags


def _get_all_objs(content, vimtype, regexes=None, include_only_marked=False, tags=[]):
"""
Get all the vsphere objects associated with a given type
"""
obj_list = []
container = content.viewManager.CreateContainerView(
content.rootFolder,
[RESOURCE_TYPE_MAP[vimtype]],
True)

for c in container.view:
instance_tags = []
if not self._is_excluded(c, regexes, include_only_marked):
hostname = c.name
if c.parent:
instance_tags += _get_parent_tags(c)

vsphere_type = None
if isinstance(c, vim.VirtualMachine):
vsphere_type = u'vsphere_type:vm'
if c.runtime.powerState == vim.VirtualMachinePowerState.poweredOff:
continue
host = c.runtime.host.name
instance_tags.append(u'vsphere_host:{}'.format(host))
elif isinstance(c, vim.HostSystem):
vsphere_type = u'vsphere_type:host'
elif isinstance(c, vim.Datastore):
vsphere_type = u'vsphere_type:datastore'
instance_tags.append(u'vsphere_datastore:{}'.format(c.name))
hostname = None
elif isinstance(c, vim.Datacenter):
vsphere_type = u'vsphere_type:datacenter'
hostname = None

if vsphere_type:
instance_tags.append(vsphere_type)
obj_list.append(dict(mor_type=vimtype, mor=c, hostname=hostname, tags=tags+instance_tags))

return obj_list

# @atomic_method
def build_resource_registry(instance, tags, regexes=None, include_only_marked=False):
i_key = self._instance_key(instance)
server_instance = self._get_server_instance(instance)
if i_key not in self.morlist_raw:
self.morlist_raw[i_key] = {}

for resource in sorted(RESOURCE_TYPE_MAP):
self.morlist_raw[i_key][resource] = _get_all_objs(
server_instance.RetrieveContent(),
resource,
regexes,
include_only_marked,
tags
)
self.morlist_raw[instance_key].append(watched_mor)

else:
self.log.error(u"Unrecognized object %s", obj)

# Init recursion
# collect...
self.pool.apply_async(
browse_mor,
args=(obj, tags, 0)
build_resource_registry,
args=(instance, tags, regexes, include_only_marked)
)

@staticmethod
Expand Down Expand Up @@ -692,17 +700,15 @@ def _cache_morlist_raw(self, instance):

i_key = self._instance_key(instance)
self.log.debug("Caching the morlist for vcenter instance %s" % i_key)
if i_key in self.morlist_raw and len(self.morlist_raw[i_key]) > 0:
self.log.debug(
"Skipping morlist collection now, RAW results "
"processing not over (latest refresh was {0}s ago)".format(
time.time() - self.cache_times[i_key][MORLIST][LAST])
)
return
self.morlist_raw[i_key] = []

server_instance = self._get_server_instance(instance)
root_folder = server_instance.content.rootFolder
for resource_type in RESOURCE_TYPE_MAP:
if i_key in self.morlist_raw and len(self.morlist_raw[i_key].get(resource_type, [])) > 0:
self.log.debug(
"Skipping morlist collection now, RAW results "
"processing not over (latest refresh was {0}s ago)".format(
time.time() - self.cache_times[i_key][MORLIST][LAST])
)
return
self.morlist_raw[i_key] = {}

instance_tag = "vcenter_server:%s" % instance.get('name')
regexes = {
Expand All @@ -712,7 +718,7 @@ def _cache_morlist_raw(self, instance):
include_only_marked = _is_affirmative(instance.get('include_only_marked', False))

# Discover hosts and virtual machines
self._discover_mor(i_key, root_folder, [instance_tag], regexes, include_only_marked)
self._discover_mor(instance, [instance_tag], regexes, include_only_marked)

self.cache_times[i_key][MORLIST][LAST] = time.time()

Expand All @@ -733,12 +739,14 @@ def _cache_morlist_process_atomic(self, instance, mor):
" for MOR {0} (type={1})".format(mor['mor'], mor['mor_type'])
)

mor['interval'] = REAL_TIME_INTERVAL if mor['mor_type'] in REALTIME_RESOURCES else None

available_metrics = perfManager.QueryAvailablePerfMetric(
mor['mor'], intervalId=REAL_TIME_INTERVAL)
mor['mor'], intervalId=mor['interval'])

mor['metrics'] = self._compute_needed_metrics(instance, available_metrics)
mor_name = str(mor['mor'])

mor_name = str(mor['mor'])
if mor_name in self.morlist[i_key]:
# Was already here last iteration
self.morlist[i_key][mor_name]['metrics'] = mor['metrics']
Expand All @@ -762,13 +770,23 @@ def _cache_morlist_process(self, instance):

batch_size = self.init_config.get('batch_morlist_size', BATCH_MORLIST_SIZE)

for i in xrange(batch_size):
try:
mor = self.morlist_raw[i_key].pop()
self.pool.apply_async(self._cache_morlist_process_atomic, args=(instance, mor))
except (IndexError, KeyError):
self.log.debug("No more work to process in morlist_raw")
return
processed = 0
for resource_type in RESOURCE_TYPE_MAP:
for i in xrange(batch_size):
try:
mor = self.morlist_raw[i_key][resource_type].pop()
self.pool.apply_async(self._cache_morlist_process_atomic, args=(instance, mor))

processed += 1
if processed == batch_size:
break
except (IndexError, KeyError):
self.log.debug("No more work to process in morlist_raw")
break

if processed == batch_size:
break
return

def _vacuum_morlist(self, instance):
""" Check if self.morlist doesn't have some old MORs that are gone, ie
Expand Down Expand Up @@ -837,10 +855,11 @@ def _collect_metrics_atomic(self, instance, mor):
i_key = self._instance_key(instance)
server_instance = self._get_server_instance(instance)
perfManager = server_instance.content.perfManager

query = vim.PerformanceManager.QuerySpec(maxSample=1,
entity=mor['mor'],
metricId=mor['metrics'],
intervalId=20,
intervalId=mor['interval'],
format='normal')
results = perfManager.QueryPerf(querySpec=[query])
if results:
Expand All @@ -858,12 +877,13 @@ def _collect_metrics_atomic(self, instance, mor):
self.log.debug(u"Skipping unknown `%s` metric.", metric_name)
continue

if ALL_METRICS[metric_name]['s_type'] == 'rate':
record_metric = self.rate
else:
record_metric = self.gauge
tags = ['instance:%s' % instance_name]
if not mor['hostname']: # no host tags available
tags.extend(mor['tags'])

record_metric(
# vsphere "rates" should be submitted as gauges (rate is
# precomputed).
self.gauge(
"vsphere.%s" % metric_name,
value,
hostname=mor['hostname'],
Expand Down Expand Up @@ -891,7 +911,7 @@ def collect_metrics(self, instance):
for mor_name, mor in mors:
if mor['mor_type'] == 'vm':
vm_count += 1
if 'metrics' not in mor:
if 'metrics' not in mor or not mor['metrics']:
# self.log.debug("Skipping entity %s collection because we didn't cache its metrics yet" % mor['hostname'])
continue

Expand Down
Loading

0 comments on commit e309c52

Please sign in to comment.