Skip to content

Commit

Permalink
Replace the Gnocchi dispatcher by a publisher
Browse files Browse the repository at this point in the history
This removes the deprecated Gnocchi dispatcher and replaces it by its
equivalent publisher.

Change-Id: Ie44baf20ccb8de5794f5f0c3d4717f7e56afa63b
  • Loading branch information
jd committed Oct 13, 2017
1 parent bca9d45 commit 83ffaff
Show file tree
Hide file tree
Showing 18 changed files with 267 additions and 321 deletions.
4 changes: 0 additions & 4 deletions ceilometer/api/controllers/v2/root.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,6 @@ def gnocchi_is_enabled(self):
if pecan.request.cfg.api.gnocchi_is_enabled is not None:
self._gnocchi_is_enabled = (
pecan.request.cfg.api.gnocchi_is_enabled)

elif ("gnocchi" not in pecan.request.cfg.meter_dispatchers
or "database" in pecan.request.cfg.meter_dispatchers):
self._gnocchi_is_enabled = False
else:
try:
catalog = keystone_client.get_service_catalog(
Expand Down
34 changes: 0 additions & 34 deletions ceilometer/dispatcher/gnocchi_opts.py

This file was deleted.

12 changes: 5 additions & 7 deletions ceilometer/gnocchi_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
LOG = log.getLogger(__name__)


def get_gnocchiclient(conf, timeout_override=False):
group = conf.dispatcher_gnocchi.auth_section
timeout = (None if (not conf.dispatcher_gnocchi.request_timeout or
timeout_override)
else conf.dispatcher_gnocchi.request_timeout)
session = keystone_client.get_session(conf, group=group, timeout=timeout)
def get_gnocchiclient(conf, request_timeout=None):
group = conf.gnocchi.auth_section
session = keystone_client.get_session(conf, group=group,
timeout=request_timeout)
adapter = keystoneauth1.session.TCPKeepAliveAdapter(
pool_maxsize=conf.max_parallel_requests)
session.mount("http://", adapter)
Expand Down Expand Up @@ -188,7 +186,7 @@ def get_gnocchiclient(conf, timeout_override=False):


def upgrade_resource_types(conf):
gnocchi = get_gnocchiclient(conf, True)
gnocchi = get_gnocchiclient(conf)
for name, attributes in resources_initial.items():
try:
gnocchi.resource_type.get(name=name)
Expand Down
2 changes: 1 addition & 1 deletion ceilometer/keystone_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

# List of group that can set auth_section to use a different
# credentials section
OVERRIDABLE_GROUPS = ['dispatcher_gnocchi', 'zaqar']
OVERRIDABLE_GROUPS = ['gnocchi', 'zaqar']


def get_session(conf, requests_session=None, group=None, timeout=None):
Expand Down
25 changes: 22 additions & 3 deletions ceilometer/opts.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import ceilometer.compute.virt.vmware.inspector
import ceilometer.compute.virt.xenapi.inspector
import ceilometer.dispatcher
import ceilometer.dispatcher.gnocchi_opts
import ceilometer.event.converter
import ceilometer.hardware.discovery
import ceilometer.hardware.pollsters.generic
Expand Down Expand Up @@ -105,8 +104,28 @@ def list_opts():
'membership has changed'),
]),
('database', ceilometer.storage.OPTS),
('dispatcher_gnocchi',
ceilometer.dispatcher.gnocchi_opts.dispatcher_opts),
('dispatcher_gnocchi', (
cfg.StrOpt(
'filter_project',
deprecated_for_removal=True,
default='gnocchi',
help='Gnocchi project used to filter out samples '
'generated by Gnocchi service activity'),
cfg.StrOpt(
'archive_policy',
deprecated_for_removal=True,
help='The archive policy to use when the dispatcher '
'create a new metric.'),
cfg.StrOpt(
'resources_definition_file',
deprecated_for_removal=True,
default='gnocchi_resources.yaml',
help=('The Yaml file that defines mapping between samples '
'and gnocchi resources/metrics')),
cfg.FloatOpt(
'request_timeout', default=6.05, min=0.0,
deprecated_for_removal=True,
help='Number of seconds before request to gnocchi times out'))),
('event', ceilometer.event.converter.OPTS),
('hardware', itertools.chain(
ceilometer.hardware.discovery.OPTS,
Expand Down
4 changes: 2 additions & 2 deletions ceilometer/publisher/direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ class DirectPublisher(publisher.ConfigPublisherBase):
are required.
By default, the database dispatcher is used to select another one we
can use direct://?dispatcher=gnocchi, ...
can use direct://?dispatcher=name_of_dispatcher, ...
"""
def __init__(self, conf, parsed_url):
super(DirectPublisher, self).__init__(conf, parsed_url)
default_dispatcher = parsed_url.scheme
if default_dispatcher == 'direct':
LOG.warning('Direct publisher is deprecated for removal. Use '
'an explicit publisher instead, e.g. "gnocchi", '
'an explicit publisher instead, e.g. '
'"database", "file", ...')
default_dispatcher = 'database'
options = urlparse.parse_qs(parsed_url.query)
Expand Down
125 changes: 65 additions & 60 deletions ceilometer/dispatcher/gnocchi.py → ceilometer/publisher/gnocchi.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
from oslo_utils import fnmatch
from oslo_utils import timeutils
import six
import six.moves.urllib.parse as urlparse
from stevedore import extension

from ceilometer import declarative
from ceilometer import dispatcher
from ceilometer import gnocchi_client
from ceilometer.i18n import _
from ceilometer import keystone_client
from ceilometer import publisher

NAME_ENCODED = __name__.encode('utf-8')
CACHE_NAMESPACE = uuid.UUID(bytes=hashlib.md5(NAME_ENCODED).digest())
Expand Down Expand Up @@ -129,14 +130,14 @@ def event_match(self, event_type):
def sample_attributes(self, sample):
attrs = {}
for name, definition in self._attributes.items():
value = definition.parse(sample)
value = definition.parse(sample.as_dict())
if value is not None:
attrs[name] = value
return attrs

def event_attributes(self, event):
attrs = {'type': self.cfg['resource_type']}
traits = dict([(trait[0], trait[2]) for trait in event['traits']])
traits = dict([(trait[0], trait[2]) for trait in event.traits])
for attr, field in self.cfg.get('event_attributes', {}).items():
value = traits.get(field)
if value is not None:
Expand Down Expand Up @@ -168,44 +169,49 @@ def pop(self, key, *args):
key_lock.release()


class GnocchiDispatcher(dispatcher.MeterDispatcherBase,
dispatcher.EventDispatcherBase):
"""Dispatcher class for recording metering data into the Gnocchi service.
class GnocchiPublisher(publisher.ConfigPublisherBase):
"""Publisher class for recording metering data into the Gnocchi service.
The dispatcher class records each meter into the gnocchi service
configured in ceilometer configuration file. An example configuration may
The publisher class records each meter into the gnocchi service
configured in Ceilometer pipeline file. An example target may
look like the following:
[dispatcher_gnocchi]
archive_policy = low
To enable this dispatcher, the following section needs to be present in
ceilometer.conf file
[DEFAULT]
meter_dispatchers = gnocchi
event_dispatchers = gnocchi
gnocchi://?archive_policy=low&filter_project=gnocchi
"""
def __init__(self, conf):
super(GnocchiDispatcher, self).__init__(conf)
self.conf = conf
self.filter_service_activity = (
conf.dispatcher_gnocchi.filter_service_activity)
def __init__(self, conf, parsed_url):
super(GnocchiPublisher, self).__init__(conf, parsed_url)
# TODO(jd) allow to override Gnocchi endpoint via the host in the URL
options = urlparse.parse_qs(parsed_url.query)

self.filter_project = options.get(
'filter_project',
[conf.dispatcher_gnocchi.filter_project])[-1]

resources_definition_file = options.get(
'resources_definition_file',
[conf.dispatcher_gnocchi.resources_definition_file])[-1]
archive_policy = options.get(
'archive_policy',
[conf.dispatcher_gnocchi.archive_policy])[-1]
self.resources_definition = self._load_resources_definitions(
conf, archive_policy, resources_definition_file)

timeout = options.get('timeout',
[conf.dispatcher_gnocchi.request_timeout])[-1]
self._ks_client = keystone_client.get_client(conf)
self.resources_definition = self._load_resources_definitions(conf)

self.cache = None
try:
import oslo_cache
oslo_cache.configure(self.conf)
oslo_cache.configure(conf)
# NOTE(cdent): The default cache backend is a real but
# noop backend. We don't want to use that here because
# we want to avoid the cache pathways entirely if the
# cache has not been configured explicitly.
if self.conf.cache.enabled:
if conf.cache.enabled:
cache_region = oslo_cache.create_region()
self.cache = oslo_cache.configure_cache_region(
self.conf, cache_region)
conf, cache_region)
self.cache.key_mangler = cache_key_mangler
except ImportError:
pass
Expand All @@ -216,24 +222,26 @@ def __init__(self, conf):
self._gnocchi_project_id_lock = threading.Lock()
self._gnocchi_resource_lock = LockedDefaultDict(threading.Lock)

self._gnocchi = gnocchi_client.get_gnocchiclient(conf)
self._gnocchi = gnocchi_client.get_gnocchiclient(
conf, request_timeout=timeout)
self._already_logged_event_types = set()
self._already_logged_metric_names = set()

@classmethod
def _load_resources_definitions(cls, conf):
@staticmethod
def _load_resources_definitions(conf, archive_policy,
resources_definition_file):
plugin_manager = extension.ExtensionManager(
namespace='ceilometer.event.trait_plugin')
data = declarative.load_definitions(
conf, {}, conf.dispatcher_gnocchi.resources_definition_file,
conf, {}, resources_definition_file,
pkg_resources.resource_filename(__name__,
"data/gnocchi_resources.yaml"))
resource_defs = []
for resource in data.get('resources', []):
try:
resource_defs.append(ResourcesDefinition(
resource,
conf.dispatcher_gnocchi.archive_policy, plugin_manager))
archive_policy, plugin_manager))
except Exception as exc:
LOG.error("Failed to load resource due to error %s" %
exc)
Expand All @@ -247,32 +255,32 @@ def gnocchi_project_id(self):
if self._gnocchi_project_id is None:
try:
project = self._ks_client.projects.find(
name=self.conf.dispatcher_gnocchi.filter_project)
name=self.filter_project)
except ka_exceptions.NotFound:
LOG.warning('gnocchi project not found in keystone,'
' ignoring the filter_service_activity '
LOG.warning('filtered project not found in keystone,'
' ignoring the filter_project '
'option')
self.filter_service_activity = False
self.filter_project = None
return None
except Exception:
LOG.exception('fail to retrieve user of Gnocchi '
'service')
LOG.exception('fail to retrieve filtered project ')
raise
self._gnocchi_project_id = project.id
LOG.debug("gnocchi project found: %s", self.gnocchi_project_id)
LOG.debug("filtered project found: %s",
self.gnocchi_project_id)
return self._gnocchi_project_id

def _is_swift_account_sample(self, sample):
return bool([rd for rd in self.resources_definition
if rd.cfg['resource_type'] == 'swift_account'
and rd.metric_match(sample['counter_name'])])
and rd.metric_match(sample.name)])

