Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aggregator] Propagate cancellation through tick #3895

Merged
merged 8 commits into from
Nov 3, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 54 additions & 3 deletions src/aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package aggregator
import (
"context"
"errors"
"fmt"
"math"
"strconv"
"sync"
Expand Down Expand Up @@ -93,6 +94,11 @@ type Aggregator interface {
Close() error
}

type tickShardFn func(
shard *aggregatorShard,
perShardTickDuration time.Duration,
) tickResult

// aggregator stores aggregations of different types of metrics (e.g., counter,
// timer, gauges) and periodically flushes them out.
type aggregator struct {
Expand All @@ -101,6 +107,7 @@ type aggregator struct {
opts Options
nowFn clock.NowFn
shardFn sharding.ShardFn
tickShardFn tickShardFn
checkInterval time.Duration
placementManager PlacementManager
flushTimesManager FlushTimesManager
Expand Down Expand Up @@ -133,7 +140,7 @@ func NewAggregator(opts Options) Aggregator {
timerOpts := iOpts.TimerOptions()
logger := iOpts.Logger()

return &aggregator{
agg := &aggregator{
opts: opts,
nowFn: opts.ClockOptions().NowFn(),
shardFn: opts.ShardFn(),
Expand All @@ -151,6 +158,9 @@ func NewAggregator(opts Options) Aggregator {
metrics: newAggregatorMetrics(scope, timerOpts, opts.MaxAllowedForwardingDelayFn()),
logger: logger,
}

agg.tickShardFn = agg.tickShard
arnikola marked this conversation as resolved.
Show resolved Hide resolved
return agg
}

func (agg *aggregator) Open() error {
Expand Down Expand Up @@ -383,6 +393,20 @@ func (agg *aggregator) Close() error {
}
agg.state = aggregatorClosed

var (
lastOpCompleted = time.Now()
currTime time.Time
closeLogger = agg.logger.With(zap.String("closing", "aggregator"))

logCloseOperation = func(op string) {
currTime = time.Now()
arnikola marked this conversation as resolved.
Show resolved Hide resolved
closeLogger.Info(fmt.Sprintf("closed %s", op),
zap.String("took", currTime.Sub(lastOpCompleted).String()))
lastOpCompleted = currTime
}
)

closeLogger.Info("signaling aggregator done")
close(agg.doneCh)

// Waiting for the ticking goroutines to return.
Expand All @@ -391,17 +415,31 @@ func (agg *aggregator) Close() error {
agg.wg.Wait()
agg.Lock()

logCloseOperation("ticking wait groups")
for _, shardID := range agg.shardIDs {
agg.shards[shardID].Close()
}

logCloseOperation("aggregator shards")
if agg.shardSetOpen {
agg.closeShardSetWithLock()
}

logCloseOperation("flush shard sets")

agg.flushHandler.Close()
logCloseOperation("flush handler")

agg.passthroughWriter.Close()
logCloseOperation("passthrough writer")

if agg.adminClient != nil {
closeLogger.Info("closing admin client")
agg.adminClient.Close()
logCloseOperation("admin client")
}

closeLogger.Info("done")
return nil
}

Expand Down Expand Up @@ -723,6 +761,13 @@ func (agg *aggregator) tick() {
}
}

func (agg *aggregator) tickShard(
shard *aggregatorShard,
perShardTickDuration time.Duration,
) tickResult {
return shard.Tick(perShardTickDuration, agg.doneCh)
}

func (agg *aggregator) tickInternal() {
ownedShards, closingShards := agg.ownedShards()
agg.closeShardsAsync(closingShards)
Expand All @@ -740,8 +785,14 @@ func (agg *aggregator) tickInternal() {
tickResult tickResult
)
for _, shard := range ownedShards {
shardTickResult := shard.Tick(perShardTickDuration)
tickResult = tickResult.merge(shardTickResult)
select {
case <-agg.doneCh:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we don't actually need to check here right? since we check within the tickShardFn when iterating over every metric right? just checking my understanding.

the only reason I mention this nit, is it at first it reads like we only check once per shard, which would be bad.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is if we've signaled on doneCh, we can avoid ticking the remainder of the shards, I'll make this more explicit

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah right. good point. nah it makes sense.

agg.logger.Info("recevied interrupt on tick; aborting")
return
default:
shardTickResult := agg.tickShardFn(shard, perShardTickDuration)
tickResult = tickResult.merge(shardTickResult)
}
}
tickDuration := agg.nowFn().Sub(start)
agg.metrics.tick.Report(tickResult, tickDuration)
Expand Down
38 changes: 38 additions & 0 deletions src/aggregator/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,44 @@ func TestAggregatorTick(t *testing.T) {
require.NoError(t, agg.Close())
}

func TestAggregatorTickCancelled(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

flushTimesManager := NewMockFlushTimesManager(ctrl)
flushTimesManager.EXPECT().Reset().Return(nil).AnyTimes()
flushTimesManager.EXPECT().Open(gomock.Any()).Return(nil).AnyTimes()
flushTimesManager.EXPECT().Get().Return(nil, nil).AnyTimes()
flushTimesManager.EXPECT().Close().Return(nil).AnyTimes()

agg, _ := testAggregator(t, ctrl)
agg.flushTimesManager = flushTimesManager
require.NoError(t, agg.Open())

var (
tickedCh = make(chan struct{})
numTicked = 0
doneAfterTicks = 2
)

agg.tickShardFn = func(*aggregatorShard, time.Duration) tickResult {
numTicked++
if doneAfterTicks == 2 {
close(tickedCh)
}

time.Sleep(time.Millisecond * 50)
return tickResult{}
}

go func() {
<-tickedCh
require.NoError(t, agg.Close())
}()

require.Equal(t, 2, doneAfterTicks)
}

func TestAggregatorShardSetNotOpenNilInstance(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
31 changes: 28 additions & 3 deletions src/aggregator/aggregator/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,11 @@ func (m *metricMap) AddForwarded(
return err
}

func (m *metricMap) Tick(target time.Duration) tickResult {
mapTickRes := m.tick(target)
func (m *metricMap) Tick(
target time.Duration,
doneCh chan struct{},
) tickResult {
mapTickRes := m.tick(target, doneCh)
listsTickRes := m.metricLists.Tick()
mapTickRes.standard.activeElems = listsTickRes.standard
mapTickRes.forwarded.activeElems = listsTickRes.forwarded
Expand Down Expand Up @@ -320,7 +323,10 @@ func (m *metricMap) lookupEntryWithLock(key entryKey) (*Entry, bool) {
// tick performs two operations:
// 1. Delete entries that have expired, and report the number of expired entries.
// 2. Report number of standard entries and forwarded entries that are active.
func (m *metricMap) tick(target time.Duration) tickResult {
func (m *metricMap) tick(
target time.Duration,
doneCh chan struct{},
) tickResult {
// Determine batch size.
m.RLock()
numEntries := m.entryList.Len()
Expand All @@ -340,8 +346,27 @@ func (m *metricMap) tick(target time.Duration) tickResult {
numTimedActive int
numTimedExpired int
entryIdx int

done bool
)

// NB: if no doneChan provided, do not interrupt the tick.
if doneCh == nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when is a done chan not provided?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be if everything is hooked up correctly, this is largely just to be defensive here; might remove it and hard panic or better yet error if we don't have a chan passed in to make it more explicit

Copy link
Collaborator Author

@arnikola arnikola Nov 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, actually... TIL that you can use nil channels in a select https://play.golang.org/p/DltRooS3v7D, just going to get rid of this section

doneCh = make(chan struct{})
}

m.forEachEntry(func(entry hashedEntry) {
if done {
return
}

select {
case <-doneCh:
done = true
return
default:
}

now := m.nowFn()
if entryIdx > 0 && entryIdx%defaultSoftDeadlineCheckEvery == 0 {
targetDeadline := start.Add(time.Duration(entryIdx) * perEntrySoftDeadline)
Expand Down
45 changes: 44 additions & 1 deletion src/aggregator/aggregator/map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func TestMetricMapDeleteExpired(t *testing.T) {
}

// Delete expired entries.
m.tick(opts.EntryCheckInterval())
m.tick(opts.EntryCheckInterval(), nil)

// Assert there should be only half of the entries left.
require.Equal(t, numEntries/2, len(m.entries))
Expand All @@ -525,3 +525,46 @@ func TestMetricMapDeleteExpired(t *testing.T) {
require.NotNil(t, e.entry)
}
}

func TestMetricMapTickCancellation(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

opts := testOptions(ctrl)
m := newMetricMap(testShard, opts)

numBatchesProcessed := 0
tickedCh := make(chan struct{})

m.sleepFn = func(d time.Duration) {
numBatchesProcessed++
if numBatchesProcessed == 10 {
close(tickedCh)
}

time.Sleep(d)
}

doneCh := make(chan struct{})
go func() {
<-tickedCh
close(doneCh)
}()

// NB: wait/early exit on every defaultSoftDeadlineCheckEvery
numEntries := defaultSoftDeadlineCheckEvery * 60
for i := 0; i < numEntries; i++ {
key := entryKey{
metricType: metricType(metric.CounterType),
idHash: hash.Murmur3Hash128([]byte(fmt.Sprintf("%d", i))),
}

m.entries[key] = m.entryList.PushBack(hashedEntry{
key: key,
entry: NewEntry(m.metricLists, runtime.NewOptions(), opts),
})
}

m.Tick(time.Minute, doneCh)
require.Equal(t, 10, numBatchesProcessed)
}
7 changes: 5 additions & 2 deletions src/aggregator/aggregator/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,11 @@ func (s *aggregatorShard) AddForwarded(
return nil
}

func (s *aggregatorShard) Tick(target time.Duration) tickResult {
return s.metricMap.Tick(target)
func (s *aggregatorShard) Tick(
target time.Duration,
doneCh chan struct{},
) tickResult {
return s.metricMap.Tick(target, doneCh)
}

func (s *aggregatorShard) Close() {
Expand Down
49 changes: 43 additions & 6 deletions src/cmd/services/m3aggregator/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package serve

import (
"fmt"
"time"

"github.com/m3db/m3/src/aggregator/aggregator"
httpserver "github.com/m3db/m3/src/aggregator/server/http"
Expand All @@ -38,9 +39,23 @@ func Serve(
doneCh chan struct{},
opts Options,
) error {
iOpts := opts.InstrumentOpts()
log := iOpts.Logger()
defer aggregator.Close()
var (
iOpts = opts.InstrumentOpts()
log = iOpts.Logger()
closeLogger = log.With(zap.String("closing", "aggregator_server"))
)

defer func() {
start := time.Now()
closeLogger.Info("closing aggregator")
err := aggregator.Close()
fields := []zap.Field{zap.String("took", time.Since(start).String())}
if err != nil {
closeLogger.Warn("closed aggregator with error", append(fields, zap.Error(err))...)
} else {
closeLogger.Info("closed aggregator", fields...)
}
}()

if m3msgAddr := opts.M3MsgAddr(); m3msgAddr != "" {
serverOpts := opts.M3MsgServerOpts()
Expand All @@ -51,7 +66,14 @@ func Serve(
if err := m3msgServer.ListenAndServe(); err != nil {
return fmt.Errorf("could not start m3msg server at: addr=%s, err=%v", m3msgAddr, err)
}
defer m3msgServer.Close()

defer func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could probably add all of this logging into x/server server.go Close func so you don't need to do these custom defer func

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though prob only a subset of the servers we want this for

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm actually not all of these are using this x/server, e.g. the http handler just implements x/server here, so this may not be the best approach

start := time.Now()
closeLogger.Info("closing m3msg server")
m3msgServer.Close()
closeLogger.Info("m3msg server closed", zap.String("took", time.Since(start).String()))
}()

log.Info("m3msg server listening", zap.String("addr", m3msgAddr))
}

Expand All @@ -61,7 +83,14 @@ func Serve(
if err := rawTCPServer.ListenAndServe(); err != nil {
return fmt.Errorf("could not start raw TCP server at: addr=%s, err=%v", rawTCPAddr, err)
}
defer rawTCPServer.Close()

defer func() {
start := time.Now()
closeLogger.Info("closing raw TCPServer")
rawTCPServer.Close()
closeLogger.Info("closed raw TCPServer", zap.String("took", time.Since(start).String()))
}()

log.Info("raw TCP server listening", zap.String("addr", rawTCPAddr))
}

Expand All @@ -72,12 +101,20 @@ func Serve(
if err := httpServer.ListenAndServe(); err != nil {
return fmt.Errorf("could not start http server at: addr=%s, err=%v", httpAddr, err)
}
defer httpServer.Close()

defer func() {
start := time.Now()
closeLogger.Info("closing http server")
httpServer.Close()
closeLogger.Info("closed http server", zap.String("took", time.Since(start).String()))
}()

log.Info("http server listening", zap.String("addr", httpAddr))
}

// Wait for exit signal.
<-doneCh
closeLogger.Info("server signaled on doneCh")

return nil
}