diff --git a/src/msg/producer/writer/message_writer.go b/src/msg/producer/writer/message_writer.go index 6868652613..9a949f3cdc 100644 --- a/src/msg/producer/writer/message_writer.go +++ b/src/msg/producer/writer/message_writer.go @@ -26,6 +26,7 @@ import ( "math" "sync" "time" + stdunsafe "unsafe" "github.com/m3db/m3/src/msg/producer" "github.com/m3db/m3/src/msg/protocol/proto" @@ -35,6 +36,7 @@ import ( "github.com/m3db/m3/src/x/unsafe" "github.com/uber-go/tally" + "go.uber.org/atomic" ) // MessageRetryNanosFn returns the message backoff time for retry in nanoseconds. @@ -179,7 +181,7 @@ type messageWriter struct { doneCh chan struct{} wg sync.WaitGroup // metrics can be updated when a consumer instance changes, so must be guarded with RLock - m *messageWriterMetrics + metrics atomic.UnsafePointer // *messageWriterMetrics nextFullScan time.Time lastNewWrite *list.Element @@ -196,7 +198,7 @@ func newMessageWriter( opts = NewOptions() } nowFn := time.Now - return &messageWriter{ + mw := &messageWriter{ replicatedShardID: replicatedShardID, mPool: mPool, opts: opts, @@ -211,9 +213,10 @@ func newMessageWriter( msgsToWrite: make([]*message, 0, opts.MessageQueueScanBatchSize()), isClosed: false, doneCh: make(chan struct{}), - m: m, nowFn: nowFn, } + mw.metrics.Store(stdunsafe.Pointer(m)) + return mw } // Write writes a message, messages not acknowledged in time will be retried. @@ -222,9 +225,10 @@ func (w *messageWriter) Write(rm *producer.RefCountedMessage) { var ( nowNanos = w.nowFn().UnixNano() msg = w.newMessage() + metrics = w.Metrics() ) w.Lock() - if !w.isValidWriteWithLock(nowNanos) { + if !w.isValidWriteWithLock(nowNanos, metrics) { w.Unlock() w.close(msg) return @@ -240,7 +244,7 @@ func (w *messageWriter) Write(rm *producer.RefCountedMessage) { msg.Set(meta, rm, nowNanos) w.acks.add(meta, msg) // Make sure all the new writes are ordered in queue. - w.m.enqueuedMessages.Inc(1) + metrics.enqueuedMessages.Inc(1) if w.lastNewWrite != nil { w.lastNewWrite = w.queue.InsertAfter(msg, w.lastNewWrite) } else { @@ -249,17 +253,17 @@ func (w *messageWriter) Write(rm *producer.RefCountedMessage) { w.Unlock() } -func (w *messageWriter) isValidWriteWithLock(nowNanos int64) bool { +func (w *messageWriter) isValidWriteWithLock(nowNanos int64, metrics *messageWriterMetrics) bool { if w.opts.IgnoreCutoffCutover() { return true } if w.cutOffNanos > 0 && nowNanos >= w.cutOffNanos { - w.m.writeAfterCutoff.Inc(1) + metrics.writeAfterCutoff.Inc(1) return false } if w.cutOverNanos > 0 && nowNanos < w.cutOverNanos { - w.m.writeBeforeCutover.Inc(1) + metrics.writeBeforeCutover.Inc(1) return false } @@ -319,10 +323,7 @@ func (w *messageWriter) write( // Ack acknowledges the metadata. func (w *messageWriter) Ack(meta metadata) bool { if acked, expectedProcessNanos := w.acks.ack(meta); acked { - w.RLock() - m := w.m - w.RUnlock() - + m := w.Metrics() m.messageConsumeLatency.Record(time.Duration(w.nowFn().UnixNano() - expectedProcessNanos)) m.messageAcked.Inc(1) return true @@ -368,8 +369,8 @@ func (w *messageWriter) scanMessageQueue() { e := w.queue.Front() w.lastNewWrite = nil isClosed := w.isClosed - m := w.m w.RUnlock() + var ( nowFn = w.nowFn msgsToWrite []*message @@ -379,6 +380,7 @@ func (w *messageWriter) scanMessageQueue() { consumerWriters []consumerWriter iterationIndexes []int fullScan = isClosed || beforeScan.After(w.nextFullScan) + m = w.Metrics() scanMetrics scanBatchMetrics skipWrites bool ) @@ -453,6 +455,7 @@ func (w *messageWriter) scanBatchWithLock( iterated int next *list.Element ) + metrics := w.Metrics() w.msgsToWrite = w.msgsToWrite[:0] for e := start; e != nil; e = next { iterated++ @@ -470,7 +473,7 @@ func (w *messageWriter) scanBatchWithLock( // do not stay in memory forever. // NB: The message must be added to the ack map to be acked here. w.acks.ack(m.Metadata()) - w.removeFromQueueWithLock(e, m) + w.removeFromQueueWithLock(e, m, metrics) scanMetrics[_messageClosed]++ continue } @@ -492,12 +495,12 @@ func (w *messageWriter) scanBatchWithLock( if acked, _ := w.acks.ack(m.Metadata()); acked { scanMetrics[_messageDroppedTTLExpire]++ } - w.removeFromQueueWithLock(e, m) + w.removeFromQueueWithLock(e, m, metrics) continue } if m.IsAcked() { scanMetrics[_processedAck]++ - w.removeFromQueueWithLock(e, m) + w.removeFromQueueWithLock(e, m, metrics) continue } if m.IsDroppedOrConsumed() { @@ -510,7 +513,7 @@ func (w *messageWriter) scanBatchWithLock( continue } w.acks.remove(m.Metadata()) - w.removeFromQueueWithLock(e, m) + w.removeFromQueueWithLock(e, m, metrics) scanMetrics[_messageDroppedBufferFull]++ continue } @@ -645,18 +648,14 @@ func (w *messageWriter) RemoveConsumerWriter(addr string) { // Metrics returns the metrics. These are dynamic and change if downstream consumer instance changes. func (w *messageWriter) Metrics() *messageWriterMetrics { - w.RLock() - defer w.RUnlock() - return w.m + return (*messageWriterMetrics)(w.metrics.Load()) } // SetMetrics sets the metrics // // This allows changing the labels of the metrics when the downstream consumer instance changes. func (w *messageWriter) SetMetrics(m *messageWriterMetrics) { - w.Lock() - w.m = m - w.Unlock() + w.metrics.Store(stdunsafe.Pointer(m)) } // QueueSize returns the number of messages queued in the writer. @@ -668,9 +667,9 @@ func (w *messageWriter) newMessage() *message { return w.mPool.Get() } -func (w *messageWriter) removeFromQueueWithLock(e *list.Element, m *message) { +func (w *messageWriter) removeFromQueueWithLock(e *list.Element, m *message, metrics *messageWriterMetrics) { w.queue.Remove(e) - w.m.dequeuedMessages.Inc(1) + metrics.dequeuedMessages.Inc(1) w.close(m) } diff --git a/src/msg/producer/writer/message_writer_test.go b/src/msg/producer/writer/message_writer_test.go index d031b67d8c..9b38f59e04 100644 --- a/src/msg/producer/writer/message_writer_test.go +++ b/src/msg/producer/writer/message_writer_test.go @@ -336,16 +336,17 @@ func TestMessageWriterCutoverCutoff(t *testing.T) { w := newMessageWriter(200, newMessagePool(), nil, testMessageWriterMetrics()) now := time.Now() w.nowFn = func() time.Time { return now } - require.True(t, w.isValidWriteWithLock(now.UnixNano())) - require.True(t, w.isValidWriteWithLock(now.UnixNano()+150)) - require.True(t, w.isValidWriteWithLock(now.UnixNano()+250)) - require.True(t, w.isValidWriteWithLock(now.UnixNano()+50)) + met := w.Metrics() + require.True(t, w.isValidWriteWithLock(now.UnixNano(), met)) + require.True(t, w.isValidWriteWithLock(now.UnixNano()+150, met)) + require.True(t, w.isValidWriteWithLock(now.UnixNano()+250, met)) + require.True(t, w.isValidWriteWithLock(now.UnixNano()+50, met)) w.SetCutoffNanos(now.UnixNano() + 200) w.SetCutoverNanos(now.UnixNano() + 100) - require.True(t, w.isValidWriteWithLock(now.UnixNano()+150)) - require.False(t, w.isValidWriteWithLock(now.UnixNano()+250)) - require.False(t, w.isValidWriteWithLock(now.UnixNano()+50)) + require.True(t, w.isValidWriteWithLock(now.UnixNano()+150, met)) + require.False(t, w.isValidWriteWithLock(now.UnixNano()+250, met)) + require.False(t, w.isValidWriteWithLock(now.UnixNano()+50, met)) require.Equal(t, 0, w.queue.Len()) mm := producer.NewMockMessage(ctrl) @@ -366,9 +367,10 @@ func TestMessageWriterIgnoreCutoverCutoff(t *testing.T) { w.SetCutoffNanos(now.UnixNano() + 200) w.SetCutoverNanos(now.UnixNano() + 100) - require.True(t, w.isValidWriteWithLock(now.UnixNano()+150)) - require.True(t, w.isValidWriteWithLock(now.UnixNano()+250)) - require.True(t, w.isValidWriteWithLock(now.UnixNano()+50)) + met := w.Metrics() + require.True(t, w.isValidWriteWithLock(now.UnixNano()+150, met)) + require.True(t, w.isValidWriteWithLock(now.UnixNano()+250, met)) + require.True(t, w.isValidWriteWithLock(now.UnixNano()+50, met)) require.Equal(t, 0, w.queue.Len()) mm := producer.NewMockMessage(ctrl)