From 1bed452e11685c08630b13e1d457f1690e50eb45 Mon Sep 17 00:00:00 2001 From: evan-forbes Date: Sat, 19 Jun 2021 15:47:38 -0500 Subject: [PATCH 1/7] change the blockstore interface --- blockchain/v0/reactor.go | 14 +++++-- blockchain/v0/reactor_test.go | 16 ++++++-- consensus/replay.go | 14 +++++-- consensus/replay_test.go | 23 ++++++----- consensus/state.go | 5 ++- evidence/pool_test.go | 6 ++- rpc/core/blocks.go | 17 ++++++-- rpc/core/blocks_test.go | 22 ++++++---- rpc/core/tx.go | 8 +++- state/services.go | 8 ++-- store/store.go | 24 +++++++---- store/store_test.go | 75 +++++++++++++++++++++++++---------- 12 files changed, 168 insertions(+), 64 deletions(-) diff --git a/blockchain/v0/reactor.go b/blockchain/v0/reactor.go index 598ed701bf..3beb9e18d9 100644 --- a/blockchain/v0/reactor.go +++ b/blockchain/v0/reactor.go @@ -1,6 +1,7 @@ package v0 import ( + "context" "fmt" "reflect" "time" @@ -178,7 +179,11 @@ func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { func (bcR *BlockchainReactor) respondToPeer(msg *bcproto.BlockRequest, src p2p.Peer) (queued bool) { - block := bcR.store.LoadBlock(msg.Height) + block, err := bcR.store.LoadBlock(context.TODO(), msg.Height) + if err != nil { + // todo(evan): handle this error + panic(err) + } if block != nil { bl, err := block.ToProto() if err != nil { @@ -418,11 +423,14 @@ FOR_LOOP: bcR.pool.PopRequest() // TODO: batch saves so we dont persist to disk every block - bcR.store.SaveBlock(first, firstParts, second.LastCommit) + err := bcR.store.SaveBlock(context.TODO(), first, firstParts, second.LastCommit) + if err != nil { + // todo(evan): don't panic + panic(err) + } // TODO: same thing for app - but we would need a way to get the hash // without persisting the state. - var err error state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first) if err != nil { // TODO This is bad, are we zombie? diff --git a/blockchain/v0/reactor_test.go b/blockchain/v0/reactor_test.go index 218caf0b43..62bcafe0a3 100644 --- a/blockchain/v0/reactor_test.go +++ b/blockchain/v0/reactor_test.go @@ -1,6 +1,7 @@ package v0 import ( + "context" "crypto/sha256" "fmt" "os" @@ -100,7 +101,10 @@ func newBlockchainReactor( lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil) if blockHeight > 1 { lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1) - lastBlock := blockStore.LoadBlock(blockHeight - 1) + lastBlock, err := blockStore.LoadBlock(context.TODO(), blockHeight-1) + if err != nil { + panic(err) + } vote, err := types.MakeVote( lastBlock.Header.Height, @@ -127,7 +131,10 @@ func newBlockchainReactor( panic(fmt.Errorf("error apply block: %w", err)) } - blockStore.SaveBlock(thisBlock, thisParts, lastCommit) + err := blockStore.SaveBlock(context.TODO(), thisBlock, thisParts, lastCommit) + if err != nil { + panic(err) + } } bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) @@ -184,7 +191,10 @@ func TestNoBlockResponse(t *testing.T) { assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height()) for _, tt := range tests { - block := reactorPairs[1].reactor.store.LoadBlock(tt.height) + block, err := reactorPairs[1].reactor.store.LoadBlock(context.TODO(), tt.height) + if err != nil { + panic(err) + } if tt.existent { assert.True(t, block != nil) } else { diff --git a/consensus/replay.go b/consensus/replay.go index beb0d70039..4bb48ec8ac 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -463,7 +463,11 @@ func (h *Handshaker) replayBlocks( } for i := firstBlock; i <= finalBlock; i++ { h.logger.Info("Applying block", "height", i) - block := h.store.LoadBlock(i) + block, err := h.store.LoadBlock(context.TODO(), i) + if err != nil { + // todo(evan): handle error + panic(err) + } // Extra check to ensure the app was not changed in a way it shouldn't have. if len(appHash) > 0 { assertAppHashEqualsOneFromBlock(appHash, block) @@ -492,7 +496,12 @@ func (h *Handshaker) replayBlocks( // ApplyBlock on the proxyApp with the last block. func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.AppConnConsensus) (sm.State, error) { - block := h.store.LoadBlock(height) + block, err := h.store.LoadBlock(context.TODO(), height) + if err != nil { + // todo(evan): handle/bubble this error + panic(err) + // return sm.State{}, err + } meta := h.store.LoadBlockMeta(height) // Use stubs for both mempool and evidence pool since no transactions nor @@ -500,7 +509,6 @@ func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.Ap blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}) blockExec.SetEventBus(h.eventBus) - var err error state, _, err = blockExec.ApplyBlock(state, meta.BlockID, block) if err != nil { return sm.State{}, err diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 781aaa38d9..abcbbbaf10 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -548,7 +548,9 @@ func TestSimulateValidatorsChange(t *testing.T) { sim.Chain = make([]*types.Block, 0) sim.Commits = make([]*types.Commit, 0) for i := 1; i <= numBlocks; i++ { - sim.Chain = append(sim.Chain, css[0].blockStore.LoadBlock(int64(i))) + blck, err := css[0].blockStore.LoadBlock(context.TODO(), int64(i)) + require.NoError(t, err) + sim.Chain = append(sim.Chain, blck) sim.Commits = append(sim.Commits, css[0].blockStore.LoadBlockCommit(int64(i))) } } @@ -1195,13 +1197,15 @@ func newMockBlockStore(config *cfg.Config, params tmproto.ConsensusParams) *mock return &mockBlockStore{config, params, nil, nil, 0, mdutils.Mock()} } -func (bs *mockBlockStore) Height() int64 { return int64(len(bs.chain)) } -func (bs *mockBlockStore) Base() int64 { return bs.base } -func (bs *mockBlockStore) Size() int64 { return bs.Height() - bs.Base() + 1 } -func (bs *mockBlockStore) LoadBaseMeta() *types.BlockMeta { return bs.LoadBlockMeta(bs.base) } -func (bs *mockBlockStore) LoadBlock(height int64) *types.Block { return bs.chain[height-1] } -func (bs *mockBlockStore) LoadBlockByHash(hash []byte) *types.Block { - return bs.chain[int64(len(bs.chain))-1] +func (bs *mockBlockStore) Height() int64 { return int64(len(bs.chain)) } +func (bs *mockBlockStore) Base() int64 { return bs.base } +func (bs *mockBlockStore) Size() int64 { return bs.Height() - bs.Base() + 1 } +func (bs *mockBlockStore) LoadBaseMeta() *types.BlockMeta { return bs.LoadBlockMeta(bs.base) } +func (bs *mockBlockStore) LoadBlock(ctx context.Context, height int64) (*types.Block, error) { + return bs.chain[height-1], nil +} +func (bs *mockBlockStore) LoadBlockByHash(ctx context.Context, hash []byte) (*types.Block, error) { + return bs.chain[int64(len(bs.chain))-1], nil } func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta { block := bs.chain[height-1] @@ -1211,7 +1215,8 @@ func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta { } } func (bs *mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil } -func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { +func (bs *mockBlockStore) SaveBlock(ctx context.Context, block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) error { + return nil } func (bs *mockBlockStore) LoadBlockCommit(height int64) *types.Commit { return bs.commits[height-1] diff --git a/consensus/state.go b/consensus/state.go index f70b1c7b06..0fd9cb8142 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1585,7 +1585,10 @@ func (cs *State) finalizeCommit(height int64) { // but may differ from the LastCommit included in the next block precommits := cs.Votes.Precommits(cs.CommitRound) seenCommit := precommits.MakeCommit() - cs.blockStore.SaveBlock(block, blockParts, seenCommit) + err := cs.blockStore.SaveBlock(cs.proposalCtx, block, blockParts, seenCommit) + if err != nil { + panic(err) + } } else { // Happens during replay if we already saved the block but didn't commit cs.Logger.Info("Calling finalizeCommit on already stored block", "height", block.Height) diff --git a/evidence/pool_test.go b/evidence/pool_test.go index 0c5a64a51f..8494956dd7 100644 --- a/evidence/pool_test.go +++ b/evidence/pool_test.go @@ -1,6 +1,7 @@ package evidence_test import ( + "context" "os" "testing" "time" @@ -408,7 +409,10 @@ func initializeBlockStore(db dbm.DB, state sm.State, valAddr []byte) *store.Bloc partSet := block.MakePartSet(parts) seenCommit := makeCommit(i, valAddr) - blockStore.SaveBlock(block, partSet, seenCommit) + err := blockStore.SaveBlock(context.TODO(), block, partSet, seenCommit) + if err != nil { + panic(err) + } } return blockStore diff --git a/rpc/core/blocks.go b/rpc/core/blocks.go index 6799a0aa36..06d411a6a3 100644 --- a/rpc/core/blocks.go +++ b/rpc/core/blocks.go @@ -89,7 +89,10 @@ func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) return nil, err } - block := env.BlockStore.LoadBlock(height) + block, err := env.BlockStore.LoadBlock(ctx.Context(), height) + if err != nil { + return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: block}, err + } blockMeta := env.BlockStore.LoadBlockMeta(height) if blockMeta == nil { return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: block}, nil @@ -100,7 +103,10 @@ func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) // BlockByHash gets block by hash. // More: https://docs.tendermint.com/master/rpc/#/Info/block_by_hash func BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error) { - block := env.BlockStore.LoadBlockByHash(hash) + block, err := env.BlockStore.LoadBlockByHash(ctx.Context(), hash) + if err != nil { + return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: nil}, err + } if block == nil { return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: nil}, nil } @@ -147,7 +153,12 @@ func DataAvailabilityHeader(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.Re // depends on either: // - https://github.com/lazyledger/lazyledger-core/pull/312, or // - https://github.com/lazyledger/lazyledger-core/pull/218 - block := env.BlockStore.LoadBlock(height) + block, err := env.BlockStore.LoadBlock(ctx.Context(), height) + if err != nil { + return &ctypes.ResultDataAvailabilityHeader{ + DataAvailabilityHeader: types.DataAvailabilityHeader{}, + }, err + } _ = block.Hash() dah := block.DataAvailabilityHeader return &ctypes.ResultDataAvailabilityHeader{ diff --git a/rpc/core/blocks_test.go b/rpc/core/blocks_test.go index 996c83c43e..86f12f2aa2 100644 --- a/rpc/core/blocks_test.go +++ b/rpc/core/blocks_test.go @@ -1,6 +1,7 @@ package core import ( + "context" "fmt" "testing" @@ -118,16 +119,21 @@ type mockBlockStore struct { height int64 } -func (mockBlockStore) Base() int64 { return 1 } -func (store mockBlockStore) Height() int64 { return store.height } -func (store mockBlockStore) Size() int64 { return store.height } -func (mockBlockStore) LoadBaseMeta() *types.BlockMeta { return nil } -func (mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta { return nil } -func (mockBlockStore) LoadBlock(height int64) *types.Block { return nil } -func (mockBlockStore) LoadBlockByHash(hash []byte) *types.Block { return nil } +func (mockBlockStore) Base() int64 { return 1 } +func (store mockBlockStore) Height() int64 { return store.height } +func (store mockBlockStore) Size() int64 { return store.height } +func (mockBlockStore) LoadBaseMeta() *types.BlockMeta { return nil } +func (mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta { return nil } +func (mockBlockStore) LoadBlock(ctx context.Context, height int64) (*types.Block, error) { + return nil, nil +} +func (mockBlockStore) LoadBlockByHash(ctx context.Context, hash []byte) (*types.Block, error) { + return nil, nil +} func (mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil } func (mockBlockStore) LoadBlockCommit(height int64) *types.Commit { return nil } func (mockBlockStore) LoadSeenCommit(height int64) *types.Commit { return nil } func (mockBlockStore) PruneBlocks(height int64) (uint64, error) { return 0, nil } -func (mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { +func (mockBlockStore) SaveBlock(ctx context.Context, block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) error { + return nil } diff --git a/rpc/core/tx.go b/rpc/core/tx.go index fc125ec161..c479ebe40a 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -37,7 +37,8 @@ func Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error var proof types.TxProof if prove { - block := env.BlockStore.LoadBlock(height) + block, err := env.BlockStore.LoadBlock(ctx.Context(), height) + return nil, fmt.Errorf("tx (%X) not found: %w", hash, err) proof = block.Data.Txs.Proof(int(index)) // XXX: overflow on 32-bit machines } @@ -107,7 +108,10 @@ func TxSearch(ctx *rpctypes.Context, query string, prove bool, pagePtr, perPageP var proof types.TxProof if prove { - block := env.BlockStore.LoadBlock(r.Height) + block, err := env.BlockStore.LoadBlock(ctx.Context(), r.Height) + if err != nil { + return nil, err + } proof = block.Data.Txs.Proof(int(r.Index)) // XXX: overflow on 32-bit machines } diff --git a/state/services.go b/state/services.go index eef7dc854d..45aba30329 100644 --- a/state/services.go +++ b/state/services.go @@ -1,6 +1,8 @@ package state import ( + "context" + "github.com/lazyledger/lazyledger-core/types" ) @@ -20,13 +22,13 @@ type BlockStore interface { LoadBaseMeta() *types.BlockMeta LoadBlockMeta(height int64) *types.BlockMeta - LoadBlock(height int64) *types.Block + LoadBlock(ctx context.Context, height int64) (*types.Block, error) - SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) + SaveBlock(ctx context.Context, block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) error PruneBlocks(height int64) (uint64, error) - LoadBlockByHash(hash []byte) *types.Block + LoadBlockByHash(ctx context.Context, hash []byte) (*types.Block, error) LoadBlockPart(height int64, index int) *types.Part LoadBlockCommit(height int64) *types.Commit diff --git a/store/store.go b/store/store.go index 92190baac2..33b5a0da06 100644 --- a/store/store.go +++ b/store/store.go @@ -1,6 +1,7 @@ package store import ( + "context" "fmt" "strconv" @@ -94,10 +95,10 @@ func (bs *BlockStore) LoadBaseMeta() *types.BlockMeta { // LoadBlock returns the block with the given height. // If no block is found for that height, it returns nil. -func (bs *BlockStore) LoadBlock(height int64) *types.Block { +func (bs *BlockStore) LoadBlock(ctx context.Context, height int64) (*types.Block, error) { var blockMeta = bs.LoadBlockMeta(height) if blockMeta == nil { - return nil + return nil, nil } pbb := new(tmproto.Block) @@ -107,7 +108,7 @@ func (bs *BlockStore) LoadBlock(height int64) *types.Block { // If the part is missing (e.g. since it has been deleted after we // loaded the block meta) we consider the whole block to be missing. if part == nil { - return nil + return nil, nil } buf = append(buf, part.Bytes...) } @@ -123,19 +124,19 @@ func (bs *BlockStore) LoadBlock(height int64) *types.Block { panic(fmt.Errorf("error from proto block: %w", err)) } - return block + return block, nil } // LoadBlockByHash returns the block with the given hash. // If no block is found for that hash, it returns nil. // Panics if it fails to parse height associated with the given hash. -func (bs *BlockStore) LoadBlockByHash(hash []byte) *types.Block { +func (bs *BlockStore) LoadBlockByHash(ctx context.Context, hash []byte) (*types.Block, error) { bz, err := bs.db.Get(calcBlockHashKey(hash)) if err != nil { panic(err) } if len(bz) == 0 { - return nil + return nil, nil } s := string(bz) @@ -144,7 +145,7 @@ func (bs *BlockStore) LoadBlockByHash(hash []byte) *types.Block { if err != nil { panic(fmt.Sprintf("failed to extract height from %s: %v", s, err)) } - return bs.LoadBlock(height) + return bs.LoadBlock(ctx, height) } // LoadBlockPart returns the Part at the given index @@ -332,7 +333,12 @@ func (bs *BlockStore) PruneBlocks(height int64) (uint64, error) { // If all the nodes restart after committing a block, // we need this to reload the precommits to catch-up nodes to the // most recent height. Otherwise they'd stall at H-1. -func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { +func (bs *BlockStore) SaveBlock( + ctx context.Context, + block *types.Block, + blockParts *types.PartSet, + seenCommit *types.Commit, +) error { if block == nil { panic("BlockStore can only save a non-nil block") } @@ -398,6 +404,8 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s // Save new BlockStoreState descriptor. This also flushes the database. bs.saveState() + + return nil } func (bs *BlockStore) saveBlockPart(height int64, index int, part *types.Part) { diff --git a/store/store_test.go b/store/store_test.go index c7130e9bec..d71354ee4a 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -2,6 +2,7 @@ package store import ( "bytes" + "context" "crypto/sha256" "fmt" "os" @@ -172,10 +173,12 @@ func TestBlockStoreSaveLoadBlock(t *testing.T) { require.Equal(t, bs.Base(), int64(0), "initially the base should be zero") require.Equal(t, bs.Height(), int64(0), "initially the height should be zero") + ctx := context.TODO() + // check there are no blocks at various heights noBlockHeights := []int64{0, -1, 100, 1000, 2} for i, height := range noBlockHeights { - if g := bs.LoadBlock(height); g != nil { + if g, _ := bs.LoadBlock(ctx, height); g != nil { t.Errorf("#%d: height(%d) got a block; want nil", i, height) } } @@ -184,13 +187,14 @@ func TestBlockStoreSaveLoadBlock(t *testing.T) { block := makeBlock(bs.Height()+1, state, new(types.Commit)) validPartSet := block.MakePartSet(2) seenCommit := makeTestCommit(10, tmtime.Now()) - bs.SaveBlock(block, partSet, seenCommit) + err := bs.SaveBlock(ctx, block, partSet, seenCommit) + require.NoError(t, err) require.EqualValues(t, 1, bs.Base(), "expecting the new height to be changed") require.EqualValues(t, block.Header.Height, bs.Height(), "expecting the new height to be changed") incompletePartSet := types.NewPartSetFromHeader(types.PartSetHeader{Total: 2}) uncontiguousPartSet := types.NewPartSetFromHeader(types.PartSetHeader{Total: 0}) - _, err := uncontiguousPartSet.AddPart(part2) + _, err = uncontiguousPartSet.AddPart(part2) require.Error(t, err) header1 := types.Header{ @@ -305,16 +309,20 @@ func TestBlockStoreSaveLoadBlock(t *testing.T) { bs, db := freshBlockStore() // SaveBlock res, err, panicErr := doFn(func() (interface{}, error) { - bs.SaveBlock(tuple.block, tuple.parts, tuple.seenCommit) + err := bs.SaveBlock(ctx, tuple.block, tuple.parts, tuple.seenCommit) if tuple.block == nil { return nil, nil } + if err != nil { + return nil, err + } if tuple.corruptBlockInDB { err := db.Set(calcBlockMetaKey(tuple.block.Height), []byte("block-bogus")) require.NoError(t, err) } - bBlock := bs.LoadBlock(tuple.block.Height) + bBlock, err := bs.LoadBlock(ctx, tuple.block.Height) + require.NoError(t, err) bBlockMeta := bs.LoadBlockMeta(tuple.block.Height) if tuple.eraseSeenCommitInDB { @@ -387,7 +395,8 @@ func TestLoadBaseMeta(t *testing.T) { block := makeBlock(h, state, new(types.Commit)) partSet := block.MakePartSet(2) seenCommit := makeTestCommit(h, tmtime.Now()) - bs.SaveBlock(block, partSet, seenCommit) + err := bs.SaveBlock(context.TODO(), block, partSet, seenCommit) + require.NoError(t, err) } _, err = bs.PruneBlocks(4) @@ -443,6 +452,8 @@ func TestPruneBlocks(t *testing.T) { assert.EqualValues(t, 0, bs.Height()) assert.EqualValues(t, 0, bs.Size()) + ctx := context.TODO() + // pruning an empty store should error, even when pruning to 0 _, err = bs.PruneBlocks(1) require.Error(t, err) @@ -455,14 +466,16 @@ func TestPruneBlocks(t *testing.T) { block := makeBlock(h, state, new(types.Commit)) partSet := block.MakePartSet(2) seenCommit := makeTestCommit(h, tmtime.Now()) - bs.SaveBlock(block, partSet, seenCommit) + err := bs.SaveBlock(ctx, block, partSet, seenCommit) + require.NoError(t, err) } assert.EqualValues(t, 1, bs.Base()) assert.EqualValues(t, 1500, bs.Height()) assert.EqualValues(t, 1500, bs.Size()) - prunedBlock := bs.LoadBlock(1199) + prunedBlock, err := bs.LoadBlock(ctx, 1199) + require.NoError(t, err) // Check that basic pruning works pruned, err := bs.PruneBlocks(1200) @@ -476,18 +489,29 @@ func TestPruneBlocks(t *testing.T) { Height: 1500, }, LoadBlockStoreState(db)) - require.NotNil(t, bs.LoadBlock(1200)) - require.Nil(t, bs.LoadBlock(1199)) - require.Nil(t, bs.LoadBlockByHash(prunedBlock.Hash())) + b, err := bs.LoadBlock(ctx, 1200) + require.NotNil(t, b) + require.NoError(t, err) + b, err = bs.LoadBlock(ctx, 1199) + require.NoError(t, err) + require.Nil(t, b) + b, err = bs.LoadBlockByHash(ctx, prunedBlock.Hash()) + require.Nil(t, b) + require.NoError(t, err) + require.Nil(t, bs.LoadBlockCommit(1199)) require.Nil(t, bs.LoadBlockMeta(1199)) require.Nil(t, bs.LoadBlockPart(1199, 1)) for i := int64(1); i < 1200; i++ { - require.Nil(t, bs.LoadBlock(i)) + b, err := bs.LoadBlock(ctx, i) + require.Nil(t, b) + require.NoError(t, err) } for i := int64(1200); i <= 1500; i++ { - require.NotNil(t, bs.LoadBlock(i)) + b, err := bs.LoadBlock(ctx, i) + require.NotNil(t, b) + require.NoError(t, err) } // Pruning below the current base should error @@ -513,9 +537,15 @@ func TestPruneBlocks(t *testing.T) { pruned, err = bs.PruneBlocks(1500) require.NoError(t, err) assert.EqualValues(t, 200, pruned) - assert.Nil(t, bs.LoadBlock(1499)) - assert.NotNil(t, bs.LoadBlock(1500)) - assert.Nil(t, bs.LoadBlock(1501)) + b, err = bs.LoadBlock(ctx, 1499) + assert.Nil(t, b) + require.NoError(t, err) + b, err = bs.LoadBlock(ctx, 1500) + assert.NotNil(t, b) + require.NoError(t, err) + b, err = bs.LoadBlock(ctx, 1501) + assert.Nil(t, b) + require.NoError(t, err) } func TestLoadBlockMeta(t *testing.T) { @@ -561,6 +591,7 @@ func TestLoadBlockMeta(t *testing.T) { } func TestBlockFetchAtHeight(t *testing.T) { + ctx := context.TODO() state, bs, cleanup := makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer))) defer cleanup() require.Equal(t, bs.Height(), int64(0), "initially the height should be zero") @@ -568,10 +599,12 @@ func TestBlockFetchAtHeight(t *testing.T) { partSet := block.MakePartSet(2) seenCommit := makeTestCommit(10, tmtime.Now()) - bs.SaveBlock(block, partSet, seenCommit) + err := bs.SaveBlock(ctx, block, partSet, seenCommit) + require.NoError(t, err) require.Equal(t, bs.Height(), block.Header.Height, "expecting the new height to be changed") - blockAtHeight := bs.LoadBlock(bs.Height()) + blockAtHeight, err := bs.LoadBlock(ctx, bs.Height()) + require.NoError(t, err) b1, err := block.ToProto() require.NoError(t, err) b2, err := blockAtHeight.ToProto() @@ -582,9 +615,11 @@ func TestBlockFetchAtHeight(t *testing.T) { require.Equal(t, block.Hash(), blockAtHeight.Hash(), "expecting a successful load of the last saved block") - blockAtHeightPlus1 := bs.LoadBlock(bs.Height() + 1) + blockAtHeightPlus1, err := bs.LoadBlock(ctx, bs.Height()+1) + require.NoError(t, err) require.Nil(t, blockAtHeightPlus1, "expecting an unsuccessful load of Height()+1") - blockAtHeightPlus2 := bs.LoadBlock(bs.Height() + 2) + blockAtHeightPlus2, err := bs.LoadBlock(ctx, bs.Height()+2) + require.NoError(t, err) require.Nil(t, blockAtHeightPlus2, "expecting an unsuccessful load of Height()+2") } From ea347347b4a080f73e3cbbff7b8c5ff5c557c5fe Mon Sep 17 00:00:00 2001 From: evan-forbes Date: Sat, 19 Jun 2021 16:35:55 -0500 Subject: [PATCH 2/7] update NewBlockstore to accept blockstore.Blockstore --- blockchain/v0/reactor_test.go | 4 +--- consensus/byzantine_test.go | 6 ++---- consensus/common_test.go | 2 +- consensus/reactor_test.go | 4 +--- consensus/replay_file.go | 2 +- consensus/wal_test.go | 2 +- evidence/pool_test.go | 3 +-- go.mod | 1 + ipfs/mock.go | 7 +++++++ node/node.go | 2 +- node/node_test.go | 3 +-- store/mock.go | 21 +++++++++++++++++++++ store/store.go | 21 ++++++++++++++++----- store/store_test.go | 15 +++++++-------- 14 files changed, 62 insertions(+), 31 deletions(-) create mode 100644 store/mock.go diff --git a/blockchain/v0/reactor_test.go b/blockchain/v0/reactor_test.go index 62bcafe0a3..b87e2853fe 100644 --- a/blockchain/v0/reactor_test.go +++ b/blockchain/v0/reactor_test.go @@ -9,7 +9,6 @@ import ( "testing" "time" - mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -70,10 +69,9 @@ func newBlockchainReactor( panic(fmt.Errorf("error start app: %w", err)) } - blockDB := memdb.NewDB() stateDB := memdb.NewDB() stateStore := sm.NewStore(stateDB) - blockStore := store.NewBlockStore(blockDB, mdutils.Mock()) + blockStore := store.MockBlockStore(nil) state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc) if err != nil { diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 0436b8bfef..0ae68d9025 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -56,9 +56,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { vals := types.TM2PB.ValidatorUpdates(state.Validators) app.InitChain(abci.RequestInitChain{Validators: vals}) - blockDB := memdb.NewDB() - dag := mdutils.Mock() - blockStore := store.NewBlockStore(blockDB, dag) + blockStore := store.MockBlockStore(nil) // one for mempool, one for consensus mtx := new(tmsync.Mutex) @@ -81,7 +79,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { // Make State blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool) cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, - mempool, dag, ipfs.MockRouting(), evpool) + mempool, mdutils.Mock(), ipfs.MockRouting(), evpool) cs.SetLogger(cs.Logger) // set private validator pv := privVals[i] diff --git a/consensus/common_test.go b/consensus/common_test.go index e3372552fd..daaa6bbea0 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -379,7 +379,7 @@ func newStateWithConfigAndBlockStore( dag format.DAGService, ) *State { // Get BlockStore - blockStore := store.NewBlockStore(blockDB, dag) + blockStore := store.MockBlockStore(blockDB) // one for mempool, one for consensus mtx := new(tmsync.Mutex) diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 2ccffd67b8..0ed1251ea7 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -155,10 +155,8 @@ func TestReactorWithEvidence(t *testing.T) { // duplicate code from: // css[i] = newStateWithConfig(thisConfig, state, privVals[i], app) - blockDB := memdb.NewDB() dag := mdutils.Mock() - blockStore := store.NewBlockStore(blockDB, dag) - + blockStore := store.MockBlockStore(nil) // one for mempool, one for consensus mtx := new(tmsync.Mutex) proxyAppConnMem := abcicli.NewLocalClient(mtx, app) diff --git a/consensus/replay_file.go b/consensus/replay_file.go index e70f611170..3a13cbb1da 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -291,7 +291,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo tmos.Exit(err.Error()) } dag := mdutils.Mock() - blockStore := store.NewBlockStore(blockStoreDB, dag) + blockStore := store.MockBlockStore(blockStoreDB) // Get State stateDB, err := badgerdb.NewDB("state", config.DBDir()) diff --git a/consensus/wal_test.go b/consensus/wal_test.go index 8cc83bd054..65fdaf7a7b 100644 --- a/consensus/wal_test.go +++ b/consensus/wal_test.go @@ -311,7 +311,7 @@ func walGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { t.Error(err) } dag := mdutils.Mock() - blockStore := store.NewBlockStore(blockStoreDB, dag) + blockStore := store.MockBlockStore(blockStoreDB) proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app)) proxyApp.SetLogger(logger.With("module", "proxy")) diff --git a/evidence/pool_test.go b/evidence/pool_test.go index 8494956dd7..8fb837afde 100644 --- a/evidence/pool_test.go +++ b/evidence/pool_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -397,7 +396,7 @@ func initializeValidatorState(privVal types.PrivValidator, height int64) sm.Stor // initializeBlockStore creates a block storage and populates it w/ a dummy // block at +height+. func initializeBlockStore(db dbm.DB, state sm.State, valAddr []byte) *store.BlockStore { - blockStore := store.NewBlockStore(db, mdutils.Mock()) + blockStore := store.MockBlockStore(db) for i := int64(1); i <= state.LastBlockHeight; i++ { lastCommit := makeCommit(i-1, valAddr) diff --git a/go.mod b/go.mod index 659e7006fd..96353c2460 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/ipfs/go-ipfs-api v0.2.0 github.com/ipfs/go-ipfs-blockstore v0.1.4 github.com/ipfs/go-ipfs-config v0.11.0 + github.com/ipfs/go-ipfs-exchange-offline v0.0.1 github.com/ipfs/go-ipfs-routing v0.1.0 github.com/ipfs/go-ipld-format v0.2.0 github.com/ipfs/go-merkledag v0.3.2 diff --git a/ipfs/mock.go b/ipfs/mock.go index 9b0a04a678..bd7ac327a2 100644 --- a/ipfs/mock.go +++ b/ipfs/mock.go @@ -3,6 +3,9 @@ package ipfs import ( "context" + ds "github.com/ipfs/go-datastore" + ds_sync "github.com/ipfs/go-datastore/sync" + blockstore "github.com/ipfs/go-ipfs-blockstore" nilrouting "github.com/ipfs/go-ipfs-routing/none" "github.com/ipfs/go-ipfs/core" coremock "github.com/ipfs/go-ipfs/core/mock" @@ -43,3 +46,7 @@ func MockRouting() routing.Routing { croute, _ := nilrouting.ConstructNilRouting(context.TODO(), nil, nil, nil) return croute } + +func MockBlockStore() blockstore.Blockstore { + return blockstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) +} diff --git a/node/node.go b/node/node.go index 89fc09c8c9..72b5d64aa4 100644 --- a/node/node.go +++ b/node/node.go @@ -690,7 +690,7 @@ func NewNode(config *cfg.Config, return nil, err } - blockStore := store.NewBlockStore(blockStoreDB, ipfsNode.DAG) + blockStore := store.NewBlockStore(blockStoreDB, ipfsNode.Blockstore, logger) // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state, // and replays any blocks as necessary to sync tendermint with the app. diff --git a/node/node_test.go b/node/node_test.go index b2d4d560a2..92b00fd3f3 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -10,7 +10,6 @@ import ( "testing" "time" - mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -284,7 +283,7 @@ func TestCreateProposalBlock(t *testing.T) { // Make EvidencePool evidenceDB := memdb.NewDB() - blockStore := store.NewBlockStore(memdb.NewDB(), mdutils.Mock()) + blockStore := store.MockBlockStore(nil) evidencePool, err := evidence.NewPool(evidenceDB, stateStore, blockStore) require.NoError(t, err) evidencePool.SetLogger(logger) diff --git a/store/mock.go b/store/mock.go new file mode 100644 index 0000000000..7514d9fc5a --- /dev/null +++ b/store/mock.go @@ -0,0 +1,21 @@ +package store + +import ( + "github.com/lazyledger/lazyledger-core/ipfs" + dbm "github.com/lazyledger/lazyledger-core/libs/db" + "github.com/lazyledger/lazyledger-core/libs/db/memdb" + "github.com/lazyledger/lazyledger-core/libs/log" +) + +// MockBlockStore returns a mocked blockstore. a nil db will result in a new in memory db +func MockBlockStore(db dbm.DB) *BlockStore { + if db == nil { + db = memdb.NewDB() + } + + return NewBlockStore( + db, + ipfs.MockBlockStore(), + log.NewNopLogger(), + ) +} diff --git a/store/store.go b/store/store.go index 33b5a0da06..fec14d27c3 100644 --- a/store/store.go +++ b/store/store.go @@ -3,12 +3,19 @@ package store import ( "context" "fmt" + "strconv" "github.com/gogo/protobuf/proto" + "github.com/ipfs/go-blockservice" + blockstore "github.com/ipfs/go-ipfs-blockstore" + offline "github.com/ipfs/go-ipfs-exchange-offline" + format "github.com/ipfs/go-ipld-format" ipld "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-merkledag" dbm "github.com/lazyledger/lazyledger-core/libs/db" + "github.com/lazyledger/lazyledger-core/libs/log" tmsync "github.com/lazyledger/lazyledger-core/libs/sync" tmstore "github.com/lazyledger/lazyledger-core/proto/tendermint/store" tmproto "github.com/lazyledger/lazyledger-core/proto/tendermint/types" @@ -44,18 +51,22 @@ type BlockStore struct { base int64 height int64 + dag format.DAGService + logger log.Logger + ipfsDagAPI ipld.DAGService } // NewBlockStore returns a new BlockStore with the given DB, // initialized to the last height that was committed to the DB. -func NewBlockStore(db dbm.DB, dagAPI ipld.DAGService) *BlockStore { +func NewBlockStore(db dbm.DB, bstore blockstore.Blockstore, logger log.Logger) *BlockStore { bs := LoadBlockStoreState(db) return &BlockStore{ - base: bs.Base, - height: bs.Height, - db: db, - ipfsDagAPI: dagAPI, + base: bs.Base, + height: bs.Height, + db: db, + dag: merkledag.NewDAGService(blockservice.New(bstore, offline.Exchange(bstore))), + logger: logger, } } diff --git a/store/store_test.go b/store/store_test.go index d71354ee4a..c54b78937b 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -12,7 +12,6 @@ import ( "time" "github.com/gogo/protobuf/proto" - mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -75,7 +74,7 @@ func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore, cleanupFu if err != nil { panic(fmt.Errorf("error constructing state from genesis file: %w", err)) } - return state, NewBlockStore(blockDB, mdutils.Mock()), func() { os.RemoveAll(config.RootDir) } + return state, MockBlockStore(blockDB), func() { os.RemoveAll(config.RootDir) } } func TestLoadBlockStoreState(t *testing.T) { @@ -107,7 +106,7 @@ func TestNewBlockStore(t *testing.T) { bz, _ := proto.Marshal(&bss) err := db.Set(blockStoreKey, bz) require.NoError(t, err) - bs := NewBlockStore(db, mdutils.Mock()) + bs := MockBlockStore(db) require.Equal(t, int64(100), bs.Base(), "failed to properly parse blockstore") require.Equal(t, int64(10000), bs.Height(), "failed to properly parse blockstore") @@ -125,7 +124,7 @@ func TestNewBlockStore(t *testing.T) { _, _, panicErr := doFn(func() (interface{}, error) { err := db.Set(blockStoreKey, tt.data) require.NoError(t, err) - _ = NewBlockStore(db, mdutils.Mock()) + _ = MockBlockStore(db) return nil, nil }) require.NotNil(t, panicErr, "#%d panicCauser: %q expected a panic", i, tt.data) @@ -134,13 +133,13 @@ func TestNewBlockStore(t *testing.T) { err = db.Set(blockStoreKey, []byte{}) require.NoError(t, err) - bs = NewBlockStore(db, mdutils.Mock()) + bs = MockBlockStore(db) assert.Equal(t, bs.Height(), int64(0), "expecting empty bytes to be unmarshaled alright") } func freshBlockStore() (*BlockStore, dbm.DB) { db := memdb.NewDB() - return NewBlockStore(db, mdutils.Mock()), db + return MockBlockStore(db), db } var ( @@ -389,7 +388,7 @@ func TestLoadBaseMeta(t *testing.T) { stateStore := sm.NewStore(memdb.NewDB()) state, err := stateStore.LoadFromDBOrGenesisFile(config.GenesisFile()) require.NoError(t, err) - bs := NewBlockStore(memdb.NewDB(), mdutils.Mock()) + bs := MockBlockStore(nil) for h := int64(1); h <= 10; h++ { block := makeBlock(h, state, new(types.Commit)) @@ -447,7 +446,7 @@ func TestPruneBlocks(t *testing.T) { state, err := stateStore.LoadFromDBOrGenesisFile(config.GenesisFile()) require.NoError(t, err) db := memdb.NewDB() - bs := NewBlockStore(db, mdutils.Mock()) + bs := MockBlockStore(db) assert.EqualValues(t, 0, bs.Base()) assert.EqualValues(t, 0, bs.Height()) assert.EqualValues(t, 0, bs.Size()) From a220090c5fd4edb9db6964dd0fdb1a84f6428223 Mon Sep 17 00:00:00 2001 From: evan-forbes Date: Sat, 19 Jun 2021 16:44:14 -0500 Subject: [PATCH 3/7] fix typo causing rpc error --- rpc/core/tx.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rpc/core/tx.go b/rpc/core/tx.go index c479ebe40a..960dbe343d 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -38,7 +38,9 @@ func Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error var proof types.TxProof if prove { block, err := env.BlockStore.LoadBlock(ctx.Context(), height) - return nil, fmt.Errorf("tx (%X) not found: %w", hash, err) + if err != nil { + return nil, err + } proof = block.Data.Txs.Proof(int(index)) // XXX: overflow on 32-bit machines } From 243e1eecd73c3b003bf2741016214075a61515da Mon Sep 17 00:00:00 2001 From: evan-forbes Date: Sat, 19 Jun 2021 19:44:38 -0500 Subject: [PATCH 4/7] load block data using the IPFS store --- store/store.go | 47 ++++++++++++++++++++++++----------------------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/store/store.go b/store/store.go index fec14d27c3..2e6c684695 100644 --- a/store/store.go +++ b/store/store.go @@ -3,6 +3,7 @@ package store import ( "context" "fmt" + "strings" "strconv" @@ -11,15 +12,17 @@ import ( blockstore "github.com/ipfs/go-ipfs-blockstore" offline "github.com/ipfs/go-ipfs-exchange-offline" format "github.com/ipfs/go-ipld-format" - ipld "github.com/ipfs/go-ipld-format" "github.com/ipfs/go-merkledag" + "github.com/lazyledger/lazyledger-core/ipfs" dbm "github.com/lazyledger/lazyledger-core/libs/db" "github.com/lazyledger/lazyledger-core/libs/log" tmsync "github.com/lazyledger/lazyledger-core/libs/sync" + "github.com/lazyledger/lazyledger-core/p2p/ipld" tmstore "github.com/lazyledger/lazyledger-core/proto/tendermint/store" tmproto "github.com/lazyledger/lazyledger-core/proto/tendermint/types" "github.com/lazyledger/lazyledger-core/types" + "github.com/lazyledger/rsmt2d" ) /* @@ -53,8 +56,6 @@ type BlockStore struct { dag format.DAGService logger log.Logger - - ipfsDagAPI ipld.DAGService } // NewBlockStore returns a new BlockStore with the given DB, @@ -107,35 +108,30 @@ func (bs *BlockStore) LoadBaseMeta() *types.BlockMeta { // LoadBlock returns the block with the given height. // If no block is found for that height, it returns nil. func (bs *BlockStore) LoadBlock(ctx context.Context, height int64) (*types.Block, error) { - var blockMeta = bs.LoadBlockMeta(height) + blockMeta := bs.LoadBlockMeta(height) if blockMeta == nil { return nil, nil } - pbb := new(tmproto.Block) - buf := []byte{} - for i := 0; i < int(blockMeta.BlockID.PartSetHeader.Total); i++ { - part := bs.LoadBlockPart(height, i) - // If the part is missing (e.g. since it has been deleted after we - // loaded the block meta) we consider the whole block to be missing. - if part == nil { - return nil, nil - } - buf = append(buf, part.Bytes...) - } - err := proto.Unmarshal(buf, pbb) + lastCommit := bs.LoadBlockCommit(height - 1) + + data, err := ipld.RetrieveBlockData(ctx, &blockMeta.DAHeader, bs.dag, rsmt2d.NewRSGF8Codec()) if err != nil { - // NOTE: The existence of meta should imply the existence of the - // block. So, make sure meta is only saved after blocks are saved. - panic(fmt.Sprintf("Error reading block: %v", err)) + if strings.Contains(err.Error(), format.ErrNotFound.Error()) { + return nil, fmt.Errorf("failure to retrieve block data from local ipfs store: %w", err) + } + bs.logger.Info("failure to retrieve block data", err) + return nil, err } - block, err := types.BlockFromProto(pbb) - if err != nil { - panic(fmt.Errorf("error from proto block: %w", err)) + block := types.Block{ + Header: blockMeta.Header, + Data: data, + DataAvailabilityHeader: blockMeta.DAHeader, + LastCommit: lastCommit, } - return block, nil + return &block, nil } // LoadBlockByHash returns the block with the given hash. @@ -373,6 +369,11 @@ func (bs *BlockStore) SaveBlock( bs.saveBlockPart(height, i, part) } + err := ipld.PutBlock(ctx, bs.dag, block, ipfs.MockRouting(), bs.logger) + if err != nil { + return err + } + // Save block meta blockMeta := types.NewBlockMeta(block, blockParts) pbm, err := blockMeta.ToProto() From 9bacf5480d836af5d7e1b562985b84904b09f389 Mon Sep 17 00:00:00 2001 From: evan-forbes Date: Sat, 19 Jun 2021 19:45:04 -0500 Subject: [PATCH 5/7] clean up the rest of the testing utility code --- consensus/byzantine_test.go | 11 ++++++++--- consensus/common_test.go | 17 ++++++++++------- consensus/mempool_test.go | 11 +++++------ consensus/replay_test.go | 9 ++++++--- consensus/state.go | 4 ++-- rpc/core/blocks_test.go | 7 ++++++- 6 files changed, 37 insertions(+), 22 deletions(-) diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 0ae68d9025..821467b98c 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -9,7 +9,9 @@ import ( "testing" "time" - mdutils "github.com/ipfs/go-merkledag/test" + "github.com/ipfs/go-blockservice" + offline "github.com/ipfs/go-ipfs-exchange-offline" + "github.com/ipfs/go-merkledag" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -56,7 +58,10 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { vals := types.TM2PB.ValidatorUpdates(state.Validators) app.InitChain(abci.RequestInitChain{Validators: vals}) - blockStore := store.MockBlockStore(nil) + blockDB := memdb.NewDB() + bs := ipfs.MockBlockStore() + dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) + blockStore := store.NewBlockStore(blockDB, bs, log.TestingLogger()) // one for mempool, one for consensus mtx := new(tmsync.Mutex) @@ -79,7 +84,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { // Make State blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool) cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, - mempool, mdutils.Mock(), ipfs.MockRouting(), evpool) + mempool, dag, ipfs.MockRouting(), evpool) cs.SetLogger(cs.Logger) // set private validator pv := privVals[i] diff --git a/consensus/common_test.go b/consensus/common_test.go index daaa6bbea0..2f41c2eb4e 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -14,7 +14,10 @@ import ( "time" "github.com/go-kit/kit/log/term" + "github.com/ipfs/go-blockservice" + offline "github.com/ipfs/go-ipfs-exchange-offline" format "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-merkledag" mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/require" @@ -356,7 +359,7 @@ func subscribeToVoter(cs *State, addr []byte) <-chan tmpubsub.Message { func newState(state sm.State, pv types.PrivValidator, app abci.Application, ipfsDagAPI format.DAGService) *State { config := cfg.ResetTestRoot("consensus_state_test") - return newStateWithConfig(config, state, pv, app, ipfsDagAPI) + return newStateWithConfig(config, state, pv, app) } func newStateWithConfig( @@ -364,10 +367,9 @@ func newStateWithConfig( state sm.State, pv types.PrivValidator, app abci.Application, - ipfsDagAPI format.DAGService, ) *State { blockDB := memdb.NewDB() - return newStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockDB, ipfsDagAPI) + return newStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockDB) } func newStateWithConfigAndBlockStore( @@ -376,10 +378,11 @@ func newStateWithConfigAndBlockStore( pv types.PrivValidator, app abci.Application, blockDB dbm.DB, - dag format.DAGService, ) *State { // Get BlockStore - blockStore := store.MockBlockStore(blockDB) + bs := ipfs.MockBlockStore() + dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) + blockStore := store.NewBlockStore(blockDB, bs, log.TestingLogger()) // one for mempool, one for consensus mtx := new(tmsync.Mutex) @@ -708,7 +711,7 @@ func randConsensusNet( vals := types.TM2PB.ValidatorUpdates(state.Validators) app.InitChain(abci.RequestInitChain{Validators: vals}) - css[i] = newStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, stateDB, mdutils.Mock()) + css[i] = newStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, stateDB) css[i].SetTimeoutTicker(tickerFunc()) css[i].SetLogger(logger.With("validator", i, "module", "consensus")) } @@ -771,7 +774,7 @@ func randConsensusNetWithPeers( app.InitChain(abci.RequestInitChain{Validators: vals}) // sm.SaveState(stateDB,state) //height 1's validatorsInfo already saved in LoadStateFromDBOrGenesisDoc above - css[i] = newStateWithConfig(thisConfig, state, privVal, app, mdutils.Mock()) + css[i] = newStateWithConfig(thisConfig, state, privVal, app) css[i].SetTimeoutTicker(tickerFunc()) css[i].SetLogger(logger.With("validator", i, "module", "consensus")) } diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 01a8dbdfd9..0d7690b066 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -30,7 +29,7 @@ func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) { config.Consensus.CreateEmptyBlocks = false state, privVals := randGenesisState(1, false, 10) - cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication(), mdutils.Mock()) + cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication()) assertMempool(cs.txNotifier).EnableTxsAvailable() height, round := cs.Height, cs.Round newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock) @@ -50,7 +49,7 @@ func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) { config.Consensus.CreateEmptyBlocksInterval = ensureTimeout state, privVals := randGenesisState(1, false, 10) - cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication(), mdutils.Mock()) + cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication()) assertMempool(cs.txNotifier).EnableTxsAvailable() @@ -68,7 +67,7 @@ func TestMempoolProgressInHigherRound(t *testing.T) { config.Consensus.CreateEmptyBlocks = false state, privVals := randGenesisState(1, false, 10) - cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication(), mdutils.Mock()) + cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication()) assertMempool(cs.txNotifier).EnableTxsAvailable() height, round := cs.Height, cs.Round @@ -118,7 +117,7 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) { blockDB := memdb.NewDB() stateStore := sm.NewStore(blockDB) - cs := newStateWithConfigAndBlockStore(config, state, privVals[0], NewCounterApplication(), blockDB, mdutils.Mock()) + cs := newStateWithConfigAndBlockStore(config, state, privVals[0], NewCounterApplication(), blockDB) err := stateStore.Save(state) require.NoError(t, err) newBlockHeaderCh := subscribe(cs.eventBus, types.EventQueryNewBlockHeader) @@ -144,7 +143,7 @@ func TestMempoolRmBadTx(t *testing.T) { blockDB := memdb.NewDB() stateStore := sm.NewStore(blockDB) - cs := newStateWithConfigAndBlockStore(config, state, privVals[0], app, blockDB, mdutils.Mock()) + cs := newStateWithConfigAndBlockStore(config, state, privVals[0], app, blockDB) err := stateStore.Save(state) require.NoError(t, err) diff --git a/consensus/replay_test.go b/consensus/replay_test.go index abcbbbaf10..8b367ae961 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -79,7 +79,6 @@ func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Confi privValidator, kvstore.NewApplication(), blockDB, - mdutils.Mock(), ) cs.SetLogger(logger) @@ -174,7 +173,6 @@ LOOP: privValidator, kvstore.NewApplication(), blockDB, - mdutils.Mock(), ) cs.SetLogger(logger) @@ -1215,7 +1213,12 @@ func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta { } } func (bs *mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil } -func (bs *mockBlockStore) SaveBlock(ctx context.Context, block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) error { +func (bs *mockBlockStore) SaveBlock( + ctx context.Context, + block *types.Block, + blockParts *types.PartSet, + seenCommit *types.Commit, +) error { return nil } func (bs *mockBlockStore) LoadBlockCommit(height int64) *types.Commit { diff --git a/consensus/state.go b/consensus/state.go index 0fd9cb8142..80a39330b1 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1139,7 +1139,7 @@ func (cs *State) defaultDecideProposal(height int64, round int32) { // the provide timeout could still be larger than just the time between // two consecutive proposals. // - // cs.proposalCancel() + cs.proposalCancel() } cs.proposalCtx, cs.proposalCancel = context.WithCancel(context.TODO()) go func(ctx context.Context) { @@ -1585,7 +1585,7 @@ func (cs *State) finalizeCommit(height int64) { // but may differ from the LastCommit included in the next block precommits := cs.Votes.Precommits(cs.CommitRound) seenCommit := precommits.MakeCommit() - err := cs.blockStore.SaveBlock(cs.proposalCtx, block, blockParts, seenCommit) + err := cs.blockStore.SaveBlock(context.TODO(), block, blockParts, seenCommit) if err != nil { panic(err) } diff --git a/rpc/core/blocks_test.go b/rpc/core/blocks_test.go index 86f12f2aa2..8ae560ff16 100644 --- a/rpc/core/blocks_test.go +++ b/rpc/core/blocks_test.go @@ -134,6 +134,11 @@ func (mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { retur func (mockBlockStore) LoadBlockCommit(height int64) *types.Commit { return nil } func (mockBlockStore) LoadSeenCommit(height int64) *types.Commit { return nil } func (mockBlockStore) PruneBlocks(height int64) (uint64, error) { return 0, nil } -func (mockBlockStore) SaveBlock(ctx context.Context, block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) error { +func (mockBlockStore) SaveBlock( + ctx context.Context, + block *types.Block, + blockParts *types.PartSet, + seenCommit *types.Commit, +) error { return nil } From befb3c68febbdab9fa9090516c587c95aa51b895 Mon Sep 17 00:00:00 2001 From: evan-forbes Date: Sat, 19 Jun 2021 19:46:10 -0500 Subject: [PATCH 6/7] linter --- consensus/state.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consensus/state.go b/consensus/state.go index 80a39330b1..90112e15a7 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1122,7 +1122,7 @@ func (cs *State) defaultDecideProposal(height int64, round int32) { } // cancel ctx for previous proposal block to ensure block putting/providing does not queues up - if cs.proposalCancel != nil { //nolint:staticcheck + if cs.proposalCancel != nil { // FIXME(ismail): below commented out cancel tries to prevent block putting // and providing no to queue up endlessly. // But in a real network proposers should have enough time in between. From 21a18b243857f1d80ce8f47067a4b863cf2c2ca4 Mon Sep 17 00:00:00 2001 From: evan-forbes Date: Wed, 23 Jun 2021 11:46:01 -0500 Subject: [PATCH 7/7] handle errors and fix todos --- blockchain/v0/reactor.go | 3 +-- consensus/replay.go | 7 ++----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/blockchain/v0/reactor.go b/blockchain/v0/reactor.go index 3beb9e18d9..1748026352 100644 --- a/blockchain/v0/reactor.go +++ b/blockchain/v0/reactor.go @@ -181,7 +181,6 @@ func (bcR *BlockchainReactor) respondToPeer(msg *bcproto.BlockRequest, block, err := bcR.store.LoadBlock(context.TODO(), msg.Height) if err != nil { - // todo(evan): handle this error panic(err) } if block != nil { @@ -425,7 +424,7 @@ FOR_LOOP: // TODO: batch saves so we dont persist to disk every block err := bcR.store.SaveBlock(context.TODO(), first, firstParts, second.LastCommit) if err != nil { - // todo(evan): don't panic + // an error is only returned if something with the local IPFS blockstore is seriously wrong panic(err) } diff --git a/consensus/replay.go b/consensus/replay.go index 4bb48ec8ac..7eca15b7e5 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -465,8 +465,7 @@ func (h *Handshaker) replayBlocks( h.logger.Info("Applying block", "height", i) block, err := h.store.LoadBlock(context.TODO(), i) if err != nil { - // todo(evan): handle error - panic(err) + return nil, err } // Extra check to ensure the app was not changed in a way it shouldn't have. if len(appHash) > 0 { @@ -498,9 +497,7 @@ func (h *Handshaker) replayBlocks( func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.AppConnConsensus) (sm.State, error) { block, err := h.store.LoadBlock(context.TODO(), height) if err != nil { - // todo(evan): handle/bubble this error - panic(err) - // return sm.State{}, err + return sm.State{}, err } meta := h.store.LoadBlockMeta(height)