Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick TimeSeriesDatabase from megacarbon branch #484

Closed
wants to merge 7 commits into from
4 changes: 4 additions & 0 deletions conf/carbon.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
#
#LOCAL_DATA_DIR = /opt/graphite/storage/whisper/

# Specify the database library used to store metric data on disk. Currently
# only the whisper database is supported, but ceres support is coming soon.
DATABASE = whisper

# Enable daily log rotation. If disabled, a new file will be opened whenever the log file path no
# longer exists (i.e. it is removed or renamed)
ENABLE_LOGROTATION = True
Expand Down
27 changes: 10 additions & 17 deletions lib/carbon/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
from optparse import OptionParser
from ConfigParser import ConfigParser

import whisper
from carbon import log
from carbon import log, state
from carbon.database import TimeSeriesDatabase
from carbon.exceptions import CarbonConfigException

from twisted.python import usage
Expand All @@ -45,6 +45,7 @@
LOG_UPDATES=True,
LOG_CACHE_HITS=True,
LOG_CACHE_QUEUE_SORTS=True,
DATABASE='whisper',
WHISPER_AUTOFLUSH=False,
WHISPER_SPARSE_CREATE=False,
WHISPER_FALLOCATE_CREATE=False,
Expand Down Expand Up @@ -235,22 +236,14 @@ def postOptions(self):
print "Error: missing required config %s" % storage_schemas
sys.exit(1)

if settings.WHISPER_AUTOFLUSH:
log.msg("Enabling Whisper autoflush")
whisper.AUTOFLUSH = True

if settings.WHISPER_FALLOCATE_CREATE:
if whisper.CAN_FALLOCATE:
log.msg("Enabling Whisper fallocate support")
else:
log.err("WHISPER_FALLOCATE_CREATE is enabled but linking failed.")
# Database-specific settings
database = settings.DATABASE
if database not in TimeSeriesDatabase.plugins:
print "No database plugin implemented for '%s'" % database
raise SystemExit(1)

if settings.WHISPER_LOCK_WRITES:
if whisper.CAN_LOCK:
log.msg("Enabling Whisper file locking")
whisper.LOCK = True
else:
log.err("WHISPER_LOCK_WRITES is enabled but import of fcntl module failed.")
database_class = TimeSeriesDatabase.plugins[database]
state.database = database_class(settings)

if not "action" in self:
self["action"] = "start"
Expand Down
115 changes: 115 additions & 0 deletions lib/carbon/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""Copyright 2009 Chris Davis

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""

import os
from os.path import exists, dirname, join, sep
from carbon.util import PluginRegistrar
from carbon import log


class TimeSeriesDatabase(object):
"Abstract base class for Carbon database backends."
__metaclass__ = PluginRegistrar
plugins = {}

def write(self, metric, datapoints):
"Persist datapoints in the database for metric."
raise NotImplemented()

def exists(self, metric):
"Return True if the given metric path exists, False otherwise."
raise NotImplemented()

def create(self, metric, retentions, xfilesfactor, aggregation_method):
"Create an entry in the database for metric using options."
raise NotImplemented()

def getMetadata(self, metric, key):
"Lookup metric metadata."
raise NotImplemented()

def setMetadata(self, metric, key, value):
"Modify metric metadata."
raise NotImplemented()

def getFilesystemPath(self, metric):
"Return filesystem path for metric, defaults to None."
pass


try:
import whisper
except ImportError:
pass
else:
class WhisperDatabase(TimeSeriesDatabase):
plugin_name = 'whisper'

def __init__(self, settings):
self.data_dir = settings.LOCAL_DATA_DIR
self.sparse_create = settings.WHISPER_SPARSE_CREATE
self.fallocate_create = settings.WHISPER_FALLOCATE_CREATE
if settings.WHISPER_AUTOFLUSH:
log.msg("Enabling Whisper autoflush")
whisper.AUTOFLUSH = True

if settings.WHISPER_FALLOCATE_CREATE:
if whisper.CAN_FALLOCATE:
log.msg("Enabling Whisper fallocate support")
else:
log.err("WHISPER_FALLOCATE_CREATE is enabled but linking failed.")

if settings.WHISPER_LOCK_WRITES:
if whisper.CAN_LOCK:
log.msg("Enabling Whisper file locking")
whisper.LOCK = True
else:
log.err("WHISPER_LOCK_WRITES is enabled but import of fcntl module failed.")

def write(self, metric, datapoints):
path = self.getFilesystemPath(metric)
whisper.update_many(path, datapoints)

def exists(self, metric):
return exists(self.getFilesystemPath(metric))

def create(self, metric, retentions, xfilesfactor, aggregation_method):
path = self.getFilesystemPath(metric)
directory = dirname(path)
try:
if not exists(directory):
os.makedirs(directory)
except OSError, e:
log.err("%s" % e)

whisper.create(path, retentions, xfilesfactor, aggregation_method,
self.sparse_create, self.fallocate_create)

def getMetadata(self, metric, key):
if key != 'aggregationMethod':
raise ValueError("Unsupported metadata key \"%s\"" % key)

wsp_path = self.getFilesystemPath(metric)
return whisper.info(wsp_path)['aggregationMethod']

def setMetadata(self, metric, key, value):
if key != 'aggregationMethod':
raise ValueError("Unsupported metadata key \"%s\"" % key)

