Skip to content

Commit

Permalink
add allocate in Receive calls (#9667)
Browse files Browse the repository at this point in the history
Co-authored-by: William Banfield <[email protected]>
  • Loading branch information
tnasu and williambanfield committed Jul 22, 2023
1 parent 0d3fd6e commit 9287c19
Show file tree
Hide file tree
Showing 19 changed files with 231 additions and 11 deletions.
2 changes: 1 addition & 1 deletion blockchain/v0/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (bcR *BlockchainReactor) ReceiveEnvelope(e p2p.Envelope) {
}

func (bcR *BlockchainReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *ocbcproto.Message
msg := &ocbcproto.Message{}
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)
Expand Down
22 changes: 22 additions & 0 deletions blockchain/v0/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

abci "github.com/tendermint/tendermint/abci/types"
bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain"
dbm "github.com/tendermint/tm-db"

ocabci "github.com/Finschia/ostracon/abci/types"
Expand Down Expand Up @@ -197,6 +199,26 @@ func TestNoBlockResponse(t *testing.T) {
}
}

func TestLegacyReactorReceiveBasic(t *testing.T) {
config = cfg.ResetTestRoot("blockchain_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
reactor := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 10,
config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize).reactor
peer := p2p.CreateRandomPeer(false)

reactor.InitPeer(peer)
reactor.AddPeer(peer)
m := &bcproto.StatusRequest{}
wm := m.Wrap()
msg, err := proto.Marshal(wm)
assert.NoError(t, err)

assert.NotPanics(t, func() {
reactor.Receive(BlockchainChannel, peer, msg)
})
}

// NOTE: This is too hard to test without
// an easy way to add test peer to switch
// or without significant refactoring of the module.
Expand Down
2 changes: 1 addition & 1 deletion blockchain/v1/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func (bcR *BlockchainReactor) ReceiveEnvelope(e p2p.Envelope) {
}

func (bcR *BlockchainReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *bcproto.Message
msg := &bcproto.Message{}
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)
Expand Down
21 changes: 21 additions & 0 deletions blockchain/v1/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
dbm "github.com/tendermint/tm-db"

Expand Down Expand Up @@ -352,6 +354,25 @@ outerFor:
assert.True(t, lastReactorPair.bcR.Switch.Peers().Size() < len(reactorPairs)-1)
}

func TestLegacyReactorReceiveBasic(t *testing.T) {
config = cfg.ResetTestRoot("blockchain_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
reactor := newBlockchainReactor(t, log.TestingLogger(), genDoc, privVals, 10)
peer := p2p.CreateRandomPeer(false)

reactor.InitPeer(peer)
reactor.AddPeer(peer)
m := &bcproto.StatusRequest{}
wm := m.Wrap()
msg, err := proto.Marshal(wm)
assert.NoError(t, err)

assert.NotPanics(t, func() {
reactor.Receive(BlockchainChannel, peer, msg)
})
}

//----------------------------------------------
// utility funcs

Expand Down
2 changes: 1 addition & 1 deletion blockchain/v2/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ func (r *BlockchainReactor) ReceiveEnvelope(e p2p.Envelope) {
}

func (r *BlockchainReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *bcproto.Message
msg := &bcproto.Message{}
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)
Expand Down
28 changes: 28 additions & 0 deletions blockchain/v2/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,34 @@ func TestReactorHelperMode(t *testing.T) {
}
}

func TestLegacyReactorReceiveBasic(t *testing.T) {
config := cfg.ResetTestRoot("blockchain_reactor_v2_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(config.ChainID(), 1, false, 30)
params := testReactorParams{
logger: log.TestingLogger(),
genDoc: genDoc,
privVals: privVals,
startHeight: 20,
mockA: true,
}
reactor := newTestReactor(params)
mockSwitch := &mockSwitchIo{switchedToConsensus: false}
reactor.io = mockSwitch
peer := p2p.CreateRandomPeer(false)

reactor.InitPeer(peer)
reactor.AddPeer(peer)
m := &bcproto.StatusRequest{}
wm := m.Wrap()
msg, err := proto.Marshal(wm)
assert.NoError(t, err)

assert.NotPanics(t, func() {
reactor.Receive(BlockchainChannel, peer, msg)
})
}

func TestReactorSetSwitchNil(t *testing.T) {
config := cfg.ResetTestRoot("blockchain_reactor_v2_test")
defer os.RemoveAll(config.RootDir)
Expand Down
2 changes: 1 addition & 1 deletion consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
}

func (conR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *tmcons.Message
msg := &tmcons.Message{}
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)
Expand Down
32 changes: 31 additions & 1 deletion consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -259,7 +260,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
}, css)
}

func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) {
func TestLegacyReactorReceiveBasicIfAddPeerHasntBeenCalledYet(t *testing.T) {
N := 1
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
defer cleanup()
Expand All @@ -285,6 +286,35 @@ func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) {
})
}

func TestLegacyReactorReceiveBasic(t *testing.T) {
N := 1
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
defer cleanup()
reactors, _, eventBuses := startConsensusNet(t, css, N)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)

var (
reactor = reactors[0]
peer = p2pmock.NewPeer(nil)
)

reactor.InitPeer(peer)
v := &tmcons.HasVote{
Height: 1,
Round: 1,
Index: 1,
Type: tmproto.PrevoteType,
}
w := v.Wrap()
msg, err := proto.Marshal(w)
assert.NoError(t, err)

