Skip to content

Commit

Permalink
change atomic.Value to atomic.Pointer (#2088)
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt authored Feb 14, 2023
1 parent a857d40 commit 3d9cc01
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 31 deletions.
26 changes: 13 additions & 13 deletions p2p/host/autonat/autonat.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type AmbientAutoNAT struct {
inboundConn chan network.Conn
observations chan autoNATResult
// status is an autoNATResult reflecting current status.
status atomic.Value
status atomic.Pointer[autoNATResult]
// Reflects the confidence on of the NATStatus being private, as a single
// dialback may fail for reasons unrelated to NAT.
// If it is <3, then multiple autoNAT peers may be contacted for dialback
Expand Down Expand Up @@ -117,7 +117,7 @@ func New(h host.Host, options ...Option) (AutoNAT, error) {
service: service,
recentProbes: make(map[peer.ID]time.Time),
}
as.status.Store(autoNATResult{network.ReachabilityUnknown, nil})
as.status.Store(&autoNATResult{network.ReachabilityUnknown, nil})

subscriber, err := as.host.EventBus().Subscribe(
[]any{new(event.EvtLocalAddressesUpdated), new(event.EvtPeerIdentificationCompleted)},
Expand All @@ -136,18 +136,18 @@ func New(h host.Host, options ...Option) (AutoNAT, error) {

// Status returns the AutoNAT observed reachability status.
func (as *AmbientAutoNAT) Status() network.Reachability {
s := as.status.Load().(autoNATResult)
s := as.status.Load()
return s.Reachability
}

func (as *AmbientAutoNAT) emitStatus() {
status := as.status.Load().(autoNATResult)
status := as.status.Load()
as.emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: status.Reachability})
}

// PublicAddr returns the publicly connectable Multiaddr of this node if one is known.
func (as *AmbientAutoNAT) PublicAddr() (ma.Multiaddr, error) {
s := as.status.Load().(autoNATResult)
s := as.status.Load()
if s.Reachability != network.ReachabilityPublic {
return nil, errors.New("NAT status is not public")
}
Expand Down Expand Up @@ -184,7 +184,7 @@ func (as *AmbientAutoNAT) background() {
// new inbound connection.
case conn := <-as.inboundConn:
localAddrs := as.host.Addrs()
ca := as.status.Load().(autoNATResult)
ca := as.status.Load()
if ca.address != nil {
localAddrs = append(localAddrs, ca.address)
}
Expand All @@ -204,7 +204,7 @@ func (as *AmbientAutoNAT) background() {
}
case event.EvtPeerIdentificationCompleted:
if s, err := as.host.Peerstore().SupportsProtocols(e.Peer, AutoNATProto); err == nil && len(s) > 0 {
currentStatus := as.status.Load().(autoNATResult)
currentStatus := as.status.Load()
if currentStatus.Reachability == network.ReachabilityUnknown {
as.tryProbe(e.Peer)
}
Expand Down Expand Up @@ -253,7 +253,7 @@ func (as *AmbientAutoNAT) scheduleProbe() time.Duration {
// * recent inbound connections (implying continued connectivity) should decrease the retry when public
// * recent inbound connections when not public mean we should try more actively to see if we're public.
fixedNow := time.Now()
currentStatus := as.status.Load().(autoNATResult)
currentStatus := as.status.Load()

nextProbe := fixedNow
// Don't look for peers in the peer store more than once per second.
Expand Down Expand Up @@ -285,7 +285,7 @@ func (as *AmbientAutoNAT) scheduleProbe() time.Duration {

// Update the current status based on an observed result.
func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
currentStatus := as.status.Load().(autoNATResult)
currentStatus := as.status.Load()
if observation.Reachability == network.ReachabilityPublic {
log.Debugf("NAT status is public")
changed := false
Expand All @@ -306,7 +306,7 @@ func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
if currentStatus.address == nil || !observation.address.Equal(currentStatus.address) {
changed = true
}
as.status.Store(observation)
as.status.Store(&observation)
}
if observation.address != nil && changed {
as.emitStatus()
Expand All @@ -319,15 +319,15 @@ func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
} else {
// we are flipping our NATStatus, so confidence drops to 0
as.confidence = 0
as.status.Store(observation)
as.status.Store(&observation)
if as.service != nil {
as.service.Disable()
}
as.emitStatus()
}
} else if as.confidence < 3 {
as.confidence++
as.status.Store(observation)
as.status.Store(&observation)
if currentStatus.Reachability != network.ReachabilityPrivate {
as.emitStatus()
}
Expand All @@ -337,7 +337,7 @@ func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
as.confidence--
} else {
log.Debugf("NAT status is unknown")
as.status.Store(autoNATResult{network.ReachabilityUnknown, nil})
as.status.Store(&autoNATResult{network.ReachabilityUnknown, nil})
if currentStatus.Reachability != network.ReachabilityUnknown {
if as.service != nil {
as.service.Enable()
Expand Down
10 changes: 6 additions & 4 deletions p2p/net/connmgr/decay.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type decayer struct {
knownTags map[string]*decayingTag

// lastTick stores the last time the decayer ticked. Guarded by atomic.
lastTick atomic.Value
lastTick atomic.Pointer[time.Time]

// bumpTagCh queues bump commands to be processed by the loop.
bumpTagCh chan bumpCmd
Expand Down Expand Up @@ -89,7 +89,8 @@ func NewDecayer(cfg *DecayerCfg, mgr *BasicConnMgr) (*decayer, error) {
doneCh: make(chan struct{}),
}

d.lastTick.Store(d.clock.Now())
now := d.clock.Now()
d.lastTick.Store(&now)

// kick things off.
go d.process()
Expand All @@ -116,7 +117,7 @@ func (d *decayer) RegisterDecayingTag(name string, interval time.Duration, decay
"some precision may be lost", name, interval, d.cfg.Resolution)
}

lastTick := d.lastTick.Load().(time.Time)
lastTick := d.lastTick.Load()
tag := &decayingTag{
trkr: d,
name: name,
Expand Down Expand Up @@ -163,7 +164,8 @@ func (d *decayer) process() {
for {
select {
case now = <-ticker.C:
d.lastTick.Store(now)
nn := now
d.lastTick.Store(&nn)

d.tagsMu.Lock()
for _, tag := range d.knownTags {
Expand Down
12 changes: 7 additions & 5 deletions p2p/net/mock/mock_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type stream struct {

writeErr error

protocol atomic.Value
protocol atomic.Pointer[protocol.ID]
stat network.Stats
}

Expand Down Expand Up @@ -92,17 +92,19 @@ func (s *stream) ID() string {
}

func (s *stream) Protocol() protocol.ID {
// Ignore type error. It means that the protocol is unset.
p, _ := s.protocol.Load().(protocol.ID)
return p
p := s.protocol.Load()
if p == nil {
return ""
}
return *p
}

func (s *stream) Stat() network.Stats {
return s.stat
}

func (s *stream) SetProtocol(proto protocol.ID) error {
s.protocol.Store(proto)
s.protocol.Store(&proto)
return nil
}

Expand Down
11 changes: 7 additions & 4 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ type Swarm struct {
maResolver *madns.Resolver

// stream handlers
streamh atomic.Value
streamh atomic.Pointer[network.StreamHandler]

// dialing helpers
dsync *dialSync
Expand Down Expand Up @@ -347,13 +347,16 @@ func (s *Swarm) Peerstore() peerstore.Peerstore {

// SetStreamHandler assigns the handler for new streams.
func (s *Swarm) SetStreamHandler(handler network.StreamHandler) {
s.streamh.Store(handler)
s.streamh.Store(&handler)
}

// StreamHandler gets the handler for new streams.
func (s *Swarm) StreamHandler() network.StreamHandler {
handler, _ := s.streamh.Load().(network.StreamHandler)
return handler
handler := s.streamh.Load()
if handler == nil {
return nil
}
return *handler
}

// NewStream creates a new stream on any available connection to peer, dialing
Expand Down
12 changes: 7 additions & 5 deletions p2p/net/swarm/swarm_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Stream struct {

closeOnce sync.Once

protocol atomic.Value
protocol atomic.Pointer[protocol.ID]

stat network.Stats
}
Expand Down Expand Up @@ -108,9 +108,11 @@ func (s *Stream) remove() {

// Protocol returns the protocol negotiated on this stream (if set).
func (s *Stream) Protocol() protocol.ID {
// Ignore type error. It means that the protocol is unset.
p, _ := s.protocol.Load().(protocol.ID)
return p
p := s.protocol.Load()
if p == nil {
return ""
}
return *p
}

// SetProtocol sets the protocol for this stream.
Expand All @@ -123,7 +125,7 @@ func (s *Stream) SetProtocol(p protocol.ID) error {
return err
}

s.protocol.Store(p)
s.protocol.Store(&p)
return nil
}

Expand Down

0 comments on commit 3d9cc01

Please sign in to comment.