Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Fix stream race tests #1208

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
4d65aea
swarm/network/stream: newStreamerTester cleanup only if err is nil
janos Feb 8, 2019
73d5c97
swarm/network/stream: raise newStreamerTester waitForPeers timeout
janos Feb 8, 2019
d233560
swarm/network/stream: fix data races in GetPeerSubscriptions
janos Feb 8, 2019
edfee9c
swarm/storage: prevent data race on LDBStore.batchesC
janos Feb 8, 2019
91f8735
swarm/network/stream: fix TestGetSubscriptionsRPC data race
janos Feb 8, 2019
11d9441
swarm/network/stream: correctly use Simulation.Run callback
janos Feb 8, 2019
e411147
swarm/network: protect addrCountC in Kademlia.AddrCountC function
janos Feb 11, 2019
080f4c2
p2p/simulations: fix a deadlock calling getRandomNode with lock
janos Feb 11, 2019
b151dc2
swarm/network/stream: terminate disconnect goruotines in tests
janos Feb 11, 2019
4f5807b
swarm/network/stream: reduce memory consumption when testing data races
janos Feb 11, 2019
dee2145
swarm/network/stream: add watchDisconnections helper function
janos Feb 12, 2019
c3f2368
swarm/network/stream: add concurrent counter for tests
janos Feb 12, 2019
baed029
swarm/network/stream: rename race/norace test files and use const
janos Feb 12, 2019
1467d2b
swarm/network/stream: remove watchSim and its panic
janos Feb 12, 2019
3710a72
swarm/network/stream: pass context in watchDisconnections
janos Feb 12, 2019
714d871
swarm/network/stream: add concurrent safe bool for watchDisconnections
janos Feb 12, 2019
d8f29cf
swarm/storage: fix LDBStore.batchesC data race by not closing it
janos Feb 13, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions swarm/network/stream/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste
// temp datadir
datadir, err := ioutil.TempDir("", "streamer")
if err != nil {
return nil, nil, nil, func() {}, err
return nil, nil, nil, nil, err
}
removeDataDir := func() {
os.RemoveAll(datadir)
Expand All @@ -163,12 +163,14 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste

localStore, err := storage.NewTestLocalStoreForAddr(params)
if err != nil {
return nil, nil, nil, removeDataDir, err
removeDataDir()
return nil, nil, nil, nil, err
}

netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, nil, removeDataDir, err
removeDataDir()
return nil, nil, nil, nil, err
}

delivery := NewDelivery(to, netStore)
Expand All @@ -180,8 +182,9 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste
}
protocolTester := p2ptest.NewProtocolTester(addr.ID(), 1, streamer.runProtocol)

err = waitForPeers(streamer, 1*time.Second, 1)
err = waitForPeers(streamer, 10*time.Second, 1)
if err != nil {
teardown()
return nil, nil, nil, nil, errors.New("timeout: peer is not created")
}

Expand Down
8 changes: 4 additions & 4 deletions swarm/network/stream/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ func TestStreamerRetrieveRequest(t *testing.T) {
Syncing: SyncingDisabled,
}
tester, streamer, _, teardown, err := newStreamerTester(regOpts)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

node := tester.Nodes[0]

Expand Down Expand Up @@ -100,10 +100,10 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
Retrieval: RetrievalEnabled,
Syncing: SyncingDisabled, //do no syncing
})
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

node := tester.Nodes[0]

Expand Down Expand Up @@ -172,10 +172,10 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
Retrieval: RetrievalEnabled,
Syncing: SyncingDisabled,
})
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

node := tester.Nodes[0]

Expand Down Expand Up @@ -362,10 +362,10 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
})
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
return &testClient{
Expand Down
8 changes: 4 additions & 4 deletions swarm/network/stream/lightnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) {
Syncing: SyncingDisabled,
}
tester, _, _, teardown, err := newStreamerTester(registryOptions)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

node := tester.Nodes[0]

Expand Down Expand Up @@ -68,10 +68,10 @@ func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) {
Syncing: SyncingDisabled,
}
tester, _, _, teardown, err := newStreamerTester(registryOptions)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

node := tester.Nodes[0]

Expand Down Expand Up @@ -112,10 +112,10 @@ func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) {
Syncing: SyncingRegisterOnly,
}
tester, _, _, teardown, err := newStreamerTester(registryOptions)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

node := tester.Nodes[0]

Expand Down Expand Up @@ -157,10 +157,10 @@ func TestLigthnodeRequestSubscriptionWithoutSync(t *testing.T) {
Syncing: SyncingDisabled,
}
tester, _, _, teardown, err := newStreamerTester(registryOptions)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

