From d087eb4b20a702668f8b5a711f7673867a4ae414 Mon Sep 17 00:00:00 2001 From: Artem Date: Tue, 2 Nov 2021 06:24:11 -0400 Subject: [PATCH 1/7] [aggregator] Propagate cancellation through tick --- src/aggregator/aggregator/aggregator.go | 41 +++++++++++++++-- src/aggregator/aggregator/aggregator_test.go | 40 +++++++++++++++++ src/aggregator/aggregator/map.go | 31 +++++++++++-- src/aggregator/aggregator/map_test.go | 46 +++++++++++++++++++- src/aggregator/aggregator/shard.go | 7 ++- src/cmd/services/m3aggregator/serve/serve.go | 36 +++++++++++++-- 6 files changed, 188 insertions(+), 13 deletions(-) diff --git a/src/aggregator/aggregator/aggregator.go b/src/aggregator/aggregator/aggregator.go index 96b57994de..e0e7aade5f 100644 --- a/src/aggregator/aggregator/aggregator.go +++ b/src/aggregator/aggregator/aggregator.go @@ -93,6 +93,12 @@ type Aggregator interface { Close() error } +type tickShardFn func( + shard *aggregatorShard, + perShardTickDuration time.Duration, + doneCh chan struct{}, +) tickResult + // aggregator stores aggregations of different types of metrics (e.g., counter, // timer, gauges) and periodically flushes them out. type aggregator struct { @@ -101,6 +107,7 @@ type aggregator struct { opts Options nowFn clock.NowFn shardFn sharding.ShardFn + tickShardFn tickShardFn checkInterval time.Duration placementManager PlacementManager flushTimesManager FlushTimesManager @@ -133,7 +140,7 @@ func NewAggregator(opts Options) Aggregator { timerOpts := iOpts.TimerOptions() logger := iOpts.Logger() - return &aggregator{ + agg := &aggregator{ opts: opts, nowFn: opts.ClockOptions().NowFn(), shardFn: opts.ShardFn(), @@ -151,6 +158,9 @@ func NewAggregator(opts Options) Aggregator { metrics: newAggregatorMetrics(scope, timerOpts, opts.MaxAllowedForwardingDelayFn()), logger: logger, } + + agg.tickShardFn = agg.tickShard + return agg } func (agg *aggregator) Open() error { @@ -383,6 +393,7 @@ func (agg *aggregator) Close() error { } agg.state = aggregatorClosed + agg.logger.Info("signalling aggregator done") close(agg.doneCh) // Waiting for the ticking goroutines to return. @@ -391,17 +402,27 @@ func (agg *aggregator) Close() error { agg.wg.Wait() agg.Lock() + agg.logger.Info("closing aggregator shards") for _, shardID := range agg.shardIDs { agg.shards[shardID].Close() } + + agg.logger.Info("closing aggregator shard sets") if agg.shardSetOpen { agg.closeShardSetWithLock() } + + agg.logger.Info("closing aggregator flush handler") agg.flushHandler.Close() + + agg.logger.Info("closing aggregator passthrough handler") agg.passthroughWriter.Close() + if agg.adminClient != nil { + agg.logger.Info("closing aggregator admin client") agg.adminClient.Close() } + return nil } @@ -723,6 +744,14 @@ func (agg *aggregator) tick() { } } +func (agg *aggregator) tickShard( + shard *aggregatorShard, + perShardTickDuration time.Duration, + doneCh chan struct{}, +) tickResult { + return shard.Tick(perShardTickDuration, doneCh) +} + func (agg *aggregator) tickInternal() { ownedShards, closingShards := agg.ownedShards() agg.closeShardsAsync(closingShards) @@ -740,8 +769,14 @@ func (agg *aggregator) tickInternal() { tickResult tickResult ) for _, shard := range ownedShards { - shardTickResult := shard.Tick(perShardTickDuration) - tickResult = tickResult.merge(shardTickResult) + select { + case <-agg.doneCh: + agg.logger.Info("recevied interrupt on tick; aborting") + return + default: + shardTickResult := agg.tickShardFn(shard, perShardTickDuration, agg.doneCh) + tickResult = tickResult.merge(shardTickResult) + } } tickDuration := agg.nowFn().Sub(start) agg.metrics.tick.Report(tickResult, tickDuration) diff --git a/src/aggregator/aggregator/aggregator_test.go b/src/aggregator/aggregator/aggregator_test.go index bafc941f70..10fc72aecf 100644 --- a/src/aggregator/aggregator/aggregator_test.go +++ b/src/aggregator/aggregator/aggregator_test.go @@ -841,6 +841,46 @@ func TestAggregatorTick(t *testing.T) { require.NoError(t, agg.Close()) } +func TestAggregatorTickCancelled(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + flushTimesManager := NewMockFlushTimesManager(ctrl) + flushTimesManager.EXPECT().Reset().Return(nil).AnyTimes() + flushTimesManager.EXPECT().Open(gomock.Any()).Return(nil).AnyTimes() + flushTimesManager.EXPECT().Get().Return(nil, nil).AnyTimes() + flushTimesManager.EXPECT().Close().Return(nil).AnyTimes() + + agg, _ := testAggregator(t, ctrl) + agg.flushTimesManager = flushTimesManager + require.NoError(t, agg.Open()) + + var ( + tickedCh = make(chan struct{}) + numTicked = 0 + doneAfterTicks = 2 + ) + + agg.tickShardFn = func( + *aggregatorShard, time.Duration, chan struct{}, + ) tickResult { + numTicked++ + if doneAfterTicks == 2 { + close(tickedCh) + } + + time.Sleep(time.Millisecond * 50) + return tickResult{} + } + + go func() { + <-tickedCh + require.NoError(t, agg.Close()) + }() + + require.Equal(t, 2, doneAfterTicks) +} + func TestAggregatorShardSetNotOpenNilInstance(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/src/aggregator/aggregator/map.go b/src/aggregator/aggregator/map.go index c946efb8fd..8161dc188b 100644 --- a/src/aggregator/aggregator/map.go +++ b/src/aggregator/aggregator/map.go @@ -218,8 +218,11 @@ func (m *metricMap) AddForwarded( return err } -func (m *metricMap) Tick(target time.Duration) tickResult { - mapTickRes := m.tick(target) +func (m *metricMap) Tick( + target time.Duration, + doneCh chan struct{}, +) tickResult { + mapTickRes := m.tick(target, doneCh) listsTickRes := m.metricLists.Tick() mapTickRes.standard.activeElems = listsTickRes.standard mapTickRes.forwarded.activeElems = listsTickRes.forwarded @@ -320,7 +323,10 @@ func (m *metricMap) lookupEntryWithLock(key entryKey) (*Entry, bool) { // tick performs two operations: // 1. Delete entries that have expired, and report the number of expired entries. // 2. Report number of standard entries and forwarded entries that are active. -func (m *metricMap) tick(target time.Duration) tickResult { +func (m *metricMap) tick( + target time.Duration, + doneCh chan struct{}, +) tickResult { // Determine batch size. m.RLock() numEntries := m.entryList.Len() @@ -340,8 +346,27 @@ func (m *metricMap) tick(target time.Duration) tickResult { numTimedActive int numTimedExpired int entryIdx int + + done bool ) + + // NB: if no doneChan provided, do not interrupt the tick. + if doneCh == nil { + doneCh = make(chan struct{}) + } + m.forEachEntry(func(entry hashedEntry) { + if done { + return + } + + select { + case <-doneCh: + done = true + return + default: + } + now := m.nowFn() if entryIdx > 0 && entryIdx%defaultSoftDeadlineCheckEvery == 0 { targetDeadline := start.Add(time.Duration(entryIdx) * perEntrySoftDeadline) diff --git a/src/aggregator/aggregator/map_test.go b/src/aggregator/aggregator/map_test.go index d913d15941..69d631c9be 100644 --- a/src/aggregator/aggregator/map_test.go +++ b/src/aggregator/aggregator/map_test.go @@ -513,7 +513,7 @@ func TestMetricMapDeleteExpired(t *testing.T) { } // Delete expired entries. - m.tick(opts.EntryCheckInterval()) + m.tick(opts.EntryCheckInterval(), nil) // Assert there should be only half of the entries left. require.Equal(t, numEntries/2, len(m.entries)) @@ -525,3 +525,47 @@ func TestMetricMapDeleteExpired(t *testing.T) { require.NotNil(t, e.entry) } } + +func TestMetricMapTickCancellation(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + opts := testOptions(ctrl) + m := newMetricMap(testShard, opts) + + numBatchesProcessed := 0 + tickedCh := make(chan struct{}) + + m.sleepFn = func(d time.Duration) { + numBatchesProcessed++ + if numBatchesProcessed == 10 { + close(tickedCh) + } + + time.Sleep(d) + } + + doneCh := make(chan struct{}) + go func() { + <-tickedCh + fmt.Println("closing ch") + close(doneCh) + }() + + // NB: wait/early exit on every defaultSoftDeadlineCheckEvery + numEntries := defaultSoftDeadlineCheckEvery * 60 + for i := 0; i < numEntries; i++ { + key := entryKey{ + metricType: metricType(metric.CounterType), + idHash: hash.Murmur3Hash128([]byte(fmt.Sprintf("%d", i))), + } + + m.entries[key] = m.entryList.PushBack(hashedEntry{ + key: key, + entry: NewEntry(m.metricLists, runtime.NewOptions(), opts), + }) + } + + m.Tick(time.Minute, doneCh) + require.Equal(t, 10, numBatchesProcessed) +} diff --git a/src/aggregator/aggregator/shard.go b/src/aggregator/aggregator/shard.go index 2855c8380a..a94657981c 100644 --- a/src/aggregator/aggregator/shard.go +++ b/src/aggregator/aggregator/shard.go @@ -260,8 +260,11 @@ func (s *aggregatorShard) AddForwarded( return nil } -func (s *aggregatorShard) Tick(target time.Duration) tickResult { - return s.metricMap.Tick(target) +func (s *aggregatorShard) Tick( + target time.Duration, + doneCh chan struct{}, +) tickResult { + return s.metricMap.Tick(target, doneCh) } func (s *aggregatorShard) Close() { diff --git a/src/cmd/services/m3aggregator/serve/serve.go b/src/cmd/services/m3aggregator/serve/serve.go index 7928b5b7c1..0d4324865a 100644 --- a/src/cmd/services/m3aggregator/serve/serve.go +++ b/src/cmd/services/m3aggregator/serve/serve.go @@ -22,6 +22,7 @@ package serve import ( "fmt" + "time" "github.com/m3db/m3/src/aggregator/aggregator" httpserver "github.com/m3db/m3/src/aggregator/server/http" @@ -40,7 +41,12 @@ func Serve( ) error { iOpts := opts.InstrumentOpts() log := iOpts.Logger() - defer aggregator.Close() + defer func() { + start := time.Now() + log.Info("closing aggregator") + aggregator.Close() + log.Info("closed aggregator", zap.String("took", time.Since(start).String())) + }() if m3msgAddr := opts.M3MsgAddr(); m3msgAddr != "" { serverOpts := opts.M3MsgServerOpts() @@ -51,7 +57,14 @@ func Serve( if err := m3msgServer.ListenAndServe(); err != nil { return fmt.Errorf("could not start m3msg server at: addr=%s, err=%v", m3msgAddr, err) } - defer m3msgServer.Close() + + defer func() { + start := time.Now() + log.Info("closing m3msg server") + m3msgServer.Close() + log.Info("m3msg server closed", zap.String("took", time.Since(start).String())) + }() + log.Info("m3msg server listening", zap.String("addr", m3msgAddr)) } @@ -61,7 +74,14 @@ func Serve( if err := rawTCPServer.ListenAndServe(); err != nil { return fmt.Errorf("could not start raw TCP server at: addr=%s, err=%v", rawTCPAddr, err) } - defer rawTCPServer.Close() + + defer func() { + start := time.Now() + log.Info("closing raw TCPServer") + rawTCPServer.Close() + log.Info("closed raw TCPServer", zap.String("took", time.Since(start).String())) + }() + log.Info("raw TCP server listening", zap.String("addr", rawTCPAddr)) } @@ -72,12 +92,20 @@ func Serve( if err := httpServer.ListenAndServe(); err != nil { return fmt.Errorf("could not start http server at: addr=%s, err=%v", httpAddr, err) } - defer httpServer.Close() + + defer func() { + start := time.Now() + log.Info("closing http server") + httpServer.Close() + log.Info("closed http server", zap.String("took", time.Since(start).String())) + }() + log.Info("http server listening", zap.String("addr", httpAddr)) } // Wait for exit signal. <-doneCh + log.Info("server signalled on doneCh") return nil } From f9b1f10e916d1e89aadd83a8ab499dd6a416c610 Mon Sep 17 00:00:00 2001 From: Artem Date: Tue, 2 Nov 2021 07:23:16 -0400 Subject: [PATCH 2/7] Better logging around close timings --- src/aggregator/aggregator/aggregator.go | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/src/aggregator/aggregator/aggregator.go b/src/aggregator/aggregator/aggregator.go index e0e7aade5f..ae4eafe94e 100644 --- a/src/aggregator/aggregator/aggregator.go +++ b/src/aggregator/aggregator/aggregator.go @@ -23,6 +23,7 @@ package aggregator import ( "context" "errors" + "fmt" "math" "strconv" "sync" @@ -393,6 +394,17 @@ func (agg *aggregator) Close() error { } agg.state = aggregatorClosed + var ( + lastOpCompleted = time.Now() + currTime time.Time + logCloseOperation = func(op string) { + currTime = time.Now() + agg.logger.Info(fmt.Sprintf("closed %s", op), + zap.String("took", currTime.Sub(lastOpCompleted).String())) + lastOpCompleted = currTime + } + ) + agg.logger.Info("signalling aggregator done") close(agg.doneCh) @@ -402,25 +414,28 @@ func (agg *aggregator) Close() error { agg.wg.Wait() agg.Lock() - agg.logger.Info("closing aggregator shards") + logCloseOperation("ticking wait groups") for _, shardID := range agg.shardIDs { agg.shards[shardID].Close() } - agg.logger.Info("closing aggregator shard sets") + logCloseOperation("aggregator shards") if agg.shardSetOpen { agg.closeShardSetWithLock() } - agg.logger.Info("closing aggregator flush handler") + logCloseOperation("flush shard sets") + agg.flushHandler.Close() + logCloseOperation("flush handler") - agg.logger.Info("closing aggregator passthrough handler") agg.passthroughWriter.Close() + logCloseOperation("passthrough writer") if agg.adminClient != nil { - agg.logger.Info("closing aggregator admin client") + agg.logger.Info("closing admin client") agg.adminClient.Close() + logCloseOperation("admin client") } return nil From af0ed985705eac7fbd8d1c9c2654931a2f7f18bd Mon Sep 17 00:00:00 2001 From: Artem Date: Tue, 2 Nov 2021 07:28:24 -0400 Subject: [PATCH 3/7] Capture aggregator close errors --- src/cmd/services/m3aggregator/serve/serve.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/cmd/services/m3aggregator/serve/serve.go b/src/cmd/services/m3aggregator/serve/serve.go index 0d4324865a..f9acc2c4e6 100644 --- a/src/cmd/services/m3aggregator/serve/serve.go +++ b/src/cmd/services/m3aggregator/serve/serve.go @@ -44,8 +44,13 @@ func Serve( defer func() { start := time.Now() log.Info("closing aggregator") - aggregator.Close() - log.Info("closed aggregator", zap.String("took", time.Since(start).String())) + err := aggregator.Close() + fields := []zap.Field{zap.String("took", time.Since(start).String())} + if err != nil { + log.Warn("closed aggregator with error", append(fields, zap.Error(err))...) + } else { + log.Info("closed aggregator", fields...) + } }() if m3msgAddr := opts.M3MsgAddr(); m3msgAddr != "" { From 83bbbf97116b282c30ef4eaaed7368aad9ea1cfd Mon Sep 17 00:00:00 2001 From: Artem Date: Tue, 2 Nov 2021 07:38:38 -0400 Subject: [PATCH 4/7] update --- src/aggregator/aggregator/aggregator.go | 2 +- src/cmd/services/m3aggregator/serve/serve.go | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/aggregator/aggregator/aggregator.go b/src/aggregator/aggregator/aggregator.go index ae4eafe94e..ee6c72f989 100644 --- a/src/aggregator/aggregator/aggregator.go +++ b/src/aggregator/aggregator/aggregator.go @@ -399,7 +399,7 @@ func (agg *aggregator) Close() error { currTime time.Time logCloseOperation = func(op string) { currTime = time.Now() - agg.logger.Info(fmt.Sprintf("closed %s", op), + agg.logger.Info(fmt.Sprintf("!!! closed %s", op), zap.String("took", currTime.Sub(lastOpCompleted).String())) lastOpCompleted = currTime } diff --git a/src/cmd/services/m3aggregator/serve/serve.go b/src/cmd/services/m3aggregator/serve/serve.go index f9acc2c4e6..0c401126aa 100644 --- a/src/cmd/services/m3aggregator/serve/serve.go +++ b/src/cmd/services/m3aggregator/serve/serve.go @@ -43,13 +43,13 @@ func Serve( log := iOpts.Logger() defer func() { start := time.Now() - log.Info("closing aggregator") + log.Info("!! closing aggregator") err := aggregator.Close() fields := []zap.Field{zap.String("took", time.Since(start).String())} if err != nil { log.Warn("closed aggregator with error", append(fields, zap.Error(err))...) } else { - log.Info("closed aggregator", fields...) + log.Info("!! closed aggregator", fields...) } }() @@ -65,7 +65,7 @@ func Serve( defer func() { start := time.Now() - log.Info("closing m3msg server") + log.Info("!! closing m3msg server") m3msgServer.Close() log.Info("m3msg server closed", zap.String("took", time.Since(start).String())) }() @@ -82,9 +82,9 @@ func Serve( defer func() { start := time.Now() - log.Info("closing raw TCPServer") + log.Info("!! closing raw TCPServer") rawTCPServer.Close() - log.Info("closed raw TCPServer", zap.String("took", time.Since(start).String())) + log.Info("!! closed raw TCPServer", zap.String("took", time.Since(start).String())) }() log.Info("raw TCP server listening", zap.String("addr", rawTCPAddr)) @@ -100,9 +100,9 @@ func Serve( defer func() { start := time.Now() - log.Info("closing http server") + log.Info("!! closing http server") httpServer.Close() - log.Info("closed http server", zap.String("took", time.Since(start).String())) + log.Info("!! closed http server", zap.String("took", time.Since(start).String())) }() log.Info("http server listening", zap.String("addr", httpAddr)) @@ -110,7 +110,7 @@ func Serve( // Wait for exit signal. <-doneCh - log.Info("server signalled on doneCh") + log.Info("!! server signalled on doneCh") return nil } From 4b95340b11f5d6554c0c25ece07d923504b1f7ad Mon Sep 17 00:00:00 2001 From: Artem Date: Tue, 2 Nov 2021 07:44:03 -0400 Subject: [PATCH 5/7] Refactor to avoid chan passing --- src/aggregator/aggregator/aggregator.go | 6 ++---- src/aggregator/aggregator/aggregator_test.go | 4 +--- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/aggregator/aggregator/aggregator.go b/src/aggregator/aggregator/aggregator.go index ee6c72f989..8320a58a6b 100644 --- a/src/aggregator/aggregator/aggregator.go +++ b/src/aggregator/aggregator/aggregator.go @@ -97,7 +97,6 @@ type Aggregator interface { type tickShardFn func( shard *aggregatorShard, perShardTickDuration time.Duration, - doneCh chan struct{}, ) tickResult // aggregator stores aggregations of different types of metrics (e.g., counter, @@ -762,9 +761,8 @@ func (agg *aggregator) tick() { func (agg *aggregator) tickShard( shard *aggregatorShard, perShardTickDuration time.Duration, - doneCh chan struct{}, ) tickResult { - return shard.Tick(perShardTickDuration, doneCh) + return shard.Tick(perShardTickDuration, agg.doneCh) } func (agg *aggregator) tickInternal() { @@ -789,7 +787,7 @@ func (agg *aggregator) tickInternal() { agg.logger.Info("recevied interrupt on tick; aborting") return default: - shardTickResult := agg.tickShardFn(shard, perShardTickDuration, agg.doneCh) + shardTickResult := agg.tickShardFn(shard, perShardTickDuration) tickResult = tickResult.merge(shardTickResult) } } diff --git a/src/aggregator/aggregator/aggregator_test.go b/src/aggregator/aggregator/aggregator_test.go index 10fc72aecf..3b9dda854b 100644 --- a/src/aggregator/aggregator/aggregator_test.go +++ b/src/aggregator/aggregator/aggregator_test.go @@ -861,9 +861,7 @@ func TestAggregatorTickCancelled(t *testing.T) { doneAfterTicks = 2 ) - agg.tickShardFn = func( - *aggregatorShard, time.Duration, chan struct{}, - ) tickResult { + agg.tickShardFn = func(*aggregatorShard, time.Duration) tickResult { numTicked++ if doneAfterTicks == 2 { close(tickedCh) From ab6289da22997266454375b00cadacd79f8f5475 Mon Sep 17 00:00:00 2001 From: Artem Date: Tue, 2 Nov 2021 07:57:56 -0400 Subject: [PATCH 6/7] Lint --- src/aggregator/aggregator/aggregator.go | 13 +++++---- src/aggregator/aggregator/map_test.go | 1 - src/cmd/services/m3aggregator/serve/serve.go | 28 +++++++++++--------- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/src/aggregator/aggregator/aggregator.go b/src/aggregator/aggregator/aggregator.go index 8320a58a6b..e37f43909b 100644 --- a/src/aggregator/aggregator/aggregator.go +++ b/src/aggregator/aggregator/aggregator.go @@ -394,17 +394,19 @@ func (agg *aggregator) Close() error { agg.state = aggregatorClosed var ( - lastOpCompleted = time.Now() - currTime time.Time + lastOpCompleted = time.Now() + currTime time.Time + closeLogger = agg.logger.With(zap.String("closing", "aggregator")) + logCloseOperation = func(op string) { currTime = time.Now() - agg.logger.Info(fmt.Sprintf("!!! closed %s", op), + closeLogger.Info(fmt.Sprintf("closed %s", op), zap.String("took", currTime.Sub(lastOpCompleted).String())) lastOpCompleted = currTime } ) - agg.logger.Info("signalling aggregator done") + closeLogger.Info("signaling aggregator done") close(agg.doneCh) // Waiting for the ticking goroutines to return. @@ -432,11 +434,12 @@ func (agg *aggregator) Close() error { logCloseOperation("passthrough writer") if agg.adminClient != nil { - agg.logger.Info("closing admin client") + closeLogger.Info("closing admin client") agg.adminClient.Close() logCloseOperation("admin client") } + closeLogger.Info("done") return nil } diff --git a/src/aggregator/aggregator/map_test.go b/src/aggregator/aggregator/map_test.go index 69d631c9be..64e72f0c32 100644 --- a/src/aggregator/aggregator/map_test.go +++ b/src/aggregator/aggregator/map_test.go @@ -548,7 +548,6 @@ func TestMetricMapTickCancellation(t *testing.T) { doneCh := make(chan struct{}) go func() { <-tickedCh - fmt.Println("closing ch") close(doneCh) }() diff --git a/src/cmd/services/m3aggregator/serve/serve.go b/src/cmd/services/m3aggregator/serve/serve.go index 0c401126aa..cc1a51466e 100644 --- a/src/cmd/services/m3aggregator/serve/serve.go +++ b/src/cmd/services/m3aggregator/serve/serve.go @@ -39,17 +39,21 @@ func Serve( doneCh chan struct{}, opts Options, ) error { - iOpts := opts.InstrumentOpts() - log := iOpts.Logger() + var ( + iOpts = opts.InstrumentOpts() + log = iOpts.Logger() + closeLogger = log.With(zap.String("closing", "aggregator_server")) + ) + defer func() { start := time.Now() - log.Info("!! closing aggregator") + closeLogger.Info("closing aggregator") err := aggregator.Close() fields := []zap.Field{zap.String("took", time.Since(start).String())} if err != nil { - log.Warn("closed aggregator with error", append(fields, zap.Error(err))...) + closeLogger.Warn("closed aggregator with error", append(fields, zap.Error(err))...) } else { - log.Info("!! closed aggregator", fields...) + closeLogger.Info("closed aggregator", fields...) } }() @@ -65,9 +69,9 @@ func Serve( defer func() { start := time.Now() - log.Info("!! closing m3msg server") + closeLogger.Info("closing m3msg server") m3msgServer.Close() - log.Info("m3msg server closed", zap.String("took", time.Since(start).String())) + closeLogger.Info("m3msg server closed", zap.String("took", time.Since(start).String())) }() log.Info("m3msg server listening", zap.String("addr", m3msgAddr)) @@ -82,9 +86,9 @@ func Serve( defer func() { start := time.Now() - log.Info("!! closing raw TCPServer") + closeLogger.Info("closing raw TCPServer") rawTCPServer.Close() - log.Info("!! closed raw TCPServer", zap.String("took", time.Since(start).String())) + closeLogger.Info("closed raw TCPServer", zap.String("took", time.Since(start).String())) }() log.Info("raw TCP server listening", zap.String("addr", rawTCPAddr)) @@ -100,9 +104,9 @@ func Serve( defer func() { start := time.Now() - log.Info("!! closing http server") + closeLogger.Info("closing http server") httpServer.Close() - log.Info("!! closed http server", zap.String("took", time.Since(start).String())) + closeLogger.Info("closed http server", zap.String("took", time.Since(start).String())) }() log.Info("http server listening", zap.String("addr", httpAddr)) @@ -110,7 +114,7 @@ func Serve( // Wait for exit signal. <-doneCh - log.Info("!! server signalled on doneCh") + closeLogger.Info("server signaled on doneCh") return nil } From 62d82eeb52ca0309c4e7ce13ea560730ec956eff Mon Sep 17 00:00:00 2001 From: Artem Date: Wed, 3 Nov 2021 10:06:17 -0400 Subject: [PATCH 7/7] PR response --- src/aggregator/aggregator/aggregator.go | 30 +++++++++++--------- src/aggregator/aggregator/aggregator_test.go | 2 +- src/aggregator/aggregator/map.go | 9 ++---- src/aggregator/aggregator/map_test.go | 20 +++++++------ src/aggregator/aggregator/shard.go | 2 +- 5 files changed, 32 insertions(+), 31 deletions(-) diff --git a/src/aggregator/aggregator/aggregator.go b/src/aggregator/aggregator/aggregator.go index e37f43909b..495843fefb 100644 --- a/src/aggregator/aggregator/aggregator.go +++ b/src/aggregator/aggregator/aggregator.go @@ -97,8 +97,17 @@ type Aggregator interface { type tickShardFn func( shard *aggregatorShard, perShardTickDuration time.Duration, + doneCh <-chan struct{}, ) tickResult +func tickShard( + shard *aggregatorShard, + perShardTickDuration time.Duration, + doneCh <-chan struct{}, +) tickResult { + return shard.Tick(perShardTickDuration, doneCh) +} + // aggregator stores aggregations of different types of metrics (e.g., counter, // timer, gauges) and periodically flushes them out. type aggregator struct { @@ -157,9 +166,9 @@ func NewAggregator(opts Options) Aggregator { doneCh: make(chan struct{}), metrics: newAggregatorMetrics(scope, timerOpts, opts.MaxAllowedForwardingDelayFn()), logger: logger, + tickShardFn: tickShard, } - agg.tickShardFn = agg.tickShard return agg } @@ -395,12 +404,11 @@ func (agg *aggregator) Close() error { var ( lastOpCompleted = time.Now() - currTime time.Time closeLogger = agg.logger.With(zap.String("closing", "aggregator")) logCloseOperation = func(op string) { - currTime = time.Now() - closeLogger.Info(fmt.Sprintf("closed %s", op), + currTime := time.Now() + closeLogger.Debug(fmt.Sprintf("closed %s", op), zap.String("took", currTime.Sub(lastOpCompleted).String())) lastOpCompleted = currTime } @@ -761,13 +769,6 @@ func (agg *aggregator) tick() { } } -func (agg *aggregator) tickShard( - shard *aggregatorShard, - perShardTickDuration time.Duration, -) tickResult { - return shard.Tick(perShardTickDuration, agg.doneCh) -} - func (agg *aggregator) tickInternal() { ownedShards, closingShards := agg.ownedShards() agg.closeShardsAsync(closingShards) @@ -786,13 +787,16 @@ func (agg *aggregator) tickInternal() { ) for _, shard := range ownedShards { select { + // NB: if doneCh has been signaled, no need to continue ticking shards and + // it's valid to early abort ticking. case <-agg.doneCh: agg.logger.Info("recevied interrupt on tick; aborting") return default: - shardTickResult := agg.tickShardFn(shard, perShardTickDuration) - tickResult = tickResult.merge(shardTickResult) } + + shardTickResult := agg.tickShardFn(shard, perShardTickDuration, agg.doneCh) + tickResult = tickResult.merge(shardTickResult) } tickDuration := agg.nowFn().Sub(start) agg.metrics.tick.Report(tickResult, tickDuration) diff --git a/src/aggregator/aggregator/aggregator_test.go b/src/aggregator/aggregator/aggregator_test.go index 3b9dda854b..166db4b11e 100644 --- a/src/aggregator/aggregator/aggregator_test.go +++ b/src/aggregator/aggregator/aggregator_test.go @@ -861,7 +861,7 @@ func TestAggregatorTickCancelled(t *testing.T) { doneAfterTicks = 2 ) - agg.tickShardFn = func(*aggregatorShard, time.Duration) tickResult { + agg.tickShardFn = func(*aggregatorShard, time.Duration, <-chan struct{}) tickResult { numTicked++ if doneAfterTicks == 2 { close(tickedCh) diff --git a/src/aggregator/aggregator/map.go b/src/aggregator/aggregator/map.go index 8161dc188b..919a9c5b64 100644 --- a/src/aggregator/aggregator/map.go +++ b/src/aggregator/aggregator/map.go @@ -220,7 +220,7 @@ func (m *metricMap) AddForwarded( func (m *metricMap) Tick( target time.Duration, - doneCh chan struct{}, + doneCh <-chan struct{}, ) tickResult { mapTickRes := m.tick(target, doneCh) listsTickRes := m.metricLists.Tick() @@ -325,7 +325,7 @@ func (m *metricMap) lookupEntryWithLock(key entryKey) (*Entry, bool) { // 2. Report number of standard entries and forwarded entries that are active. func (m *metricMap) tick( target time.Duration, - doneCh chan struct{}, + doneCh <-chan struct{}, ) tickResult { // Determine batch size. m.RLock() @@ -350,11 +350,6 @@ func (m *metricMap) tick( done bool ) - // NB: if no doneChan provided, do not interrupt the tick. - if doneCh == nil { - doneCh = make(chan struct{}) - } - m.forEachEntry(func(entry hashedEntry) { if done { return diff --git a/src/aggregator/aggregator/map_test.go b/src/aggregator/aggregator/map_test.go index 64e72f0c32..dae5d5f014 100644 --- a/src/aggregator/aggregator/map_test.go +++ b/src/aggregator/aggregator/map_test.go @@ -530,15 +530,17 @@ func TestMetricMapTickCancellation(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - opts := testOptions(ctrl) - m := newMetricMap(testShard, opts) - - numBatchesProcessed := 0 - tickedCh := make(chan struct{}) + var ( + opts = testOptions(ctrl) + m = newMetricMap(testShard, opts) + numBatchesProcessed = 0 + numToProcessBeforeDone = 10 + tickedCh = make(chan struct{}) + ) m.sleepFn = func(d time.Duration) { numBatchesProcessed++ - if numBatchesProcessed == 10 { + if numBatchesProcessed == numToProcessBeforeDone { close(tickedCh) } @@ -552,7 +554,7 @@ func TestMetricMapTickCancellation(t *testing.T) { }() // NB: wait/early exit on every defaultSoftDeadlineCheckEvery - numEntries := defaultSoftDeadlineCheckEvery * 60 + numEntries := defaultSoftDeadlineCheckEvery * 600 for i := 0; i < numEntries; i++ { key := entryKey{ metricType: metricType(metric.CounterType), @@ -565,6 +567,6 @@ func TestMetricMapTickCancellation(t *testing.T) { }) } - m.Tick(time.Minute, doneCh) - require.Equal(t, 10, numBatchesProcessed) + m.Tick(time.Second*10, doneCh) + require.Equal(t, numToProcessBeforeDone, numBatchesProcessed) } diff --git a/src/aggregator/aggregator/shard.go b/src/aggregator/aggregator/shard.go index a94657981c..658505c477 100644 --- a/src/aggregator/aggregator/shard.go +++ b/src/aggregator/aggregator/shard.go @@ -262,7 +262,7 @@ func (s *aggregatorShard) AddForwarded( func (s *aggregatorShard) Tick( target time.Duration, - doneCh chan struct{}, + doneCh <-chan struct{}, ) tickResult { return s.metricMap.Tick(target, doneCh) }