Skip to content

Commit

Permalink
make the code unsafe-free again
Browse files Browse the repository at this point in the history
  • Loading branch information
vdarulis committed Sep 4, 2020
1 parent 6fac7ff commit 63fcea9
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 24 deletions.
4 changes: 2 additions & 2 deletions src/msg/integration/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,8 @@ writer:
topicName: topicName
topicWatchInitTimeout: 100ms
placementWatchInitTimeout: 100ms
messagePool:
size: 100
# FIXME: Consumers sharing the same pool trigger false-positives in race detector
messagePool: ~
messageRetry:
initialBackoff: 20ms
maxBackoff: 50ms
Expand Down
26 changes: 4 additions & 22 deletions src/msg/producer/writer/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
package writer

import (
stdatomic "sync/atomic"
"unsafe"

"github.com/m3db/m3/src/msg/generated/proto/msgpb"
"github.com/m3db/m3/src/msg/producer"
"github.com/m3db/m3/src/msg/protocol/proto"
Expand Down Expand Up @@ -56,7 +53,7 @@ func newMessage() *message {
func (m *message) Set(meta metadata, rm *producer.RefCountedMessage, initNanos int64) {
m.initNanos = initNanos
m.meta = meta
m.storeRefCountedMessage(rm)
m.RefCountedMessage = rm
m.ToProto(&m.pb)
}

Expand Down Expand Up @@ -101,7 +98,7 @@ func (m *message) IsAcked() bool {
// Ack acknowledges the message. Duplicated acks on the same message might cause panic.
func (m *message) Ack() {
m.isAcked.Store(true)
m.loadRefCountedMessage().DecRef()
m.RefCountedMessage.DecRef()
}

// Metadata returns the metadata.
Expand All @@ -111,29 +108,14 @@ func (m *message) Metadata() metadata {

// Marshaler returns the marshaler and a bool to indicate whether the marshaler is valid.
func (m *message) Marshaler() (proto.Marshaler, bool) {
return &m.pb, !m.loadRefCountedMessage().IsDroppedOrConsumed()
return &m.pb, !m.RefCountedMessage.IsDroppedOrConsumed()
}

func (m *message) ToProto(pb *msgpb.Message) {
m.meta.ToProto(&pb.Metadata)
pb.Value = m.loadRefCountedMessage().Bytes()
pb.Value = m.RefCountedMessage.Bytes()
}

func (m *message) ResetProto(pb *msgpb.Message) {
pb.Value = nil
}

func (m *message) storeRefCountedMessage(rm *producer.RefCountedMessage) {
stdatomic.StorePointer(
(*unsafe.Pointer)(unsafe.Pointer(&m.RefCountedMessage)),
unsafe.Pointer(rm),
)
}

func (m *message) loadRefCountedMessage() *producer.RefCountedMessage {
return (*producer.RefCountedMessage)(
stdatomic.LoadPointer(
(*unsafe.Pointer)(unsafe.Pointer(&m.RefCountedMessage)),
),
)
}

0 comments on commit 63fcea9

Please sign in to comment.