Skip to content

Commit

Permalink
clean up metric usage
Browse files Browse the repository at this point in the history
  • Loading branch information
vdarulis committed May 5, 2022
1 parent ecc4787 commit 65d15d8
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 35 deletions.
49 changes: 24 additions & 25 deletions src/msg/producer/writer/message_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"math"
"sync"
"time"
stdunsafe "unsafe"

"github.com/m3db/m3/src/msg/producer"
"github.com/m3db/m3/src/msg/protocol/proto"
Expand All @@ -35,6 +36,7 @@ import (
"github.com/m3db/m3/src/x/unsafe"

"github.com/uber-go/tally"
"go.uber.org/atomic"
)

// MessageRetryNanosFn returns the message backoff time for retry in nanoseconds.
Expand Down Expand Up @@ -179,7 +181,7 @@ type messageWriter struct {
doneCh chan struct{}
wg sync.WaitGroup
// metrics can be updated when a consumer instance changes, so must be guarded with RLock
m *messageWriterMetrics
metrics atomic.UnsafePointer // *messageWriterMetrics
nextFullScan time.Time
lastNewWrite *list.Element

Expand All @@ -196,7 +198,7 @@ func newMessageWriter(
opts = NewOptions()
}
nowFn := time.Now
return &messageWriter{
mw := &messageWriter{
replicatedShardID: replicatedShardID,
mPool: mPool,
opts: opts,
Expand All @@ -211,9 +213,10 @@ func newMessageWriter(
msgsToWrite: make([]*message, 0, opts.MessageQueueScanBatchSize()),
isClosed: false,
doneCh: make(chan struct{}),
m: m,
nowFn: nowFn,
}
mw.metrics.Store(stdunsafe.Pointer(m))
return mw
}

// Write writes a message, messages not acknowledged in time will be retried.
Expand All @@ -222,9 +225,10 @@ func (w *messageWriter) Write(rm *producer.RefCountedMessage) {
var (
nowNanos = w.nowFn().UnixNano()
msg = w.newMessage()
metrics = w.Metrics()
)
w.Lock()
if !w.isValidWriteWithLock(nowNanos) {
if !w.isValidWriteWithLock(nowNanos, metrics) {
w.Unlock()
w.close(msg)
return
Expand All @@ -240,7 +244,7 @@ func (w *messageWriter) Write(rm *producer.RefCountedMessage) {
msg.Set(meta, rm, nowNanos)
w.acks.add(meta, msg)
// Make sure all the new writes are ordered in queue.
w.m.enqueuedMessages.Inc(1)
metrics.enqueuedMessages.Inc(1)
if w.lastNewWrite != nil {
w.lastNewWrite = w.queue.InsertAfter(msg, w.lastNewWrite)
} else {
Expand All @@ -249,17 +253,17 @@ func (w *messageWriter) Write(rm *producer.RefCountedMessage) {
w.Unlock()
}

func (w *messageWriter) isValidWriteWithLock(nowNanos int64) bool {
func (w *messageWriter) isValidWriteWithLock(nowNanos int64, metrics *messageWriterMetrics) bool {
if w.opts.IgnoreCutoffCutover() {
return true
}

if w.cutOffNanos > 0 && nowNanos >= w.cutOffNanos {
w.m.writeAfterCutoff.Inc(1)
metrics.writeAfterCutoff.Inc(1)
return false
}
if w.cutOverNanos > 0 && nowNanos < w.cutOverNanos {
w.m.writeBeforeCutover.Inc(1)
metrics.writeBeforeCutover.Inc(1)
return false
}

Expand Down Expand Up @@ -319,10 +323,7 @@ func (w *messageWriter) write(
// Ack acknowledges the metadata.
func (w *messageWriter) Ack(meta metadata) bool {
if acked, expectedProcessNanos := w.acks.ack(meta); acked {
w.RLock()
m := w.m
w.RUnlock()

m := w.Metrics()
m.messageConsumeLatency.Record(time.Duration(w.nowFn().UnixNano() - expectedProcessNanos))
m.messageAcked.Inc(1)
return true
Expand Down Expand Up @@ -368,8 +369,8 @@ func (w *messageWriter) scanMessageQueue() {
e := w.queue.Front()
w.lastNewWrite = nil
isClosed := w.isClosed
m := w.m
w.RUnlock()

var (
nowFn = w.nowFn
msgsToWrite []*message
Expand All @@ -379,6 +380,7 @@ func (w *messageWriter) scanMessageQueue() {
consumerWriters []consumerWriter
iterationIndexes []int
fullScan = isClosed || beforeScan.After(w.nextFullScan)
m = w.Metrics()
scanMetrics scanBatchMetrics
skipWrites bool
)
Expand Down Expand Up @@ -453,6 +455,7 @@ func (w *messageWriter) scanBatchWithLock(
iterated int
next *list.Element
)
metrics := w.Metrics()
w.msgsToWrite = w.msgsToWrite[:0]
for e := start; e != nil; e = next {
iterated++
Expand All @@ -470,7 +473,7 @@ func (w *messageWriter) scanBatchWithLock(
// do not stay in memory forever.
// NB: The message must be added to the ack map to be acked here.
w.acks.ack(m.Metadata())
w.removeFromQueueWithLock(e, m)
w.removeFromQueueWithLock(e, m, metrics)
scanMetrics[_messageClosed]++
continue
}
Expand All @@ -492,12 +495,12 @@ func (w *messageWriter) scanBatchWithLock(
if acked, _ := w.acks.ack(m.Metadata()); acked {
scanMetrics[_messageDroppedTTLExpire]++
}
w.removeFromQueueWithLock(e, m)
w.removeFromQueueWithLock(e, m, metrics)
continue
}
if m.IsAcked() {
scanMetrics[_processedAck]++
w.removeFromQueueWithLock(e, m)
w.removeFromQueueWithLock(e, m, metrics)
continue
}
if m.IsDroppedOrConsumed() {
Expand All @@ -510,7 +513,7 @@ func (w *messageWriter) scanBatchWithLock(
continue
}
w.acks.remove(m.Metadata())
w.removeFromQueueWithLock(e, m)
w.removeFromQueueWithLock(e, m, metrics)
scanMetrics[_messageDroppedBufferFull]++
continue
}
Expand Down Expand Up @@ -645,18 +648,14 @@ func (w *messageWriter) RemoveConsumerWriter(addr string) {

// Metrics returns the metrics. These are dynamic and change if downstream consumer instance changes.
func (w *messageWriter) Metrics() *messageWriterMetrics {
w.RLock()
defer w.RUnlock()
return w.m
return (*messageWriterMetrics)(w.metrics.Load())
}

// SetMetrics sets the metrics
//
// This allows changing the labels of the metrics when the downstream consumer instance changes.
func (w *messageWriter) SetMetrics(m *messageWriterMetrics) {
w.Lock()
w.m = m
w.Unlock()
w.metrics.Store(stdunsafe.Pointer(m))
}

// QueueSize returns the number of messages queued in the writer.
Expand All @@ -668,9 +667,9 @@ func (w *messageWriter) newMessage() *message {
return w.mPool.Get()
}

func (w *messageWriter) removeFromQueueWithLock(e *list.Element, m *message) {
func (w *messageWriter) removeFromQueueWithLock(e *list.Element, m *message, metrics *messageWriterMetrics) {
w.queue.Remove(e)
w.m.dequeuedMessages.Inc(1)
metrics.dequeuedMessages.Inc(1)
w.close(m)
}

Expand Down
22 changes: 12 additions & 10 deletions src/msg/producer/writer/message_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,16 +336,17 @@ func TestMessageWriterCutoverCutoff(t *testing.T) {
w := newMessageWriter(200, newMessagePool(), nil, testMessageWriterMetrics())
now := time.Now()
w.nowFn = func() time.Time { return now }
require.True(t, w.isValidWriteWithLock(now.UnixNano()))
require.True(t, w.isValidWriteWithLock(now.UnixNano()+150))
require.True(t, w.isValidWriteWithLock(now.UnixNano()+250))
require.True(t, w.isValidWriteWithLock(now.UnixNano()+50))
met := w.Metrics()
require.True(t, w.isValidWriteWithLock(now.UnixNano(), met))
require.True(t, w.isValidWriteWithLock(now.UnixNano()+150, met))
require.True(t, w.isValidWriteWithLock(now.UnixNano()+250, met))
require.True(t, w.isValidWriteWithLock(now.UnixNano()+50, met))

w.SetCutoffNanos(now.UnixNano() + 200)
w.SetCutoverNanos(now.UnixNano() + 100)
require.True(t, w.isValidWriteWithLock(now.UnixNano()+150))
require.False(t, w.isValidWriteWithLock(now.UnixNano()+250))
require.False(t, w.isValidWriteWithLock(now.UnixNano()+50))
require.True(t, w.isValidWriteWithLock(now.UnixNano()+150, met))
require.False(t, w.isValidWriteWithLock(now.UnixNano()+250, met))
require.False(t, w.isValidWriteWithLock(now.UnixNano()+50, met))
require.Equal(t, 0, w.queue.Len())

mm := producer.NewMockMessage(ctrl)
Expand All @@ -366,9 +367,10 @@ func TestMessageWriterIgnoreCutoverCutoff(t *testing.T) {

w.SetCutoffNanos(now.UnixNano() + 200)
w.SetCutoverNanos(now.UnixNano() + 100)
require.True(t, w.isValidWriteWithLock(now.UnixNano()+150))
require.True(t, w.isValidWriteWithLock(now.UnixNano()+250))
require.True(t, w.isValidWriteWithLock(now.UnixNano()+50))
met := w.Metrics()
require.True(t, w.isValidWriteWithLock(now.UnixNano()+150, met))
require.True(t, w.isValidWriteWithLock(now.UnixNano()+250, met))
require.True(t, w.isValidWriteWithLock(now.UnixNano()+50, met))
require.Equal(t, 0, w.queue.Len())

mm := producer.NewMockMessage(ctrl)
Expand Down

0 comments on commit 65d15d8

Please sign in to comment.