Skip to content

Commit

Permalink
Merge branch 'master' into linasn/find-files-improve-perf
Browse files Browse the repository at this point in the history
* master:
  Fix race when checking for dirty aggregations (#3886)
  [aggregator] Add test coverage to expireValues (#3898)
  [aggregator] Propagate cancellation through tick (#3895)
  • Loading branch information
soundvibe committed Nov 4, 2021
2 parents 3d8cd47 + a4aff33 commit 26815db
Show file tree
Hide file tree
Showing 17 changed files with 1,720 additions and 754 deletions.
4 changes: 3 additions & 1 deletion src/aggregator/aggregation/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ func isExpensive(aggTypes aggregation.Types) bool {
return false
}

func maybeReplaceAnnotation(currentAnnotation, newAnnotation []byte) []byte {
// MaybeReplaceAnnotation replaces the current annotation with the new annotation, returning the updated ref for
// current annotation.
func MaybeReplaceAnnotation(currentAnnotation, newAnnotation []byte) []byte {
if len(newAnnotation) == 0 {
return currentAnnotation
}
Expand Down
2 changes: 1 addition & 1 deletion src/aggregator/aggregation/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (c *Counter) Update(timestamp time.Time, value int64, annotation []byte) {
c.sumSq += value * value
}

c.annotation = maybeReplaceAnnotation(c.annotation, annotation)
c.annotation = MaybeReplaceAnnotation(c.annotation, annotation)
}

// LastAt returns the time of the last value received.
Expand Down
2 changes: 1 addition & 1 deletion src/aggregator/aggregation/gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewGauge(opts Options) Gauge {

// Update updates the gauge value.
func (g *Gauge) Update(timestamp time.Time, value float64, annotation []byte) {
g.annotation = maybeReplaceAnnotation(g.annotation, annotation)
g.annotation = MaybeReplaceAnnotation(g.annotation, annotation)
g.updateTotals(timestamp, value)
// min/max cannot be updated by an update to a value.
if math.IsNaN(g.max) || g.max < value {
Expand Down
2 changes: 1 addition & 1 deletion src/aggregator/aggregation/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (t *Timer) AddBatch(timestamp time.Time, values []float64, annotation []byt

t.stream.AddBatch(values)

t.annotation = maybeReplaceAnnotation(t.annotation, annotation)
t.annotation = MaybeReplaceAnnotation(t.annotation, annotation)
}

func (t *Timer) recordLastAt(timestamp time.Time) {
Expand Down
59 changes: 57 additions & 2 deletions src/aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package aggregator
import (
"context"
"errors"
"fmt"
"math"
"strconv"
"sync"
Expand Down Expand Up @@ -93,6 +94,20 @@ type Aggregator interface {
Close() error
}

type tickShardFn func(
shard *aggregatorShard,
perShardTickDuration time.Duration,
doneCh <-chan struct{},
) tickResult

func tickShard(
shard *aggregatorShard,
perShardTickDuration time.Duration,
doneCh <-chan struct{},
) tickResult {
return shard.Tick(perShardTickDuration, doneCh)
}

// aggregator stores aggregations of different types of metrics (e.g., counter,
// timer, gauges) and periodically flushes them out.
type aggregator struct {
Expand All @@ -101,6 +116,7 @@ type aggregator struct {
opts Options
nowFn clock.NowFn
shardFn sharding.ShardFn
tickShardFn tickShardFn
checkInterval time.Duration
placementManager PlacementManager
flushTimesManager FlushTimesManager
Expand Down Expand Up @@ -133,7 +149,7 @@ func NewAggregator(opts Options) Aggregator {
timerOpts := iOpts.TimerOptions()
logger := iOpts.Logger()

return &aggregator{
agg := &aggregator{
opts: opts,
nowFn: opts.ClockOptions().NowFn(),
shardFn: opts.ShardFn(),
Expand All @@ -150,7 +166,10 @@ func NewAggregator(opts Options) Aggregator {
doneCh: make(chan struct{}),
metrics: newAggregatorMetrics(scope, timerOpts, opts.MaxAllowedForwardingDelayFn()),
logger: logger,
tickShardFn: tickShard,
}

return agg
}

func (agg *aggregator) Open() error {
Expand Down Expand Up @@ -383,6 +402,19 @@ func (agg *aggregator) Close() error {
}
agg.state = aggregatorClosed

var (
lastOpCompleted = time.Now()
closeLogger = agg.logger.With(zap.String("closing", "aggregator"))

logCloseOperation = func(op string) {
currTime := time.Now()
closeLogger.Debug(fmt.Sprintf("closed %s", op),
zap.String("took", currTime.Sub(lastOpCompleted).String()))
lastOpCompleted = currTime
}
)

closeLogger.Info("signaling aggregator done")
close(agg.doneCh)

// Waiting for the ticking goroutines to return.
Expand All @@ -391,17 +423,31 @@ func (agg *aggregator) Close() error {
agg.wg.Wait()
agg.Lock()

logCloseOperation("ticking wait groups")
for _, shardID := range agg.shardIDs {
agg.shards[shardID].Close()
}

logCloseOperation("aggregator shards")
if agg.shardSetOpen {
agg.closeShardSetWithLock()
}

logCloseOperation("flush shard sets")

agg.flushHandler.Close()
logCloseOperation("flush handler")

agg.passthroughWriter.Close()
logCloseOperation("passthrough writer")

if agg.adminClient != nil {
closeLogger.Info("closing admin client")
agg.adminClient.Close()
logCloseOperation("admin client")
}

closeLogger.Info("done")
return nil
}

Expand Down Expand Up @@ -740,7 +786,16 @@ func (agg *aggregator) tickInternal() {
tickResult tickResult
)
for _, shard := range ownedShards {
shardTickResult := shard.Tick(perShardTickDuration)
select {
// NB: if doneCh has been signaled, no need to continue ticking shards and
// it's valid to early abort ticking.
case <-agg.doneCh:
agg.logger.Info("recevied interrupt on tick; aborting")
return
default:
}

shardTickResult := agg.tickShardFn(shard, perShardTickDuration, agg.doneCh)
tickResult = tickResult.merge(shardTickResult)
}
tickDuration := agg.nowFn().Sub(start)
Expand Down
38 changes: 38 additions & 0 deletions src/aggregator/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,44 @@ func TestAggregatorTick(t *testing.T) {
require.NoError(t, agg.Close())
}

func TestAggregatorTickCancelled(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

flushTimesManager := NewMockFlushTimesManager(ctrl)
flushTimesManager.EXPECT().Reset().Return(nil).AnyTimes()
flushTimesManager.EXPECT().Open(gomock.Any()).Return(nil).AnyTimes()
flushTimesManager.EXPECT().Get().Return(nil, nil).AnyTimes()
flushTimesManager.EXPECT().Close().Return(nil).AnyTimes()

agg, _ := testAggregator(t, ctrl)
agg.flushTimesManager = flushTimesManager
require.NoError(t, agg.Open())

var (
tickedCh = make(chan struct{})
numTicked = 0
doneAfterTicks = 2
)

agg.tickShardFn = func(*aggregatorShard, time.Duration, <-chan struct{}) tickResult {
numTicked++
if doneAfterTicks == 2 {
close(tickedCh)
}

time.Sleep(time.Millisecond * 50)
return tickResult{}
}

go func() {
<-tickedCh
require.NoError(t, agg.Close())
}()

require.Equal(t, 2, doneAfterTicks)
}

func TestAggregatorShardSetNotOpenNilInstance(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
Loading

0 comments on commit 26815db

Please sign in to comment.