Skip to content

Commit

Permalink
[query] Allow generalization for rate queries. (#2263)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola authored Apr 14, 2020
1 parent f68c9d5 commit fdcd250
Showing 1 changed file with 33 additions and 21 deletions.
54 changes: 33 additions & 21 deletions src/query/functions/temporal/rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,42 +49,53 @@ const (
IncreaseType = "increase"
)

type rateProcessor struct {
isRate, isCounter bool
rateFn rateFn
// RateProcessor is a structure containing details about the rate.
type RateProcessor struct {
IsRate, IsCounter bool
RateFn RateFn
}

func (r rateProcessor) initialize(
func (r RateProcessor) initialize(
duration time.Duration,
_ transform.Options,
) processor {
return &rateNode{
isRate: r.isRate,
isCounter: r.isCounter,
rateFn: r.rateFn,
duration: duration,
isRate: r.IsRate,
isCounter: r.IsCounter,
rateFn: r.RateFn,
duration: duration,
}
}

// NewRateOp creates a new base temporal transform for rate functions
func NewRateOp(args []interface{}, optype string) (transform.Params, error) {
// NewRateOpWithProcessor creates a new base temporal transform for
// the given rate processor.
func NewRateOpWithProcessor(
args []interface{},
opType string,
rateProcessor RateProcessor,
) (transform.Params, error) {
if len(args) != 1 {
return emptyOp,
fmt.Errorf("invalid number of args for %s: %d", optype, len(args))
fmt.Errorf("invalid number of args for %s: %d", opType, len(args))
}

duration, ok := args[0].(time.Duration)
if !ok {
return emptyOp,
fmt.Errorf("unable to cast to scalar argument: %v for %s", args[0], optype)
fmt.Errorf("unable to cast to scalar argument: %v for %s", args[0], opType)
}

return newBaseOp(duration, opType, rateProcessor)
}

// NewRateOp creates a new base temporal transform for rate functions.
func NewRateOp(args []interface{}, opType string) (transform.Params, error) {
var (
isRate, isCounter bool
rateFn = standardRateFunc
)

switch optype {
switch opType {
case IRateType:
isRate = true
rateFn = irateFunc
Expand All @@ -97,19 +108,20 @@ func NewRateOp(args []interface{}, optype string) (transform.Params, error) {
isCounter = true
case DeltaType:
default:
return nil, fmt.Errorf("unknown rate type: %s", optype)
return nil, fmt.Errorf("unknown rate type: %s", opType)
}

r := rateProcessor{
isRate: isRate,
isCounter: isCounter,
rateFn: rateFn,
r := RateProcessor{
IsRate: isRate,
IsCounter: isCounter,
RateFn: rateFn,
}

return newBaseOp(duration, optype, r)
return NewRateOpWithProcessor(args, opType, r)
}

type rateFn func(
// RateFn is a function that calculates rate over the given set of datapoints.
type RateFn func(
datapoints ts.Datapoints,
isRate bool,
isCounter bool,
Expand All @@ -121,7 +133,7 @@ type rateFn func(
type rateNode struct {
isRate, isCounter bool
duration time.Duration
rateFn rateFn
rateFn RateFn
}

func (r *rateNode) process(datapoints ts.Datapoints, bounds iterationBounds) float64 {
Expand Down

0 comments on commit fdcd250

Please sign in to comment.