Skip to content

Commit

Permalink
Fix the test case that often fails (#319)
Browse files Browse the repository at this point in the history
* (fix) TestByzantinePrevoteEquivocation: Remove a lazy proposer and set short timeout

* (fix) Improve logging for TestByzantinePrevoteEquivocation

* (fix) TestWALCrash: Don't start `crash` state when reach to stop height and improve logging
  • Loading branch information
tnasu authored Sep 27, 2021
1 parent 6bcf753 commit 624b74c
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 29 deletions.
18 changes: 15 additions & 3 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs.SetLogger(cs.Logger)
// set private validator
pv := privVals[i]
cs.SetPrivValidator(pv)
Expand Down Expand Up @@ -130,7 +129,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// make connected switches and start all reactors
p2p.MakeConnectedSwitches(config.P2P, nValidators, func(i int, s *p2p.Switch, c *config2.P2PConfig) *p2p.Switch {
s.AddReactor("CONSENSUS", reactors[i])
s.SetLogger(reactors[i].conS.Logger.With("module", "p2p"))
s.SetLogger(log.NewNopLogger().With("module", "p2p")) // Switch log is noisy for this test
return s
}, p2p.Connect2Switches)

Expand Down Expand Up @@ -164,6 +163,17 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
}
}

//
// Remove a lazy proposer:
// Cannot accept the below codes in `VerifyAggregatedSignature` after `lazyProposer.LastCommit.MakeCommit()`
// `commit.Signatures[len(commit.Signatures)-1] = types.NewCommitSigAbsent()`
//
// If you get fail test result, you find the below messages from each node.
// `prevote step: ProposalBlock is invalid`
// `err="wrong aggregated signature: `
// `failed to verify the aggregated hashes by 1 public keys`
//
/*
// introducing a lazy proposer means that the time of the block committed is different to the
// timestamp that the other nodes have. This tests to ensure that the evidence that finally gets
// proposed will have a valid timestamp
Expand Down Expand Up @@ -235,6 +245,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
}
}
*/

// start the consensus reactors
for i := 0; i < nValidators; i++ {
s := reactors[i].conS.GetState()
Expand Down Expand Up @@ -280,7 +292,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
assert.Equal(t, prevoteHeight, ev.Height())
}
}
case <-time.After(30 * time.Second): // XXX 20 second is short time, so we changed to 30 second
case <-time.After(10 * time.Second): // XXX 20 second is too much time, so we changed to 10 second
for i, reactor := range reactors {
t.Logf("Consensus Reactor %d\n%v", i, reactor)
}
Expand Down
47 changes: 43 additions & 4 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,17 @@ func newStateWithConfigAndBlockStore(
pv types.PrivValidator,
app abci.Application,
blockDB dbm.DB,
) *State {
return newStateWithConfigAndBlockStoreWithLoggers(thisConfig, state, pv, app, blockDB, DefaultTestLoggers())
}

