Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change atomic.Value to atomic.Pointer #2088

Merged
merged 1 commit into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions p2p/host/autonat/autonat.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type AmbientAutoNAT struct {
inboundConn chan network.Conn
observations chan autoNATResult
// status is an autoNATResult reflecting current status.
status atomic.Value
status atomic.Pointer[autoNATResult]
// Reflects the confidence on of the NATStatus being private, as a single
// dialback may fail for reasons unrelated to NAT.
// If it is <3, then multiple autoNAT peers may be contacted for dialback
Expand Down Expand Up @@ -117,7 +117,7 @@ func New(h host.Host, options ...Option) (AutoNAT, error) {
service: service,
recentProbes: make(map[peer.ID]time.Time),
}
as.status.Store(autoNATResult{network.ReachabilityUnknown, nil})
as.status.Store(&autoNATResult{network.ReachabilityUnknown, nil})

subscriber, err := as.host.EventBus().Subscribe(
[]any{new(event.EvtLocalAddressesUpdated), new(event.EvtPeerIdentificationCompleted)},
Expand All @@ -136,18 +136,18 @@ func New(h host.Host, options ...Option) (AutoNAT, error) {

// Status returns the AutoNAT observed reachability status.
func (as *AmbientAutoNAT) Status() network.Reachability {
s := as.status.Load().(autoNATResult)
s := as.status.Load()
return s.Reachability
}

func (as *AmbientAutoNAT) emitStatus() {
status := as.status.Load().(autoNATResult)
status := as.status.Load()
as.emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: status.Reachability})
}

// PublicAddr returns the publicly connectable Multiaddr of this node if one is known.
func (as *AmbientAutoNAT) PublicAddr() (ma.Multiaddr, error) {
s := as.status.Load().(autoNATResult)
s := as.status.Load()
if s.Reachability != network.ReachabilityPublic {
return nil, errors.New("NAT status is not public")
}
Expand Down Expand Up @@ -184,7 +184,7 @@ func (as *AmbientAutoNAT) background() {
// new inbound connection.
case conn := <-as.inboundConn:
localAddrs := as.host.Addrs()
ca := as.status.Load().(autoNATResult)
ca := as.status.Load()
if ca.address != nil {
localAddrs = append(localAddrs, ca.address)
}
Expand All @@ -204,7 +204,7 @@ func (as *AmbientAutoNAT) background() {
}
case event.EvtPeerIdentificationCompleted:
if s, err := as.host.Peerstore().SupportsProtocols(e.Peer, AutoNATProto); err == nil && len(s) > 0 {
currentStatus := as.status.Load().(autoNATResult)
currentStatus := as.status.Load()
if currentStatus.Reachability == network.ReachabilityUnknown {
as.tryProbe(e.Peer)
}
Expand Down Expand Up @@ -253,7 +253,7 @@ func (as *AmbientAutoNAT) scheduleProbe() time.Duration {
// * recent inbound connections (implying continued connectivity) should decrease the retry when public
// * recent inbound connections when not public mean we should try more actively to see if we're public.
fixedNow := time.Now()
currentStatus := as.status.Load().(autoNATResult)
currentStatus := as.status.Load()

nextProbe := fixedNow
// Don't look for peers in the peer store more than once per second.
Expand Down Expand Up @@ -285,7 +285,7 @@ func (as *AmbientAutoNAT) scheduleProbe() time.Duration {

// Update the current status based on an observed result.
func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
currentStatus := as.status.Load().(autoNATResult)
currentStatus := as.status.Load()
if observation.Reachability == network.ReachabilityPublic {
log.Debugf("NAT status is public")
changed := false
Expand All @@ -306,7 +306,7 @@ func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
if currentStatus.address == nil || !observation.address.Equal(currentStatus.address) {
changed = true
}
as.status.Store(observation)
as.status.Store(&observation)
}
if observation.address != nil && changed {
as.emitStatus()
Expand All @@ -319,15 +319,15 @@ func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
} else {
// we are flipping our NATStatus, so confidence drops to 0
as.confidence = 0
as.status.Store(observation)
as.status.Store(&observation)
if as.service != nil {
as.service.Disable()
}
as.emitStatus()
}
} else if as.confidence < 3 {
as.confidence++
as.status.Store(observation)
as.status.Store(&observation)
if currentStatus.Reachability != network.ReachabilityPrivate {
as.emitStatus()
}
Expand All @@ -337,7 +337,7 @@ func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
as.confidence--
} else {
log.Debugf("NAT status is unknown")
as.status.Store(autoNATResult{network.ReachabilityUnknown, nil})
as.status.Store(&autoNATResult{network.ReachabilityUnknown, nil})
if currentStatus.Reachability != network.ReachabilityUnknown {
if as.service != nil {
as.service.Enable()
Expand Down
10 changes: 6 additions & 4 deletions p2p/net/connmgr/decay.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type decayer struct {
knownTags map[string]*decayingTag

// lastTick stores the last time the decayer ticked. Guarded by atomic.
lastTick atomic.Value
lastTick atomic.Pointer[time.Time]

// bumpTagCh queues bump commands to be processed by the loop.
bumpTagCh chan bumpCmd
Expand Down Expand Up @@ -89,7 +89,8 @@ func NewDecayer(cfg *DecayerCfg, mgr *BasicConnMgr) (*decayer, error) {
doneCh: make(chan struct{}),
}

d.lastTick.Store(d.clock.Now())
now := d.clock.Now()
d.lastTick.Store(&now)

// kick things off.
go d.process()
Expand All @@ -116,7 +117,7 @@ func (d *decayer) RegisterDecayingTag(name string, interval time.Duration, decay
"some precision may be lost", name, interval, d.cfg.Resolution)
}

lastTick := d.lastTick.Load().(time.Time)
lastTick := d.lastTick.Load()
tag := &decayingTag{
trkr: d,
name: name,
Expand Down Expand Up @@ -163,7 +164,8 @@ func (d *decayer) process() {
for {
select {
case now = <-ticker.C:
d.lastTick.Store(now)
nn := now
d.lastTick.Store(&nn)
Comment on lines +167 to +168
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not &now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now is declared outside the block and reassigned at line 219. storing a pointer to now would cause it to use this value of 219 sometimes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!


d.tagsMu.Lock()
for _, tag := range d.knownTags {
Expand Down
12 changes: 7 additions & 5 deletions p2p/net/mock/mock_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type stream struct {

writeErr error

protocol atomic.Value
protocol atomic.Pointer[protocol.ID]
stat network.Stats
}

Expand Down Expand Up @@ -92,17 +92,19 @@ func (s *stream) ID() string {
}

func (s *stream) Protocol() protocol.ID {
// Ignore type error. It means that the protocol is unset.
p, _ := s.protocol.Load().(protocol.ID)
return p
p := s.protocol.Load()
if p == nil {
return ""
}
return *p
}

func (s *stream) Stat() network.Stats {
return s.stat
}

func (s *stream) SetProtocol(proto protocol.ID) error {
s.protocol.Store(proto)
s.protocol.Store(&proto)
return nil
}

Expand Down
11 changes: 7 additions & 4 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ type Swarm struct {
maResolver *madns.Resolver

// stream handlers
streamh atomic.Value
streamh atomic.Pointer[network.StreamHandler]

// dialing helpers
dsync *dialSync
Expand Down Expand Up @@ -347,13 +347,16 @@ func (s *Swarm) Peerstore() peerstore.Peerstore {

// SetStreamHandler assigns the handler for new streams.
func (s *Swarm) SetStreamHandler(handler network.StreamHandler) {
s.streamh.Store(handler)
s.streamh.Store(&handler)
}

// StreamHandler gets the handler for new streams.
func (s *Swarm) StreamHandler() network.StreamHandler {
handler, _ := s.streamh.Load().(network.StreamHandler)
return handler
handler := s.streamh.Load()
if handler == nil {
return nil
}
return *handler
}

// NewStream creates a new stream on any available connection to peer, dialing
Expand Down
12 changes: 7 additions & 5 deletions p2p/net/swarm/swarm_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Stream struct {

closeOnce sync.Once

protocol atomic.Value
protocol atomic.Pointer[protocol.ID]

stat network.Stats
}
Expand Down Expand Up @@ -108,9 +108,11 @@ func (s *Stream) remove() {

// Protocol returns the protocol negotiated on this stream (if set).
func (s *Stream) Protocol() protocol.ID {
// Ignore type error. It means that the protocol is unset.
p, _ := s.protocol.Load().(protocol.ID)
return p
p := s.protocol.Load()
if p == nil {
return ""
}
return *p
}

// SetProtocol sets the protocol for this stream.
Expand All @@ -123,7 +125,7 @@ func (s *Stream) SetProtocol(p protocol.ID) error {
return err
}

s.protocol.Store(p)
s.protocol.Store(&p)
return nil
}

Expand Down