wsp_path = self.getFilesystemPath(metric)
return whisper.setAggregationMethod(wsp_path, value)

def getFilesystemPath(self, metric):
metric_path = metric.replace('.', sep).lstrip(sep) + '.wsp'
return join(self.data_dir, metric_path)
17 changes: 3 additions & 14 deletions lib/carbon/management.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,19 @@
import traceback
import whisper
from carbon import log
from carbon.storage import getFilesystemPath

from carbon import log, state


def getMetadata(metric, key):
if key != 'aggregationMethod':
return dict(error="Unsupported metadata key \"%s\"" % key)

wsp_path = getFilesystemPath(metric)
try:
value = whisper.info(wsp_path)['aggregationMethod']
value = state.database.getMetadata(metric, key)
return dict(value=value)
except Exception:
log.err()
return dict(error=traceback.format_exc())


def setMetadata(metric, key, value):
if key != 'aggregationMethod':
return dict(error="Unsupported metadata key \"%s\"" % key)

wsp_path = getFilesystemPath(metric)
try:
old_value = whisper.setAggregationMethod(wsp_path, value)
old_value = state.database.setMetadata(metric, key, value)
return dict(old_value=old_value, new_value=value)
except Exception:
log.err()
Expand Down
1 change: 1 addition & 0 deletions lib/carbon/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
client_manager = None
connectedMetricReceiverProtocols = set()
pipeline_processors = []
database = None
7 changes: 3 additions & 4 deletions lib/carbon/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,19 @@
import os, re
import whisper

from os.path import join, exists, sep
from os.path import join, exists
from carbon.conf import OrderedConfigParser, settings
from carbon.exceptions import CarbonConfigException
from carbon.util import pickle
from carbon import log
from carbon import log, state


STORAGE_SCHEMAS_CONFIG = join(settings.CONF_DIR, 'storage-schemas.conf')
STORAGE_AGGREGATION_CONFIG = join(settings.CONF_DIR, 'storage-aggregation.conf')
STORAGE_LISTS_DIR = join(settings.CONF_DIR, 'lists')

def getFilesystemPath(metric):
metric_path = metric.replace('.',sep).lstrip(sep) + '.wsp'
return join(settings.LOCAL_DATA_DIR, metric_path)
return state.database.getFilesystemPath(metric)


class Schema:
Expand Down
14 changes: 7 additions & 7 deletions lib/carbon/tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from unittest import TestCase
from mock import patch

from carbon.tests.util import TestSettings
from carbon.database import WhisperDatabase

# class NoConfigSchemaLoadingTest(TestCase):

Expand Down Expand Up @@ -90,15 +92,13 @@ class getFilesystemPathTest(TestCase):
def setUp(self):
self._sep_patch = patch.object(os.path, 'sep', "/")
self._sep_patch.start()
settings = {
'LOCAL_DATA_DIR': '/tmp/',
'CONF_DIR': '',
}
self._settings_patch = patch.dict('carbon.conf.settings', settings)
self._settings_patch.start()
settings = TestSettings()
settings['LOCAL_DATA_DIR'] = '/tmp/'
self._database_patch = patch('carbon.state.database', new=WhisperDatabase(settings))
self._database_patch.start()

def tearDown(self):
self._settings_patch.stop()
self._database_patch.stop()
self._sep_patch.stop()

def test_getFilesystemPath(self):
Expand Down
21 changes: 3 additions & 18 deletions lib/carbon/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@
See the License for the specific language governing permissions and
limitations under the License."""

import os
import time
from os.path import exists, dirname

import whisper
from carbon import state
from carbon.cache import MetricCache
from carbon.storage import getFilesystemPath, loadStorageSchemas,\
Expand Down Expand Up @@ -64,7 +61,7 @@ def optimalWriteOrder():
events.cacheSpaceAvailable()

dbFilePath = getFilesystemPath(metric)
dbFileExists = exists(dbFilePath)
dbFileExists = state.database.exists(metric)

if not dbFileExists and CREATE_BUCKET:
# If our tokenbucket has enough tokens available to create a new metric
Expand Down Expand Up @@ -109,22 +106,10 @@ def writeCachedDataPoints():
if not archiveConfig:
raise Exception("No storage schema matched the metric '%s', check your storage-schemas.conf file." % metric)

dbDir = dirname(dbFilePath)
try:
if not exists(dbDir):
os.makedirs(dbDir)
except OSError, e:
log.err("%s" % e)
log.creates("creating database file %s (archive=%s xff=%s agg=%s)" %
(dbFilePath, archiveConfig, xFilesFactor, aggregationMethod))
try:
whisper.create(
dbFilePath,
archiveConfig,
xFilesFactor,
aggregationMethod,
settings.WHISPER_SPARSE_CREATE,
settings.WHISPER_FALLOCATE_CREATE)
state.database.create(metric, archiveConfig, xFilesFactor, aggregationMethod)
instrumentation.increment('creates')
except Exception:
log.err("Error creating %s" % (dbFilePath))
Expand All @@ -134,7 +119,7 @@ def writeCachedDataPoints():
UPDATE_BUCKET.drain(1, blocking=True)
try:
t1 = time.time()
whisper.update_many(dbFilePath, datapoints)
state.database.write(metric, datapoints)
updateTime = time.time() - t1
except Exception:
log.msg("Error writing to %s" % (dbFilePath))
Expand Down