From 898bcf844a65dd8b7314d18e082720f67d050cf8 Mon Sep 17 00:00:00 2001 From: Vytenis Darulis Date: Thu, 5 May 2022 14:01:31 -0400 Subject: [PATCH] [m3msg] Specialize messageWriter acker map (#4113) --- .golangci.yml | 2 + src/msg/producer/writer/message_writer.go | 92 ++++++++++--------- .../writer/message_writer_benchmark_test.go | 9 ++ .../producer/writer/message_writer_test.go | 31 ++++--- 4 files changed, 75 insertions(+), 59 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 0785343a08..19c9240798 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -298,6 +298,8 @@ issues: - "G104: Errors unhandled" # Random numbers here are not used in a security-sensitive context. - "G404: Use of weak random number generator (math/rand instead of crypto/rand)" + # Pointless, every time we'll just end up adding a //nolint comment on this. + - "G103: Use of unsafe calls should be audited" # Excluding configuration per-path, per-linter, per-text and per-source exclude-rules: diff --git a/src/msg/producer/writer/message_writer.go b/src/msg/producer/writer/message_writer.go index f96e651d3e..1ca18bd57a 100644 --- a/src/msg/producer/writer/message_writer.go +++ b/src/msg/producer/writer/message_writer.go @@ -26,6 +26,7 @@ import ( "math" "sync" "time" + stdunsafe "unsafe" "github.com/m3db/m3/src/msg/producer" "github.com/m3db/m3/src/msg/protocol/proto" @@ -35,6 +36,7 @@ import ( "github.com/m3db/m3/src/x/unsafe" "github.com/uber-go/tally" + "go.uber.org/atomic" ) // MessageRetryNanosFn returns the message backoff time for retry in nanoseconds. @@ -179,7 +181,7 @@ type messageWriter struct { doneCh chan struct{} wg sync.WaitGroup // metrics can be updated when a consumer instance changes, so must be guarded with RLock - m *messageWriterMetrics + metrics atomic.UnsafePointer // *messageWriterMetrics nextFullScan time.Time lastNewWrite *list.Element @@ -196,7 +198,7 @@ func newMessageWriter( opts = NewOptions() } nowFn := time.Now - return &messageWriter{ + mw := &messageWriter{ replicatedShardID: replicatedShardID, mPool: mPool, opts: opts, @@ -211,9 +213,10 @@ func newMessageWriter( msgsToWrite: make([]*message, 0, opts.MessageQueueScanBatchSize()), isClosed: false, doneCh: make(chan struct{}), - m: m, nowFn: nowFn, } + mw.metrics.Store(stdunsafe.Pointer(m)) + return mw } // Write writes a message, messages not acknowledged in time will be retried. @@ -222,9 +225,10 @@ func (w *messageWriter) Write(rm *producer.RefCountedMessage) { var ( nowNanos = w.nowFn().UnixNano() msg = w.newMessage() + metrics = w.Metrics() ) w.Lock() - if !w.isValidWriteWithLock(nowNanos) { + if !w.isValidWriteWithLock(nowNanos, metrics) { w.Unlock() w.close(msg) return @@ -240,7 +244,7 @@ func (w *messageWriter) Write(rm *producer.RefCountedMessage) { msg.Set(meta, rm, nowNanos) w.acks.add(meta, msg) // Make sure all the new writes are ordered in queue. - w.m.enqueuedMessages.Inc(1) + metrics.enqueuedMessages.Inc(1) if w.lastNewWrite != nil { w.lastNewWrite = w.queue.InsertAfter(msg, w.lastNewWrite) } else { @@ -249,17 +253,17 @@ func (w *messageWriter) Write(rm *producer.RefCountedMessage) { w.Unlock() } -func (w *messageWriter) isValidWriteWithLock(nowNanos int64) bool { +func (w *messageWriter) isValidWriteWithLock(nowNanos int64, metrics *messageWriterMetrics) bool { if w.opts.IgnoreCutoffCutover() { return true } if w.cutOffNanos > 0 && nowNanos >= w.cutOffNanos { - w.m.writeAfterCutoff.Inc(1) + metrics.writeAfterCutoff.Inc(1) return false } if w.cutOverNanos > 0 && nowNanos < w.cutOverNanos { - w.m.writeBeforeCutover.Inc(1) + metrics.writeBeforeCutover.Inc(1) return false } @@ -318,12 +322,10 @@ func (w *messageWriter) write( // Ack acknowledges the metadata. func (w *messageWriter) Ack(meta metadata) bool { - acked, expectedProcessNanos := w.acks.ack(meta) - if acked { - w.RLock() - defer w.RUnlock() - w.m.messageConsumeLatency.Record(time.Duration(w.nowFn().UnixNano() - expectedProcessNanos)) - w.m.messageAcked.Inc(1) + if acked, expectedProcessNanos := w.acks.ack(meta); acked { + m := w.Metrics() + m.messageConsumeLatency.Record(time.Duration(w.nowFn().UnixNano() - expectedProcessNanos)) + m.messageAcked.Inc(1) return true } return false @@ -367,8 +369,8 @@ func (w *messageWriter) scanMessageQueue() { e := w.queue.Front() w.lastNewWrite = nil isClosed := w.isClosed - m := w.m w.RUnlock() + var ( nowFn = w.nowFn msgsToWrite []*message @@ -378,6 +380,7 @@ func (w *messageWriter) scanMessageQueue() { consumerWriters []consumerWriter iterationIndexes []int fullScan = isClosed || beforeScan.After(w.nextFullScan) + m = w.Metrics() scanMetrics scanBatchMetrics skipWrites bool ) @@ -452,6 +455,7 @@ func (w *messageWriter) scanBatchWithLock( iterated int next *list.Element ) + metrics := w.Metrics() w.msgsToWrite = w.msgsToWrite[:0] for e := start; e != nil; e = next { iterated++ @@ -469,7 +473,7 @@ func (w *messageWriter) scanBatchWithLock( // do not stay in memory forever. // NB: The message must be added to the ack map to be acked here. w.acks.ack(m.Metadata()) - w.removeFromQueueWithLock(e, m) + w.removeFromQueueWithLock(e, m, metrics) scanMetrics[_messageClosed]++ continue } @@ -491,12 +495,12 @@ func (w *messageWriter) scanBatchWithLock( if acked, _ := w.acks.ack(m.Metadata()); acked { scanMetrics[_messageDroppedTTLExpire]++ } - w.removeFromQueueWithLock(e, m) + w.removeFromQueueWithLock(e, m, metrics) continue } if m.IsAcked() { scanMetrics[_processedAck]++ - w.removeFromQueueWithLock(e, m) + w.removeFromQueueWithLock(e, m, metrics) continue } if m.IsDroppedOrConsumed() { @@ -509,7 +513,7 @@ func (w *messageWriter) scanBatchWithLock( continue } w.acks.remove(m.Metadata()) - w.removeFromQueueWithLock(e, m) + w.removeFromQueueWithLock(e, m, metrics) scanMetrics[_messageDroppedBufferFull]++ continue } @@ -644,18 +648,14 @@ func (w *messageWriter) RemoveConsumerWriter(addr string) { // Metrics returns the metrics. These are dynamic and change if downstream consumer instance changes. func (w *messageWriter) Metrics() *messageWriterMetrics { - w.RLock() - defer w.RUnlock() - return w.m + return (*messageWriterMetrics)(w.metrics.Load()) } // SetMetrics sets the metrics // // This allows changing the labels of the metrics when the downstream consumer instance changes. func (w *messageWriter) SetMetrics(m *messageWriterMetrics) { - w.Lock() - w.m = m - w.Unlock() + w.metrics.Store(stdunsafe.Pointer(m)) } // QueueSize returns the number of messages queued in the writer. @@ -667,9 +667,9 @@ func (w *messageWriter) newMessage() *message { return w.mPool.Get() } -func (w *messageWriter) removeFromQueueWithLock(e *list.Element, m *message) { +func (w *messageWriter) removeFromQueueWithLock(e *list.Element, m *message, metrics *messageWriterMetrics) { w.queue.Remove(e) - w.m.dequeuedMessages.Inc(1) + metrics.dequeuedMessages.Inc(1) w.close(m) } @@ -679,51 +679,53 @@ func (w *messageWriter) close(m *message) { } type acks struct { - sync.Mutex - - ackMap map[metadataKey]*message + mtx sync.Mutex + acks map[uint64]*message } // nolint: unparam func newAckHelper(size int) *acks { return &acks{ - ackMap: make(map[metadataKey]*message, size), + acks: make(map[uint64]*message, size), } } func (a *acks) add(meta metadata, m *message) { - a.Lock() - a.ackMap[meta.metadataKey] = m - a.Unlock() + a.mtx.Lock() + a.acks[meta.metadataKey.id] = m + a.mtx.Unlock() } func (a *acks) remove(meta metadata) { - a.Lock() - delete(a.ackMap, meta.metadataKey) - a.Unlock() + a.mtx.Lock() + delete(a.acks, meta.metadataKey.id) + a.mtx.Unlock() } // ack processes the ack. returns true if the message was not already acked. additionally returns the expected // processing time for lag calculations. func (a *acks) ack(meta metadata) (bool, int64) { - a.Lock() - m, ok := a.ackMap[meta.metadataKey] + a.mtx.Lock() + m, ok := a.acks[meta.metadataKey.id] if !ok { - a.Unlock() + a.mtx.Unlock() // Acking a message that is already acked, which is ok. return false, 0 } - delete(a.ackMap, meta.metadataKey) - a.Unlock() + + delete(a.acks, meta.metadataKey.id) + a.mtx.Unlock() + expectedProcessAtNanos := m.ExpectedProcessAtNanos() m.Ack() + return true, expectedProcessAtNanos } func (a *acks) size() int { - a.Lock() - l := len(a.ackMap) - a.Unlock() + a.mtx.Lock() + l := len(a.acks) + a.mtx.Unlock() return l } diff --git a/src/msg/producer/writer/message_writer_benchmark_test.go b/src/msg/producer/writer/message_writer_benchmark_test.go index ffe8e3c729..73e9dc3b91 100644 --- a/src/msg/producer/writer/message_writer_benchmark_test.go +++ b/src/msg/producer/writer/message_writer_benchmark_test.go @@ -69,3 +69,12 @@ func (noopWriter) Address() string { return "" } func (noopWriter) Write(int, []byte) error { return nil } func (noopWriter) Init() {} func (noopWriter) Close() {} + +func BenchmarkAck(b *testing.B) { + ack := newAckHelper(64) + for i := 0; i < b.N; i++ { + meta := metadata{metadataKey: metadataKey{200, uint64(b.N)}} + ack.add(meta, nil) + ack.remove(meta) + } +} diff --git a/src/msg/producer/writer/message_writer_test.go b/src/msg/producer/writer/message_writer_test.go index 087bb761e6..9b38f59e04 100644 --- a/src/msg/producer/writer/message_writer_test.go +++ b/src/msg/producer/writer/message_writer_test.go @@ -212,8 +212,11 @@ func TestMessageWriterRetry(t *testing.T) { time.Sleep(100 * time.Millisecond) } - _, ok := w.acks.ackMap[metadataKey{shard: 200, id: 1}] + require.Equal(t, 1, w.acks.size()) + w.acks.mtx.Lock() + _, ok := w.acks.acks[uint64(1)] require.True(t, ok) + w.acks.mtx.Unlock() cw := newConsumerWriter(addr, a, opts, testConsumerWriterMetrics()) cw.Init() @@ -333,16 +336,17 @@ func TestMessageWriterCutoverCutoff(t *testing.T) { w := newMessageWriter(200, newMessagePool(), nil, testMessageWriterMetrics()) now := time.Now() w.nowFn = func() time.Time { return now } - require.True(t, w.isValidWriteWithLock(now.UnixNano())) - require.True(t, w.isValidWriteWithLock(now.UnixNano()+150)) - require.True(t, w.isValidWriteWithLock(now.UnixNano()+250)) - require.True(t, w.isValidWriteWithLock(now.UnixNano()+50)) + met := w.Metrics() + require.True(t, w.isValidWriteWithLock(now.UnixNano(), met)) + require.True(t, w.isValidWriteWithLock(now.UnixNano()+150, met)) + require.True(t, w.isValidWriteWithLock(now.UnixNano()+250, met)) + require.True(t, w.isValidWriteWithLock(now.UnixNano()+50, met)) w.SetCutoffNanos(now.UnixNano() + 200) w.SetCutoverNanos(now.UnixNano() + 100) - require.True(t, w.isValidWriteWithLock(now.UnixNano()+150)) - require.False(t, w.isValidWriteWithLock(now.UnixNano()+250)) - require.False(t, w.isValidWriteWithLock(now.UnixNano()+50)) + require.True(t, w.isValidWriteWithLock(now.UnixNano()+150, met)) + require.False(t, w.isValidWriteWithLock(now.UnixNano()+250, met)) + require.False(t, w.isValidWriteWithLock(now.UnixNano()+50, met)) require.Equal(t, 0, w.queue.Len()) mm := producer.NewMockMessage(ctrl) @@ -363,9 +367,10 @@ func TestMessageWriterIgnoreCutoverCutoff(t *testing.T) { w.SetCutoffNanos(now.UnixNano() + 200) w.SetCutoverNanos(now.UnixNano() + 100) - require.True(t, w.isValidWriteWithLock(now.UnixNano()+150)) - require.True(t, w.isValidWriteWithLock(now.UnixNano()+250)) - require.True(t, w.isValidWriteWithLock(now.UnixNano()+50)) + met := w.Metrics() + require.True(t, w.isValidWriteWithLock(now.UnixNano()+150, met)) + require.True(t, w.isValidWriteWithLock(now.UnixNano()+250, met)) + require.True(t, w.isValidWriteWithLock(now.UnixNano()+50, met)) require.Equal(t, 0, w.queue.Len()) mm := producer.NewMockMessage(ctrl) @@ -758,9 +763,7 @@ func TestMessageWriter_WithoutConsumerScope(t *testing.T) { } func isEmptyWithLock(h *acks) bool { - h.Lock() - defer h.Unlock() - return len(h.ackMap) == 0 + return h.size() == 0 } func testMessageWriterMetrics() *messageWriterMetrics {