Skip to content

Commit

Permalink
[m3msg] Specialize messageWriter acker map
Browse files Browse the repository at this point in the history
  • Loading branch information
vdarulis committed May 3, 2022
1 parent 665fd30 commit 1b5d128
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 28 deletions.
53 changes: 29 additions & 24 deletions src/msg/producer/writer/message_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,13 @@ func (w *messageWriter) write(

// Ack acknowledges the metadata.
func (w *messageWriter) Ack(meta metadata) bool {
acked, expectedProcessNanos := w.acks.ack(meta)
if acked {
if acked, expectedProcessNanos := w.acks.ack(meta); acked {
w.RLock()
defer w.RUnlock()
w.m.messageConsumeLatency.Record(time.Duration(w.nowFn().UnixNano() - expectedProcessNanos))
w.m.messageAcked.Inc(1)
m := w.m
w.RUnlock()

m.messageConsumeLatency.Record(time.Duration(w.nowFn().UnixNano() - expectedProcessNanos))
m.messageAcked.Inc(1)
return true
}
return false
Expand Down Expand Up @@ -679,52 +680,56 @@ func (w *messageWriter) close(m *message) {
}

type acks struct {
sync.Mutex

ackMap map[metadataKey]*message
mtx sync.Mutex
acks map[uint64]*message
}

// nolint: unparam
func newAckHelper(size int) *acks {
return &acks{
ackMap: make(map[metadataKey]*message, size),
acks: make(map[uint64]*message, size),
}
}

func (a *acks) add(meta metadata, m *message) {
a.Lock()
a.ackMap[meta.metadataKey] = m
a.Unlock()
a.mtx.Lock()
defer a.mtx.Unlock()

a.acks[meta.metadataKey.id] = m
}

func (a *acks) remove(meta metadata) {
a.Lock()
delete(a.ackMap, meta.metadataKey)
a.Unlock()
a.mtx.Lock()
defer a.mtx.Unlock()

delete(a.acks, meta.metadataKey.id)
}

// ack processes the ack. returns true if the message was not already acked. additionally returns the expected
// processing time for lag calculations.
func (a *acks) ack(meta metadata) (bool, int64) {
a.Lock()
m, ok := a.ackMap[meta.metadataKey]
a.mtx.Lock()
m, ok := a.acks[meta.metadataKey.id]
if !ok {
a.Unlock()
a.mtx.Unlock()
// Acking a message that is already acked, which is ok.
return false, 0
}
delete(a.ackMap, meta.metadataKey)
a.Unlock()

delete(a.acks, meta.metadataKey.id)
a.mtx.Unlock()

expectedProcessAtNanos := m.ExpectedProcessAtNanos()
m.Ack()

return true, expectedProcessAtNanos
}

func (a *acks) size() int {
a.Lock()
l := len(a.ackMap)
a.Unlock()
return l
a.mtx.Lock()
defer a.mtx.Unlock()

return len(a.acks)
}

type metricIdx byte
Expand Down
9 changes: 9 additions & 0 deletions src/msg/producer/writer/message_writer_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,12 @@ func (noopWriter) Address() string { return "" }
func (noopWriter) Write(int, []byte) error { return nil }
func (noopWriter) Init() {}
func (noopWriter) Close() {}

func BenchmarkAck(b *testing.B) {
ack := newAckHelper(64)
for i := 0; i < b.N; i++ {
meta := metadata{metadataKey: metadataKey{200, uint64(b.N)}}
ack.add(meta, nil)
ack.remove(meta)
}
}
9 changes: 5 additions & 4 deletions src/msg/producer/writer/message_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,11 @@ func TestMessageWriterRetry(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}

_, ok := w.acks.ackMap[metadataKey{shard: 200, id: 1}]
require.Equal(t, 1, w.acks.size())
w.acks.mtx.Lock()
_, ok := w.acks.acks[uint64(1)]
require.True(t, ok)
w.acks.mtx.Unlock()

cw := newConsumerWriter(addr, a, opts, testConsumerWriterMetrics())
cw.Init()
Expand Down Expand Up @@ -758,9 +761,7 @@ func TestMessageWriter_WithoutConsumerScope(t *testing.T) {
}

func isEmptyWithLock(h *acks) bool {
h.Lock()
defer h.Unlock()
return len(h.ackMap) == 0
return h.size() == 0
}

func testMessageWriterMetrics() *messageWriterMetrics {
Expand Down

0 comments on commit 1b5d128

Please sign in to comment.