-
Notifications
You must be signed in to change notification settings - Fork 814
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[vsphere] check revamp addressing several issues #3055
Changes from all commits
2f6a17d
4df21c9
f7bbc2a
f55892f
ae9a960
8687435
838a177
4946d6b
ab674e0
478e2b5
99a02b0
f95e255
f60c625
f9b639f
7ccd947
8e77ea1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,6 +37,15 @@ | |
# The amount of jobs batched at the same time in the queue to query available metrics | ||
BATCH_MORLIST_SIZE = 50 | ||
|
||
REALTIME_RESOURCES = {'vm', 'host'} | ||
|
||
RESOURCE_TYPE_MAP = { | ||
'vm': vim.VirtualMachine, | ||
'datacenter': vim.Datacenter, | ||
'host': vim.HostSystem, | ||
'datastore': vim.Datastore | ||
} | ||
|
||
# Time after which we reap the jobs that clog the queue | ||
# TODO: use it | ||
JOB_TIMEOUT = 10 | ||
|
@@ -343,6 +352,8 @@ def __init__(self, name, init_config, agentConfig, instances): | |
|
||
self.event_config[i_key] = instance.get('event_config') | ||
|
||
# managed entity raw view | ||
self.registry = {} | ||
# First layer of cache (get entities from the tree) | ||
self.morlist_raw = {} | ||
# Second layer, processed from the first one | ||
|
@@ -527,11 +538,13 @@ def get_external_host_tags(self): | |
continue | ||
|
||
for mor in mor_by_mor_name.itervalues(): | ||
external_host_tags.append((mor['hostname'], {SOURCE_TYPE: mor['tags']})) | ||
if mor['hostname']: # some mor's have a None hostname | ||
external_host_tags.append((mor['hostname'], {SOURCE_TYPE: mor['tags']})) | ||
|
||
return external_host_tags | ||
|
||
def _discover_mor(self, instance_key, obj, tags, regexes=None, include_only_marked=False): | ||
|
||
def _discover_mor(self, instance, tags, regexes=None, include_only_marked=False): | ||
""" | ||
Explore vCenter infrastructure to discover hosts, virtual machines | ||
and compute their associated tags. | ||
|
@@ -557,93 +570,88 @@ def _discover_mor(self, instance_key, obj, tags, regexes=None, include_only_mark | |
If it's a node we want to query metric for, queue it in `self.morlist_raw` that | ||
will be processed by another job. | ||
""" | ||
@atomic_method | ||
def browse_mor(obj, prev_tags, depth): | ||
self.log.debug( | ||
u"job_atomic: Exploring MOR %s: name=%s, class=%s", | ||
obj, obj.name, obj.__class__ | ||
) | ||
|
||
tags = list(prev_tags) | ||
|
||
# Folder | ||
if isinstance(obj, vim.Folder): | ||
# Do not tag with root folder | ||
if depth: | ||
tags.append(obj.name) | ||
|
||
for resource in obj.childEntity: | ||
self.pool.apply_async( | ||
browse_mor, | ||
args=(resource, tags, depth + 1) | ||
) | ||
|
||
# Datacenter | ||
elif isinstance(obj, vim.Datacenter): | ||
tags.append(u"vsphere_datacenter:{0}".format(obj.name)) | ||
|
||
for resource in obj.hostFolder.childEntity: | ||
self.pool.apply_async( | ||
browse_mor, | ||
args=(resource, tags, depth + 1) | ||
) | ||
|
||
# ClusterComputeResource | ||
elif isinstance(obj, vim.ClusterComputeResource): | ||
tags.append(u"vsphere_cluster:{0}".format(obj.name)) | ||
|
||
for host in obj.host: | ||
# Skip non-host | ||
if not hasattr(host, 'vm'): | ||
continue | ||
|
||
self.pool.apply_async( | ||
browse_mor, | ||
args=(host, tags, depth + 1) | ||
) | ||
|
||
# Host | ||
elif isinstance(obj, vim.HostSystem): | ||
if self._is_excluded(obj, regexes, include_only_marked): | ||
self.log.debug( | ||
u"Filtered out host '%s'.", obj.name | ||
) | ||
return | ||
|
||
watched_mor = dict( | ||
mor_type='host', mor=obj, hostname=obj.name, tags=tags + [u"vsphere_type:host"] | ||
) | ||
self.morlist_raw[instance_key].append(watched_mor) | ||
|
||
tags.append(u"vsphere_host:{}".format(obj.name)) | ||
for vm in obj.vm: | ||
if vm.runtime.powerState != 'poweredOn': | ||
continue | ||
self.pool.apply_async( | ||
browse_mor, | ||
args=(vm, tags, depth + 1) | ||
) | ||
|
||
# Virtual Machine | ||
elif isinstance(obj, vim.VirtualMachine): | ||
if self._is_excluded(obj, regexes, include_only_marked): | ||
self.log.debug( | ||
u"Filtered out VM '%s'.", obj.name | ||
) | ||
return | ||
|
||
watched_mor = dict( | ||
mor_type='vm', mor=obj, hostname=obj.name, tags=tags + ['vsphere_type:vm'] | ||
def _get_parent_tags(mor): | ||
tags = [] | ||
if mor.parent: | ||
tag = [] | ||
if isinstance(mor.parent, vim.HostSystem): | ||
tag.append(u'vsphere_host:{}'.format(mor.parent.name)) | ||
elif isinstance(mor.parent, vim.Folder): | ||
tag.append(u'vsphere_folder:{}'.format(mor.parent.name)) | ||
elif isinstance(mor.parent, vim.ComputeResource): | ||
if isinstance(mor.parent, vim.ClusterComputeResource): | ||
tag.append(u'vsphere_cluster:{}'.format(mor.parent.name)) | ||
tag.append(u'vsphere_compute:{}'.format(mor.parent.name)) | ||
elif isinstance(mor.parent, vim.Datacenter): | ||
tag.append(u'vsphere_datacenter:{}'.format(mor.parent.name)) | ||
|
||
tags = _get_parent_tags(mor.parent) | ||
if tag: | ||
tags.extend(tag) | ||
|
||
return tags | ||
|
||
|
||
def _get_all_objs(content, vimtype, regexes=None, include_only_marked=False, tags=[]): | ||
""" | ||
Get all the vsphere objects associated with a given type | ||
""" | ||
obj_list = [] | ||
container = content.viewManager.CreateContainerView( | ||
content.rootFolder, | ||
[RESOURCE_TYPE_MAP[vimtype]], | ||
True) | ||
|
||
for c in container.view: | ||
instance_tags = [] | ||
if not self._is_excluded(c, regexes, include_only_marked): | ||
hostname = c.name | ||
if c.parent: | ||
instance_tags += _get_parent_tags(c) | ||
|
||
vsphere_type = None | ||
if isinstance(c, vim.VirtualMachine): | ||
vsphere_type = u'vsphere_type:vm' | ||
if c.runtime.powerState == vim.VirtualMachinePowerState.poweredOff: | ||
continue | ||
host = c.runtime.host.name | ||
instance_tags.append(u'vsphere_host:{}'.format(host)) | ||
elif isinstance(c, vim.HostSystem): | ||
vsphere_type = u'vsphere_type:host' | ||
elif isinstance(c, vim.Datastore): | ||
vsphere_type = u'vsphere_type:datastore' | ||
instance_tags.append(u'vsphere_datastore:{}'.format(c.name)) | ||
hostname = None | ||
elif isinstance(c, vim.Datacenter): | ||
vsphere_type = u'vsphere_type:datacenter' | ||
hostname = None | ||
|
||
if vsphere_type: | ||
instance_tags.append(vsphere_type) | ||
obj_list.append(dict(mor_type=vimtype, mor=c, hostname=hostname, tags=tags+instance_tags)) | ||
|
||
return obj_list | ||
|
||
# @atomic_method | ||
def build_resource_registry(instance, tags, regexes=None, include_only_marked=False): | ||
i_key = self._instance_key(instance) | ||
server_instance = self._get_server_instance(instance) | ||
if i_key not in self.morlist_raw: | ||
self.morlist_raw[i_key] = {} | ||
|
||
for resource in sorted(RESOURCE_TYPE_MAP): | ||
self.morlist_raw[i_key][resource] = _get_all_objs( | ||
server_instance.RetrieveContent(), | ||
resource, | ||
regexes, | ||
include_only_marked, | ||
tags | ||
) | ||
self.morlist_raw[instance_key].append(watched_mor) | ||
|
||
else: | ||
self.log.error(u"Unrecognized object %s", obj) | ||
|
||
# Init recursion | ||
# collect... | ||
self.pool.apply_async( | ||
browse_mor, | ||
args=(obj, tags, 0) | ||
build_resource_registry, | ||
args=(instance, tags, regexes, include_only_marked) | ||
) | ||
|
||
@staticmethod | ||
|
@@ -692,17 +700,15 @@ def _cache_morlist_raw(self, instance): | |
|
||
i_key = self._instance_key(instance) | ||
self.log.debug("Caching the morlist for vcenter instance %s" % i_key) | ||
if i_key in self.morlist_raw and len(self.morlist_raw[i_key]) > 0: | ||
self.log.debug( | ||
"Skipping morlist collection now, RAW results " | ||
"processing not over (latest refresh was {0}s ago)".format( | ||
time.time() - self.cache_times[i_key][MORLIST][LAST]) | ||
) | ||
return | ||
self.morlist_raw[i_key] = [] | ||
|
||
server_instance = self._get_server_instance(instance) | ||
root_folder = server_instance.content.rootFolder | ||
for resource_type in RESOURCE_TYPE_MAP: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why can't we keep
? Vsphere objects in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, I did that because now we pop() the elements per resource type in |
||
if i_key in self.morlist_raw and len(self.morlist_raw[i_key].get(resource_type, [])) > 0: | ||
self.log.debug( | ||
"Skipping morlist collection now, RAW results " | ||
"processing not over (latest refresh was {0}s ago)".format( | ||
time.time() - self.cache_times[i_key][MORLIST][LAST]) | ||
) | ||
return | ||
self.morlist_raw[i_key] = {} | ||
|
||
instance_tag = "vcenter_server:%s" % instance.get('name') | ||
regexes = { | ||
|
@@ -712,7 +718,7 @@ def _cache_morlist_raw(self, instance): | |
include_only_marked = _is_affirmative(instance.get('include_only_marked', False)) | ||
|
||
# Discover hosts and virtual machines | ||
self._discover_mor(i_key, root_folder, [instance_tag], regexes, include_only_marked) | ||
self._discover_mor(instance, [instance_tag], regexes, include_only_marked) | ||
|
||
self.cache_times[i_key][MORLIST][LAST] = time.time() | ||
|
||
|
@@ -733,12 +739,14 @@ def _cache_morlist_process_atomic(self, instance, mor): | |
" for MOR {0} (type={1})".format(mor['mor'], mor['mor_type']) | ||
) | ||
|
||
mor['interval'] = REAL_TIME_INTERVAL if mor['mor_type'] in REALTIME_RESOURCES else None | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nitpick: we have some small code duplication, I wonder if it'd be nicer to have the interval stored once for all in
|
||
available_metrics = perfManager.QueryAvailablePerfMetric( | ||
mor['mor'], intervalId=REAL_TIME_INTERVAL) | ||
mor['mor'], intervalId=mor['interval']) | ||
|
||
mor['metrics'] = self._compute_needed_metrics(instance, available_metrics) | ||
mor_name = str(mor['mor']) | ||
|
||
mor_name = str(mor['mor']) | ||
if mor_name in self.morlist[i_key]: | ||
# Was already here last iteration | ||
self.morlist[i_key][mor_name]['metrics'] = mor['metrics'] | ||
|
@@ -762,13 +770,23 @@ def _cache_morlist_process(self, instance): | |
|
||
batch_size = self.init_config.get('batch_morlist_size', BATCH_MORLIST_SIZE) | ||
|
||
for i in xrange(batch_size): | ||
try: | ||
mor = self.morlist_raw[i_key].pop() | ||
self.pool.apply_async(self._cache_morlist_process_atomic, args=(instance, mor)) | ||
except (IndexError, KeyError): | ||
self.log.debug("No more work to process in morlist_raw") | ||
return | ||
processed = 0 | ||
for resource_type in RESOURCE_TYPE_MAP: | ||
for i in xrange(batch_size): | ||
try: | ||
mor = self.morlist_raw[i_key][resource_type].pop() | ||
self.pool.apply_async(self._cache_morlist_process_atomic, args=(instance, mor)) | ||
|
||
processed += 1 | ||
if processed == batch_size: | ||
break | ||
except (IndexError, KeyError): | ||
self.log.debug("No more work to process in morlist_raw") | ||
break | ||
|
||
if processed == batch_size: | ||
break | ||
return | ||
|
||
def _vacuum_morlist(self, instance): | ||
""" Check if self.morlist doesn't have some old MORs that are gone, ie | ||
|
@@ -837,10 +855,11 @@ def _collect_metrics_atomic(self, instance, mor): | |
i_key = self._instance_key(instance) | ||
server_instance = self._get_server_instance(instance) | ||
perfManager = server_instance.content.perfManager | ||
|
||
query = vim.PerformanceManager.QuerySpec(maxSample=1, | ||
entity=mor['mor'], | ||
metricId=mor['metrics'], | ||
intervalId=20, | ||
intervalId=mor['interval'], | ||
format='normal') | ||
results = perfManager.QueryPerf(querySpec=[query]) | ||
if results: | ||
|
@@ -858,12 +877,13 @@ def _collect_metrics_atomic(self, instance, mor): | |
self.log.debug(u"Skipping unknown `%s` metric.", metric_name) | ||
continue | ||
|
||
if ALL_METRICS[metric_name]['s_type'] == 'rate': | ||
record_metric = self.rate | ||
else: | ||
record_metric = self.gauge | ||
tags = ['instance:%s' % instance_name] | ||
if not mor['hostname']: # no host tags available | ||
tags.extend(mor['tags']) | ||
|
||
record_metric( | ||
# vsphere "rates" should be submitted as gauges (rate is | ||
# precomputed). | ||
self.gauge( | ||
"vsphere.%s" % metric_name, | ||
value, | ||
hostname=mor['hostname'], | ||
|
@@ -891,7 +911,7 @@ def collect_metrics(self, instance): | |
for mor_name, mor in mors: | ||
if mor['mor_type'] == 'vm': | ||
vm_count += 1 | ||
if 'metrics' not in mor: | ||
if 'metrics' not in mor or not mor['metrics']: | ||
# self.log.debug("Skipping entity %s collection because we didn't cache its metrics yet" % mor['hostname']) | ||
continue | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's avoid the mutable
tags=[]
as a default argument. It's easy to forget it and modify the value later on.