From 2c06eb76d6b7afaf00159bd5c96ea60484de30f1 Mon Sep 17 00:00:00 2001 From: zenground0 Date: Mon, 14 Feb 2022 09:47:18 -0500 Subject: [PATCH 1/2] Improve MineBlocksMustPost use it in PaychAPI itest --- itests/kit/blockminer.go | 96 +++++++++++++++++++++------------------- itests/paych_api_test.go | 2 +- 2 files changed, 52 insertions(+), 46 deletions(-) diff --git a/itests/kit/blockminer.go b/itests/kit/blockminer.go index 91ddc2e26e0..cae3d26e35e 100644 --- a/itests/kit/blockminer.go +++ b/itests/kit/blockminer.go @@ -10,6 +10,7 @@ import ( "github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/dline" "github.com/filecoin-project/lotus/api" aminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/types" @@ -82,11 +83,49 @@ func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, smsg *type return } +func (bm *BlockMiner) forcePoSt(ctx context.Context, ts *types.TipSet, dlinfo *dline.Info) { + + tracker := newPartitionTracker(ctx, dlinfo.Index, bm) + if !tracker.done(bm.t) { // need to wait for post + bm.t.Logf("expect %d partitions proved but only see %d", len(tracker.partitions), tracker.count(bm.t)) + poolEvts, err := bm.miner.FullNode.MpoolSub(ctx) //subscribe before checking pending so we don't miss any events + require.NoError(bm.t, err) + + // First check pending messages we'll mine this epoch + msgs, err := bm.miner.FullNode.MpoolPending(ctx, types.EmptyTSK) + require.NoError(bm.t, err) + for _, msg := range msgs { + tracker.recordIfPost(bm.t, bm, msg) + } + + // post not yet in mpool, wait for it + if !tracker.done(bm.t) { + bm.t.Logf("post missing from mpool, block mining suspended until it arrives") + POOL: + for { + bm.t.Logf("mpool event wait loop at block height %d, ts: %s", ts.Height(), ts.Key()) + select { + case <-ctx.Done(): + return + case evt := <-poolEvts: + bm.t.Logf("pool event: %d", evt.Type) + if evt.Type == api.MpoolAdd { + bm.t.Logf("incoming message %v", evt.Message) + if tracker.recordIfPost(bm.t, bm, evt.Message) { + break POOL + } + } + } + } + bm.t.Logf("done waiting on mpool") + } + } +} + // Like MineBlocks but refuses to mine until the window post scheduler has wdpost messages in the mempool // and everything shuts down if a post fails. It also enforces that every block mined succeeds func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Duration) { - - time.Sleep(3 * time.Second) + time.Sleep(time.Second) // wrap context in a cancellable context. ctx, bm.cancel = context.WithCancel(ctx) @@ -94,8 +133,6 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur go func() { defer bm.wg.Done() - activeDeadlines := make(map[int]struct{}) - _ = activeDeadlines ts, err := bm.miner.FullNode.ChainHead(ctx) require.NoError(bm.t, err) wait := make(chan bool) @@ -103,7 +140,7 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur require.NoError(bm.t, err) // read current out curr := <-chg - require.Equal(bm.t, ts.Height(), curr[0].Val.Height()) + require.Equal(bm.t, ts.Height(), curr[0].Val.Height(), "failed sanity check: are multiple miners mining with must post?") for { select { case <-time.After(blocktime): @@ -111,52 +148,15 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur return } nulls := atomic.SwapInt64(&bm.nextNulls, 0) - require.Equal(bm.t, int64(0), nulls, "Injecting > 0 null blocks while `MustPost` mining is currently unsupported") // Wake up and figure out if we are at the end of an active deadline ts, err := bm.miner.FullNode.ChainHead(ctx) require.NoError(bm.t, err) - tsk := ts.Key() - dlinfo, err := bm.miner.FullNode.StateMinerProvingDeadline(ctx, bm.miner.ActorAddr, tsk) + dlinfo, err := bm.miner.FullNode.StateMinerProvingDeadline(ctx, bm.miner.ActorAddr, ts.Key()) require.NoError(bm.t, err) - if ts.Height()+1 == dlinfo.Last() { // Last epoch in dline, we need to check that miner has posted - - tracker := newPartitionTracker(ctx, dlinfo.Index, bm) - if !tracker.done(bm.t) { // need to wait for post - bm.t.Logf("expect %d partitions proved but only see %d", len(tracker.partitions), tracker.count(bm.t)) - poolEvts, err := bm.miner.FullNode.MpoolSub(ctx) - require.NoError(bm.t, err) - - // First check pending messages we'll mine this epoch - msgs, err := bm.miner.FullNode.MpoolPending(ctx, types.EmptyTSK) - require.NoError(bm.t, err) - for _, msg := range msgs { - tracker.recordIfPost(bm.t, bm, msg) - } - - // post not yet in mpool, wait for it - if !tracker.done(bm.t) { - bm.t.Logf("post missing from mpool, block mining suspended until it arrives") - POOL: - for { - bm.t.Logf("mpool event wait loop at block height %d, ts: %s", ts.Height(), ts.Key()) - select { - case <-ctx.Done(): - return - case evt := <-poolEvts: - bm.t.Logf("pool event: %d", evt.Type) - if evt.Type == api.MpoolAdd { - bm.t.Logf("incoming message %v", evt.Message) - if tracker.recordIfPost(bm.t, bm, evt.Message) { - break POOL - } - } - } - } - bm.t.Logf("done waiting on mpool") - } - } + if ts.Height()+1+abi.ChainEpoch(nulls) >= dlinfo.Last() { // Next block brings us past the last epoch in dline, we need to wait for miner to post + bm.forcePoSt(ctx, ts, dlinfo) } var target abi.ChainEpoch @@ -173,6 +173,12 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur Done: reportSuccessFn, }) success = <-wait + if !success { + // if we are mining a new null block and it brings us past deadline boundary we need to wait for miner to post + if ts.Height()+1+abi.ChainEpoch(nulls+i) >= dlinfo.Last() { + bm.forcePoSt(ctx, ts, dlinfo) + } + } } // Wait until it shows up on the given full nodes ChainHead diff --git a/itests/paych_api_test.go b/itests/paych_api_test.go index a07c499f9cf..7e135a9bea7 100644 --- a/itests/paych_api_test.go +++ b/itests/paych_api_test.go @@ -51,7 +51,7 @@ func TestPaymentChannelsAPI(t *testing.T) { Miner(&miner, &paymentCreator, kit.WithAllSubsystems()). Start(). InterconnectAll() - bms := ens.BeginMining(blockTime) + bms := ens.BeginMiningMustPost(blockTime) bm := bms[0] // send some funds to register the receiver From 977351f41911a12b81d0521c838e9155fa7c7f32 Mon Sep 17 00:00:00 2001 From: zenground0 Date: Mon, 14 Feb 2022 14:00:41 -0500 Subject: [PATCH 2/2] Fix from Magik to remove hanging behavior --- itests/kit/blockminer.go | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/itests/kit/blockminer.go b/itests/kit/blockminer.go index cae3d26e35e..a232d82e026 100644 --- a/itests/kit/blockminer.go +++ b/itests/kit/blockminer.go @@ -3,6 +3,7 @@ package kit import ( "bytes" "context" + "fmt" "sync" "sync/atomic" "testing" @@ -64,11 +65,10 @@ func (p *partitionTracker) done(t *testing.T) bool { return uint64(len(p.partitions)) == p.count(t) } -func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, smsg *types.SignedMessage) (ret bool) { +func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, msg *types.Message) (ret bool) { defer func() { ret = p.done(t) }() - msg := smsg.Message if !(msg.To == bm.miner.ActorAddr) { return } @@ -95,7 +95,26 @@ func (bm *BlockMiner) forcePoSt(ctx context.Context, ts *types.TipSet, dlinfo *d msgs, err := bm.miner.FullNode.MpoolPending(ctx, types.EmptyTSK) require.NoError(bm.t, err) for _, msg := range msgs { - tracker.recordIfPost(bm.t, bm, msg) + if tracker.recordIfPost(bm.t, bm, &msg.Message) { + fmt.Printf("found post in mempool pending\n") + } + } + + // Account for included but not yet executed messages + for _, bc := range ts.Cids() { + msgs, err := bm.miner.FullNode.ChainGetBlockMessages(ctx, bc) + require.NoError(bm.t, err) + for _, msg := range msgs.BlsMessages { + if tracker.recordIfPost(bm.t, bm, msg) { + fmt.Printf("found post in message of prev tipset\n") + } + + } + for _, msg := range msgs.SecpkMessages { + if tracker.recordIfPost(bm.t, bm, &msg.Message) { + fmt.Printf("found post in message of prev tipset\n") + } + } } // post not yet in mpool, wait for it @@ -111,7 +130,8 @@ func (bm *BlockMiner) forcePoSt(ctx context.Context, ts *types.TipSet, dlinfo *d bm.t.Logf("pool event: %d", evt.Type) if evt.Type == api.MpoolAdd { bm.t.Logf("incoming message %v", evt.Message) - if tracker.recordIfPost(bm.t, bm, evt.Message) { + if tracker.recordIfPost(bm.t, bm, &evt.Message.Message) { + fmt.Printf("found post in mempool evt\n") break POOL } }