Skip to content

Commit

Permalink
Fix data rereads and replace _fetchWithBootstrap
Browse files Browse the repository at this point in the history
  • Loading branch information
iain-buclaw-sociomantic committed Jul 20, 2016
1 parent cac6197 commit 7d93840
Show file tree
Hide file tree
Showing 3 changed files with 426 additions and 174 deletions.
7 changes: 6 additions & 1 deletion graphite_api/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ def evaluateTarget(requestContext, target, data_store=None):
return result


def evaluateTokens(requestContext, tokens, data_store):
def evaluateTokens(requestContext, tokens, data_store=None):
if data_store is None:
paths = list(pathsFromTokens(tokens))
data_store = fetchData(requestContext, paths)

if tokens.expression:
return evaluateTokens(requestContext, tokens.expression, data_store)

Expand All @@ -49,6 +53,7 @@ def evaluateTokens(requestContext, tokens, data_store):
func = app.functions[tokens.call.funcname]
args = [evaluateTokens(requestContext,
arg, data_store) for arg in tokens.call.args]
requestContext['args'] = tokens.call.args
kwargs = dict([(kwarg.argname,
evaluateTokens(requestContext,
kwarg.args[0],
Expand Down
184 changes: 70 additions & 114 deletions graphite_api/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from six.moves import zip_longest, map, reduce

from .evaluator import evaluateTarget, pathsFromTarget
from .evaluator import evaluateTarget, evaluateTokens, pathsFromTarget
from .render.attime import parseTimeOffset, parseATTime
from .render.glyph import format_units
from .render.datalib import TimeSeries, fetchData
Expand Down Expand Up @@ -706,31 +706,35 @@ def movingMedian(requestContext, seriesList, windowSize):
Takes one metric or a wildcard seriesList followed by a number N of
datapoints or a quoted string with a length of time like '1hour' or '5min'
(See ``from / until`` in the render\_api_ for examples of time formats).
Graphs the median of the preceeding datapoints for each point on the
graph. All previous datapoints are set to None at the beginning of the
graph.
Graphs the median of the preceeding datapoints for each point on the graph.
Example::
&target=movingMedian(Server.instance01.threads.busy,10)
&target=movingMedian(Server.instance*.threads.idle,'5min')
"""
if not seriesList:
return []
windowInterval = None
if isinstance(windowSize, six.string_types):
delta = parseTimeOffset(windowSize)
windowInterval = to_seconds(delta)

if windowInterval:
bootstrapSeconds = windowInterval
previewSeconds = windowInterval
else:
bootstrapSeconds = max([s.step for s in seriesList]) * int(windowSize)

bootstrapList = _fetchWithBootstrap(requestContext, seriesList,
seconds=bootstrapSeconds)
previewSeconds = max([s.step for s in seriesList]) * int(windowSize)

# ignore original data and pull new, including our preview
# data from earlier is needed to calculate the early results
newContext = requestContext.copy()
newContext['startTime'] = (requestContext['startTime'] -
timedelta(seconds=previewSeconds))
previewList = evaluateTokens(newContext, requestContext['args'][0])
result = []

for bootstrap, series in zip_longest(bootstrapList, seriesList):
for series in previewList:
if windowInterval:
windowPoints = windowInterval // series.step
else:
Expand All @@ -739,14 +743,13 @@ def movingMedian(requestContext, seriesList, windowSize):
if isinstance(windowSize, six.string_types):
newName = 'movingMedian(%s,"%s")' % (series.name, windowSize)
else:
newName = "movingMedian(%s,%d)" % (series.name, windowPoints)
newSeries = TimeSeries(newName, series.start, series.end, series.step,
[])
newName = "movingMedian(%s,%s)" % (series.name, windowSize)
newSeries = TimeSeries(newName, series.start + previewSeconds,
series.end, series.step, [])
newSeries.pathExpression = newName

offset = len(bootstrap) - len(series)
for i in range(len(series)):
window = bootstrap[i + offset - windowPoints:i + offset]
for i in range(windowPoints, len(series)):
window = series[i - windowPoints:i]
nonNull = [v for v in window if v is not None]
if nonNull:
m_index = len(nonNull) // 2
Expand Down Expand Up @@ -895,7 +898,6 @@ def movingAverage(requestContext, seriesList, windowSize):
datapoints or a quoted string with a length of time like '1hour' or '5min'
(See ``from / until`` in the render\_api_ for examples of time formats).
Graphs the average of the preceeding datapoints for each point on the
graph. All previous datapoints are set to None at the beginning of the
graph.
Example::
Expand All @@ -912,15 +914,19 @@ def movingAverage(requestContext, seriesList, windowSize):
windowInterval = to_seconds(delta)

if windowInterval:
bootstrapSeconds = windowInterval
previewSeconds = windowInterval
else:
bootstrapSeconds = max([s.step for s in seriesList]) * int(windowSize)

bootstrapList = _fetchWithBootstrap(requestContext, seriesList,
seconds=bootstrapSeconds)
previewSeconds = max([s.step for s in seriesList]) * int(windowSize)

# ignore original data and pull new, including our preview
# data from earlier is needed to calculate the early results
newContext = requestContext.copy()
newContext['startTime'] = (requestContext['startTime'] -
timedelta(seconds=previewSeconds))
previewList = evaluateTokens(newContext, requestContext['args'][0])
result = []

for bootstrap, series in zip_longest(bootstrapList, seriesList):
for series in previewList:
if windowInterval:
windowPoints = windowInterval // series.step
else:
Expand All @@ -930,13 +936,12 @@ def movingAverage(requestContext, seriesList, windowSize):
newName = 'movingAverage(%s,"%s")' % (series.name, windowSize)
else:
newName = "movingAverage(%s,%s)" % (series.name, windowSize)
newSeries = TimeSeries(newName, series.start, series.end, series.step,
[])
newSeries = TimeSeries(newName, series.start + previewSeconds,
series.end, series.step, [])
newSeries.pathExpression = newName

offset = len(bootstrap) - len(series)
for i in range(len(series)):
window = bootstrap[i + offset - windowPoints:i + offset]
for i in range(windowPoints, len(series)):
window = series[i - windowPoints:i]
newSeries.append(safeAvg(window))

result.append(newSeries)
Expand Down Expand Up @@ -2057,83 +2062,6 @@ def secondYAxis(requestContext, seriesList):
return seriesList


def _fetchWithBootstrap(requestContext, seriesList, **delta_kwargs):
"""
Request the same data but with a bootstrap period at the beginning.
"""
bootstrapContext = requestContext.copy()
bootstrapContext['startTime'] = (
requestContext['startTime'] - timedelta(**delta_kwargs))
bootstrapContext['endTime'] = requestContext['startTime']

bootstrapList = []

# Get all paths to fetch
paths = []
for series in seriesList:
if series.pathExpression in [b.pathExpression for b in bootstrapList]:
continue
paths.extend(pathsFromTarget(series.pathExpression))

# Fetch all paths
data_store = fetchData(bootstrapContext, paths)

for series in seriesList:
if series.pathExpression in [b.pathExpression for b in bootstrapList]:
# This pathExpression returns multiple series and we already
# fetched it
continue
bootstraps = evaluateTarget(bootstrapContext,
series.pathExpression,
data_store)
found = dict(((s.name, s) for s in bootstraps))
for s in seriesList:
if s.name not in found:
# bootstrap interval too large for the range available in
# storage. Fill with nulls.
start = epoch(bootstrapContext['startTime'])
end = epoch(bootstrapContext['endTime'])
delta = (end - start) % s.step
values = [None] * delta
found[s.name] = TimeSeries(s.name, start, end, s.step, values)
found[s.name].pathExpression = s.pathExpression
bootstrapList.append(found[s.name])

newSeriesList = []
for bootstrap, original in zip_longest(bootstrapList, seriesList):
newValues = []
if bootstrap.step != original.step:
ratio = bootstrap.step / original.step
for value in bootstrap:
# XXX For series with aggregationMethod = sum this should also
# divide by the ratio to bring counts to the same time unit
# ...but we have no way of knowing whether that's the case
newValues.extend([value] * ratio)
else:
newValues.extend(bootstrap)
newValues.extend(original)

newSeries = TimeSeries(original.name, bootstrap.start, original.end,
original.step, newValues)
newSeries.pathExpression = series.pathExpression
newSeriesList.append(newSeries)

return newSeriesList


def _trimBootstrap(bootstrap, original):
"""
Trim the bootstrap period off the front of this series so it matches the
original.
"""
original_len = len(original)
length_limit = (original_len * original.step) // bootstrap.step
trim_start = bootstrap.end - (length_limit * bootstrap.step)
trimmed = TimeSeries(bootstrap.name, trim_start, bootstrap.end,
bootstrap.step, bootstrap[-length_limit:])
return trimmed


def holtWintersIntercept(alpha, actual, last_season, last_intercept,
last_slope):
return (alpha * (actual - last_season) +
Expand Down Expand Up @@ -2252,11 +2180,23 @@ def holtWintersForecast(requestContext, seriesList):
Performs a Holt-Winters forecast using the series as input data. Data from
one week previous to the series is used to bootstrap the initial forecast.
"""
previewSeconds = 7 * 86400 # 7 days
# ignore original data and pull new, including our preview
newContext = requestContext.copy()
newContext['startTime'] = (requestContext['startTime'] -
timedelta(seconds=previewSeconds))
previewList = evaluateTokens(newContext, requestContext['args'][0])
results = []
bootstrapList = _fetchWithBootstrap(requestContext, seriesList, days=7)
for bootstrap, series in zip_longest(bootstrapList, seriesList):
analysis = holtWintersAnalysis(bootstrap)
results.append(_trimBootstrap(analysis['predictions'], series))
for series in previewList:
analysis = holtWintersAnalysis(series)
predictions = analysis['predictions']
windowPoints = previewSeconds // predictions.step
result = TimeSeries("holtWintersForecast(%s)" % series.name,
predictions.start + previewSeconds,
predictions.end, predictions.step,
predictions[windowPoints:])
result.pathExpression = result.name
results.append(result)
return results


Expand All @@ -2265,12 +2205,28 @@ def holtWintersConfidenceBands(requestContext, seriesList, delta=3):
Performs a Holt-Winters forecast using the series as input data and plots
upper and lower bands with the predicted forecast deviations.
"""
previewSeconds = 7 * 86400 # 7 days
# ignore original data and pull new, including our preview
newContext = requestContext.copy()
newContext['startTime'] = (requestContext['startTime'] -
timedelta(seconds=previewSeconds))
previewList = evaluateTokens(newContext, requestContext['args'][0])
results = []
bootstrapList = _fetchWithBootstrap(requestContext, seriesList, days=7)
for bootstrap, series in zip_longest(bootstrapList, seriesList):
analysis = holtWintersAnalysis(bootstrap)
forecast = _trimBootstrap(analysis['predictions'], series)
deviation = _trimBootstrap(analysis['deviations'], series)
for series in previewList:
analysis = holtWintersAnalysis(series)

data = analysis['predictions']
windowPoints = previewSeconds // data.step
forecast = TimeSeries(data.name, data.start + previewSeconds,
data.end, data.step, data[windowPoints:])
forecast.pathExpression = data.pathExpression

data = analysis['deviations']
windowPoints = previewSeconds // data.step
deviation = TimeSeries(data.name, data.start + previewSeconds,
data.end, data.step, data[windowPoints:])
deviation.pathExpression = data.pathExpression

seriesLength = len(forecast)
i = 0
upperBand = list()
Expand Down
Loading

0 comments on commit 7d93840

Please sign in to comment.