Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi: Use atomic types in unexported modules. #3053

Merged
merged 1 commit into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/decred/dcrd

go 1.17
go 1.19

require (
github.com/davecgh/go-spew v1.1.1
Expand Down
10 changes: 5 additions & 5 deletions internal/blockchain/indexers/indexsubscriber.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021-2022 The Decred developers
// Copyright (c) 2021-2023 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -122,7 +122,7 @@ func (s *IndexSubscription) stop() error {

// IndexSubscriber subscribes clients for index updates.
type IndexSubscriber struct {
subscribers uint32 // update atomically.
subscribers atomic.Uint32

c chan IndexNtfn
subscriptions map[string]*IndexSubscription
Expand Down Expand Up @@ -172,7 +172,7 @@ func (s *IndexSubscriber) Subscribe(index Indexer, prerequisite string) (*IndexS
}

prereq.dependent = sub
atomic.AddUint32(&s.subscribers, 1)
s.subscribers.Add(1)

return sub, nil
}
Expand All @@ -183,14 +183,14 @@ func (s *IndexSubscriber) Subscribe(index Indexer, prerequisite string) (*IndexS
s.subscriptions[sub.id] = sub
s.mtx.Unlock()

atomic.AddUint32(&s.subscribers, 1)
s.subscribers.Add(1)

return sub, nil
}

// Notify relays an index notification to subscribed indexes for processing.
func (s *IndexSubscriber) Notify(ntfn *IndexNtfn) {
subscribers := atomic.LoadUint32(&s.subscribers)
subscribers := s.subscribers.Load()

// Only relay notifications when there are subscribed indexes
// to be notified.
Expand Down
9 changes: 4 additions & 5 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ type orphanTx struct {
// and relayed to other peers. It is safe for concurrent access from multiple
// peers.
type TxPool struct {
// The following variables must only be used atomically.
lastUpdated int64 // last time pool was updated.
lastUpdated atomic.Int64 // last time pool was updated.

mtx sync.RWMutex
cfg Config
Expand Down Expand Up @@ -821,7 +820,7 @@ func (mp *TxPool) removeTransaction(tx *dcrutil.Tx, removeRedeemers bool) {

delete(mp.pool, *txHash)

atomic.StoreInt64(&mp.lastUpdated, time.Now().Unix())
mp.lastUpdated.Store(time.Now().Unix())

// Inform associated fee estimator that the transaction has been removed
// from the mempool
Expand Down Expand Up @@ -909,7 +908,7 @@ func (mp *TxPool) addTransaction(utxoView *blockchain.UtxoViewpoint, txDesc *TxD
for _, txIn := range msgTx.TxIn {
mp.outpoints[txIn.PreviousOutPoint] = txDesc
}
atomic.StoreInt64(&mp.lastUpdated, time.Now().Unix())
mp.lastUpdated.Store(time.Now().Unix())

// Add unconfirmed exists address index entries associated with the
// transaction if enabled.
Expand Down Expand Up @@ -2283,7 +2282,7 @@ func (mp *TxPool) miningDescs() []*mining.TxDesc {
//
// This function is safe for concurrent access.
func (mp *TxPool) LastUpdated() time.Time {
return time.Unix(atomic.LoadInt64(&mp.lastUpdated), 0)
return time.Unix(mp.lastUpdated.Load(), 0)
}

// MiningView returns a slice of mining descriptors for all the transactions
Expand Down
27 changes: 14 additions & 13 deletions internal/mining/cpuminer/cpuminer.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright (c) 2014-2016 The btcsuite developers
// Copyright (c) 2015-2022 The Decred developers
// Copyright (c) 2015-2023 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -55,8 +55,8 @@ var (
// speedStats houses tracking information used to monitor the hashing speed of
// the CPU miner.
type speedStats struct {
totalHashes uint64 // atomic
elapsedMicros uint64 // atomic
totalHashes atomic.Uint64
elapsedMicros atomic.Uint64
}

// Config is a descriptor containing the CPU miner configuration.
Expand Down Expand Up @@ -112,7 +112,7 @@ type Config struct {
// workers which means it will be idle. The number of worker goroutines for the
// normal mining mode can be set via the SetNumWorkers method.
type CPUMiner struct {
numWorkers uint32 // update atomically
numWorkers atomic.Uint32

sync.Mutex
g *mining.BgBlkTmplGenerator
Expand Down Expand Up @@ -165,8 +165,8 @@ out:
hashesPerSec = 0
m.Lock()
for _, stats := range m.speedStats {
totalHashes := atomic.SwapUint64(&stats.totalHashes, 0)
elapsedMicros := atomic.SwapUint64(&stats.elapsedMicros, 0)
totalHashes := stats.totalHashes.Swap(0)
elapsedMicros := stats.elapsedMicros.Swap(0)
elapsedSecs := (elapsedMicros / 1000000)
if totalHashes == 0 || elapsedSecs == 0 {
continue
Expand Down Expand Up @@ -285,9 +285,9 @@ func (m *CPUMiner) solveBlock(ctx context.Context, header *wire.BlockHeader, sta
hashesCompleted := uint64(0)
start := time.Now()
updateSpeedStats := func() {
atomic.AddUint64(&stats.totalHashes, hashesCompleted)
stats.totalHashes.Add(hashesCompleted)
elapsedMicros := time.Since(start).Microseconds()
atomic.AddUint64(&stats.elapsedMicros, uint64(elapsedMicros))
stats.elapsedMicros.Add(uint64(elapsedMicros))

hashesCompleted = 0
start = time.Now()
Expand Down Expand Up @@ -552,7 +552,7 @@ out:
// Update the number of running workers.
case <-m.updateNumWorkers:
numRunning := uint32(len(runningWorkers))
numWorkers := atomic.LoadUint32(&m.numWorkers)
numWorkers := m.numWorkers.Load()

// No change.
if numWorkers == numRunning {
Expand Down Expand Up @@ -678,7 +678,7 @@ func (m *CPUMiner) SetNumWorkers(numWorkers int32) {
} else if targetNumWorkers > MaxNumWorkers {
targetNumWorkers = MaxNumWorkers
}
atomic.StoreUint32(&m.numWorkers, targetNumWorkers)
m.numWorkers.Store(targetNumWorkers)

// Set the normal mining state accordingly.
if targetNumWorkers != 0 {
Expand All @@ -699,7 +699,7 @@ func (m *CPUMiner) SetNumWorkers(numWorkers int32) {
//
// This function is safe for concurrent access.
func (m *CPUMiner) NumWorkers() int32 {
return int32(atomic.LoadUint32(&m.numWorkers))
return int32(m.numWorkers.Load())
}

// GenerateNBlocks generates the requested number of blocks in the discrete
Expand Down Expand Up @@ -826,14 +826,15 @@ out:
//
// See the documentation for CPUMiner type for more details.
func New(cfg *Config) *CPUMiner {
return &CPUMiner{
miner := &CPUMiner{
g: cfg.BgBlkTmplGenerator,
cfg: cfg,
numWorkers: defaultNumWorkers,
updateNumWorkers: make(chan struct{}),
queryHashesPerSec: make(chan float64),
speedStats: make(map[uint64]*speedStats),
minedOnParents: make(map[chainhash.Hash]uint8),
quit: make(chan struct{}),
}
miner.numWorkers.Store(defaultNumWorkers)
davecgh marked this conversation as resolved.
Show resolved Hide resolved
return miner
}
10 changes: 5 additions & 5 deletions internal/mining/mining_harness_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2022 The Decred developers
// Copyright (c) 2020-2023 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -263,7 +263,7 @@ type fakeTxSource struct {
votes map[chainhash.Hash][]VoteDesc
tspends map[chainhash.Hash]*dcrutil.Tx
miningView *TxMiningView
lastUpdated int64
lastUpdated atomic.Int64
}

// isTransactionInTxSource returns whether or not the passed transaction exists
Expand All @@ -289,7 +289,7 @@ func (p *fakeTxSource) isTransactionStaged(hash *chainhash.Hash) bool {
// LastUpdated returns the last time a transaction was added to or removed from
// the fake tx source.
func (p *fakeTxSource) LastUpdated() time.Time {
return time.Unix(atomic.LoadInt64(&p.lastUpdated), 0)
return time.Unix(p.lastUpdated.Load(), 0)
}

// HaveTransaction returns whether or not the passed transaction hash exists in
Expand Down Expand Up @@ -508,7 +508,7 @@ func (p *fakeTxSource) addTransaction(tx *dcrutil.Tx, txType stake.TxType, heigh
for _, txIn := range msgTx.TxIn {
p.outpoints[txIn.PreviousOutPoint] = tx
}
atomic.StoreInt64(&p.lastUpdated, time.Now().Unix())
p.lastUpdated.Store(time.Now().Unix())
}

// insertVote inserts a vote into the map of block votes.
Expand Down Expand Up @@ -607,7 +607,7 @@ func (p *fakeTxSource) RemoveTransaction(tx *dcrutil.Tx, removeRedeemers,

delete(p.pool, *txHash)

atomic.StoreInt64(&p.lastUpdated, time.Now().Unix())
p.lastUpdated.Store(time.Now().Unix())

// Stop tracking if it's a tspend.
delete(p.tspends, *txHash)
Expand Down
11 changes: 5 additions & 6 deletions internal/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Copyright (c) 2015-2022 The Decred developers
// Copyright (c) 2015-2023 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -4913,8 +4913,7 @@ func handleVersion(_ context.Context, _ *Server, _ interface{}) (interface{}, er

// Server provides a concurrent safe RPC server to a chain server.
type Server struct {
// atomic
numClients int32
numClients atomic.Int32

cfg Config
hmac hash.Hash
Expand Down Expand Up @@ -5097,7 +5096,7 @@ func (s *Server) NotifyWinningTickets(wtnd *WinningTicketsNtfnData) {
//
// This function is safe for concurrent access.
func (s *Server) limitConnections(w http.ResponseWriter, remoteAddr string) bool {
if int(atomic.LoadInt32(&s.numClients)+1) > s.cfg.RPCMaxClients {
if int(s.numClients.Load()+1) > s.cfg.RPCMaxClients {
log.Infof("Max RPC clients exceeded [%d] - "+
"disconnecting client %s", s.cfg.RPCMaxClients,
remoteAddr)
Expand All @@ -5114,7 +5113,7 @@ func (s *Server) limitConnections(w http.ResponseWriter, remoteAddr string) bool
//
// This function is safe for concurrent access.
func (s *Server) incrementClients() {
atomic.AddInt32(&s.numClients, 1)
s.numClients.Add(1)
}

// decrementClients subtracts one from the number of connected RPC clients.
Expand All @@ -5123,7 +5122,7 @@ func (s *Server) incrementClients() {
//
// This function is safe for concurrent access.
func (s *Server) decrementClients() {
atomic.AddInt32(&s.numClients, -1)
s.numClients.Add(-1)
}

// authMAC calculates the MAC (currently HMAC-SHA256) of an Authorization
Expand Down
13 changes: 6 additions & 7 deletions internal/rpcserver/rpcwebsocket.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Copyright (c) 2015-2022 The Decred developers
// Copyright (c) 2015-2023 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -1262,8 +1262,7 @@ type wsResponse struct {
// subsystems can't block. Ultimately, all messages are sent via the
// outHandler.
type wsClient struct {
// The following variables must only be used atomically.
disconnected int32 // Websocket client disconnected?
disconnected atomic.Bool // Websocket client disconnected?

sync.Mutex

Expand Down Expand Up @@ -1309,7 +1308,7 @@ type wsClient struct {
func (c *wsClient) shouldLogReadError(err error) bool {
// No logging when the client is being forcibly disconnected from the server
// side.
if atomic.LoadInt32(&c.disconnected) != 0 {
if c.disconnected.Load() {
return false
}

Expand All @@ -1327,7 +1326,7 @@ func (c *wsClient) shouldLogReadError(err error) bool {
// must be run as a goroutine.
func (c *wsClient) inHandler(ctx context.Context) {
out:
for atomic.LoadInt32(&c.disconnected) == 0 {
for !c.disconnected.Load() {
_, msg, err := c.conn.ReadMessage()
if err != nil {
// Log the error if it's not due to disconnecting.
Expand Down Expand Up @@ -1916,13 +1915,13 @@ func (c *wsClient) QueueNotification(marshalledJSON []byte) error {

// Disconnected returns whether or not the websocket client is disconnected.
func (c *wsClient) Disconnected() bool {
return atomic.LoadInt32(&c.disconnected) > 0
return c.disconnected.Load()
}

// Disconnect disconnects the websocket client.
func (c *wsClient) Disconnect() {
// Nothing to do if already disconnected.
if atomic.AddInt32(&c.disconnected, 1) != 1 {
if !c.disconnected.CompareAndSwap(false, true) {
return
}

Expand Down
21 changes: 9 additions & 12 deletions server.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Copyright (c) 2015-2022 The Decred developers
// Copyright (c) 2015-2023 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -466,11 +466,9 @@ func (ps *peerState) ResolveLocalAddress(netType addrmgr.NetAddressType, addrMgr
// server provides a Decred server for handling communications to and from
// Decred peers.
type server struct {
// The following variables must only be used atomically.
// Putting the uint64s first makes them 64-bit aligned for 32-bit systems.
bytesReceived uint64 // Total bytes received from all peers since start.
bytesSent uint64 // Total bytes sent by all peers since start.
shutdown int32
bytesReceived atomic.Uint64 // Total bytes received from all peers since start.
bytesSent atomic.Uint64 // Total bytes sent by all peers since start.
shutdown atomic.Bool

// minKnownWork houses the minimum known work from the associated network
// params converted to a uint256 so the conversion only needs to be
Expand Down Expand Up @@ -1693,7 +1691,7 @@ func (s *server) handleAddPeerMsg(state *peerState, sp *serverPeer) bool {
}

// Ignore new peers if we're shutting down.
if atomic.LoadInt32(&s.shutdown) != 0 {
if s.shutdown.Load() {
srvrLog.Infof("New peer %s ignored - server is shutting down", sp)
sp.Disconnect()
return false
Expand Down Expand Up @@ -2472,20 +2470,19 @@ func (s *server) AddedNodeInfo() []*serverPeer {
// AddBytesSent adds the passed number of bytes to the total bytes sent counter
// for the server. It is safe for concurrent access.
func (s *server) AddBytesSent(bytesSent uint64) {
atomic.AddUint64(&s.bytesSent, bytesSent)
s.bytesSent.Add(bytesSent)
}

// AddBytesReceived adds the passed number of bytes to the total bytes received
// counter for the server. It is safe for concurrent access.
func (s *server) AddBytesReceived(bytesReceived uint64) {
atomic.AddUint64(&s.bytesReceived, bytesReceived)
s.bytesReceived.Add(bytesReceived)
}

// NetTotals returns the sum of all bytes received and sent across the network
// for all peers. It is safe for concurrent access.
func (s *server) NetTotals() (uint64, uint64) {
return atomic.LoadUint64(&s.bytesReceived),
atomic.LoadUint64(&s.bytesSent)
return s.bytesReceived.Load(), s.bytesSent.Load()
}

// notifiedWinningTickets returns whether or not the winning tickets
Expand Down Expand Up @@ -3111,7 +3108,7 @@ func (s *server) Run(ctx context.Context) {

// Wait until the server is signalled to shutdown.
<-ctx.Done()
atomic.AddInt32(&s.shutdown, 1)
s.shutdown.Store(true)

srvrLog.Warnf("Server shutting down")

Expand Down