Skip to content

Commit

Permalink
Merge pull request #45 from groupon/buffer_interpolation
Browse files Browse the repository at this point in the history
Buffer the interpolated Collections for Context/Evaluation stream.
  • Loading branch information
nahratzah authored Sep 17, 2016
2 parents d0e637c + 446a7c5 commit c0d9140
Showing 1 changed file with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import static com.groupon.lex.metrics.history.HistoryContext.LOOK_BACK;
import static com.groupon.lex.metrics.history.HistoryContext.LOOK_FORWARD;
import com.groupon.lex.metrics.lib.BufferedIterator;
import com.groupon.lex.metrics.timeseries.ExpressionLookBack;
import com.groupon.lex.metrics.timeseries.TimeSeriesCollection;
import com.groupon.lex.metrics.timeseries.TimeSeriesMetricDeltaSet;
import com.groupon.lex.metrics.timeseries.TimeSeriesMetricExpression;
import com.groupon.lex.metrics.timeseries.expression.Context;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -65,18 +67,18 @@ public default Stream<TimeSeriesCollection> stream(DateTime begin, DateTime end,

/** Return a History Context for evaluating expressions. */
public default Stream<Context> getContext(Duration stepsize, ExpressionLookBack lookback) {
return HistoryContext.stream(stream(stepsize), lookback);
return HistoryContext.stream(BufferedIterator.stream(ForkJoinPool.commonPool(), stream(stepsize)), lookback);
}

/** Return a History Context for evaluating expressions, starting at the 'begin' timestamp (inclusive). */
public default Stream<Context> getContext(DateTime begin, Duration stepsize, ExpressionLookBack lookback) {
return HistoryContext.stream(stream(begin.minus(lookback.hintDuration()), stepsize), lookback)
return HistoryContext.stream(BufferedIterator.stream(ForkJoinPool.commonPool(), stream(begin.minus(lookback.hintDuration()), stepsize)), lookback)
.filter(ctx -> !ctx.getTSData().getCurrentCollection().getTimestamp().isBefore(begin));
}

/** Return a History Context for evaluating expressions, between the 'begin' timestamp (inclusive) and the 'end' timestamp (inclusive). */
public default Stream<Context> getContext(DateTime begin, DateTime end, Duration stepsize, ExpressionLookBack lookback) {
return HistoryContext.stream(stream(begin.minus(lookback.hintDuration()), end, stepsize), lookback)
return HistoryContext.stream(BufferedIterator.stream(ForkJoinPool.commonPool(), stream(begin.minus(lookback.hintDuration()), end, stepsize)), lookback)
.filter(ctx -> !ctx.getTSData().getCurrentCollection().getTimestamp().isBefore(begin));
}

Expand Down

0 comments on commit c0d9140

Please sign in to comment.