From 3d260b0f663b5577bc3a0fc3f0741802109a28c4 Mon Sep 17 00:00:00 2001 From: Iain Buclaw Date: Wed, 2 Dec 2015 13:39:46 +0100 Subject: [PATCH] Cherry-pick TimeSeriesDatabase from megacarbon branch --- conf/carbon.conf.example | 4 ++ lib/carbon/conf.py | 27 +++----- lib/carbon/database.py | 115 +++++++++++++++++++++++++++++++ lib/carbon/management.py | 17 +---- lib/carbon/state.py | 1 + lib/carbon/storage.py | 7 +- lib/carbon/tests/test_storage.py | 14 ++-- lib/carbon/writer.py | 21 +----- 8 files changed, 146 insertions(+), 60 deletions(-) create mode 100644 lib/carbon/database.py diff --git a/conf/carbon.conf.example b/conf/carbon.conf.example index 49e80fc9d..8983879c5 100644 --- a/conf/carbon.conf.example +++ b/conf/carbon.conf.example @@ -30,6 +30,10 @@ # #LOCAL_DATA_DIR = /opt/graphite/storage/whisper/ +# Specify the database library used to store metric data on disk. Currently +# only the whisper database is supported, but ceres support is coming soon. +DATABASE = whisper + # Enable daily log rotation. If disabled, a new file will be opened whenever the log file path no # longer exists (i.e. it is removed or renamed) ENABLE_LOGROTATION = True diff --git a/lib/carbon/conf.py b/lib/carbon/conf.py index 631dd664f..24fff28b0 100644 --- a/lib/carbon/conf.py +++ b/lib/carbon/conf.py @@ -21,8 +21,8 @@ from optparse import OptionParser from ConfigParser import ConfigParser -import whisper -from carbon import log +from carbon import log, state +from carbon.database import TimeSeriesDatabase from carbon.exceptions import CarbonConfigException from twisted.python import usage @@ -45,6 +45,7 @@ LOG_UPDATES=True, LOG_CACHE_HITS=True, LOG_CACHE_QUEUE_SORTS=True, + DATABASE='whisper', WHISPER_AUTOFLUSH=False, WHISPER_SPARSE_CREATE=False, WHISPER_FALLOCATE_CREATE=False, @@ -237,22 +238,14 @@ def postOptions(self): print "Error: missing required config %s" % storage_schemas sys.exit(1) - if settings.WHISPER_AUTOFLUSH: - log.msg("Enabling Whisper autoflush") - whisper.AUTOFLUSH = True - - if settings.WHISPER_FALLOCATE_CREATE: - if whisper.CAN_FALLOCATE: - log.msg("Enabling Whisper fallocate support") - else: - log.err("WHISPER_FALLOCATE_CREATE is enabled but linking failed.") + # Database-specific settings + database = settings.DATABASE + if database not in TimeSeriesDatabase.plugins: + print "No database plugin implemented for '%s'" % database + raise SystemExit(1) - if settings.WHISPER_LOCK_WRITES: - if whisper.CAN_LOCK: - log.msg("Enabling Whisper file locking") - whisper.LOCK = True - else: - log.err("WHISPER_LOCK_WRITES is enabled but import of fcntl module failed.") + database_class = TimeSeriesDatabase.plugins[database] + state.database = database_class(settings) if not "action" in self: self["action"] = "start" diff --git a/lib/carbon/database.py b/lib/carbon/database.py new file mode 100644 index 000000000..1e28bf325 --- /dev/null +++ b/lib/carbon/database.py @@ -0,0 +1,115 @@ +"""Copyright 2009 Chris Davis + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" + +import os +from os.path import exists, dirname, join, sep +from carbon.util import PluginRegistrar +from carbon import log + + +class TimeSeriesDatabase(object): + "Abstract base class for Carbon database backends." + __metaclass__ = PluginRegistrar + plugins = {} + + def write(self, metric, datapoints): + "Persist datapoints in the database for metric." + raise NotImplemented() + + def exists(self, metric): + "Return True if the given metric path exists, False otherwise." + raise NotImplemented() + + def create(self, metric, retentions, xfilesfactor, aggregation_method): + "Create an entry in the database for metric using options." + raise NotImplemented() + + def getMetadata(self, metric, key): + "Lookup metric metadata." + raise NotImplemented() + + def setMetadata(self, metric, key, value): + "Modify metric metadata." + raise NotImplemented() + + def getFilesystemPath(self, metric): + "Return filesystem path for metric, defaults to None." + pass + + +try: + import whisper +except ImportError: + pass +else: + class WhisperDatabase(TimeSeriesDatabase): + plugin_name = 'whisper' + + def __init__(self, settings): + self.data_dir = settings.LOCAL_DATA_DIR + self.sparse_create = settings.WHISPER_SPARSE_CREATE + self.fallocate_create = settings.WHISPER_FALLOCATE_CREATE + if settings.WHISPER_AUTOFLUSH: + log.msg("Enabling Whisper autoflush") + whisper.AUTOFLUSH = True + + if settings.WHISPER_FALLOCATE_CREATE: + if whisper.CAN_FALLOCATE: + log.msg("Enabling Whisper fallocate support") + else: + log.err("WHISPER_FALLOCATE_CREATE is enabled but linking failed.") + + if settings.WHISPER_LOCK_WRITES: + if whisper.CAN_LOCK: + log.msg("Enabling Whisper file locking") + whisper.LOCK = True + else: + log.err("WHISPER_LOCK_WRITES is enabled but import of fcntl module failed.") + + def write(self, metric, datapoints): + path = self.getFilesystemPath(metric) + whisper.update_many(path, datapoints) + + def exists(self, metric): + return exists(self.getFilesystemPath(metric)) + + def create(self, metric, retentions, xfilesfactor, aggregation_method): + path = self.getFilesystemPath(metric) + directory = dirname(path) + try: + if not exists(directory): + os.makedirs(directory) + except OSError, e: + log.err("%s" % e) + + whisper.create(path, retentions, xfilesfactor, aggregation_method, + self.sparse_create, self.fallocate_create) + + def getMetadata(self, metric, key): + if key != 'aggregationMethod': + raise ValueError("Unsupported metadata key \"%s\"" % key) + + wsp_path = self.getFilesystemPath(metric) + return whisper.info(wsp_path)['aggregationMethod'] + + def setMetadata(self, metric, key, value): + if key != 'aggregationMethod': + raise ValueError("Unsupported metadata key \"%s\"" % key) + + wsp_path = self.getFilesystemPath(metric) + return whisper.setAggregationMethod(wsp_path, value) + + def getFilesystemPath(self, metric): + metric_path = metric.replace('.', sep).lstrip(sep) + '.wsp' + return join(self.data_dir, metric_path) diff --git a/lib/carbon/management.py b/lib/carbon/management.py index 063550580..c95873f22 100644 --- a/lib/carbon/management.py +++ b/lib/carbon/management.py @@ -1,17 +1,10 @@ import traceback -import whisper -from carbon import log -from carbon.storage import getFilesystemPath - +from carbon import log, state def getMetadata(metric, key): - if key != 'aggregationMethod': - return dict(error="Unsupported metadata key \"%s\"" % key) - - wsp_path = getFilesystemPath(metric) try: - value = whisper.info(wsp_path)['aggregationMethod'] + value = state.database.getMetadata(metric, key) return dict(value=value) except Exception: log.err() @@ -19,12 +12,8 @@ def getMetadata(metric, key): def setMetadata(metric, key, value): - if key != 'aggregationMethod': - return dict(error="Unsupported metadata key \"%s\"" % key) - - wsp_path = getFilesystemPath(metric) try: - old_value = whisper.setAggregationMethod(wsp_path, value) + old_value = state.database.setMetadata(metric, key, value) return dict(old_value=old_value, new_value=value) except Exception: log.err() diff --git a/lib/carbon/state.py b/lib/carbon/state.py index 657a1c1e8..33664f6a3 100644 --- a/lib/carbon/state.py +++ b/lib/carbon/state.py @@ -8,3 +8,4 @@ client_manager = None connectedMetricReceiverProtocols = set() pipeline_processors = [] +database = None diff --git a/lib/carbon/storage.py b/lib/carbon/storage.py index 4aa1fc8d1..db923fc7b 100644 --- a/lib/carbon/storage.py +++ b/lib/carbon/storage.py @@ -15,11 +15,11 @@ import os, re import whisper -from os.path import join, exists, sep +from os.path import join, exists from carbon.conf import OrderedConfigParser, settings from carbon.exceptions import CarbonConfigException from carbon.util import pickle -from carbon import log +from carbon import log, state STORAGE_SCHEMAS_CONFIG = join(settings.CONF_DIR, 'storage-schemas.conf') @@ -27,8 +27,7 @@ STORAGE_LISTS_DIR = join(settings.CONF_DIR, 'lists') def getFilesystemPath(metric): - metric_path = metric.replace('.',sep).lstrip(sep) + '.wsp' - return join(settings.LOCAL_DATA_DIR, metric_path) + return state.database.getFilesystemPath(metric) class Schema: diff --git a/lib/carbon/tests/test_storage.py b/lib/carbon/tests/test_storage.py index 89211e1f6..5aadbea6f 100644 --- a/lib/carbon/tests/test_storage.py +++ b/lib/carbon/tests/test_storage.py @@ -2,6 +2,8 @@ from unittest import TestCase from mock import patch +from carbon.tests.util import TestSettings +from carbon.database import WhisperDatabase # class NoConfigSchemaLoadingTest(TestCase): @@ -90,15 +92,13 @@ class getFilesystemPathTest(TestCase): def setUp(self): self._sep_patch = patch.object(os.path, 'sep', "/") self._sep_patch.start() - settings = { - 'LOCAL_DATA_DIR': '/tmp/', - 'CONF_DIR': '', - } - self._settings_patch = patch.dict('carbon.conf.settings', settings) - self._settings_patch.start() + settings = TestSettings() + settings['LOCAL_DATA_DIR'] = '/tmp/' + self._database_patch = patch('carbon.state.database', new=WhisperDatabase(settings)) + self._database_patch.start() def tearDown(self): - self._settings_patch.stop() + self._database_patch.stop() self._sep_patch.stop() def test_getFilesystemPath(self): diff --git a/lib/carbon/writer.py b/lib/carbon/writer.py index 718cd13ea..58f3d1a0c 100644 --- a/lib/carbon/writer.py +++ b/lib/carbon/writer.py @@ -12,11 +12,8 @@ See the License for the specific language governing permissions and limitations under the License.""" -import os import time -from os.path import exists, dirname -import whisper from carbon import state from carbon.cache import MetricCache from carbon.storage import getFilesystemPath, loadStorageSchemas,\ @@ -64,7 +61,7 @@ def optimalWriteOrder(): events.cacheSpaceAvailable() dbFilePath = getFilesystemPath(metric) - dbFileExists = exists(dbFilePath) + dbFileExists = state.database.exists(metric) if not dbFileExists and CREATE_BUCKET: # If our tokenbucket has enough tokens available to create a new metric @@ -109,22 +106,10 @@ def writeCachedDataPoints(): if not archiveConfig: raise Exception("No storage schema matched the metric '%s', check your storage-schemas.conf file." % metric) - dbDir = dirname(dbFilePath) - try: - if not exists(dbDir): - os.makedirs(dbDir) - except OSError, e: - log.err("%s" % e) log.creates("creating database file %s (archive=%s xff=%s agg=%s)" % (dbFilePath, archiveConfig, xFilesFactor, aggregationMethod)) try: - whisper.create( - dbFilePath, - archiveConfig, - xFilesFactor, - aggregationMethod, - settings.WHISPER_SPARSE_CREATE, - settings.WHISPER_FALLOCATE_CREATE) + state.database.create(metric, archiveConfig, xFilesFactor, aggregationMethod) instrumentation.increment('creates') except Exception: log.err("Error creating %s" % (dbFilePath)) @@ -134,7 +119,7 @@ def writeCachedDataPoints(): UPDATE_BUCKET.drain(1, blocking=True) try: t1 = time.time() - whisper.update_many(dbFilePath, datapoints) + state.database.write(metric, datapoints) updateTime = time.time() - t1 except Exception: log.msg("Error writing to %s" % (dbFilePath))