From 1b5d128e47d6cdc061d8165da0d0dab3e301ce96 Mon Sep 17 00:00:00 2001 From: Vytenis Darulis Date: Tue, 3 May 2022 16:17:05 -0400 Subject: [PATCH] [m3msg] Specialize messageWriter acker map --- src/msg/producer/writer/message_writer.go | 53 ++++++++++--------- .../writer/message_writer_benchmark_test.go | 9 ++++ .../producer/writer/message_writer_test.go | 9 ++-- 3 files changed, 43 insertions(+), 28 deletions(-) diff --git a/src/msg/producer/writer/message_writer.go b/src/msg/producer/writer/message_writer.go index f96e651d3e..6868652613 100644 --- a/src/msg/producer/writer/message_writer.go +++ b/src/msg/producer/writer/message_writer.go @@ -318,12 +318,13 @@ func (w *messageWriter) write( // Ack acknowledges the metadata. func (w *messageWriter) Ack(meta metadata) bool { - acked, expectedProcessNanos := w.acks.ack(meta) - if acked { + if acked, expectedProcessNanos := w.acks.ack(meta); acked { w.RLock() - defer w.RUnlock() - w.m.messageConsumeLatency.Record(time.Duration(w.nowFn().UnixNano() - expectedProcessNanos)) - w.m.messageAcked.Inc(1) + m := w.m + w.RUnlock() + + m.messageConsumeLatency.Record(time.Duration(w.nowFn().UnixNano() - expectedProcessNanos)) + m.messageAcked.Inc(1) return true } return false @@ -679,52 +680,56 @@ func (w *messageWriter) close(m *message) { } type acks struct { - sync.Mutex - - ackMap map[metadataKey]*message + mtx sync.Mutex + acks map[uint64]*message } // nolint: unparam func newAckHelper(size int) *acks { return &acks{ - ackMap: make(map[metadataKey]*message, size), + acks: make(map[uint64]*message, size), } } func (a *acks) add(meta metadata, m *message) { - a.Lock() - a.ackMap[meta.metadataKey] = m - a.Unlock() + a.mtx.Lock() + defer a.mtx.Unlock() + + a.acks[meta.metadataKey.id] = m } func (a *acks) remove(meta metadata) { - a.Lock() - delete(a.ackMap, meta.metadataKey) - a.Unlock() + a.mtx.Lock() + defer a.mtx.Unlock() + + delete(a.acks, meta.metadataKey.id) } // ack processes the ack. returns true if the message was not already acked. additionally returns the expected // processing time for lag calculations. func (a *acks) ack(meta metadata) (bool, int64) { - a.Lock() - m, ok := a.ackMap[meta.metadataKey] + a.mtx.Lock() + m, ok := a.acks[meta.metadataKey.id] if !ok { - a.Unlock() + a.mtx.Unlock() // Acking a message that is already acked, which is ok. return false, 0 } - delete(a.ackMap, meta.metadataKey) - a.Unlock() + + delete(a.acks, meta.metadataKey.id) + a.mtx.Unlock() + expectedProcessAtNanos := m.ExpectedProcessAtNanos() m.Ack() + return true, expectedProcessAtNanos } func (a *acks) size() int { - a.Lock() - l := len(a.ackMap) - a.Unlock() - return l + a.mtx.Lock() + defer a.mtx.Unlock() + + return len(a.acks) } type metricIdx byte diff --git a/src/msg/producer/writer/message_writer_benchmark_test.go b/src/msg/producer/writer/message_writer_benchmark_test.go index ffe8e3c729..73e9dc3b91 100644 --- a/src/msg/producer/writer/message_writer_benchmark_test.go +++ b/src/msg/producer/writer/message_writer_benchmark_test.go @@ -69,3 +69,12 @@ func (noopWriter) Address() string { return "" } func (noopWriter) Write(int, []byte) error { return nil } func (noopWriter) Init() {} func (noopWriter) Close() {} + +func BenchmarkAck(b *testing.B) { + ack := newAckHelper(64) + for i := 0; i < b.N; i++ { + meta := metadata{metadataKey: metadataKey{200, uint64(b.N)}} + ack.add(meta, nil) + ack.remove(meta) + } +} diff --git a/src/msg/producer/writer/message_writer_test.go b/src/msg/producer/writer/message_writer_test.go index 087bb761e6..d031b67d8c 100644 --- a/src/msg/producer/writer/message_writer_test.go +++ b/src/msg/producer/writer/message_writer_test.go @@ -212,8 +212,11 @@ func TestMessageWriterRetry(t *testing.T) { time.Sleep(100 * time.Millisecond) } - _, ok := w.acks.ackMap[metadataKey{shard: 200, id: 1}] + require.Equal(t, 1, w.acks.size()) + w.acks.mtx.Lock() + _, ok := w.acks.acks[uint64(1)] require.True(t, ok) + w.acks.mtx.Unlock() cw := newConsumerWriter(addr, a, opts, testConsumerWriterMetrics()) cw.Init() @@ -758,9 +761,7 @@ func TestMessageWriter_WithoutConsumerScope(t *testing.T) { } func isEmptyWithLock(h *acks) bool { - h.Lock() - defer h.Unlock() - return len(h.ackMap) == 0 + return h.size() == 0 } func testMessageWriterMetrics() *messageWriterMetrics {