assert.NotPanics(t, func() {
reactor.Receive(StateChannel, peer, msg)
reactor.AddPeer(peer)
})
}

func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) {
N := 1
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
Expand Down
2 changes: 1 addition & 1 deletion evidence/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (evR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
}

func (evR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *tmproto.EvidenceList
msg := &tmproto.EvidenceList{}
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)
Expand Down
28 changes: 28 additions & 0 deletions evidence/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/fortytw2/leaktest"
"github.com/go-kit/log/term"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -375,6 +376,33 @@ func exampleVote(t byte) *types.Vote {
ValidatorIndex: 56789,
}
}
func TestLegacyReactorReceiveBasic(t *testing.T) {
config := cfg.TestConfig()
N := 1

stateDBs := make([]sm.Store, N)
val := types.NewMockPV()
stateDBs[0] = initializeValidatorState(val, 1)

reactors, _ := makeAndConnectReactorsAndPools(config, stateDBs)

var (
reactor = reactors[0]
peer = &p2pmocks.Peer{}
)
quitChan := make(<-chan struct{})
peer.On("Quit").Return(quitChan)

reactor.InitPeer(peer)
reactor.AddPeer(peer)
e := &tmproto.EvidenceList{}
msg, err := proto.Marshal(e)
assert.NoError(t, err)

assert.NotPanics(t, func() {
reactor.Receive(evidence.EvidenceChannel, peer, msg)
})
}

// nolint:lll //ignore line length for tests
func TestEvidenceVectors(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion mempool/v0/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
}

func (memR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *protomem.Message
msg := &protomem.Message{}
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)
Expand Down
26 changes: 26 additions & 0 deletions mempool/v0/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/fortytw2/leaktest"
"github.com/go-kit/log/term"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -300,6 +301,31 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) {
}
}

func TestLegacyReactorReceiveBasic(t *testing.T) {
config := cfg.TestConfig()
const N = 1
reactors := makeAndConnectReactors(config, N)
var (
reactor = reactors[0]
peer = mock.NewPeer(nil)
)
defer func() {
err := reactor.Stop()
assert.NoError(t, err)
}()

reactor.InitPeer(peer)
reactor.AddPeer(peer)
m := &memproto.Txs{}
wm := m.Wrap()
msg, err := proto.Marshal(wm)
assert.NoError(t, err)

assert.NotPanics(t, func() {
reactor.Receive(mempool.MempoolChannel, peer, msg)
})
}

// mempoolLogger is a TestingLogger which uses a different
// color for each validator ("validator" key must exist).
func mempoolLogger() log.Logger {
Expand Down
2 changes: 1 addition & 1 deletion mempool/v1/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
}

func (memR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *protomem.Message
msg := &protomem.Message{}
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)
Expand Down
31 changes: 31 additions & 0 deletions mempool/v1/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"time"

"github.com/go-kit/log/term"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/tendermint/tendermint/abci/example/kvstore"
"github.com/tendermint/tendermint/p2p/mock"

cfg "github.com/tendermint/tendermint/config"

Expand Down Expand Up @@ -95,6 +97,35 @@ func TestMempoolVectors(t *testing.T) {
}
}

func TestLegacyReactorReceiveBasic(t *testing.T) {
config := cfg.TestConfig()
// if there were more than two reactors, the order of transactions could not be
// asserted in waitForTxsOnReactors (due to transactions gossiping). If we
// replace Connect2Switches (full mesh) with a func, which connects first
// reactor to others and nothing else, this test should also pass with >2 reactors.
const N = 1
reactors := makeAndConnectReactors(config, N)
var (
reactor = reactors[0]
peer = mock.NewPeer(nil)
)
defer func() {
err := reactor.Stop()
assert.NoError(t, err)
}()

reactor.InitPeer(peer)
reactor.AddPeer(peer)
m := &memproto.Txs{}
wm := m.Wrap()
msg, err := proto.Marshal(wm)
assert.NoError(t, err)

assert.NotPanics(t, func() {
reactor.Receive(mempool.MempoolChannel, peer, msg)
})
}

func makeAndConnectReactors(config *cfg.Config, n int) []*Reactor {
reactors := make([]*Reactor, n)
logger := mempoolLogger()
Expand Down
2 changes: 1 addition & 1 deletion p2p/pex/pex_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (r *Reactor) ReceiveEnvelope(e p2p.Envelope) {
}

func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *tmp2p.Message
msg := &tmp2p.Message{}
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)
Expand Down
16 changes: 16 additions & 0 deletions p2p/pex/pex_reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,22 @@ func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) {
assert.Equal(t, size, book.Size())
}

func TestLegacyReactorReceiveBasic(t *testing.T) {
pexR, _ := createReactor(&ReactorConfig{})
peer := p2p.CreateRandomPeer(false)

pexR.InitPeer(peer)
pexR.AddPeer(peer)
m := &tmp2p.PexAddrs{}
wm := m.Wrap()
msg, err := proto.Marshal(wm)
assert.NoError(t, err)

assert.NotPanics(t, func() {
pexR.Receive(PexChannel, peer, msg)
})
}

func TestPEXReactorDialPeer(t *testing.T) {
pexR, book := createReactor(&ReactorConfig{})
defer teardownReactor(book)
Expand Down
Loading

0 comments on commit 9287c19

Please sign in to comment.