Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save and load block data using IPFS #374

Merged
merged 7 commits into from
Jun 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions blockchain/v0/reactor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v0

import (
"context"
"fmt"
"reflect"
"time"
Expand Down Expand Up @@ -178,7 +179,10 @@ func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
func (bcR *BlockchainReactor) respondToPeer(msg *bcproto.BlockRequest,
src p2p.Peer) (queued bool) {

block := bcR.store.LoadBlock(msg.Height)
block, err := bcR.store.LoadBlock(context.TODO(), msg.Height)
if err != nil {
panic(err)
}
if block != nil {
bl, err := block.ToProto()
if err != nil {
Expand Down Expand Up @@ -418,11 +422,14 @@ FOR_LOOP:
bcR.pool.PopRequest()

// TODO: batch saves so we dont persist to disk every block
bcR.store.SaveBlock(first, firstParts, second.LastCommit)
err := bcR.store.SaveBlock(context.TODO(), first, firstParts, second.LastCommit)
if err != nil {
// an error is only returned if something with the local IPFS blockstore is seriously wrong
panic(err)
}

// TODO: same thing for app - but we would need a way to get the hash
// without persisting the state.
var err error
state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first)
if err != nil {
// TODO This is bad, are we zombie?
Expand Down
20 changes: 14 additions & 6 deletions blockchain/v0/reactor_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package v0

import (
"context"
"crypto/sha256"
"fmt"
"os"
"sort"
"testing"
"time"

mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -69,10 +69,9 @@ func newBlockchainReactor(
panic(fmt.Errorf("error start app: %w", err))
}

blockDB := memdb.NewDB()
stateDB := memdb.NewDB()
stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(blockDB, mdutils.Mock())
blockStore := store.MockBlockStore(nil)

state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc)
if err != nil {
Expand Down Expand Up @@ -100,7 +99,10 @@ func newBlockchainReactor(
lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil)
if blockHeight > 1 {
lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1)
lastBlock := blockStore.LoadBlock(blockHeight - 1)
lastBlock, err := blockStore.LoadBlock(context.TODO(), blockHeight-1)
if err != nil {
panic(err)
}

vote, err := types.MakeVote(
lastBlock.Header.Height,
Expand All @@ -127,7 +129,10 @@ func newBlockchainReactor(
panic(fmt.Errorf("error apply block: %w", err))
}

blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
err := blockStore.SaveBlock(context.TODO(), thisBlock, thisParts, lastCommit)
if err != nil {
panic(err)
}
}

bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
Expand Down Expand Up @@ -184,7 +189,10 @@ func TestNoBlockResponse(t *testing.T) {
assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height())

for _, tt := range tests {
block := reactorPairs[1].reactor.store.LoadBlock(tt.height)
block, err := reactorPairs[1].reactor.store.LoadBlock(context.TODO(), tt.height)
if err != nil {
panic(err)
}
if tt.existent {
assert.True(t, block != nil)
} else {
Expand Down
9 changes: 6 additions & 3 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"testing"
"time"

mdutils "github.com/ipfs/go-merkledag/test"
"github.com/ipfs/go-blockservice"
offline "github.com/ipfs/go-ipfs-exchange-offline"
"github.com/ipfs/go-merkledag"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -57,8 +59,9 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
app.InitChain(abci.RequestInitChain{Validators: vals})

blockDB := memdb.NewDB()
dag := mdutils.Mock()
blockStore := store.NewBlockStore(blockDB, dag)
bs := ipfs.MockBlockStore()
dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
blockStore := store.NewBlockStore(blockDB, bs, log.TestingLogger())

// one for mempool, one for consensus
mtx := new(tmsync.Mutex)
Expand Down
17 changes: 10 additions & 7 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import (
"time"

"github.com/go-kit/kit/log/term"
"github.com/ipfs/go-blockservice"
offline "github.com/ipfs/go-ipfs-exchange-offline"
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -356,18 +359,17 @@ func subscribeToVoter(cs *State, addr []byte) <-chan tmpubsub.Message {

func newState(state sm.State, pv types.PrivValidator, app abci.Application, ipfsDagAPI format.DAGService) *State {
config := cfg.ResetTestRoot("consensus_state_test")
return newStateWithConfig(config, state, pv, app, ipfsDagAPI)
return newStateWithConfig(config, state, pv, app)
}

func newStateWithConfig(
thisConfig *cfg.Config,
state sm.State,
pv types.PrivValidator,
app abci.Application,
ipfsDagAPI format.DAGService,
) *State {
blockDB := memdb.NewDB()
return newStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockDB, ipfsDagAPI)
return newStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockDB)
}

func newStateWithConfigAndBlockStore(
Expand All @@ -376,10 +378,11 @@ func newStateWithConfigAndBlockStore(
pv types.PrivValidator,
app abci.Application,
blockDB dbm.DB,
dag format.DAGService,
) *State {
// Get BlockStore
blockStore := store.NewBlockStore(blockDB, dag)
bs := ipfs.MockBlockStore()
dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
blockStore := store.NewBlockStore(blockDB, bs, log.TestingLogger())

// one for mempool, one for consensus
mtx := new(tmsync.Mutex)
Expand Down Expand Up @@ -708,7 +711,7 @@ func randConsensusNet(
vals := types.TM2PB.ValidatorUpdates(state.Validators)
app.InitChain(abci.RequestInitChain{Validators: vals})

css[i] = newStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, stateDB, mdutils.Mock())
css[i] = newStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, stateDB)
css[i].SetTimeoutTicker(tickerFunc())
css[i].SetLogger(logger.With("validator", i, "module", "consensus"))
}
Expand Down Expand Up @@ -771,7 +774,7 @@ func randConsensusNetWithPeers(
app.InitChain(abci.RequestInitChain{Validators: vals})
// sm.SaveState(stateDB,state) //height 1's validatorsInfo already saved in LoadStateFromDBOrGenesisDoc above

css[i] = newStateWithConfig(thisConfig, state, privVal, app, mdutils.Mock())
css[i] = newStateWithConfig(thisConfig, state, privVal, app)
css[i].SetTimeoutTicker(tickerFunc())
css[i].SetLogger(logger.With("validator", i, "module", "consensus"))
}
Expand Down
11 changes: 5 additions & 6 deletions consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"
"time"

mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -30,7 +29,7 @@ func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) {

config.Consensus.CreateEmptyBlocks = false
state, privVals := randGenesisState(1, false, 10)
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication(), mdutils.Mock())
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication())
assertMempool(cs.txNotifier).EnableTxsAvailable()
height, round := cs.Height, cs.Round
newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock)
Expand All @@ -50,7 +49,7 @@ func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) {

config.Consensus.CreateEmptyBlocksInterval = ensureTimeout
state, privVals := randGenesisState(1, false, 10)
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication(), mdutils.Mock())
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication())

assertMempool(cs.txNotifier).EnableTxsAvailable()

Expand All @@ -68,7 +67,7 @@ func TestMempoolProgressInHigherRound(t *testing.T) {

config.Consensus.CreateEmptyBlocks = false
state, privVals := randGenesisState(1, false, 10)
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication(), mdutils.Mock())
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication())

assertMempool(cs.txNotifier).EnableTxsAvailable()
height, round := cs.Height, cs.Round
Expand Down Expand Up @@ -118,7 +117,7 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) {
blockDB := memdb.NewDB()
stateStore := sm.NewStore(blockDB)

cs := newStateWithConfigAndBlockStore(config, state, privVals[0], NewCounterApplication(), blockDB, mdutils.Mock())
cs := newStateWithConfigAndBlockStore(config, state, privVals[0], NewCounterApplication(), blockDB)
err := stateStore.Save(state)
require.NoError(t, err)
newBlockHeaderCh := subscribe(cs.eventBus, types.EventQueryNewBlockHeader)
Expand All @@ -144,7 +143,7 @@ func TestMempoolRmBadTx(t *testing.T) {
blockDB := memdb.NewDB()

stateStore := sm.NewStore(blockDB)
cs := newStateWithConfigAndBlockStore(config, state, privVals[0], app, blockDB, mdutils.Mock())
cs := newStateWithConfigAndBlockStore(config, state, privVals[0], app, blockDB)
err := stateStore.Save(state)
require.NoError(t, err)

Expand Down
4 changes: 1 addition & 3 deletions consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,8 @@ func TestReactorWithEvidence(t *testing.T) {
// duplicate code from:
// css[i] = newStateWithConfig(thisConfig, state, privVals[i], app)

blockDB := memdb.NewDB()
dag := mdutils.Mock()
blockStore := store.NewBlockStore(blockDB, dag)

blockStore := store.MockBlockStore(nil)
// one for mempool, one for consensus
mtx := new(tmsync.Mutex)
proxyAppConnMem := abcicli.NewLocalClient(mtx, app)
Expand Down
11 changes: 8 additions & 3 deletions consensus/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,10 @@ func (h *Handshaker) replayBlocks(
}
for i := firstBlock; i <= finalBlock; i++ {
h.logger.Info("Applying block", "height", i)
block := h.store.LoadBlock(i)
block, err := h.store.LoadBlock(context.TODO(), i)
if err != nil {
return nil, err
}
// Extra check to ensure the app was not changed in a way it shouldn't have.
if len(appHash) > 0 {
assertAppHashEqualsOneFromBlock(appHash, block)
Expand Down Expand Up @@ -492,15 +495,17 @@ func (h *Handshaker) replayBlocks(

// ApplyBlock on the proxyApp with the last block.
func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.AppConnConsensus) (sm.State, error) {
block := h.store.LoadBlock(height)
block, err := h.store.LoadBlock(context.TODO(), height)
if err != nil {
return sm.State{}, err
}
meta := h.store.LoadBlockMeta(height)

// Use stubs for both mempool and evidence pool since no transactions nor
// evidence are needed here - block already exists.
blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{})
blockExec.SetEventBus(h.eventBus)

var err error
state, _, err = blockExec.ApplyBlock(state, meta.BlockID, block)
if err != nil {
return sm.State{}, err
Expand Down
2 changes: 1 addition & 1 deletion consensus/replay_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
tmos.Exit(err.Error())
}
dag := mdutils.Mock()
blockStore := store.NewBlockStore(blockStoreDB, dag)
blockStore := store.MockBlockStore(blockStoreDB)

// Get State
stateDB, err := badgerdb.NewDB("state", config.DBDir())
Expand Down
30 changes: 19 additions & 11 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Confi
privValidator,
kvstore.NewApplication(),
blockDB,
mdutils.Mock(),
)
cs.SetLogger(logger)

Expand Down Expand Up @@ -174,7 +173,6 @@ LOOP:
privValidator,
kvstore.NewApplication(),
blockDB,
mdutils.Mock(),
)
cs.SetLogger(logger)

Expand Down Expand Up @@ -548,7 +546,9 @@ func TestSimulateValidatorsChange(t *testing.T) {
sim.Chain = make([]*types.Block, 0)
sim.Commits = make([]*types.Commit, 0)
for i := 1; i <= numBlocks; i++ {
sim.Chain = append(sim.Chain, css[0].blockStore.LoadBlock(int64(i)))
blck, err := css[0].blockStore.LoadBlock(context.TODO(), int64(i))
require.NoError(t, err)
sim.Chain = append(sim.Chain, blck)
sim.Commits = append(sim.Commits, css[0].blockStore.LoadBlockCommit(int64(i)))
}
}
Expand Down Expand Up @@ -1195,13 +1195,15 @@ func newMockBlockStore(config *cfg.Config, params tmproto.ConsensusParams) *mock
return &mockBlockStore{config, params, nil, nil, 0, mdutils.Mock()}
}

func (bs *mockBlockStore) Height() int64 { return int64(len(bs.chain)) }
func (bs *mockBlockStore) Base() int64 { return bs.base }
func (bs *mockBlockStore) Size() int64 { return bs.Height() - bs.Base() + 1 }
func (bs *mockBlockStore) LoadBaseMeta() *types.BlockMeta { return bs.LoadBlockMeta(bs.base) }
func (bs *mockBlockStore) LoadBlock(height int64) *types.Block { return bs.chain[height-1] }
func (bs *mockBlockStore) LoadBlockByHash(hash []byte) *types.Block {
return bs.chain[int64(len(bs.chain))-1]
func (bs *mockBlockStore) Height() int64 { return int64(len(bs.chain)) }
func (bs *mockBlockStore) Base() int64 { return bs.base }
func (bs *mockBlockStore) Size() int64 { return bs.Height() - bs.Base() + 1 }
func (bs *mockBlockStore) LoadBaseMeta() *types.BlockMeta { return bs.LoadBlockMeta(bs.base) }
func (bs *mockBlockStore) LoadBlock(ctx context.Context, height int64) (*types.Block, error) {
return bs.chain[height-1], nil
}
func (bs *mockBlockStore) LoadBlockByHash(ctx context.Context, hash []byte) (*types.Block, error) {
return bs.chain[int64(len(bs.chain))-1], nil
}
func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
block := bs.chain[height-1]
Expand All @@ -1211,7 +1213,13 @@ func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
}
}
func (bs *mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil }
func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
func (bs *mockBlockStore) SaveBlock(
ctx context.Context,
block *types.Block,
blockParts *types.PartSet,
seenCommit *types.Commit,
) error {
return nil
}
func (bs *mockBlockStore) LoadBlockCommit(height int64) *types.Commit {
return bs.commits[height-1]
Expand Down
9 changes: 6 additions & 3 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,7 @@ func (cs *State) defaultDecideProposal(height int64, round int32) {
}

// cancel ctx for previous proposal block to ensure block putting/providing does not queues up
if cs.proposalCancel != nil { //nolint:staticcheck
if cs.proposalCancel != nil {
// FIXME(ismail): below commented out cancel tries to prevent block putting
// and providing no to queue up endlessly.
// But in a real network proposers should have enough time in between.
Expand All @@ -1139,7 +1139,7 @@ func (cs *State) defaultDecideProposal(height int64, round int32) {
// the provide timeout could still be larger than just the time between
// two consecutive proposals.
//
// cs.proposalCancel()
cs.proposalCancel()
}
cs.proposalCtx, cs.proposalCancel = context.WithCancel(context.TODO())
go func(ctx context.Context) {
Expand Down Expand Up @@ -1585,7 +1585,10 @@ func (cs *State) finalizeCommit(height int64) {
// but may differ from the LastCommit included in the next block
precommits := cs.Votes.Precommits(cs.CommitRound)
seenCommit := precommits.MakeCommit()
cs.blockStore.SaveBlock(block, blockParts, seenCommit)
err := cs.blockStore.SaveBlock(context.TODO(), block, blockParts, seenCommit)
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
panic(err)
}
} else {
// Happens during replay if we already saved the block but didn't commit
cs.Logger.Info("Calling finalizeCommit on already stored block", "height", block.Height)
Expand Down
2 changes: 1 addition & 1 deletion consensus/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func walGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
t.Error(err)
}
dag := mdutils.Mock()
blockStore := store.NewBlockStore(blockStoreDB, dag)
blockStore := store.MockBlockStore(blockStoreDB)

proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app))
proxyApp.SetLogger(logger.With("module", "proxy"))
Expand Down
Loading