node := tester.Nodes[0]

Expand Down
6 changes: 6 additions & 0 deletions swarm/network/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,16 +938,22 @@ It returns a map of node IDs with an array of string representations of Stream o
func (api *API) GetPeerSubscriptions() map[string][]string {
//create the empty map
pstreams := make(map[string][]string)

//iterate all streamer peers
api.streamer.peersMu.RLock()
frncmx marked this conversation as resolved.
Show resolved Hide resolved
defer api.streamer.peersMu.RUnlock()

for id, p := range api.streamer.peers {
var streams []string
//every peer has a map of stream servers
//every stream server represents a subscription
p.serverMu.RLock()
for s := range p.servers {
//append the string representation of the stream
//to the list for this peer
streams = append(streams, s.String())
}
p.serverMu.RUnlock()
//set the array of stream servers to the map
pstreams[id.String()] = streams
}
Expand Down
35 changes: 20 additions & 15 deletions swarm/network/stream/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ import (

func TestStreamerSubscribe(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

stream := NewStream("foo", "", true)
err = streamer.Subscribe(tester.Nodes[0].ID(), stream, NewRange(0, 0), Top)
Expand All @@ -55,10 +55,10 @@ func TestStreamerSubscribe(t *testing.T) {

func TestStreamerRequestSubscription(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

stream := NewStream("foo", "", false)
err = streamer.RequestSubscription(tester.Nodes[0].ID(), stream, &Range{}, Top)
Expand Down Expand Up @@ -146,10 +146,10 @@ func (self *testServer) Close() {

func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
return newTestClient(t), nil
Expand Down Expand Up @@ -239,10 +239,10 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {

func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

stream := NewStream("foo", "", false)

Expand Down Expand Up @@ -306,10 +306,10 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {

func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

stream := NewStream("foo", "", true)

Expand Down Expand Up @@ -372,10 +372,10 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {

func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 0), nil
Expand Down Expand Up @@ -416,10 +416,10 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {

func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

stream := NewStream("foo", "", true)

Expand Down Expand Up @@ -479,10 +479,10 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {

func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

stream := NewStream("foo", "", true)

Expand Down Expand Up @@ -544,10 +544,10 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {

func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

stream := NewStream("foo", "", true)

Expand Down Expand Up @@ -643,10 +643,10 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {

func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 10), nil
Expand Down Expand Up @@ -780,10 +780,10 @@ func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
Syncing: SyncingDisabled,
MaxPeerServers: maxPeerServers,
})
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 0), nil
Expand Down Expand Up @@ -854,10 +854,10 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{
MaxPeerServers: maxPeerServers,
})
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 0), nil
Expand Down Expand Up @@ -940,10 +940,10 @@ func TestHasPriceImplementation(t *testing.T) {
Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
})
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

if r.prices == nil {
t.Fatal("No prices implementation available for the stream protocol")
Expand Down Expand Up @@ -1293,7 +1293,9 @@ func TestGetSubscriptionsRPC(t *testing.T) {
t.Fatal("Context timed out")
}

lock.RLock()
frncmx marked this conversation as resolved.
Show resolved Hide resolved
log.Debug("Expected message count: ", "expectedMsgCount", expectedMsgCount)
lock.RUnlock()
//now iterate again, this time we call each node via RPC to get its subscriptions
realCount := 0
for _, node := range nodes {
Expand Down Expand Up @@ -1324,8 +1326,11 @@ func TestGetSubscriptionsRPC(t *testing.T) {
}
}
// every node is mutually subscribed to each other, so the actual count is half of it
if realCount/2 != expectedMsgCount {
return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount/2, expectedMsgCount)
lock.RLock()
emc := expectedMsgCount
lock.RUnlock()
if realCount/2 != emc {
return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount/2, emc)
}
return nil
})
Expand Down
4 changes: 3 additions & 1 deletion swarm/storage/ldbstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,12 +805,12 @@ func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error {
gcIdxKey := getGCIdxKey(&index)
gcIdxData := getGCIdxValue(&index, po, chunk.Address())
s.batch.Put(gcIdxKey, gcIdxData)
s.lock.Unlock()

select {
case s.batchesC <- struct{}{}:
default:
}
s.lock.Unlock()
frncmx marked this conversation as resolved.
Show resolved Hide resolved

select {
case <-batch.c:
Expand Down Expand Up @@ -1049,7 +1049,9 @@ func (s *LDBStore) Close() {
s.lock.Unlock()
// force writing out current batch
s.writeCurrentBatch()
s.lock.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is strange, but this is the data race: #1198 (comment).

close(s.batchesC)
s.lock.Unlock()
s.db.Close()
}

Expand Down