def _is_gnocchi_activity(self, sample):
return (self.filter_service_activity and self.gnocchi_project_id and (
return (self.filter_project and self.gnocchi_project_id and (
# avoid anything from the user used by gnocchi
sample['project_id'] == self.gnocchi_project_id or
sample.project_id == self.gnocchi_project_id or
# avoid anything in the swift account used by gnocchi
(sample['resource_id'] == self.gnocchi_project_id and
(sample.resource_id == self.gnocchi_project_id and
self._is_swift_account_sample(sample))
))

Expand All @@ -287,16 +295,13 @@ def _get_resource_definition_from_event(self, event_type):
if operation:
return rd, operation

def record_metering_data(self, data):
# We may have receive only one counter on the wire
if not isinstance(data, list):
data = [data]
def publish_samples(self, data):
# NOTE(sileht): skip sample generated by gnocchi itself
data = [s for s in data if not self._is_gnocchi_activity(s)]

data.sort(key=lambda s: (s['resource_id'], s['counter_name']))
data.sort(key=lambda s: (s.resource_id, s.name))
resource_grouped_samples = itertools.groupby(
data, key=operator.itemgetter('resource_id'))
data, key=operator.attrgetter('resource_id'))

gnocchi_data = {}
measures = {}
Expand All @@ -308,7 +313,7 @@ def record_metering_data(self, data):
stats['resources'] += 1
metric_grouped_samples = itertools.groupby(
list(samples_of_resource),
key=operator.itemgetter('counter_name'))
key=operator.attrgetter('name'))

res_info = {}
for metric_name, samples in metric_grouped_samples:
Expand All @@ -328,8 +333,8 @@ def record_metering_data(self, data):
res_info['resource_type'] = rd.cfg['resource_type']
res_info.setdefault("resource", {}).update({
"id": resource_id,
"user_id": samples[0]['user_id'],
"project_id": samples[0]['project_id'],
"user_id": samples[0].user_id,
"project_id": samples[0].project_id,
"metrics": rd.metrics,
})

Expand All @@ -338,10 +343,10 @@ def record_metering_data(self, data):
rd.sample_attributes(sample))
m = measures.setdefault(resource_id, {}).setdefault(
metric_name, [])
m.append({'timestamp': sample['timestamp'],
'value': sample['counter_volume']})
unit = sample['counter_unit']
metric = sample['counter_name']
m.append({'timestamp': sample.timestamp,
'value': sample.volume})
unit = sample.unit
metric = sample.name
res_info['resource']['metrics'][metric]['unit'] = unit

stats['measures'] += len(measures[resource_id][metric_name])
Expand Down Expand Up @@ -463,14 +468,14 @@ def _check_resource_cache(self, key, resource_data):
else:
return None

def record_events(self, events):
def publish_events(self, events):
for event in events:
rd = self._get_resource_definition_from_event(event['event_type'])
rd = self._get_resource_definition_from_event(event.event_type)
if not rd:
if event['event_type'] not in self._already_logged_event_types:
if event.event_type not in self._already_logged_event_types:
LOG.debug("No gnocchi definition for event type: %s",
event['event_type'])
self._already_logged_event_types.add(event['event_type'])
event.event_type)
self._already_logged_event_types.add(event.event_type)
continue

rd, operation = rd
Expand Down
Loading

0 comments on commit 83ffaff

Please sign in to comment.