func newStateWithConfigAndBlockStoreWithLoggers(
thisConfig *cfg.Config,
state sm.State,
pv types.PrivValidator,
app abci.Application,
blockDB dbm.DB,
loggers TestLoggers,
) *State {
// Get BlockStore
blockStore := store.NewBlockStore(blockDB)
Expand All @@ -415,7 +426,7 @@ func newStateWithConfigAndBlockStore(

// Make Mempool
mempool := mempl.NewCListMempool(thisConfig.Mempool, proxyAppConnMem, 0)
mempool.SetLogger(log.TestingLogger().With("module", "mempool"))
mempool.SetLogger(loggers.memLogger.With("module", "mempool"))
if thisConfig.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
}
Expand All @@ -429,13 +440,13 @@ func newStateWithConfigAndBlockStore(
panic(err)
}

blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
blockExec := sm.NewBlockExecutor(stateStore, loggers.execLogger, proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetLogger(loggers.csLogger.With("module", "consensus"))
cs.SetPrivValidator(pv)

eventBus := types.NewEventBus()
eventBus.SetLogger(log.TestingLogger().With("module", "events"))
eventBus.SetLogger(loggers.eventLogger.With("module", "events"))
err := eventBus.Start()
if err != nil {
panic(err)
Expand Down Expand Up @@ -761,6 +772,34 @@ func ensureNewEventOnChannel(ch <-chan tmpubsub.Message) {
//-------------------------------------------------------------------------------
// consensus nets

type TestLoggers struct {
memLogger log.Logger
evLogger log.Logger
execLogger log.Logger
csLogger log.Logger
eventLogger log.Logger
}

func NewTestLoggers(memLogger, evLogger, execLogger, csLogger, eventLogger log.Logger) TestLoggers {
return TestLoggers{
memLogger: memLogger,
evLogger: evLogger,
execLogger: execLogger,
csLogger: csLogger,
eventLogger: eventLogger,
}
}

func DefaultTestLoggers() TestLoggers {
return NewTestLoggers(
log.TestingLogger(), log.TestingLogger(), log.TestingLogger(), log.TestingLogger(), log.TestingLogger())
}

func NopTestLoggers() TestLoggers {
return NewTestLoggers(
log.NewNopLogger(), log.NewNopLogger(), log.NewNopLogger(), log.NewNopLogger(), log.NewNopLogger())
}

// consensusLogger is a TestingLogger which uses a different
// color for each validator ("validator" key must exist).
func consensusLogger() log.Logger {
Expand Down
68 changes: 48 additions & 20 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,29 +66,33 @@ func TestMain(m *testing.M) {
// and which ones we need the wal for - then we'd also be able to only flush the
// wal writer when we need to, instead of with every message.

func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Config,
lastBlockHeight int64, blockDB dbm.DB, stateStore sm.Store) {
logger := log.TestingLogger()
func startNewStateAndWaitForBlock(t *testing.T, i int, consensusReplayConfig *cfg.Config,
blockDB dbm.DB, stateStore sm.Store) {
logger := log.TestingLogger().With("attr", "make block", "i", i)
state, _ := stateStore.LoadFromDBOrGenesisFile(consensusReplayConfig.GenesisFile())
privValidator := loadPrivValidator(consensusReplayConfig)
cs := newStateWithConfigAndBlockStore(
cs := newStateWithConfigAndBlockStoreWithLoggers(
consensusReplayConfig,
state,
privValidator,
kvstore.NewApplication(),
blockDB,
NewTestLoggers(
log.NewNopLogger().With("module", "mempool"),
log.NewNopLogger().With("module", "evidence"),
logger.With("module", "executor"),
logger.With("module", "consensus"),
log.NewNopLogger().With("module", "event")),
)
cs.SetLogger(logger)

bytes, _ := ioutil.ReadFile(cs.config.WalFile())
t.Logf("====== WAL: \n\r%X\n", bytes)

err := cs.Start()
require.NoError(t, err)
defer func() {
if err := cs.Stop(); err != nil {
t.Error(err)
}
// Wait for closing WAL after writing remains messages to WAL
cs.Wait()
}()

// This is just a signal that we haven't halted; its not something contained
Expand All @@ -98,7 +102,9 @@ func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Confi
newBlockSub, err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
require.NoError(t, err)
select {
case <-newBlockSub.Out():
case msg := <-newBlockSub.Out():
height := msg.Data().(types.EventDataNewBlock).Block.Height
t.Logf("Make Block.Height[%d]", height)
case <-newBlockSub.Cancelled():
t.Fatal("newBlockSub was cancelled")
case <-time.After(10 * time.Second): // XXX 120 second is too much time, so we changed to 10 second
Expand Down Expand Up @@ -155,29 +161,34 @@ func TestWALCrash(t *testing.T) {
func crashWALandCheckLiveness(t *testing.T, consensusReplayConfig *cfg.Config,
initFn func(dbm.DB, *State, context.Context), heightToStop int64) {
walPanicked := make(chan error)
crashingWal := &crashingWAL{panicCh: walPanicked, heightToStop: heightToStop}
crashingWal := &crashingWAL{t: t, panicCh: walPanicked, heightToStop: heightToStop}

i := 1
LOOP:
for {
t.Logf("====== LOOP %d\n", i)

// create consensus state from a clean slate
logger := log.NewNopLogger()
blockDB := memdb.NewDB()
stateDB := blockDB
stateStore := sm.NewStore(stateDB)
state, err := sm.MakeGenesisStateFromFile(consensusReplayConfig.GenesisFile())
require.NoError(t, err)
privValidator := loadPrivValidator(consensusReplayConfig)
cs := newStateWithConfigAndBlockStore(
logger := log.TestingLogger().With("attr", "crash wal", "i", i)
cs := newStateWithConfigAndBlockStoreWithLoggers(
consensusReplayConfig,
state,
privValidator,
kvstore.NewApplication(),
blockDB,
NewTestLoggers(
log.NewNopLogger().With("module", "mempool"),
log.NewNopLogger().With("module", "evidence"),
logger.With("module", "executor"),
logger.With("module", "consensus"),
log.NewNopLogger().With("module", "event")),
)
cs.SetLogger(logger)

// start sending transactions
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -200,18 +211,18 @@ LOOP:
err = cs.Start()
require.NoError(t, err)

i++

select {
case err := <-walPanicked:
t.Logf("WAL panicked: %v", err)

// make sure we can make blocks after a crash
startNewStateAndWaitForBlock(t, consensusReplayConfig, cs.Height, blockDB, stateStore)

// stop consensus state and transactions sender (initFn)
cs.Stop() //nolint:errcheck // Logging this error causes failure
cancel()
// For safety since nobody stops and writing WAL continue sometimes.
cs.wal.Stop()

// make sure we can make blocks after a crash
startNewStateAndWaitForBlock(t, i, consensusReplayConfig, blockDB, stateStore)

// if we reached the required height, exit
if _, ok := err.(ReachedHeightToStopError); ok {
Expand All @@ -220,13 +231,16 @@ LOOP:
case <-time.After(10 * time.Second):
t.Fatal("WAL did not panic for 10 seconds (check the log)")
}

i++
}
}

// crashingWAL is a WAL which crashes or rather simulates a crash during Save
// (before and after). It remembers a message for which we last panicked
// (lastPanickedForMsgIndex), so we don't panic for it in subsequent iterations.
type crashingWAL struct {
t *testing.T
next WAL
panicCh chan error
heightToStop int64
Expand Down Expand Up @@ -260,15 +274,29 @@ func (e ReachedHeightToStopError) Error() string {
// exiting the cs.receiveRoutine.
func (w *crashingWAL) Write(m WALMessage) error {
if endMsg, ok := m.(EndHeightMessage); ok {
if endMsg.Height == w.heightToStop {
if endMsg.Height >= w.heightToStop {
w.t.Logf("Rearched[%d] WAL messasge[%T], Height[%d]", w.msgIndex, m, endMsg.Height)
w.panicCh <- ReachedHeightToStopError{endMsg.Height}
runtime.Goexit()
return nil
}

w.t.Logf("Not-Rearched[%d] WAL messasge[%T], Height[%d]", w.msgIndex, m, endMsg.Height)
w.msgIndex++
return w.next.Write(m)
}

if mi, ok := m.(msgInfo); ok {
if pm, ok := mi.Msg.(*ProposalMessage); ok {
w.t.Logf("Skipped[%d] WAL message[%T]:[%T]:[%v]", w.msgIndex, m, mi.Msg, pm.Proposal.Type)
} else if vm, ok := mi.Msg.(*VoteMessage); ok {
w.t.Logf("Skipped[%d] WAL message[%T]:[%T]:[%v]", w.msgIndex, m, mi.Msg, vm.Vote.Type)
} else {
w.t.Logf("Skipped[%d] WAL message[%T]:[%T]", w.msgIndex, m, mi.Msg)
}
} else {
w.t.Logf("Skipped[%d] WAL message[%T]", w.msgIndex, m)
}

if w.msgIndex > w.lastPanickedForMsgIndex {
w.lastPanickedForMsgIndex = w.msgIndex
_, file, line, _ := runtime.Caller(1)
Expand Down
12 changes: 10 additions & 2 deletions p2p/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ func CreateRandomPeer(outbound bool) Peer {
mconn: &conn.MConnection{},
metrics: NopMetrics(),
}
p.SetLogger(log.TestingLogger().With("peer", addr))
if p.Logger == nil {
p.SetLogger(log.TestingLogger().With("peer", addr))
} else {
p.SetLogger(p.Logger.With("peer", addr))
}
return p
}

Expand Down Expand Up @@ -200,7 +204,11 @@ func MakeSwitch(

// TODO: let the config be passed in?
sw := initSwitch(i, NewSwitch(cfg, t, opts...), cfg) // receive buffer size is all 1000 in test
sw.SetLogger(log.TestingLogger().With("switch", i))
if sw.Logger == nil {
sw.SetLogger(log.TestingLogger().With("switch", i))
} else {
sw.SetLogger(sw.Logger.With("switch", i))
}
sw.SetNodeKey(&nodeKey)

ni := nodeInfo.(DefaultNodeInfo)
Expand Down

0 comments on commit 624b74c

Please sign in to comment.