Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
fix(network): add mutex to avoid data race
Browse files Browse the repository at this point in the history
Adds a mutex to the network receiver implementation to help avoid a data race
  • Loading branch information
hannahhoward committed May 20, 2019
1 parent a32fa8a commit 916eb69
Showing 1 changed file with 28 additions and 4 deletions.
32 changes: 28 additions & 4 deletions network/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -47,7 +48,8 @@ type impl struct {
routing routing.ContentRouting

// inbound messages from the network are forwarded to the receiver
receiver Receiver
receiverLk sync.RWMutex
receiver Receiver

stats Stats
}
Expand Down Expand Up @@ -135,7 +137,9 @@ func (bsnet *impl) SendMessage(
}

func (bsnet *impl) SetDelegate(r Receiver) {
bsnet.receiverLk.Lock()
bsnet.receiver = r
bsnet.receiverLk.Unlock()
}

func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error {
Expand Down Expand Up @@ -172,18 +176,26 @@ func (bsnet *impl) Provide(ctx context.Context, k cid.Cid) error {
func (bsnet *impl) handleNewStream(s inet.Stream) {
defer s.Close()

bsnet.receiverLk.RLock()
if bsnet.receiver == nil {
bsnet.receiverLk.RUnlock()
s.Reset()
return
}
bsnet.receiverLk.RUnlock()

reader := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
for {
received, err := bsmsg.FromPBReader(reader)
if err != nil {
if err != io.EOF {
s.Reset()
go bsnet.receiver.ReceiveError(err)

go func(err error) {
bsnet.receiverLk.RLock()
bsnet.receiver.ReceiveError(err)
bsnet.receiverLk.RUnlock()
}(err)
log.Debugf("bitswap net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err)
}
return
Expand All @@ -192,7 +204,9 @@ func (bsnet *impl) handleNewStream(s inet.Stream) {
p := s.Conn().RemotePeer()
ctx := context.Background()
log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer())
bsnet.receiverLk.RLock()
bsnet.receiver.ReceiveMessage(ctx, p, received)
bsnet.receiverLk.RUnlock()
atomic.AddUint64(&bsnet.stats.MessagesRecvd, 1)
}
}
Expand All @@ -215,11 +229,21 @@ func (nn *netNotifiee) impl() *impl {
}

func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
nn.impl().receiver.PeerConnected(v.RemotePeer())
bsnet := nn.impl()
bsnet.receiverLk.RLock()
if bsnet.receiver != nil {
bsnet.receiver.PeerConnected(v.RemotePeer())
}
bsnet.receiverLk.RUnlock()
}

func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
nn.impl().receiver.PeerDisconnected(v.RemotePeer())
bsnet := nn.impl()
bsnet.receiverLk.RLock()
if bsnet.receiver != nil {
bsnet.receiver.PeerDisconnected(v.RemotePeer())
}
bsnet.receiverLk.RUnlock()
}

func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {}
Expand Down

0 comments on commit 916eb69

Please sign in to comment.