From 40acb17283ebfe1c49b804a786464b93c2213a97 Mon Sep 17 00:00:00 2001 From: Evan Forbes <42654277+evan-forbes@users.noreply.github.com> Date: Thu, 27 May 2021 17:08:02 -0500 Subject: [PATCH] Add the ipfs dag api object in Blockstore (#356) * add the ipfs object to the blockstore and remove if from the state object * fix linter's complaints * increase TestReactorSelectiveBroadcast sleep times * increase ensureTimeout to 4 seconds * only use the dag api object instead of the entire ipfs api object * increase TestNodeSetPrivValIPC timeout to 400ms * increase time waited for TestReactorsGossipNoCommittedEvidence again * increase TestNodeSetPrivValIPC timeout again * timeout increase * cleanup remainging mocks * try insane timeout for TestNodeSetPrivValIPC * increase the failing precommit timeout * more cleanup * remove the unused ipfsAPI from the node * try a test node that doesn't use the full mocked ipfs node * implement and use a dag only api provider * revert crazy timeout * simplify dag only mock * remove accidental file * try to make TestReactorsGossipNoCommittedEvidence less flaky * use ipld alias instead of format * remove access to the IPFS dag from the blockstore and add it back to consensus state * change api provider to only use the dag instead of the core api object * change alias to ipld in node package * increase timeouts for TestWALTruncate and timeoutWaitGroup for CI --- blockchain/v0/reactor_test.go | 3 ++- cmd/tendermint/commands/light.go | 6 +++--- config/config.go | 2 +- consensus/byzantine_test.go | 5 +++-- consensus/common_test.go | 21 ++++++++++-------- consensus/mempool_test.go | 11 +++++----- consensus/reactor_test.go | 7 +++--- consensus/replay_file.go | 6 +++--- consensus/replay_test.go | 20 +++++++++-------- consensus/state_test.go | 7 +++--- consensus/wal_test.go | 8 +++---- evidence/pool_test.go | 3 ++- evidence/reactor_test.go | 10 ++++----- go.mod | 2 +- ipfs/embedded.go | 4 ++-- ipfs/mock.go | 30 +++++++++++--------------- ipfs/provider.go | 2 +- light/client.go | 10 ++++----- node/node.go | 37 +++++++++++++++++--------------- node/node_test.go | 5 +++-- store/store.go | 12 +++++++---- store/store_test.go | 15 +++++++------ 22 files changed, 121 insertions(+), 105 deletions(-) diff --git a/blockchain/v0/reactor_test.go b/blockchain/v0/reactor_test.go index ffa842cc33..218caf0b43 100644 --- a/blockchain/v0/reactor_test.go +++ b/blockchain/v0/reactor_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -71,7 +72,7 @@ func newBlockchainReactor( blockDB := memdb.NewDB() stateDB := memdb.NewDB() stateStore := sm.NewStore(stateDB) - blockStore := store.NewBlockStore(blockDB) + blockStore := store.NewBlockStore(blockDB, mdutils.Mock()) state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc) if err != nil { diff --git a/cmd/tendermint/commands/light.go b/cmd/tendermint/commands/light.go index 57332b7f0d..a1501e2b10 100644 --- a/cmd/tendermint/commands/light.go +++ b/cmd/tendermint/commands/light.go @@ -186,12 +186,12 @@ func runProxy(cmd *cobra.Command, args []string) error { cfg.RootDir = dir // TODO(ismail): share badger instance apiProvider := ipfs.Embedded(true, cfg, logger) - var coreAPI coreiface.CoreAPI - coreAPI, ipfsCloser, err = apiProvider() + var dag coreiface.APIDagService + dag, ipfsCloser, err = apiProvider() if err != nil { return fmt.Errorf("could not start ipfs API: %w", err) } - options = append(options, light.DataAvailabilitySampling(numSamples, coreAPI)) + options = append(options, light.DataAvailabilitySampling(numSamples, dag)) case sequential: options = append(options, light.SequentialVerification()) default: diff --git a/config/config.go b/config/config.go index cd1a6c2776..90bbc90c39 100644 --- a/config/config.go +++ b/config/config.go @@ -847,7 +847,7 @@ func TestConsensusConfig() *ConsensusConfig { cfg.TimeoutProposeDelta = 20 * time.Millisecond cfg.TimeoutPrevote = 80 * time.Millisecond cfg.TimeoutPrevoteDelta = 20 * time.Millisecond - cfg.TimeoutPrecommit = 80 * time.Millisecond + cfg.TimeoutPrecommit = 160 * time.Millisecond cfg.TimeoutPrecommitDelta = 20 * time.Millisecond // NOTE: when modifying, make sure to update time_iota_ms (testGenesisFmt) in toml.go cfg.TimeoutCommit = 80 * time.Millisecond diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index b3e3e25356..6876dbebba 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -56,7 +56,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { app.InitChain(abci.RequestInitChain{Validators: vals}) blockDB := memdb.NewDB() - blockStore := store.NewBlockStore(blockDB) + dag := mdutils.Mock() + blockStore := store.NewBlockStore(blockDB, dag) // one for mempool, one for consensus mtx := new(tmsync.Mutex) @@ -78,7 +79,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { // Make State blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool) - cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, mdutils.Mock(), evpool) + cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, dag, evpool) cs.SetLogger(cs.Logger) // set private validator pv := privVals[i] diff --git a/consensus/common_test.go b/consensus/common_test.go index d79654cc20..6120de4e85 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -14,6 +14,7 @@ import ( "time" "github.com/go-kit/kit/log/term" + format "github.com/ipfs/go-ipld-format" mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/require" @@ -52,7 +53,7 @@ type cleanupFunc func() var ( config *cfg.Config // NOTE: must be reset for each _test.go file consensusReplayConfig *cfg.Config - ensureTimeout = 2 * time.Second + ensureTimeout = 4 * time.Second ) func ensureDir(dir string, mode os.FileMode) { @@ -352,9 +353,9 @@ func subscribeToVoter(cs *State, addr []byte) <-chan tmpubsub.Message { //------------------------------------------------------------------------------- // consensus states -func newState(state sm.State, pv types.PrivValidator, app abci.Application) *State { +func newState(state sm.State, pv types.PrivValidator, app abci.Application, ipfsDagAPI format.DAGService) *State { config := cfg.ResetTestRoot("consensus_state_test") - return newStateWithConfig(config, state, pv, app) + return newStateWithConfig(config, state, pv, app, ipfsDagAPI) } func newStateWithConfig( @@ -362,9 +363,10 @@ func newStateWithConfig( state sm.State, pv types.PrivValidator, app abci.Application, + ipfsDagAPI format.DAGService, ) *State { blockDB := memdb.NewDB() - return newStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockDB) + return newStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockDB, ipfsDagAPI) } func newStateWithConfigAndBlockStore( @@ -373,9 +375,10 @@ func newStateWithConfigAndBlockStore( pv types.PrivValidator, app abci.Application, blockDB dbm.DB, + dag format.DAGService, ) *State { // Get BlockStore - blockStore := store.NewBlockStore(blockDB) + blockStore := store.NewBlockStore(blockDB, dag) // one for mempool, one for consensus mtx := new(tmsync.Mutex) @@ -399,7 +402,7 @@ func newStateWithConfigAndBlockStore( } blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool) - cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, mdutils.Mock(), evpool) + cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, dag, evpool) cs.SetLogger(log.TestingLogger().With("module", "consensus")) cs.SetPrivValidator(pv) @@ -431,7 +434,7 @@ func randState(nValidators int) (*State, []*validatorStub) { vss := make([]*validatorStub, nValidators) - cs := newState(state, privVals[0], counter.NewApplication(true)) + cs := newState(state, privVals[0], counter.NewApplication(true), mdutils.Mock()) for i := 0; i < nValidators; i++ { vss[i] = newValidatorStub(privVals[i], int32(i)) @@ -704,7 +707,7 @@ func randConsensusNet( vals := types.TM2PB.ValidatorUpdates(state.Validators) app.InitChain(abci.RequestInitChain{Validators: vals}) - css[i] = newStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, stateDB) + css[i] = newStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, stateDB, mdutils.Mock()) css[i].SetTimeoutTicker(tickerFunc()) css[i].SetLogger(logger.With("validator", i, "module", "consensus")) } @@ -767,7 +770,7 @@ func randConsensusNetWithPeers( app.InitChain(abci.RequestInitChain{Validators: vals}) // sm.SaveState(stateDB,state) //height 1's validatorsInfo already saved in LoadStateFromDBOrGenesisDoc above - css[i] = newStateWithConfig(thisConfig, state, privVal, app) + css[i] = newStateWithConfig(thisConfig, state, privVal, app, mdutils.Mock()) css[i].SetTimeoutTicker(tickerFunc()) css[i].SetLogger(logger.With("validator", i, "module", "consensus")) } diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 0d7690b066..01a8dbdfd9 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -29,7 +30,7 @@ func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) { config.Consensus.CreateEmptyBlocks = false state, privVals := randGenesisState(1, false, 10) - cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication()) + cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication(), mdutils.Mock()) assertMempool(cs.txNotifier).EnableTxsAvailable() height, round := cs.Height, cs.Round newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock) @@ -49,7 +50,7 @@ func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) { config.Consensus.CreateEmptyBlocksInterval = ensureTimeout state, privVals := randGenesisState(1, false, 10) - cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication()) + cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication(), mdutils.Mock()) assertMempool(cs.txNotifier).EnableTxsAvailable() @@ -67,7 +68,7 @@ func TestMempoolProgressInHigherRound(t *testing.T) { config.Consensus.CreateEmptyBlocks = false state, privVals := randGenesisState(1, false, 10) - cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication()) + cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication(), mdutils.Mock()) assertMempool(cs.txNotifier).EnableTxsAvailable() height, round := cs.Height, cs.Round @@ -117,7 +118,7 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) { blockDB := memdb.NewDB() stateStore := sm.NewStore(blockDB) - cs := newStateWithConfigAndBlockStore(config, state, privVals[0], NewCounterApplication(), blockDB) + cs := newStateWithConfigAndBlockStore(config, state, privVals[0], NewCounterApplication(), blockDB, mdutils.Mock()) err := stateStore.Save(state) require.NoError(t, err) newBlockHeaderCh := subscribe(cs.eventBus, types.EventQueryNewBlockHeader) @@ -143,7 +144,7 @@ func TestMempoolRmBadTx(t *testing.T) { blockDB := memdb.NewDB() stateStore := sm.NewStore(blockDB) - cs := newStateWithConfigAndBlockStore(config, state, privVals[0], app, blockDB) + cs := newStateWithConfigAndBlockStore(config, state, privVals[0], app, blockDB, mdutils.Mock()) err := stateStore.Save(state) require.NoError(t, err) diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 1997b602dd..25b090835a 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -155,7 +155,8 @@ func TestReactorWithEvidence(t *testing.T) { // css[i] = newStateWithConfig(thisConfig, state, privVals[i], app) blockDB := memdb.NewDB() - blockStore := store.NewBlockStore(blockDB) + dag := mdutils.Mock() + blockStore := store.NewBlockStore(blockDB, dag) // one for mempool, one for consensus mtx := new(tmsync.Mutex) @@ -183,7 +184,7 @@ func TestReactorWithEvidence(t *testing.T) { // Make State blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool) - cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, mdutils.Mock(), evpool2) + cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, dag, evpool2) cs.SetLogger(log.TestingLogger().With("module", "consensus")) cs.SetPrivValidator(pv) @@ -670,7 +671,7 @@ func timeoutWaitGroup(t *testing.T, n int, f func(int), css []*State) { // we're running many nodes in-process, possibly in in a virtual machine, // and spewing debug messages - making a block could take a while, - timeout := time.Minute * 4 + timeout := time.Minute * 8 select { case <-done: diff --git a/consensus/replay_file.go b/consensus/replay_file.go index a4efc88539..6e2bb44ee1 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -11,7 +11,6 @@ import ( "strings" mdutils "github.com/ipfs/go-merkledag/test" - cfg "github.com/lazyledger/lazyledger-core/config" "github.com/lazyledger/lazyledger-core/libs/db/badgerdb" "github.com/lazyledger/lazyledger-core/libs/log" @@ -290,7 +289,8 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo if err != nil { tmos.Exit(err.Error()) } - blockStore := store.NewBlockStore(blockStoreDB) + dag := mdutils.Mock() + blockStore := store.NewBlockStore(blockStoreDB, dag) // Get State stateDB, err := badgerdb.NewDB("state", config.DBDir()) @@ -331,7 +331,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) consensusState := NewState(csConfig, state.Copy(), blockExec, - blockStore, mempool, mdutils.Mock(), evpool) + blockStore, mempool, dag, evpool) consensusState.SetEventBus(eventBus) return consensusState diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 4bd2e97e19..781aaa38d9 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -14,6 +14,7 @@ import ( "time" "github.com/gogo/protobuf/proto" + format "github.com/ipfs/go-ipld-format" mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -78,6 +79,7 @@ func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Confi privValidator, kvstore.NewApplication(), blockDB, + mdutils.Mock(), ) cs.SetLogger(logger) @@ -130,9 +132,7 @@ func TestWALCrash(t *testing.T) { heightToStop int64 }{ {"empty block", - func(stateDB dbm.DB, cs *State, ctx context.Context) { - cs.dag = mdutils.Mock() - }, + func(stateDB dbm.DB, cs *State, ctx context.Context) {}, 1}, {"many non-empty blocks", func(stateDB dbm.DB, cs *State, ctx context.Context) { @@ -174,6 +174,7 @@ LOOP: privValidator, kvstore.NewApplication(), blockDB, + mdutils.Mock(), ) cs.SetLogger(logger) @@ -1181,16 +1182,17 @@ func stateAndStore( // mock block store type mockBlockStore struct { - config *cfg.Config - params tmproto.ConsensusParams - chain []*types.Block - commits []*types.Commit - base int64 + config *cfg.Config + params tmproto.ConsensusParams + chain []*types.Block + commits []*types.Commit + base int64 + ipfsDagAPI format.DAGService } // TODO: NewBlockStore(db.NewMemDB) ... func newMockBlockStore(config *cfg.Config, params tmproto.ConsensusParams) *mockBlockStore { - return &mockBlockStore{config, params, nil, nil, 0} + return &mockBlockStore{config, params, nil, nil, 0, mdutils.Mock()} } func (bs *mockBlockStore) Height() int64 { return int64(len(bs.chain)) } diff --git a/consensus/state_test.go b/consensus/state_test.go index a8454f4b45..e713917dd5 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -639,7 +640,7 @@ func TestStateLockPOLRelock(t *testing.T) { signAddVotes(cs1, tmproto.PrecommitType, nil, types.PartSetHeader{}, vs2, vs3, vs4) // before we timeout to the new round set the new proposal - cs2 := newState(cs1.state, vs2, counter.NewApplication(true)) + cs2 := newState(cs1.state, vs2, counter.NewApplication(true), mdutils.Mock()) prop, propBlock := decideProposal(cs2, vs2, vs2.Height, vs2.Round+1) if prop == nil || propBlock == nil { t.Fatal("Failed to create proposal block with vs2") @@ -825,7 +826,7 @@ func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) { signAddVotes(cs1, tmproto.PrecommitType, nil, types.PartSetHeader{}, vs2, vs3, vs4) // before we timeout to the new round set the new proposal - cs2 := newState(cs1.state, vs2, counter.NewApplication(true)) + cs2 := newState(cs1.state, vs2, counter.NewApplication(true), mdutils.Mock()) prop, propBlock := decideProposal(cs2, vs2, vs2.Height, vs2.Round+1) if prop == nil || propBlock == nil { t.Fatal("Failed to create proposal block with vs2") @@ -869,7 +870,7 @@ func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) { signAddVotes(cs1, tmproto.PrecommitType, nil, types.PartSetHeader{}, vs2, vs3, vs4) // before we timeout to the new round set the new proposal - cs3 := newState(cs1.state, vs3, counter.NewApplication(true)) + cs3 := newState(cs1.state, vs3, counter.NewApplication(true), mdutils.Mock()) prop, propBlock = decideProposal(cs3, vs3, vs3.Height, vs3.Round+1) if prop == nil || propBlock == nil { t.Fatal("Failed to create proposal block with vs2") diff --git a/consensus/wal_test.go b/consensus/wal_test.go index 718af8a1e9..3dc0170219 100644 --- a/consensus/wal_test.go +++ b/consensus/wal_test.go @@ -67,7 +67,7 @@ func TestWALTruncate(t *testing.T) { err = walGenerateNBlocks(t, wal.Group(), 60) require.NoError(t, err) - time.Sleep(1 * time.Millisecond) // wait groupCheckDuration, make sure RotateFile run + time.Sleep(5 * time.Millisecond) // wait groupCheckDuration, make sure RotateFile run if err := wal.FlushAndSync(); err != nil { t.Error(err) @@ -311,8 +311,8 @@ func walGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { if err = stateStore.Save(state); err != nil { t.Error(err) } - - blockStore := store.NewBlockStore(blockStoreDB) + dag := mdutils.Mock() + blockStore := store.NewBlockStore(blockStoreDB, dag) proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app)) proxyApp.SetLogger(logger.With("module", "proxy")) @@ -339,7 +339,7 @@ func walGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { evpool := sm.EmptyEvidencePool{} blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) require.NoError(t, err) - consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, mdutils.Mock(), evpool) + consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, dag, evpool) consensusState.SetLogger(logger) consensusState.SetEventBus(eventBus) if privValidator != nil && privValidator != (*privval.FilePV)(nil) { diff --git a/evidence/pool_test.go b/evidence/pool_test.go index 2f8af65eb6..0c5a64a51f 100644 --- a/evidence/pool_test.go +++ b/evidence/pool_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -395,7 +396,7 @@ func initializeValidatorState(privVal types.PrivValidator, height int64) sm.Stor // initializeBlockStore creates a block storage and populates it w/ a dummy // block at +height+. func initializeBlockStore(db dbm.DB, state sm.State, valAddr []byte) *store.BlockStore { - blockStore := store.NewBlockStore(db) + blockStore := store.NewBlockStore(db, mdutils.Mock()) for i := int64(1); i <= state.LastBlockHeight; i++ { lastCommit := makeCommit(i-1, valAddr) diff --git a/evidence/reactor_test.go b/evidence/reactor_test.go index f533621534..703d542284 100644 --- a/evidence/reactor_test.go +++ b/evidence/reactor_test.go @@ -130,7 +130,7 @@ func TestReactorsGossipNoCommittedEvidence(t *testing.T) { pools[0].Update(state, evList) require.EqualValues(t, uint32(0), pools[0].Size()) - time.Sleep(100 * time.Millisecond) + time.Sleep(200 * time.Millisecond) peer := reactors[0].Switch.Peers().List()[0] ps := peerState{height - 2} @@ -141,7 +141,7 @@ func TestReactorsGossipNoCommittedEvidence(t *testing.T) { peer.Set(types.PeerStateKey, ps) // wait to see that no evidence comes through - time.Sleep(300 * time.Millisecond) + time.Sleep(600 * time.Millisecond) // the second pool should not have received any evidence because it has already been committed assert.Equal(t, uint32(0), pools[1].Size(), "second reactor should not have received evidence") @@ -157,7 +157,7 @@ func TestReactorsGossipNoCommittedEvidence(t *testing.T) { } // wait to see that only one evidence is sent - time.Sleep(300 * time.Millisecond) + time.Sleep(600 * time.Millisecond) // the second pool should only have received the first evidence because it is behind peerEv, _ := pools[1].PendingEvidence(10000) @@ -178,9 +178,9 @@ func TestReactorsGossipNoCommittedEvidence(t *testing.T) { peer.Set(types.PeerStateKey, ps) // wait to see that only two evidence is sent - time.Sleep(300 * time.Millisecond) + time.Sleep(1800 * time.Millisecond) - peerEv, _ = pools[1].PendingEvidence(1000) + peerEv, _ = pools[1].PendingEvidence(2000) assert.EqualValues(t, []types.Evidence{evList[0], evList[1]}, peerEv) } diff --git a/go.mod b/go.mod index 8af38a9f43..28cafdfbcb 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/ipfs/go-ipfs-api v0.2.0 github.com/ipfs/go-ipfs-config v0.11.0 github.com/ipfs/go-ipld-format v0.2.0 - github.com/ipfs/go-merkledag v0.3.2 // indirect + github.com/ipfs/go-merkledag v0.3.2 github.com/ipfs/go-path v0.0.9 // indirect github.com/ipfs/go-verifcid v0.0.1 github.com/ipfs/interface-go-ipfs-core v0.4.0 diff --git a/ipfs/embedded.go b/ipfs/embedded.go index 27f0fc14ec..d318f3200f 100644 --- a/ipfs/embedded.go +++ b/ipfs/embedded.go @@ -26,7 +26,7 @@ import ( // Embedded is the provider that embeds IPFS node within the same process. // It also returns closable for graceful node shutdown. func Embedded(init bool, cfg *Config, logger log.Logger) APIProvider { - return func() (coreiface.CoreAPI, io.Closer, error) { + return func() (coreiface.APIDagService, io.Closer, error) { path := cfg.Path() defer os.Setenv(ipfscfg.EnvDir, path) @@ -87,7 +87,7 @@ func Embedded(init bool, cfg *Config, logger log.Logger) APIProvider { } logger.Info("Successfully created embedded IPFS node", "ipfs-repo", path) - return api, node, nil + return api.Dag(), node, nil } } diff --git a/ipfs/mock.go b/ipfs/mock.go index 1be81c7517..e7e7282c62 100644 --- a/ipfs/mock.go +++ b/ipfs/mock.go @@ -3,28 +3,24 @@ package ipfs import ( "io" - "github.com/ipfs/go-ipfs/core/coreapi" - coremock "github.com/ipfs/go-ipfs/core/mock" + ipld "github.com/ipfs/go-ipld-format" + mdutils "github.com/ipfs/go-merkledag/test" coreiface "github.com/ipfs/interface-go-ipfs-core" - - "github.com/lazyledger/lazyledger-core/ipfs/plugin" ) // Mock provides simple mock IPFS API useful for testing func Mock() APIProvider { - return func() (coreiface.CoreAPI, io.Closer, error) { - plugin.EnableNMT() - - nd, err := coremock.NewMockNode() - if err != nil { - return nil, nil, err - } - - api, err := coreapi.NewCoreAPI(nd) - if err != nil { - return nil, nil, err - } + return func() (coreiface.APIDagService, io.Closer, error) { + dom := dagOnlyMock{mdutils.Mock()} - return api, nd, nil + return dom, dom, nil } } + +type dagOnlyMock struct { + ipld.DAGService +} + +func (dom dagOnlyMock) Dag() coreiface.APIDagService { return dom } +func (dagOnlyMock) Close() error { return nil } +func (dom dagOnlyMock) Pinning() ipld.NodeAdder { return dom } diff --git a/ipfs/provider.go b/ipfs/provider.go index 9266350c19..d7575526ae 100644 --- a/ipfs/provider.go +++ b/ipfs/provider.go @@ -7,4 +7,4 @@ import ( ) // APIProvider allows customizable IPFS core APIs. -type APIProvider func() (coreiface.CoreAPI, io.Closer, error) +type APIProvider func() (coreiface.APIDagService, io.Closer, error) diff --git a/light/client.go b/light/client.go index 06f9d2360b..d0a38a2c18 100644 --- a/light/client.go +++ b/light/client.go @@ -71,12 +71,12 @@ func SkippingVerification(trustLevel tmmath.Fraction) Option { } } -func DataAvailabilitySampling(numSamples uint32, ipfsAPI coreiface.CoreAPI) Option { +func DataAvailabilitySampling(numSamples uint32, ipfsAPI coreiface.APIDagService) Option { return func(c *Client) { c.verificationMode = dataAvailabilitySampling c.numSamples = numSamples - c.ipfsCoreAPI = ipfsAPI - c.dag = merkledag.NewSession(context.TODO(), ipfsAPI.Dag()) + c.dag = ipfsAPI + c.sessionDAG = merkledag.NewSession(context.TODO(), ipfsAPI) } } @@ -157,8 +157,8 @@ type Client struct { logger log.Logger - ipfsCoreAPI coreiface.CoreAPI - dag format.NodeGetter + dag coreiface.APIDagService + sessionDAG format.NodeGetter } // NewClient returns a new light client. It returns an error if it fails to diff --git a/node/node.go b/node/node.go index 8b20c67d1f..00ed08bf98 100644 --- a/node/node.go +++ b/node/node.go @@ -12,8 +12,8 @@ import ( "strings" "time" - format "github.com/ipfs/go-ipld-format" - ipface "github.com/ipfs/interface-go-ipfs-core" + ipld "github.com/ipfs/go-ipld-format" + iface "github.com/ipfs/interface-go-ipfs-core" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/cors" @@ -216,17 +216,20 @@ type Node struct { indexerService *txindex.IndexerService prometheusSrv *http.Server - ipfsAPI ipface.CoreAPI ipfsClose io.Closer } -func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { +func initDBs( + config *cfg.Config, + dbProvider DBProvider, + dag iface.APIDagService, +) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { var blockStoreDB dbm.DB blockStoreDB, err = dbProvider(&DBContext{"blockstore", config}) if err != nil { return } - blockStore = store.NewBlockStore(blockStoreDB) + blockStore = store.NewBlockStore(blockStoreDB, dag) stateDB, err = dbProvider(&DBContext{"state", config}) if err != nil { @@ -386,7 +389,8 @@ func createBlockchainReactor(config *cfg.Config, return bcReactor, nil } -func createConsensusReactor(config *cfg.Config, +func createConsensusReactor( + config *cfg.Config, state sm.State, blockExec *sm.BlockExecutor, blockStore sm.BlockStore, @@ -396,7 +400,7 @@ func createConsensusReactor(config *cfg.Config, csMetrics *cs.Metrics, waitSync bool, eventBus *types.EventBus, - dag format.DAGService, + dag ipld.DAGService, consensusLogger log.Logger) (*cs.Reactor, *cs.State) { consensusState := cs.NewState( @@ -639,7 +643,12 @@ func NewNode(config *cfg.Config, logger log.Logger, options ...Option) (*Node, error) { - blockStore, stateDB, err := initDBs(config, dbProvider) + dag, ipfsclose, err := ipfsProvider() + if err != nil { + return nil, err + } + + blockStore, stateDB, err := initDBs(config, dbProvider, dag) if err != nil { return nil, err } @@ -738,11 +747,6 @@ func NewNode(config *cfg.Config, sm.BlockExecutorWithMetrics(smMetrics), ) - ipfs, ipfsclose, err := ipfsProvider() - if err != nil { - return nil, err - } - // Make BlockchainReactor. Don't start fast sync if we're doing a state sync first. bcReactor, err := createBlockchainReactor(config, state, blockExec, blockStore, fastSync && !stateSync, logger) if err != nil { @@ -758,7 +762,7 @@ func NewNode(config *cfg.Config, } consensusReactor, consensusState := createConsensusReactor( config, state, blockExec, blockStore, mempool, evidencePool, - privValidator, csMetrics, stateSync || fastSync, eventBus, ipfs.Dag(), consensusLogger, + privValidator, csMetrics, stateSync || fastSync, eventBus, dag, consensusLogger, ) // Set up state sync reactor, and schedule a sync if requested. @@ -859,7 +863,6 @@ func NewNode(config *cfg.Config, txIndexer: txIndexer, indexerService: indexerService, eventBus: eventBus, - ipfsAPI: ipfs, ipfsClose: ipfsclose, } node.BaseService = *service.NewBaseService(logger, "Node", node) @@ -1410,8 +1413,8 @@ func createAndStartPrivValidatorSocketClient( } const ( - retries = 50 // 50 * 100ms = 5s total - timeout = 100 * time.Millisecond + retries = 50 // 50 * 200ms = 10s total + timeout = 200 * time.Millisecond ) pvscWithRetries := privval.NewRetrySignerClient(pvsc, retries, timeout) diff --git a/node/node_test.go b/node/node_test.go index 8fde86ae87..b2d4d560a2 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -218,7 +219,7 @@ func TestNodeSetPrivValIPC(t *testing.T) { log.TestingLogger(), dialer, ) - privval.SignerDialerEndpointTimeoutReadWrite(100 * time.Millisecond)(dialerEndpoint) + privval.SignerDialerEndpointTimeoutReadWrite(400 * time.Millisecond)(dialerEndpoint) pvsc := privval.NewSignerServer( dialerEndpoint, @@ -283,7 +284,7 @@ func TestCreateProposalBlock(t *testing.T) { // Make EvidencePool evidenceDB := memdb.NewDB() - blockStore := store.NewBlockStore(memdb.NewDB()) + blockStore := store.NewBlockStore(memdb.NewDB(), mdutils.Mock()) evidencePool, err := evidence.NewPool(evidenceDB, stateStore, blockStore) require.NoError(t, err) evidencePool.SetLogger(logger) diff --git a/store/store.go b/store/store.go index fa5dca3421..33e86b95fc 100644 --- a/store/store.go +++ b/store/store.go @@ -5,6 +5,7 @@ import ( "strconv" "github.com/gogo/protobuf/proto" + ipld "github.com/ipfs/go-ipld-format" dbm "github.com/lazyledger/lazyledger-core/libs/db" tmsync "github.com/lazyledger/lazyledger-core/libs/sync" @@ -41,16 +42,19 @@ type BlockStore struct { mtx tmsync.RWMutex base int64 height int64 + + ipfsDagAPI ipld.DAGService } // NewBlockStore returns a new BlockStore with the given DB, // initialized to the last height that was committed to the DB. -func NewBlockStore(db dbm.DB) *BlockStore { +func NewBlockStore(db dbm.DB, dagAPI ipld.DAGService) *BlockStore { bs := LoadBlockStoreState(db) return &BlockStore{ - base: bs.Base, - height: bs.Height, - db: db, + base: bs.Base, + height: bs.Height, + db: db, + ipfsDagAPI: dagAPI, } } diff --git a/store/store_test.go b/store/store_test.go index 1f45a018ec..f290510ff0 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/gogo/protobuf/proto" + mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -73,7 +74,7 @@ func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore, cleanupFu if err != nil { panic(fmt.Errorf("error constructing state from genesis file: %w", err)) } - return state, NewBlockStore(blockDB), func() { os.RemoveAll(config.RootDir) } + return state, NewBlockStore(blockDB, mdutils.Mock()), func() { os.RemoveAll(config.RootDir) } } func TestLoadBlockStoreState(t *testing.T) { @@ -105,7 +106,7 @@ func TestNewBlockStore(t *testing.T) { bz, _ := proto.Marshal(&bss) err := db.Set(blockStoreKey, bz) require.NoError(t, err) - bs := NewBlockStore(db) + bs := NewBlockStore(db, mdutils.Mock()) require.Equal(t, int64(100), bs.Base(), "failed to properly parse blockstore") require.Equal(t, int64(10000), bs.Height(), "failed to properly parse blockstore") @@ -123,7 +124,7 @@ func TestNewBlockStore(t *testing.T) { _, _, panicErr := doFn(func() (interface{}, error) { err := db.Set(blockStoreKey, tt.data) require.NoError(t, err) - _ = NewBlockStore(db) + _ = NewBlockStore(db, mdutils.Mock()) return nil, nil }) require.NotNil(t, panicErr, "#%d panicCauser: %q expected a panic", i, tt.data) @@ -132,13 +133,13 @@ func TestNewBlockStore(t *testing.T) { err = db.Set(blockStoreKey, []byte{}) require.NoError(t, err) - bs = NewBlockStore(db) + bs = NewBlockStore(db, mdutils.Mock()) assert.Equal(t, bs.Height(), int64(0), "expecting empty bytes to be unmarshaled alright") } func freshBlockStore() (*BlockStore, dbm.DB) { db := memdb.NewDB() - return NewBlockStore(db), db + return NewBlockStore(db, mdutils.Mock()), db } var ( @@ -380,7 +381,7 @@ func TestLoadBaseMeta(t *testing.T) { stateStore := sm.NewStore(memdb.NewDB()) state, err := stateStore.LoadFromDBOrGenesisFile(config.GenesisFile()) require.NoError(t, err) - bs := NewBlockStore(memdb.NewDB()) + bs := NewBlockStore(memdb.NewDB(), mdutils.Mock()) for h := int64(1); h <= 10; h++ { block := makeBlock(h, state, new(types.Commit)) @@ -437,7 +438,7 @@ func TestPruneBlocks(t *testing.T) { state, err := stateStore.LoadFromDBOrGenesisFile(config.GenesisFile()) require.NoError(t, err) db := memdb.NewDB() - bs := NewBlockStore(db) + bs := NewBlockStore(db, mdutils.Mock()) assert.EqualValues(t, 0, bs.Base()) assert.EqualValues(t, 0, bs.Height()) assert.EqualValues(t, 0, bs.Size())