From 3795cc2bd20ef13fa256ac9c71de23bc3d3de42d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Sun, 28 Feb 2021 22:48:36 +0000 Subject: [PATCH 1/6] segregate chain and state blockstores. This paves the way for better object lifetime management. Concretely, it makes it possible to: - have different stores backing chain and state data. - having the same datastore library, but using different parameters. - attach different caching layers/policies to each class of data, e.g. sizing caches differently. - specifying different retention policies for chain and state data. This separation is important because: - access patterns/frequency of chain and state data are different. - state is derivable from chain, so one could never expunge the chain store, and only retain state objects reachable from the last finality in the state store. --- blockstore/badger/blockstore.go | 5 +- blockstore/badger/blockstore_test_suite.go | 9 +- blockstore/fallback.go | 15 +- blockstore/metrics.go | 154 ++++++++++++++++++ chain/gen/gen.go | 2 +- chain/gen/genesis/genesis.go | 2 +- chain/gen/genesis/miners.go | 2 +- chain/gen/mining.go | 2 +- chain/stmgr/call.go | 4 +- chain/stmgr/forks.go | 26 +-- chain/stmgr/forks_test.go | 2 +- chain/stmgr/read.go | 4 +- chain/stmgr/stmgr.go | 23 +-- chain/stmgr/utils.go | 32 ++-- chain/store/store.go | 140 ++++++++-------- chain/store/store_test.go | 2 +- chain/store/weight.go | 4 +- chain/sync.go | 10 +- cmd/lotus-bench/import.go | 31 ++-- cmd/lotus-shed/balances.go | 4 +- cmd/lotus-shed/datastore.go | 4 +- cmd/lotus-shed/export.go | 2 +- cmd/lotus-shed/import-car.go | 4 +- cmd/lotus-shed/pruning.go | 4 +- cmd/lotus/daemon.go | 2 +- documentation/en/architecture/architecture.md | 2 +- go.mod | 2 +- go.sum | 3 +- metrics/metrics.go | 62 +++---- node/builder.go | 11 +- node/impl/full/chain.go | 21 ++- node/impl/full/state.go | 52 +++--- node/modules/blockstore.go | 65 ++++++++ node/modules/chain.go | 116 ++----------- node/modules/client.go | 2 +- node/modules/dtypes/storage.go | 34 +++- node/modules/genesis.go | 73 +++++++++ node/modules/graphsync.go | 4 +- node/modules/ipfsclient.go | 2 +- node/repo/blockstore_opts.go | 4 - node/repo/fsrepo.go | 30 +++- node/repo/importmgr/mgr.go | 2 +- node/repo/interface.go | 12 +- node/repo/memrepo.go | 10 +- .../retrievalstoremgr/retrievalstoremgr.go | 4 +- 45 files changed, 630 insertions(+), 370 deletions(-) create mode 100644 blockstore/metrics.go create mode 100644 node/modules/blockstore.go create mode 100644 node/modules/genesis.go diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index 47dbf98d362..22f9036e32b 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -110,10 +110,7 @@ func Open(opts Options) (*Blockstore, error) { return nil, fmt.Errorf("failed to open badger blockstore: %w", err) } - bs := &Blockstore{ - DB: db, - } - + bs := &Blockstore{DB: db} if p := opts.Prefix; p != "" { bs.prefixing = true bs.prefix = []byte(p) diff --git a/blockstore/badger/blockstore_test_suite.go b/blockstore/badger/blockstore_test_suite.go index ebf1be80afc..93be82ac87e 100644 --- a/blockstore/badger/blockstore_test_suite.go +++ b/blockstore/badger/blockstore_test_suite.go @@ -8,18 +8,19 @@ import ( "strings" "testing" - "github.com/filecoin-project/lotus/blockstore" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" u "github.com/ipfs/go-ipfs-util" + "github.com/filecoin-project/lotus/blockstore" + "github.com/stretchr/testify/require" ) // TODO: move this to go-ipfs-blockstore. type Suite struct { - NewBlockstore func(tb testing.TB) (bs blockstore.Blockstore, path string) - OpenBlockstore func(tb testing.TB, path string) (bs blockstore.Blockstore, err error) + NewBlockstore func(tb testing.TB) (bs blockstore.BasicBlockstore, path string) + OpenBlockstore func(tb testing.TB, path string) (bs blockstore.BasicBlockstore, err error) } func (s *Suite) RunTests(t *testing.T, prefix string) { @@ -290,7 +291,7 @@ func (s *Suite) TestDelete(t *testing.T) { } -func insertBlocks(t *testing.T, bs blockstore.Blockstore, count int) []cid.Cid { +func insertBlocks(t *testing.T, bs blockstore.BasicBlockstore, count int) []cid.Cid { keys := make([]cid.Cid, count) for i := 0; i < count; i++ { block := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i))) diff --git a/blockstore/fallback.go b/blockstore/fallback.go index 3a71913d434..5f220f941bb 100644 --- a/blockstore/fallback.go +++ b/blockstore/fallback.go @@ -11,6 +11,19 @@ import ( "github.com/ipfs/go-cid" ) +// UnwrapFallbackStore takes a blockstore, and returns the underlying blockstore +// if it was a FallbackStore. Otherwise, it just returns the supplied store +// unmodified. +func UnwrapFallbackStore(bs Blockstore) (Blockstore, bool) { + if fbs, ok := bs.(*FallbackStore); ok { + return fbs.Blockstore, true + } + return bs, false +} + +// FallbackStore is a read-through store that queries another (potentially +// remote) source if the block is not found locally. If the block is found +// during the fallback, it stores it in the local store. type FallbackStore struct { Blockstore @@ -30,7 +43,7 @@ func (fbs *FallbackStore) SetFallback(missFn func(context.Context, cid.Cid) (blo } func (fbs *FallbackStore) getFallback(c cid.Cid) (blocks.Block, error) { - log.Errorw("fallbackstore: Block not found locally, fetching from the network", "cid", c) + log.Warnf("fallbackstore: block not found locally, fetching from the network; cid: %s", c) fbs.lk.RLock() defer fbs.lk.RUnlock() diff --git a/blockstore/metrics.go b/blockstore/metrics.go new file mode 100644 index 00000000000..737690a1106 --- /dev/null +++ b/blockstore/metrics.go @@ -0,0 +1,154 @@ +package blockstore + +import ( + "time" + + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +// +// Currently unused, but kept in repo in case we introduce one of the candidate +// cache implementations (Freecache, Ristretto), both of which report these +// metrics. +// + +// CacheMetricsEmitInterval is the interval at which metrics are emitted onto +// OpenCensus. +var CacheMetricsEmitInterval = 5 * time.Second + +var ( + CacheName, _ = tag.NewKey("cache_name") +) + +// CacheMeasures groups all metrics emitted by the blockstore caches. +var CacheMeasures = struct { + HitRatio *stats.Float64Measure + Hits *stats.Int64Measure + Misses *stats.Int64Measure + Entries *stats.Int64Measure + QueriesServed *stats.Int64Measure + Adds *stats.Int64Measure + Updates *stats.Int64Measure + Evictions *stats.Int64Measure + CostAdded *stats.Int64Measure + CostEvicted *stats.Int64Measure + SetsDropped *stats.Int64Measure + SetsRejected *stats.Int64Measure + QueriesDropped *stats.Int64Measure +}{ + HitRatio: stats.Float64("blockstore/cache/hit_ratio", "Hit ratio of blockstore cache", stats.UnitDimensionless), + Hits: stats.Int64("blockstore/cache/hits", "Total number of hits at blockstore cache", stats.UnitDimensionless), + Misses: stats.Int64("blockstore/cache/misses", "Total number of misses at blockstore cache", stats.UnitDimensionless), + Entries: stats.Int64("blockstore/cache/entry_count", "Total number of entries currently in the blockstore cache", stats.UnitDimensionless), + QueriesServed: stats.Int64("blockstore/cache/queries_served", "Total number of queries served by the blockstore cache", stats.UnitDimensionless), + Adds: stats.Int64("blockstore/cache/adds", "Total number of adds to blockstore cache", stats.UnitDimensionless), + Updates: stats.Int64("blockstore/cache/updates", "Total number of updates in blockstore cache", stats.UnitDimensionless), + Evictions: stats.Int64("blockstore/cache/evictions", "Total number of evictions from blockstore cache", stats.UnitDimensionless), + CostAdded: stats.Int64("blockstore/cache/cost_added", "Total cost (byte size) of entries added into blockstore cache", stats.UnitBytes), + CostEvicted: stats.Int64("blockstore/cache/cost_evicted", "Total cost (byte size) of entries evicted by blockstore cache", stats.UnitBytes), + SetsDropped: stats.Int64("blockstore/cache/sets_dropped", "Total number of sets dropped by blockstore cache", stats.UnitDimensionless), + SetsRejected: stats.Int64("blockstore/cache/sets_rejected", "Total number of sets rejected by blockstore cache", stats.UnitDimensionless), + QueriesDropped: stats.Int64("blockstore/cache/queries_dropped", "Total number of queries dropped by blockstore cache", stats.UnitDimensionless), +} + +// CacheViews groups all cache-related default views. +var CacheViews = struct { + HitRatio *view.View + Hits *view.View + Misses *view.View + Entries *view.View + QueriesServed *view.View + Adds *view.View + Updates *view.View + Evictions *view.View + CostAdded *view.View + CostEvicted *view.View + SetsDropped *view.View + SetsRejected *view.View + QueriesDropped *view.View +}{ + HitRatio: &view.View{ + Measure: CacheMeasures.HitRatio, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + Hits: &view.View{ + Measure: CacheMeasures.Hits, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + Misses: &view.View{ + Measure: CacheMeasures.Misses, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + Entries: &view.View{ + Measure: CacheMeasures.Entries, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + QueriesServed: &view.View{ + Measure: CacheMeasures.QueriesServed, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + Adds: &view.View{ + Measure: CacheMeasures.Adds, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + Updates: &view.View{ + Measure: CacheMeasures.Updates, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + Evictions: &view.View{ + Measure: CacheMeasures.Evictions, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + CostAdded: &view.View{ + Measure: CacheMeasures.CostAdded, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + CostEvicted: &view.View{ + Measure: CacheMeasures.CostEvicted, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + SetsDropped: &view.View{ + Measure: CacheMeasures.SetsDropped, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + SetsRejected: &view.View{ + Measure: CacheMeasures.SetsRejected, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + QueriesDropped: &view.View{ + Measure: CacheMeasures.QueriesDropped, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, +} + +// DefaultViews exports all default views for this package. +var DefaultViews = []*view.View{ + CacheViews.HitRatio, + CacheViews.Hits, + CacheViews.Misses, + CacheViews.Entries, + CacheViews.QueriesServed, + CacheViews.Adds, + CacheViews.Updates, + CacheViews.Evictions, + CacheViews.CostAdded, + CacheViews.CostEvicted, + CacheViews.SetsDropped, + CacheViews.SetsRejected, + CacheViews.QueriesDropped, +} diff --git a/chain/gen/gen.go b/chain/gen/gen.go index 18cbb64f7a6..d06c755fa34 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -125,7 +125,7 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) { return nil, xerrors.Errorf("failed to get metadata datastore: %w", err) } - bs, err := lr.Blockstore(context.TODO(), repo.BlockstoreChain) + bs, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore) if err != nil { return nil, err } diff --git a/chain/gen/genesis/genesis.go b/chain/gen/genesis/genesis.go index 92b4919c198..3a4e317a566 100644 --- a/chain/gen/genesis/genesis.go +++ b/chain/gen/genesis/genesis.go @@ -406,7 +406,7 @@ func VerifyPreSealedData(ctx context.Context, cs *store.ChainStore, stateroot ci StateBase: stateroot, Epoch: 0, Rand: &fakeRand{}, - Bstore: cs.Blockstore(), + Bstore: cs.StateBlockstore(), Syscalls: mkFakedSigSyscalls(cs.VMSys()), CircSupplyCalc: nil, NtwkVersion: genesisNetworkVersion, diff --git a/chain/gen/genesis/miners.go b/chain/gen/genesis/miners.go index 850c2f39ff0..297543886dd 100644 --- a/chain/gen/genesis/miners.go +++ b/chain/gen/genesis/miners.go @@ -70,7 +70,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sroot cid.Cid StateBase: sroot, Epoch: 0, Rand: &fakeRand{}, - Bstore: cs.Blockstore(), + Bstore: cs.StateBlockstore(), Syscalls: mkFakedSigSyscalls(cs.VMSys()), CircSupplyCalc: csc, NtwkVersion: genesisNetworkVersion, diff --git a/chain/gen/mining.go b/chain/gen/mining.go index 5de0fec0ed0..3c6a8987362 100644 --- a/chain/gen/mining.go +++ b/chain/gen/mining.go @@ -79,7 +79,7 @@ func MinerCreateBlock(ctx context.Context, sm *stmgr.StateManager, w api.WalletA } } - store := sm.ChainStore().Store(ctx) + store := sm.ChainStore().ActorStore(ctx) blsmsgroot, err := toArray(store, blsMsgCids) if err != nil { return nil, xerrors.Errorf("building bls amt: %w", err) diff --git a/chain/stmgr/call.go b/chain/stmgr/call.go index bb0f0e5ecd1..89f91b0b7ba 100644 --- a/chain/stmgr/call.go +++ b/chain/stmgr/call.go @@ -59,7 +59,7 @@ func (sm *StateManager) Call(ctx context.Context, msg *types.Message, ts *types. StateBase: bstate, Epoch: bheight, Rand: store.NewChainRand(sm.cs, ts.Cids()), - Bstore: sm.cs.Blockstore(), + Bstore: sm.cs.StateBlockstore(), Syscalls: sm.cs.VMSys(), CircSupplyCalc: sm.GetVMCirculatingSupply, NtwkVersion: sm.GetNtwkVersion, @@ -174,7 +174,7 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri StateBase: state, Epoch: ts.Height() + 1, Rand: r, - Bstore: sm.cs.Blockstore(), + Bstore: sm.cs.StateBlockstore(), Syscalls: sm.cs.VMSys(), CircSupplyCalc: sm.GetVMCirculatingSupply, NtwkVersion: sm.GetNtwkVersion, diff --git a/chain/stmgr/forks.go b/chain/stmgr/forks.go index 90dcaf72913..899397940d5 100644 --- a/chain/stmgr/forks.go +++ b/chain/stmgr/forks.go @@ -504,7 +504,7 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio } case builtin0.StorageMinerActorCodeID: var st miner0.State - if err := sm.ChainStore().Store(ctx).Get(ctx, act.Head, &st); err != nil { + if err := sm.ChainStore().ActorStore(ctx).Get(ctx, act.Head, &st); err != nil { return xerrors.Errorf("failed to load miner state: %w", err) } @@ -548,7 +548,7 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio return cid.Undef, xerrors.Errorf("failed to load power actor: %w", err) } - cst := cbor.NewCborStore(sm.ChainStore().Blockstore()) + cst := cbor.NewCborStore(sm.ChainStore().StateBlockstore()) if err := cst.Get(ctx, powAct.Head, &ps); err != nil { return cid.Undef, xerrors.Errorf("failed to get power actor state: %w", err) } @@ -582,7 +582,7 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio } case builtin0.StorageMinerActorCodeID: var st miner0.State - if err := sm.ChainStore().Store(ctx).Get(ctx, act.Head, &st); err != nil { + if err := sm.ChainStore().ActorStore(ctx).Get(ctx, act.Head, &st); err != nil { return xerrors.Errorf("failed to load miner state: %w", err) } @@ -591,7 +591,7 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio return xerrors.Errorf("failed to get miner info: %w", err) } - sectorsArr, err := adt0.AsArray(sm.ChainStore().Store(ctx), st.Sectors) + sectorsArr, err := adt0.AsArray(sm.ChainStore().ActorStore(ctx), st.Sectors) if err != nil { return xerrors.Errorf("failed to load sectors array: %w", err) } @@ -611,11 +611,11 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio lbact, err := lbtree.GetActor(addr) if err == nil { var lbst miner0.State - if err := sm.ChainStore().Store(ctx).Get(ctx, lbact.Head, &lbst); err != nil { + if err := sm.ChainStore().ActorStore(ctx).Get(ctx, lbact.Head, &lbst); err != nil { return xerrors.Errorf("failed to load miner state: %w", err) } - lbsectors, err := adt0.AsArray(sm.ChainStore().Store(ctx), lbst.Sectors) + lbsectors, err := adt0.AsArray(sm.ChainStore().ActorStore(ctx), lbst.Sectors) if err != nil { return xerrors.Errorf("failed to load lb sectors array: %w", err) } @@ -711,7 +711,7 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio } func UpgradeIgnition(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { - store := sm.cs.Store(ctx) + store := sm.cs.ActorStore(ctx) if build.UpgradeLiftoffHeight <= epoch { return cid.Undef, xerrors.Errorf("liftoff height must be beyond ignition height") @@ -767,7 +767,7 @@ func UpgradeIgnition(ctx context.Context, sm *StateManager, _ MigrationCache, cb func UpgradeRefuel(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { - store := sm.cs.Store(ctx) + store := sm.cs.ActorStore(ctx) tree, err := sm.StateTree(root) if err != nil { return cid.Undef, xerrors.Errorf("getting state tree: %w", err) @@ -792,7 +792,7 @@ func UpgradeRefuel(ctx context.Context, sm *StateManager, _ MigrationCache, cb E } func UpgradeActorsV2(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { - buf := blockstore.NewTieredBstore(sm.cs.Blockstore(), blockstore.NewMemorySync()) + buf := blockstore.NewTieredBstore(sm.cs.StateBlockstore(), blockstore.NewMemorySync()) store := store.ActorStore(ctx, buf) info, err := store.Put(ctx, new(types.StateInfo0)) @@ -843,7 +843,7 @@ func UpgradeLiftoff(ctx context.Context, sm *StateManager, _ MigrationCache, cb return cid.Undef, xerrors.Errorf("getting state tree: %w", err) } - err = setNetworkName(ctx, sm.cs.Store(ctx), tree, "mainnet") + err = setNetworkName(ctx, sm.cs.ActorStore(ctx), tree, "mainnet") if err != nil { return cid.Undef, xerrors.Errorf("setting network name: %w", err) } @@ -852,7 +852,7 @@ func UpgradeLiftoff(ctx context.Context, sm *StateManager, _ MigrationCache, cb } func UpgradeCalico(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { - store := sm.cs.Store(ctx) + store := sm.cs.ActorStore(ctx) var stateRoot types.StateRoot if err := store.Get(ctx, root, &stateRoot); err != nil { return cid.Undef, xerrors.Errorf("failed to decode state root: %w", err) @@ -1009,7 +1009,7 @@ func upgradeActorsV3Common( root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet, config nv10.Config, ) (cid.Cid, error) { - buf := blockstore.NewTieredBstore(sm.cs.Blockstore(), blockstore.NewMemorySync()) + buf := blockstore.NewTieredBstore(sm.cs.StateBlockstore(), blockstore.NewMemorySync()) store := store.ActorStore(ctx, buf) // Load the state root. @@ -1239,7 +1239,7 @@ func resetGenesisMsigs0(ctx context.Context, sm *StateManager, store adt0.Store, return xerrors.Errorf("getting genesis tipset: %w", err) } - cst := cbor.NewCborStore(sm.cs.Blockstore()) + cst := cbor.NewCborStore(sm.cs.StateBlockstore()) genesisTree, err := state.LoadStateTree(cst, gts.ParentState()) if err != nil { return xerrors.Errorf("loading state tree: %w", err) diff --git a/chain/stmgr/forks_test.go b/chain/stmgr/forks_test.go index 95e7ef69900..e456dc436de 100644 --- a/chain/stmgr/forks_test.go +++ b/chain/stmgr/forks_test.go @@ -125,7 +125,7 @@ func TestForkHeightTriggers(t *testing.T) { Height: testForkHeight, Migration: func(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback, root cid.Cid, height abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { - cst := ipldcbor.NewCborStore(sm.ChainStore().Blockstore()) + cst := ipldcbor.NewCborStore(sm.ChainStore().StateBlockstore()) st, err := sm.StateTree(root) if err != nil { diff --git a/chain/stmgr/read.go b/chain/stmgr/read.go index 9a9b8026576..3c7fb5d91e8 100644 --- a/chain/stmgr/read.go +++ b/chain/stmgr/read.go @@ -22,7 +22,7 @@ func (sm *StateManager) ParentStateTsk(tsk types.TipSetKey) (*state.StateTree, e } func (sm *StateManager) ParentState(ts *types.TipSet) (*state.StateTree, error) { - cst := cbor.NewCborStore(sm.cs.Blockstore()) + cst := cbor.NewCborStore(sm.cs.StateBlockstore()) state, err := state.LoadStateTree(cst, sm.parentState(ts)) if err != nil { return nil, xerrors.Errorf("load state tree: %w", err) @@ -32,7 +32,7 @@ func (sm *StateManager) ParentState(ts *types.TipSet) (*state.StateTree, error) } func (sm *StateManager) StateTree(st cid.Cid) (*state.StateTree, error) { - cst := cbor.NewCborStore(sm.cs.Blockstore()) + cst := cbor.NewCborStore(sm.cs.StateBlockstore()) state, err := state.LoadStateTree(cst, st) if err != nil { return nil, xerrors.Errorf("load state tree: %w", err) diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index 73088ba2a8d..d6c98428052 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -286,7 +286,7 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp StateBase: base, Epoch: epoch, Rand: r, - Bstore: sm.cs.Blockstore(), + Bstore: sm.cs.StateBlockstore(), Syscalls: sm.cs.VMSys(), CircSupplyCalc: sm.GetVMCirculatingSupply, NtwkVersion: sm.GetNtwkVersion, @@ -430,7 +430,8 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp return cid.Cid{}, cid.Cid{}, err } - rectarr := blockadt.MakeEmptyArray(sm.cs.Store(ctx)) + // XXX: Is the height correct? Or should it be epoch-1? + rectarr := blockadt.MakeEmptyArray(sm.cs.ActorStore(ctx)) for i, receipt := range receipts { if err := rectarr.Set(uint64(i), receipt); err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("failed to build receipts amt: %w", err) @@ -515,7 +516,7 @@ func (sm *StateManager) ResolveToKeyAddress(ctx context.Context, addr address.Ad ts = sm.cs.GetHeaviestTipSet() } - cst := cbor.NewCborStore(sm.cs.Blockstore()) + cst := cbor.NewCborStore(sm.cs.StateBlockstore()) // First try to resolve the actor in the parent state, so we don't have to compute anything. tree, err := state.LoadStateTree(cst, ts.ParentState()) @@ -556,7 +557,7 @@ func (sm *StateManager) GetBlsPublicKey(ctx context.Context, addr address.Addres } func (sm *StateManager) LookupID(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) { - cst := cbor.NewCborStore(sm.cs.Blockstore()) + cst := cbor.NewCborStore(sm.cs.StateBlockstore()) state, err := state.LoadStateTree(cst, sm.parentState(ts)) if err != nil { return address.Undef, xerrors.Errorf("load state tree: %w", err) @@ -882,7 +883,7 @@ func (sm *StateManager) MarketBalance(ctx context.Context, addr address.Address, return api.MarketBalance{}, err } - mstate, err := market.Load(sm.cs.Store(ctx), act) + mstate, err := market.Load(sm.cs.ActorStore(ctx), act) if err != nil { return api.MarketBalance{}, err } @@ -966,7 +967,7 @@ func (sm *StateManager) setupGenesisVestingSchedule(ctx context.Context) error { return xerrors.Errorf("getting genesis tipset state: %w", err) } - cst := cbor.NewCborStore(sm.cs.Blockstore()) + cst := cbor.NewCborStore(sm.cs.StateBlockstore()) sTree, err := state.LoadStateTree(cst, st) if err != nil { return xerrors.Errorf("loading state tree: %w", err) @@ -1325,7 +1326,7 @@ func (sm *StateManager) GetCirculatingSupply(ctx context.Context, height abi.Cha unCirc = big.Add(unCirc, actor.Balance) case a == market.Address: - mst, err := market.Load(sm.cs.Store(ctx), actor) + mst, err := market.Load(sm.cs.ActorStore(ctx), actor) if err != nil { return err } @@ -1342,7 +1343,7 @@ func (sm *StateManager) GetCirculatingSupply(ctx context.Context, height abi.Cha circ = big.Add(circ, actor.Balance) case builtin.IsStorageMinerActor(actor.Code): - mst, err := miner.Load(sm.cs.Store(ctx), actor) + mst, err := miner.Load(sm.cs.ActorStore(ctx), actor) if err != nil { return err } @@ -1359,7 +1360,7 @@ func (sm *StateManager) GetCirculatingSupply(ctx context.Context, height abi.Cha } case builtin.IsMultisigActor(actor.Code): - mst, err := multisig.Load(sm.cs.Store(ctx), actor) + mst, err := multisig.Load(sm.cs.ActorStore(ctx), actor) if err != nil { return err } @@ -1413,7 +1414,7 @@ func (sm *StateManager) GetPaychState(ctx context.Context, addr address.Address, return nil, nil, err } - actState, err := paych.Load(sm.cs.Store(ctx), act) + actState, err := paych.Load(sm.cs.ActorStore(ctx), act) if err != nil { return nil, nil, err } @@ -1431,7 +1432,7 @@ func (sm *StateManager) GetMarketState(ctx context.Context, ts *types.TipSet) (m return nil, err } - actState, err := market.Load(sm.cs.Store(ctx), act) + actState, err := market.Load(sm.cs.ActorStore(ctx), act) if err != nil { return nil, err } diff --git a/chain/stmgr/utils.go b/chain/stmgr/utils.go index 86bb3a6e093..947310c7569 100644 --- a/chain/stmgr/utils.go +++ b/chain/stmgr/utils.go @@ -48,7 +48,7 @@ func GetNetworkName(ctx context.Context, sm *StateManager, st cid.Cid) (dtypes.N if err != nil { return "", err } - ias, err := init_.Load(sm.cs.Store(ctx), act) + ias, err := init_.Load(sm.cs.ActorStore(ctx), act) if err != nil { return "", err } @@ -65,7 +65,7 @@ func GetMinerWorkerRaw(ctx context.Context, sm *StateManager, st cid.Cid, maddr if err != nil { return address.Undef, xerrors.Errorf("(get sset) failed to load miner actor: %w", err) } - mas, err := miner.Load(sm.cs.Store(ctx), act) + mas, err := miner.Load(sm.cs.ActorStore(ctx), act) if err != nil { return address.Undef, xerrors.Errorf("(get sset) failed to load miner actor state: %w", err) } @@ -75,7 +75,7 @@ func GetMinerWorkerRaw(ctx context.Context, sm *StateManager, st cid.Cid, maddr return address.Undef, xerrors.Errorf("failed to load actor info: %w", err) } - return vm.ResolveToKeyAddr(state, sm.cs.Store(ctx), info.Worker) + return vm.ResolveToKeyAddr(state, sm.cs.ActorStore(ctx), info.Worker) } func GetPower(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) (power.Claim, power.Claim, bool, error) { @@ -88,7 +88,7 @@ func GetPowerRaw(ctx context.Context, sm *StateManager, st cid.Cid, maddr addres return power.Claim{}, power.Claim{}, false, xerrors.Errorf("(get sset) failed to load power actor state: %w", err) } - pas, err := power.Load(sm.cs.Store(ctx), act) + pas, err := power.Load(sm.cs.ActorStore(ctx), act) if err != nil { return power.Claim{}, power.Claim{}, false, err } @@ -123,7 +123,7 @@ func PreCommitInfo(ctx context.Context, sm *StateManager, maddr address.Address, return nil, xerrors.Errorf("(get sset) failed to load miner actor: %w", err) } - mas, err := miner.Load(sm.cs.Store(ctx), act) + mas, err := miner.Load(sm.cs.ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("(get sset) failed to load miner actor state: %w", err) } @@ -137,7 +137,7 @@ func MinerSectorInfo(ctx context.Context, sm *StateManager, maddr address.Addres return nil, xerrors.Errorf("(get sset) failed to load miner actor: %w", err) } - mas, err := miner.Load(sm.cs.Store(ctx), act) + mas, err := miner.Load(sm.cs.ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("(get sset) failed to load miner actor state: %w", err) } @@ -151,7 +151,7 @@ func GetSectorsForWinningPoSt(ctx context.Context, nv network.Version, pv ffiwra return nil, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(sm.cs.Store(ctx), act) + mas, err := miner.Load(sm.cs.ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -249,7 +249,7 @@ func GetMinerSlashed(ctx context.Context, sm *StateManager, ts *types.TipSet, ma return false, xerrors.Errorf("failed to load power actor: %w", err) } - spas, err := power.Load(sm.cs.Store(ctx), act) + spas, err := power.Load(sm.cs.ActorStore(ctx), act) if err != nil { return false, xerrors.Errorf("failed to load power actor state: %w", err) } @@ -272,7 +272,7 @@ func GetStorageDeal(ctx context.Context, sm *StateManager, dealID abi.DealID, ts return nil, xerrors.Errorf("failed to load market actor: %w", err) } - state, err := market.Load(sm.cs.Store(ctx), act) + state, err := market.Load(sm.cs.ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load market actor state: %w", err) } @@ -320,7 +320,7 @@ func ListMinerActors(ctx context.Context, sm *StateManager, ts *types.TipSet) ([ return nil, xerrors.Errorf("failed to load power actor: %w", err) } - powState, err := power.Load(sm.cs.Store(ctx), act) + powState, err := power.Load(sm.cs.ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load power actor state: %w", err) } @@ -353,7 +353,7 @@ func ComputeState(ctx context.Context, sm *StateManager, height abi.ChainEpoch, StateBase: base, Epoch: height, Rand: r, - Bstore: sm.cs.Blockstore(), + Bstore: sm.cs.StateBlockstore(), Syscalls: sm.cs.VMSys(), CircSupplyCalc: sm.GetVMCirculatingSupply, NtwkVersion: sm.GetNtwkVersion, @@ -474,7 +474,7 @@ func MinerGetBaseInfo(ctx context.Context, sm *StateManager, bcs beacon.Schedule return nil, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(sm.cs.Store(ctx), act) + mas, err := miner.Load(sm.cs.ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -623,7 +623,7 @@ func minerHasMinPower(ctx context.Context, sm *StateManager, addr address.Addres return false, xerrors.Errorf("loading power actor state: %w", err) } - ps, err := power.Load(sm.cs.Store(ctx), pact) + ps, err := power.Load(sm.cs.ActorStore(ctx), pact) if err != nil { return false, err } @@ -654,7 +654,7 @@ func MinerEligibleToMine(ctx context.Context, sm *StateManager, addr address.Add return false, xerrors.Errorf("loading power actor state: %w", err) } - pstate, err := power.Load(sm.cs.Store(ctx), pact) + pstate, err := power.Load(sm.cs.ActorStore(ctx), pact) if err != nil { return false, err } @@ -664,7 +664,7 @@ func MinerEligibleToMine(ctx context.Context, sm *StateManager, addr address.Add return false, xerrors.Errorf("loading miner actor state: %w", err) } - mstate, err := miner.Load(sm.cs.Store(ctx), mact) + mstate, err := miner.Load(sm.cs.ActorStore(ctx), mact) if err != nil { return false, err } @@ -696,7 +696,7 @@ func MinerEligibleToMine(ctx context.Context, sm *StateManager, addr address.Add } func CheckTotalFIL(ctx context.Context, sm *StateManager, ts *types.TipSet) (abi.TokenAmount, error) { - str, err := state.LoadStateTree(sm.ChainStore().Store(ctx), ts.ParentState()) + str, err := state.LoadStateTree(sm.ChainStore().ActorStore(ctx), ts.ParentState()) if err != nil { return abi.TokenAmount{}, err } diff --git a/chain/store/store.go b/chain/store/store.go index 8cbd5da375c..1244995fcbb 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -107,11 +107,11 @@ type HeadChangeEvt struct { // 1. a tipset cache // 2. a block => messages references cache. type ChainStore struct { - bs bstore.Blockstore - localbs bstore.Blockstore - ds dstore.Batching + chainBlockstore bstore.Blockstore + stateBlockstore bstore.Blockstore + metadataDs dstore.Batching - localviewer bstore.Viewer + chainLocalBlockstore bstore.Blockstore heaviestLk sync.Mutex heaviest *types.TipSet @@ -139,30 +139,30 @@ type ChainStore struct { wg sync.WaitGroup } -// localbs is guaranteed to fail Get* if requested block isn't stored locally -func NewChainStore(bs bstore.Blockstore, localbs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore { - mmCache, _ := lru.NewARC(DefaultMsgMetaCacheSize) - tsCache, _ := lru.NewARC(DefaultTipSetCacheSize) +// chainLocalBlockstore is guaranteed to fail Get* if requested block isn't stored locally +func NewChainStore(chainBs bstore.Blockstore, stateBs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore { + c, _ := lru.NewARC(DefaultMsgMetaCacheSize) + tsc, _ := lru.NewARC(DefaultTipSetCacheSize) if j == nil { j = journal.NilJournal() } ctx, cancel := context.WithCancel(context.Background()) + // unwraps the fallback store in case one is configured. + // some methods _need_ to operate on a local blockstore only. + localbs, _ := bstore.UnwrapFallbackStore(chainBs) cs := &ChainStore{ - bs: bs, - localbs: localbs, - ds: ds, - bestTips: pubsub.New(64), - tipsets: make(map[abi.ChainEpoch][]cid.Cid), - mmCache: mmCache, - tsCache: tsCache, - vmcalls: vmcalls, - cancelFn: cancel, - journal: j, - } - - if v, ok := localbs.(bstore.Viewer); ok { - cs.localviewer = v + chainBlockstore: chainBs, + stateBlockstore: stateBs, + chainLocalBlockstore: localbs, + metadataDs: ds, + bestTips: pubsub.New(64), + tipsets: make(map[abi.ChainEpoch][]cid.Cid), + mmCache: c, + tsCache: tsc, + vmcalls: vmcalls, + cancelFn: cancel, + journal: j, } cs.evtTypes = [1]journal.EventType{ @@ -216,7 +216,7 @@ func (cs *ChainStore) Close() error { } func (cs *ChainStore) Load() error { - head, err := cs.ds.Get(chainHeadKey) + head, err := cs.metadataDs.Get(chainHeadKey) if err == dstore.ErrNotFound { log.Warn("no previous chain state found") return nil @@ -246,7 +246,7 @@ func (cs *ChainStore) writeHead(ts *types.TipSet) error { return xerrors.Errorf("failed to marshal tipset: %w", err) } - if err := cs.ds.Put(chainHeadKey, data); err != nil { + if err := cs.metadataDs.Put(chainHeadKey, data); err != nil { return xerrors.Errorf("failed to write chain head to datastore: %w", err) } @@ -306,13 +306,13 @@ func (cs *ChainStore) SubscribeHeadChanges(f ReorgNotifee) { func (cs *ChainStore) IsBlockValidated(ctx context.Context, blkid cid.Cid) (bool, error) { key := blockValidationCacheKeyPrefix.Instance(blkid.String()) - return cs.ds.Has(key) + return cs.metadataDs.Has(key) } func (cs *ChainStore) MarkBlockAsValidated(ctx context.Context, blkid cid.Cid) error { key := blockValidationCacheKeyPrefix.Instance(blkid.String()) - if err := cs.ds.Put(key, []byte{0}); err != nil { + if err := cs.metadataDs.Put(key, []byte{0}); err != nil { return xerrors.Errorf("cache block validation: %w", err) } @@ -322,7 +322,7 @@ func (cs *ChainStore) MarkBlockAsValidated(ctx context.Context, blkid cid.Cid) e func (cs *ChainStore) UnmarkBlockAsValidated(ctx context.Context, blkid cid.Cid) error { key := blockValidationCacheKeyPrefix.Instance(blkid.String()) - if err := cs.ds.Delete(key); err != nil { + if err := cs.metadataDs.Delete(key); err != nil { return xerrors.Errorf("removing from valid block cache: %w", err) } @@ -339,7 +339,7 @@ func (cs *ChainStore) SetGenesis(b *types.BlockHeader) error { return err } - return cs.ds.Put(dstore.NewKey("0"), b.Cid().Bytes()) + return cs.metadataDs.Put(dstore.NewKey("0"), b.Cid().Bytes()) } func (cs *ChainStore) PutTipSet(ctx context.Context, ts *types.TipSet) error { @@ -594,7 +594,7 @@ func (cs *ChainStore) takeHeaviestTipSet(ctx context.Context, ts *types.TipSet) // FlushValidationCache removes all results of block validation from the // chain metadata store. Usually the first step after a new chain import. func (cs *ChainStore) FlushValidationCache() error { - return FlushValidationCache(cs.ds) + return FlushValidationCache(cs.metadataDs) } func FlushValidationCache(ds datastore.Batching) error { @@ -653,7 +653,7 @@ func (cs *ChainStore) SetHead(ts *types.TipSet) error { // Contains returns whether our BlockStore has all blocks in the supplied TipSet. func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) { for _, c := range ts.Cids() { - has, err := cs.bs.Has(c) + has, err := cs.chainBlockstore.Has(c) if err != nil { return false, err } @@ -668,16 +668,8 @@ func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) { // GetBlock fetches a BlockHeader with the supplied CID. It returns // blockstore.ErrNotFound if the block was not found in the BlockStore. func (cs *ChainStore) GetBlock(c cid.Cid) (*types.BlockHeader, error) { - if cs.localviewer == nil { - sb, err := cs.localbs.Get(c) - if err != nil { - return nil, err - } - return types.DecodeBlock(sb.RawData()) - } - var blk *types.BlockHeader - err := cs.localviewer.View(c, func(b []byte) (err error) { + err := cs.chainLocalBlockstore.View(c, func(b []byte) (err error) { blk, err = types.DecodeBlock(b) return err }) @@ -851,7 +843,7 @@ func (cs *ChainStore) PersistBlockHeaders(b ...*types.BlockHeader) error { end = len(b) } - err = multierr.Append(err, cs.bs.PutMany(sbs[start:end])) + err = multierr.Append(err, cs.chainLocalBlockstore.PutMany(sbs[start:end])) } return err @@ -875,7 +867,7 @@ func PutMessage(bs bstore.Blockstore, m storable) (cid.Cid, error) { } func (cs *ChainStore) PutMessage(m storable) (cid.Cid, error) { - return PutMessage(cs.bs, m) + return PutMessage(cs.chainBlockstore, m) } func (cs *ChainStore) expandTipset(b *types.BlockHeader) (*types.TipSet, error) { @@ -936,7 +928,7 @@ func (cs *ChainStore) AddBlock(ctx context.Context, b *types.BlockHeader) error } func (cs *ChainStore) GetGenesis() (*types.BlockHeader, error) { - data, err := cs.ds.Get(dstore.NewKey("0")) + data, err := cs.metadataDs.Get(dstore.NewKey("0")) if err != nil { return nil, err } @@ -962,17 +954,8 @@ func (cs *ChainStore) GetCMessage(c cid.Cid) (types.ChainMsg, error) { } func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) { - if cs.localviewer == nil { - sb, err := cs.localbs.Get(c) - if err != nil { - log.Errorf("get message get failed: %s: %s", c, err) - return nil, err - } - return types.DecodeMessage(sb.RawData()) - } - var msg *types.Message - err := cs.localviewer.View(c, func(b []byte) (err error) { + err := cs.chainLocalBlockstore.View(c, func(b []byte) (err error) { msg, err = types.DecodeMessage(b) return err }) @@ -980,17 +963,8 @@ func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) { } func (cs *ChainStore) GetSignedMessage(c cid.Cid) (*types.SignedMessage, error) { - if cs.localviewer == nil { - sb, err := cs.localbs.Get(c) - if err != nil { - log.Errorf("get message get failed: %s: %s", c, err) - return nil, err - } - return types.DecodeSignedMessage(sb.RawData()) - } - var msg *types.SignedMessage - err := cs.localviewer.View(c, func(b []byte) (err error) { + err := cs.chainLocalBlockstore.View(c, func(b []byte) (err error) { msg, err = types.DecodeSignedMessage(b) return err }) @@ -1000,7 +974,7 @@ func (cs *ChainStore) GetSignedMessage(c cid.Cid) (*types.SignedMessage, error) func (cs *ChainStore) readAMTCids(root cid.Cid) ([]cid.Cid, error) { ctx := context.TODO() // block headers use adt0, for now. - a, err := blockadt.AsArray(cs.Store(ctx), root) + a, err := blockadt.AsArray(cs.ActorStore(ctx), root) if err != nil { return nil, xerrors.Errorf("amt load: %w", err) } @@ -1124,7 +1098,7 @@ func (cs *ChainStore) ReadMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error) return mmcids.bls, mmcids.secpk, nil } - cst := cbor.NewCborStore(cs.localbs) + cst := cbor.NewCborStore(cs.chainLocalBlockstore) var msgmeta types.MsgMeta if err := cst.Get(context.TODO(), mmc, &msgmeta); err != nil { return nil, nil, xerrors.Errorf("failed to load msgmeta (%s): %w", mmc, err) @@ -1194,7 +1168,7 @@ func (cs *ChainStore) MessagesForBlock(b *types.BlockHeader) ([]*types.Message, func (cs *ChainStore) GetParentReceipt(b *types.BlockHeader, i int) (*types.MessageReceipt, error) { ctx := context.TODO() // block headers use adt0, for now. - a, err := blockadt.AsArray(cs.Store(ctx), b.ParentMessageReceipts) + a, err := blockadt.AsArray(cs.ActorStore(ctx), b.ParentMessageReceipts) if err != nil { return nil, xerrors.Errorf("amt load: %w", err) } @@ -1237,16 +1211,26 @@ func (cs *ChainStore) LoadSignedMessagesFromCids(cids []cid.Cid) ([]*types.Signe return msgs, nil } -func (cs *ChainStore) Blockstore() bstore.Blockstore { - return cs.bs +// ChainBlockstore returns the chain blockstore. Currently the chain and state +// // stores are both backed by the same physical store, albeit with different +// // caching policies, but in the future they will segregate. +func (cs *ChainStore) ChainBlockstore() bstore.Blockstore { + return cs.chainBlockstore +} + +// StateBlockstore returns the state blockstore. Currently the chain and state +// stores are both backed by the same physical store, albeit with different +// caching policies, but in the future they will segregate. +func (cs *ChainStore) StateBlockstore() bstore.Blockstore { + return cs.stateBlockstore } func ActorStore(ctx context.Context, bs bstore.Blockstore) adt.Store { return adt.WrapStore(ctx, cbor.NewCborStore(bs)) } -func (cs *ChainStore) Store(ctx context.Context) adt.Store { - return ActorStore(ctx, cs.bs) +func (cs *ChainStore) ActorStore(ctx context.Context) adt.Store { + return ActorStore(ctx, cs.stateBlockstore) } func (cs *ChainStore) VMSys() vm.SyscallBuilder { @@ -1444,8 +1428,8 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo return xerrors.Errorf("failed to write car header: %s", err) } - return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, func(c cid.Cid) error { - blk, err := cs.bs.Get(c) + return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, true, func(c cid.Cid) error { + blk, err := cs.chainBlockstore.Get(c) if err != nil { return xerrors.Errorf("writing object to car, bs.Get: %w", err) } @@ -1458,7 +1442,7 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo }) } -func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, cb func(cid.Cid) error) error { +func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs, skipMsgReceipts bool, cb func(cid.Cid) error) error { if ts == nil { ts = cs.GetHeaviestTipSet() } @@ -1478,7 +1462,7 @@ func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRe return err } - data, err := cs.bs.Get(blk) + data, err := cs.chainBlockstore.Get(blk) if err != nil { return xerrors.Errorf("getting block: %w", err) } @@ -1498,7 +1482,7 @@ func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRe var cids []cid.Cid if !skipOldMsgs || b.Height > ts.Height()-inclRecentRoots { if walked.Visit(b.Messages) { - mcids, err := recurseLinks(cs.bs, walked, b.Messages, []cid.Cid{b.Messages}) + mcids, err := recurseLinks(cs.chainBlockstore, walked, b.Messages, []cid.Cid{b.Messages}) if err != nil { return xerrors.Errorf("recursing messages failed: %w", err) } @@ -1519,13 +1503,17 @@ func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRe if b.Height == 0 || b.Height > ts.Height()-inclRecentRoots { if walked.Visit(b.ParentStateRoot) { - cids, err := recurseLinks(cs.bs, walked, b.ParentStateRoot, []cid.Cid{b.ParentStateRoot}) + cids, err := recurseLinks(cs.chainBlockstore, walked, b.ParentStateRoot, []cid.Cid{b.ParentStateRoot}) if err != nil { return xerrors.Errorf("recursing genesis state failed: %w", err) } out = append(out, cids...) } + + if !skipMsgReceipts && walked.Visit(b.ParentMessageReceipts) { + out = append(out, b.ParentMessageReceipts) + } } for _, c := range out { @@ -1561,7 +1549,7 @@ func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRe } func (cs *ChainStore) Import(r io.Reader) (*types.TipSet, error) { - header, err := car.LoadCar(cs.Blockstore(), r) + header, err := car.LoadCar(cs.StateBlockstore(), r) if err != nil { return nil, xerrors.Errorf("loadcar failed: %w", err) } diff --git a/chain/store/store_test.go b/chain/store/store_test.go index 9afe6ba7915..51e2e08d0c9 100644 --- a/chain/store/store_test.go +++ b/chain/store/store_test.go @@ -52,7 +52,7 @@ func BenchmarkGetRandomness(b *testing.B) { b.Fatal(err) } - bs, err := lr.Blockstore(context.TODO(), repo.BlockstoreChain) + bs, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore) if err != nil { b.Fatal(err) } diff --git a/chain/store/weight.go b/chain/store/weight.go index 9100df31547..42546d5e3d9 100644 --- a/chain/store/weight.go +++ b/chain/store/weight.go @@ -28,7 +28,7 @@ func (cs *ChainStore) Weight(ctx context.Context, ts *types.TipSet) (types.BigIn tpow := big2.Zero() { - cst := cbor.NewCborStore(cs.Blockstore()) + cst := cbor.NewCborStore(cs.StateBlockstore()) state, err := state.LoadStateTree(cst, ts.ParentState()) if err != nil { return types.NewInt(0), xerrors.Errorf("load state tree: %w", err) @@ -39,7 +39,7 @@ func (cs *ChainStore) Weight(ctx context.Context, ts *types.TipSet) (types.BigIn return types.NewInt(0), xerrors.Errorf("get power actor: %w", err) } - powState, err := power.Load(cs.Store(ctx), act) + powState, err := power.Load(cs.ActorStore(ctx), act) if err != nil { return types.NewInt(0), xerrors.Errorf("failed to load power actor state: %w", err) } diff --git a/chain/sync.go b/chain/sync.go index 1743a303322..88237eb5ab1 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -354,7 +354,7 @@ func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error { } // Finally, flush. - return vm.Copy(context.TODO(), blockstore, syncer.store.Blockstore(), smroot) + return vm.Copy(context.TODO(), blockstore, syncer.store.ChainBlockstore(), smroot) } func (syncer *Syncer) LocalPeer() peer.ID { @@ -640,7 +640,7 @@ func (syncer *Syncer) minerIsValid(ctx context.Context, maddr address.Address, b return xerrors.Errorf("failed to load power actor: %w", err) } - powState, err := power.Load(syncer.store.Store(ctx), act) + powState, err := power.Load(syncer.store.ActorStore(ctx), act) if err != nil { return xerrors.Errorf("failed to load power actor state: %w", err) } @@ -1055,7 +1055,7 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock return err } - st, err := state.LoadStateTree(syncer.store.Store(ctx), stateroot) + st, err := state.LoadStateTree(syncer.store.ActorStore(ctx), stateroot) if err != nil { return xerrors.Errorf("failed to load base state tree: %w", err) } @@ -1172,7 +1172,7 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock } // Finally, flush. - return vm.Copy(ctx, tmpbs, syncer.store.Blockstore(), mrcid) + return vm.Copy(ctx, tmpbs, syncer.store.ChainBlockstore(), mrcid) } func (syncer *Syncer) verifyBlsAggregate(ctx context.Context, sig *crypto.Signature, msgs []cid.Cid, pubks [][]byte) error { @@ -1574,7 +1574,7 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS return err } - if err := copyBlockstore(ctx, bs, syncer.store.Blockstore()); err != nil { + if err := copyBlockstore(ctx, bs, syncer.store.ChainBlockstore()); err != nil { return xerrors.Errorf("message processing failed: %w", err) } } diff --git a/cmd/lotus-bench/import.go b/cmd/lotus-bench/import.go index 1ded9b30a74..4b464bebeb1 100644 --- a/cmd/lotus-bench/import.go +++ b/cmd/lotus-bench/import.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/bloom" "github.com/ipfs/go-cid" - metricsi "github.com/ipfs/go-metrics-interface" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -204,7 +203,7 @@ var importBenchCmd = &cli.Command{ case cctx.Bool("use-native-badger"): log.Info("using native badger") var opts badgerbs.Options - if opts, err = repo.BadgerBlockstoreOptions(repo.BlockstoreChain, tdir, false); err != nil { + if opts, err = repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, tdir, false); err != nil { return err } opts.SyncWrites = false @@ -236,14 +235,6 @@ var importBenchCmd = &cli.Command{ defer c.Close() //nolint:errcheck } - ctx := metricsi.CtxScope(context.Background(), "lotus") - cacheOpts := blockstore.DefaultCacheOpts() - cacheOpts.HasBloomFilterSize = 0 - bs, err = blockstore.CachedBlockstore(ctx, bs, cacheOpts) - if err != nil { - return err - } - var verifier ffiwrapper.Verifier = ffiwrapper.ProofVerifier if cctx.IsSet("syscall-cache") { scds, err := badger.NewDatastore(cctx.String("syscall-cache"), &badger.DefaultOptions) @@ -267,6 +258,15 @@ var importBenchCmd = &cli.Command{ stm := stmgr.NewStateManager(cs) + var carFile *os.File + // open the CAR file if one is provided. + if path := cctx.String("car"); path != "" { + var err error + if carFile, err = os.Open(path); err != nil { + return xerrors.Errorf("failed to open provided CAR file: %w", err) + } + } + startTime := time.Now() // register a gauge that reports how long since the measurable @@ -308,18 +308,7 @@ var importBenchCmd = &cli.Command{ writeProfile("allocs") }() - var carFile *os.File - - // open the CAR file if one is provided. - if path := cctx.String("car"); path != "" { - var err error - if carFile, err = os.Open(path); err != nil { - return xerrors.Errorf("failed to open provided CAR file: %w", err) - } - } - var head *types.TipSet - // --- IMPORT --- if !cctx.Bool("no-import") { if cctx.Bool("global-profile") { diff --git a/cmd/lotus-shed/balances.go b/cmd/lotus-shed/balances.go index 140effb3d83..8c5bfefb8d6 100644 --- a/cmd/lotus-shed/balances.go +++ b/cmd/lotus-shed/balances.go @@ -175,7 +175,7 @@ var chainBalanceStateCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.BlockstoreChain) + bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } @@ -396,7 +396,7 @@ var chainPledgeCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.BlockstoreChain) + bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return xerrors.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/datastore.go b/cmd/lotus-shed/datastore.go index 1189b5a3a35..1086e8260bc 100644 --- a/cmd/lotus-shed/datastore.go +++ b/cmd/lotus-shed/datastore.go @@ -319,7 +319,7 @@ var datastoreRewriteCmd = &cli.Command{ ) // open the destination (to) store. - opts, err := repo.BadgerBlockstoreOptions(repo.BlockstoreChain, toPath, false) + opts, err := repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, toPath, false) if err != nil { return xerrors.Errorf("failed to get badger options: %w", err) } @@ -329,7 +329,7 @@ var datastoreRewriteCmd = &cli.Command{ } // open the source (from) store. - opts, err = repo.BadgerBlockstoreOptions(repo.BlockstoreChain, fromPath, true) + opts, err = repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, fromPath, true) if err != nil { return xerrors.Errorf("failed to get badger options: %w", err) } diff --git a/cmd/lotus-shed/export.go b/cmd/lotus-shed/export.go index 4820381b5c6..e711ba2bb05 100644 --- a/cmd/lotus-shed/export.go +++ b/cmd/lotus-shed/export.go @@ -72,7 +72,7 @@ var exportChainCmd = &cli.Command{ defer fi.Close() //nolint:errcheck - bs, err := lr.Blockstore(ctx, repo.BlockstoreChain) + bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/import-car.go b/cmd/lotus-shed/import-car.go index 7f3fa7c8953..4e465029f2d 100644 --- a/cmd/lotus-shed/import-car.go +++ b/cmd/lotus-shed/import-car.go @@ -47,7 +47,7 @@ var importCarCmd = &cli.Command{ return xerrors.Errorf("opening the car file: %w", err) } - bs, err := lr.Blockstore(ctx, repo.BlockstoreChain) + bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return err } @@ -118,7 +118,7 @@ var importObjectCmd = &cli.Command{ } defer lr.Close() //nolint:errcheck - bs, err := lr.Blockstore(ctx, repo.BlockstoreChain) + bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/pruning.go b/cmd/lotus-shed/pruning.go index aea548bbe3d..1afe76c4d38 100644 --- a/cmd/lotus-shed/pruning.go +++ b/cmd/lotus-shed/pruning.go @@ -131,7 +131,7 @@ var stateTreePruneCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.BlockstoreChain) + bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } @@ -191,7 +191,7 @@ var stateTreePruneCmd = &cli.Command{ rrLb := abi.ChainEpoch(cctx.Int64("keep-from-lookback")) - if err := cs.WalkSnapshot(ctx, ts, rrLb, true, func(c cid.Cid) error { + if err := cs.WalkSnapshot(ctx, ts, rrLb, true, true, func(c cid.Cid) error { if goodSet.Len()%20 == 0 { fmt.Printf("\renumerating keep set: %d ", goodSet.Len()) } diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 4226c33f775..5546ac37699 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -432,7 +432,7 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool) } defer lr.Close() //nolint:errcheck - bs, err := lr.Blockstore(ctx, repo.BlockstoreChain) + bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return xerrors.Errorf("failed to open blockstore: %w", err) } diff --git a/documentation/en/architecture/architecture.md b/documentation/en/architecture/architecture.md index 5a9eee3c275..64914d53996 100644 --- a/documentation/en/architecture/architecture.md +++ b/documentation/en/architecture/architecture.md @@ -311,7 +311,7 @@ FIXME: Maybe mention the `Batching` interface as the developer will stumble upon FIXME: IPFS blocks vs Filecoin blocks ideally happens before this / here -The [`Blockstore` interface](`github.com/filecoin-project/lotus/lib/blockstore.go`) structures the key-value pair +The [`Blockstore` interface](`github.com/filecoin-project/lotus/blockstore/blockstore.go`) structures the key-value pair into the CID format for the key and the [`Block` interface](`github.com/ipfs/go-block-format/blocks.go`) for the value. The `Block` value is just a raw string of bytes addressed by its hash, which is included in the CID key. diff --git a/go.mod b/go.mod index 93af360f8ef..5f11ea0a204 100644 --- a/go.mod +++ b/go.mod @@ -143,7 +143,7 @@ require ( go.uber.org/zap v1.16.0 golang.org/x/net v0.0.0-20201021035429-f5854403a974 golang.org/x/sync v0.0.0-20201207232520-09787c993a3a - golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f + golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect diff --git a/go.sum b/go.sum index 4d0ecd0e479..c1e5494ee43 100644 --- a/go.sum +++ b/go.sum @@ -1754,8 +1754,9 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200812155832-6a926be9bd1d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200926100807-9d91bd62050c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/metrics/metrics.go b/metrics/metrics.go index 996fa95b90c..cb909d63919 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -9,10 +9,12 @@ import ( "go.opencensus.io/tag" rpcmetrics "github.com/filecoin-project/go-jsonrpc/metrics" + + "github.com/filecoin-project/lotus/blockstore" ) // Distribution -var defaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000) +var defaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 3000, 4000, 5000, 7500, 10000, 20000, 50000, 100000) // Global Tags var ( @@ -179,33 +181,37 @@ var ( ) // DefaultViews is an array of OpenCensus views for metric gathering purposes -var DefaultViews = append([]*view.View{ - InfoView, - ChainNodeHeightView, - ChainNodeHeightExpectedView, - ChainNodeWorkerHeightView, - BlockReceivedView, - BlockValidationFailureView, - BlockValidationSuccessView, - BlockValidationDurationView, - BlockDelayView, - MessagePublishedView, - MessageReceivedView, - MessageValidationFailureView, - MessageValidationSuccessView, - PeerCountView, - PubsubPublishMessageView, - PubsubDeliverMessageView, - PubsubRejectMessageView, - PubsubDuplicateMessageView, - PubsubRecvRPCView, - PubsubSendRPCView, - PubsubDropRPCView, - APIRequestDurationView, - VMFlushCopyCountView, - VMFlushCopyDurationView, -}, - rpcmetrics.DefaultViews...) +var DefaultViews = func() []*view.View { + views := []*view.View{ + InfoView, + ChainNodeHeightView, + ChainNodeHeightExpectedView, + ChainNodeWorkerHeightView, + BlockReceivedView, + BlockValidationFailureView, + BlockValidationSuccessView, + BlockValidationDurationView, + BlockDelayView, + MessagePublishedView, + MessageReceivedView, + MessageValidationFailureView, + MessageValidationSuccessView, + PeerCountView, + PubsubPublishMessageView, + PubsubDeliverMessageView, + PubsubRejectMessageView, + PubsubDuplicateMessageView, + PubsubRecvRPCView, + PubsubSendRPCView, + PubsubDropRPCView, + APIRequestDurationView, + VMFlushCopyCountView, + VMFlushCopyDurationView, + } + views = append(views, blockstore.DefaultViews...) + views = append(views, rpcmetrics.DefaultViews...) + return views +}() // SinceInMilliseconds returns the duration of time since the provide time as a float64. func SinceInMilliseconds(startTime time.Time) float64 { diff --git a/node/builder.go b/node/builder.go index 0766d934afe..b9f2e85bf57 100644 --- a/node/builder.go +++ b/node/builder.go @@ -145,7 +145,7 @@ const ( HeadMetricsKey SettlePaymentChannelsKey RunPeerTaggerKey - SetupFallbackBlockstoreKey + SetupFallbackBlockstoresKey SetApiEndpointKey @@ -590,12 +590,15 @@ func Repo(r repo.Repo) Option { Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing Override(new(dtypes.MetadataDS), modules.Datastore), - Override(new(dtypes.ChainRawBlockstore), modules.ChainRawBlockstore), - Override(new(dtypes.ChainBlockstore), From(new(dtypes.ChainRawBlockstore))), + Override(new(dtypes.UniversalBlockstore), modules.UniversalBlockstore), + Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore), + Override(new(dtypes.StateBlockstore), modules.StateBlockstore), + Override(new(dtypes.ExposedBlockstore), From(new(dtypes.UniversalBlockstore))), If(os.Getenv("LOTUS_ENABLE_CHAINSTORE_FALLBACK") == "1", Override(new(dtypes.ChainBlockstore), modules.FallbackChainBlockstore), - Override(SetupFallbackBlockstoreKey, modules.SetupFallbackBlockstore), + Override(new(dtypes.StateBlockstore), modules.FallbackStateBlockstore), + Override(SetupFallbackBlockstoresKey, modules.InitFallbackBlockstores), ), Override(new(dtypes.ClientImportMgr), modules.ClientImportMgr), diff --git a/node/impl/full/chain.go b/node/impl/full/chain.go index 46467a358f5..25d366a87b0 100644 --- a/node/impl/full/chain.go +++ b/node/impl/full/chain.go @@ -35,6 +35,7 @@ import ( "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/vm" + "github.com/filecoin-project/lotus/node/modules/dtypes" ) var log = logging.Logger("fullnode") @@ -57,6 +58,11 @@ type ChainModule struct { fx.In Chain *store.ChainStore + + // ExposedBlockstore is the global monolith blockstore that is safe to + // expose externally. In the future, this will be segregated into two + // blockstores. + ExposedBlockstore dtypes.ExposedBlockstore } var _ ChainModuleAPI = (*ChainModule)(nil) @@ -68,6 +74,11 @@ type ChainAPI struct { ChainModuleAPI Chain *store.ChainStore + + // ExposedBlockstore is the global monolith blockstore that is safe to + // expose externally. In the future, this will be segregated into two + // blockstores. + ExposedBlockstore dtypes.ExposedBlockstore } func (m *ChainModule) ChainNotify(ctx context.Context) (<-chan []*api.HeadChange, error) { @@ -212,7 +223,7 @@ func (m *ChainModule) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpo } func (m *ChainModule) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, error) { - blk, err := m.Chain.Blockstore().Get(obj) + blk, err := m.ExposedBlockstore.Get(obj) if err != nil { return nil, xerrors.Errorf("blockstore get: %w", err) } @@ -221,15 +232,15 @@ func (m *ChainModule) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, er } func (a *ChainAPI) ChainDeleteObj(ctx context.Context, obj cid.Cid) error { - return a.Chain.Blockstore().DeleteBlock(obj) + return a.ExposedBlockstore.DeleteBlock(obj) } func (m *ChainModule) ChainHasObj(ctx context.Context, obj cid.Cid) (bool, error) { - return m.Chain.Blockstore().Has(obj) + return m.ExposedBlockstore.Has(obj) } func (a *ChainAPI) ChainStatObj(ctx context.Context, obj cid.Cid, base cid.Cid) (api.ObjStat, error) { - bs := a.Chain.Blockstore() + bs := a.ExposedBlockstore bsvc := blockservice.New(bs, offline.Exchange(bs)) dag := merkledag.NewDAGService(bsvc) @@ -514,7 +525,7 @@ func (a *ChainAPI) ChainGetNode(ctx context.Context, p string) (*api.IpldObject, return nil, xerrors.Errorf("parsing path: %w", err) } - bs := a.Chain.Blockstore() + bs := a.ExposedBlockstore bsvc := blockservice.New(bs, offline.Exchange(bs)) dag := merkledag.NewDAGService(bsvc) diff --git a/node/impl/full/state.go b/node/impl/full/state.go index 0f5d16ab2a6..f09f484f719 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -97,7 +97,7 @@ func (a *StateAPI) StateMinerSectors(ctx context.Context, addr address.Address, return nil, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -111,7 +111,7 @@ func (a *StateAPI) StateMinerActiveSectors(ctx context.Context, maddr address.Ad return nil, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -135,7 +135,7 @@ func (m *StateModule) StateMinerInfo(ctx context.Context, actor address.Address, return miner.MinerInfo{}, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(m.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(m.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return miner.MinerInfo{}, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -153,7 +153,7 @@ func (a *StateAPI) StateMinerDeadlines(ctx context.Context, m address.Address, t return nil, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -192,7 +192,7 @@ func (a *StateAPI) StateMinerPartitions(ctx context.Context, m address.Address, return nil, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -253,7 +253,7 @@ func (m *StateModule) StateMinerProvingDeadline(ctx context.Context, addr addres return nil, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(m.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(m.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -272,7 +272,7 @@ func (a *StateAPI) StateMinerFaults(ctx context.Context, addr address.Address, t return bitfield.BitField{}, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return bitfield.BitField{}, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -329,7 +329,7 @@ func (a *StateAPI) StateMinerRecoveries(ctx context.Context, addr address.Addres return bitfield.BitField{}, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return bitfield.BitField{}, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -461,7 +461,7 @@ func (a *StateAPI) StateReadState(ctx context.Context, actor address.Address, ts return nil, xerrors.Errorf("getting actor: %w", err) } - blk, err := a.Chain.Blockstore().Get(act.Head) + blk, err := a.Chain.StateBlockstore().Get(act.Head) if err != nil { return nil, xerrors.Errorf("getting actor head: %w", err) } @@ -707,7 +707,7 @@ func (m *StateModule) StateMarketStorageDeal(ctx context.Context, dealId abi.Dea } func (a *StateAPI) StateChangedActors(ctx context.Context, old cid.Cid, new cid.Cid) (map[string]types.Actor, error) { - store := a.Chain.Store(ctx) + store := a.Chain.ActorStore(ctx) oldTree, err := state.LoadStateTree(store, old) if err != nil { @@ -727,7 +727,7 @@ func (a *StateAPI) StateMinerSectorCount(ctx context.Context, addr address.Addre if err != nil { return api.MinerSectors{}, err } - mas, err := miner.Load(a.Chain.Store(ctx), act) + mas, err := miner.Load(a.Chain.ActorStore(ctx), act) if err != nil { return api.MinerSectors{}, err } @@ -792,7 +792,7 @@ func (a *StateAPI) StateSectorExpiration(ctx context.Context, maddr address.Addr if err != nil { return nil, err } - mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return nil, err } @@ -804,7 +804,7 @@ func (a *StateAPI) StateSectorPartition(ctx context.Context, maddr address.Addre if err != nil { return nil, err } - mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return nil, err } @@ -890,7 +890,7 @@ func (m *StateModule) MsigGetAvailableBalance(ctx context.Context, addr address. if err != nil { return types.EmptyInt, xerrors.Errorf("failed to load multisig actor: %w", err) } - msas, err := multisig.Load(m.Chain.Store(ctx), act) + msas, err := multisig.Load(m.Chain.ActorStore(ctx), act) if err != nil { return types.EmptyInt, xerrors.Errorf("failed to load multisig actor state: %w", err) } @@ -912,7 +912,7 @@ func (a *StateAPI) MsigGetVestingSchedule(ctx context.Context, addr address.Addr return api.EmptyVesting, xerrors.Errorf("failed to load multisig actor: %w", err) } - msas, err := multisig.Load(a.Chain.Store(ctx), act) + msas, err := multisig.Load(a.Chain.ActorStore(ctx), act) if err != nil { return api.EmptyVesting, xerrors.Errorf("failed to load multisig actor state: %w", err) } @@ -961,7 +961,7 @@ func (m *StateModule) MsigGetVested(ctx context.Context, addr address.Address, s return types.EmptyInt, xerrors.Errorf("failed to load multisig actor at end epoch: %w", err) } - msas, err := multisig.Load(m.Chain.Store(ctx), act) + msas, err := multisig.Load(m.Chain.ActorStore(ctx), act) if err != nil { return types.EmptyInt, xerrors.Errorf("failed to load multisig actor state: %w", err) } @@ -989,7 +989,7 @@ func (m *StateModule) MsigGetPending(ctx context.Context, addr address.Address, if err != nil { return nil, xerrors.Errorf("failed to load multisig actor: %w", err) } - msas, err := multisig.Load(m.Chain.Store(ctx), act) + msas, err := multisig.Load(m.Chain.ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load multisig actor state: %w", err) } @@ -1032,7 +1032,7 @@ func (a *StateAPI) StateMinerPreCommitDepositForPower(ctx context.Context, maddr return types.EmptyInt, xerrors.Errorf("failed to get resolve size: %w", err) } - store := a.Chain.Store(ctx) + store := a.Chain.ActorStore(ctx) var sectorWeight abi.StoragePower if act, err := state.GetActor(market.Address); err != nil { @@ -1093,7 +1093,7 @@ func (a *StateAPI) StateMinerInitialPledgeCollateral(ctx context.Context, maddr return types.EmptyInt, xerrors.Errorf("failed to get resolve size: %w", err) } - store := a.Chain.Store(ctx) + store := a.Chain.ActorStore(ctx) var sectorWeight abi.StoragePower if act, err := state.GetActor(market.Address); err != nil { @@ -1164,7 +1164,7 @@ func (a *StateAPI) StateMinerAvailableBalance(ctx context.Context, maddr address return types.EmptyInt, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return types.EmptyInt, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -1193,7 +1193,7 @@ func (a *StateAPI) StateMinerSectorAllocated(ctx context.Context, maddr address. return false, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return false, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -1216,7 +1216,7 @@ func (a *StateAPI) StateVerifierStatus(ctx context.Context, addr address.Address return nil, err } - vrs, err := verifreg.Load(a.StateManager.ChainStore().Store(ctx), act) + vrs, err := verifreg.Load(a.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load verified registry state: %w", err) } @@ -1247,7 +1247,7 @@ func (m *StateModule) StateVerifiedClientStatus(ctx context.Context, addr addres return nil, err } - vrs, err := verifreg.Load(m.StateManager.ChainStore().Store(ctx), act) + vrs, err := verifreg.Load(m.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load verified registry state: %w", err) } @@ -1269,7 +1269,7 @@ func (a *StateAPI) StateVerifiedRegistryRootKey(ctx context.Context, tsk types.T return address.Undef, err } - vst, err := verifreg.Load(a.StateManager.ChainStore().Store(ctx), vact) + vst, err := verifreg.Load(a.StateManager.ChainStore().ActorStore(ctx), vact) if err != nil { return address.Undef, err } @@ -1298,12 +1298,12 @@ func (m *StateModule) StateDealProviderCollateralBounds(ctx context.Context, siz return api.DealCollateralBounds{}, xerrors.Errorf("failed to load reward actor: %w", err) } - pst, err := power.Load(m.StateManager.ChainStore().Store(ctx), pact) + pst, err := power.Load(m.StateManager.ChainStore().ActorStore(ctx), pact) if err != nil { return api.DealCollateralBounds{}, xerrors.Errorf("failed to load power actor state: %w", err) } - rst, err := reward.Load(m.StateManager.ChainStore().Store(ctx), ract) + rst, err := reward.Load(m.StateManager.ChainStore().ActorStore(ctx), ract) if err != nil { return api.DealCollateralBounds{}, xerrors.Errorf("failed to load reward actor state: %w", err) } diff --git a/node/modules/blockstore.go b/node/modules/blockstore.go new file mode 100644 index 00000000000..5b1d2ee6376 --- /dev/null +++ b/node/modules/blockstore.go @@ -0,0 +1,65 @@ +package modules + +import ( + "context" + "io" + + bstore "github.com/ipfs/go-ipfs-blockstore" + "go.uber.org/fx" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/modules/helpers" + "github.com/filecoin-project/lotus/node/repo" +) + +// UniversalBlockstore returns a single universal blockstore that stores both +// chain data and state data. +func UniversalBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.UniversalBlockstore, error) { + bs, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.UniversalBlockstore) + if err != nil { + return nil, err + } + if c, ok := bs.(io.Closer); ok { + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + return c.Close() + }, + }) + } + return bs, err +} + +// StateBlockstore is a hook to overlay caches for state objects, or in the +// future, to segregate the universal blockstore into different physical state +// and chain stores. +func StateBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.StateBlockstore, error) { + return bs, nil +} + +// ChainBlockstore is a hook to overlay caches for state objects, or in the +// future, to segregate the universal blockstore into different physical state +// and chain stores. +func ChainBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.ChainBlockstore, error) { + return bs, nil +} + +func FallbackChainBlockstore(cbs dtypes.ChainBlockstore) dtypes.ChainBlockstore { + return &blockstore.FallbackStore{Blockstore: cbs} +} + +func FallbackStateBlockstore(sbs dtypes.StateBlockstore) dtypes.StateBlockstore { + return &blockstore.FallbackStore{Blockstore: sbs} +} + +func InitFallbackBlockstores(cbs dtypes.ChainBlockstore, sbs dtypes.StateBlockstore, rem dtypes.ChainBitswap) error { + for _, bs := range []bstore.Blockstore{cbs, sbs} { + if fbs, ok := bs.(*blockstore.FallbackStore); ok { + fbs.SetFallback(rem.GetBlock) + continue + } + return xerrors.Errorf("expected a FallbackStore") + } + return nil +} diff --git a/node/modules/chain.go b/node/modules/chain.go index fcb5bea2168..029064b9769 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -1,25 +1,18 @@ package modules import ( - "bytes" "context" - "os" "time" "github.com/ipfs/go-bitswap" "github.com/ipfs/go-bitswap/network" "github.com/ipfs/go-blockservice" - "github.com/ipfs/go-datastore" - "github.com/ipld/go-car" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/routing" pubsub "github.com/libp2p/go-libp2p-pubsub" "go.uber.org/fx" "golang.org/x/xerrors" - "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" - "github.com/filecoin-project/lotus/journal" - "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain" @@ -29,14 +22,15 @@ import ( "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" - "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/vm" + "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" + "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" - "github.com/filecoin-project/lotus/node/repo" ) -func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs dtypes.ChainBlockstore) dtypes.ChainBitswap { +// ChainBitswap uses a blockstore that bypasses all caches. +func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs dtypes.ExposedBlockstore) dtypes.ChainBitswap { // prefix protocol for chain bitswap // (so bitswap uses /chain/ipfs/bitswap/1.0.0 internally for chain sync stuff) bitswapNetwork := network.NewFromIpfsHost(host, rt, network.Prefix("/chain")) @@ -60,6 +54,10 @@ func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt r return exch } +func ChainBlockService(bs dtypes.ExposedBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService { + return blockservice.New(bs, rem) +} + func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal) (*messagepool.MessagePool, error) { mpp := messagepool.NewProvider(sm, ps) mp, err := messagepool.New(mpp, ds, nn, j) @@ -74,43 +72,8 @@ func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds return mp, nil } -func ChainRawBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ChainRawBlockstore, error) { - bs, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.BlockstoreChain) - if err != nil { - return nil, err - } - - // TODO potentially replace this cached blockstore by a CBOR cache. - cbs, err := blockstore.CachedBlockstore(helpers.LifecycleCtx(mctx, lc), bs, blockstore.DefaultCacheOpts()) - if err != nil { - return nil, err - } - - return cbs, nil -} - -func ChainBlockService(bs dtypes.ChainRawBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService { - return blockservice.New(bs, rem) -} - -func FallbackChainBlockstore(rbs dtypes.ChainRawBlockstore) dtypes.ChainBlockstore { - return &blockstore.FallbackStore{ - Blockstore: rbs, - } -} - -func SetupFallbackBlockstore(cbs dtypes.ChainBlockstore, rem dtypes.ChainBitswap) error { - fbs, ok := cbs.(*blockstore.FallbackStore) - if !ok { - return xerrors.Errorf("expected a FallbackStore") - } - - fbs.SetFallback(rem.GetBlock) - return nil -} - -func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, lbs dtypes.ChainRawBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore { - chain := store.NewChainStore(bs, lbs, ds, syscalls, j) +func ChainStore(lc fx.Lifecycle, cbs dtypes.ChainBlockstore, sbs dtypes.StateBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore { + chain := store.NewChainStore(cbs, sbs, ds, syscalls, j) if err := chain.Load(); err != nil { log.Warnf("loading chain state from disk: %s", err) @@ -125,65 +88,6 @@ func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, lbs dtypes.ChainRawB return chain } -func ErrorGenesis() Genesis { - return func() (header *types.BlockHeader, e error) { - return nil, xerrors.New("No genesis block provided, provide the file with 'lotus daemon --genesis=[genesis file]'") - } -} - -func LoadGenesis(genBytes []byte) func(dtypes.ChainBlockstore) Genesis { - return func(bs dtypes.ChainBlockstore) Genesis { - return func() (header *types.BlockHeader, e error) { - c, err := car.LoadCar(bs, bytes.NewReader(genBytes)) - if err != nil { - return nil, xerrors.Errorf("loading genesis car file failed: %w", err) - } - if len(c.Roots) != 1 { - return nil, xerrors.New("expected genesis file to have one root") - } - root, err := bs.Get(c.Roots[0]) - if err != nil { - return nil, err - } - - h, err := types.DecodeBlock(root.RawData()) - if err != nil { - return nil, xerrors.Errorf("decoding block failed: %w", err) - } - return h, nil - } - } -} - -func DoSetGenesis(_ dtypes.AfterGenesisSet) {} - -func SetGenesis(cs *store.ChainStore, g Genesis) (dtypes.AfterGenesisSet, error) { - genFromRepo, err := cs.GetGenesis() - if err == nil { - if os.Getenv("LOTUS_SKIP_GENESIS_CHECK") != "_yes_" { - expectedGenesis, err := g() - if err != nil { - return dtypes.AfterGenesisSet{}, xerrors.Errorf("getting expected genesis failed: %w", err) - } - - if genFromRepo.Cid() != expectedGenesis.Cid() { - return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis in the repo is not the one expected by this version of Lotus!") - } - } - return dtypes.AfterGenesisSet{}, nil // already set, noop - } - if err != datastore.ErrNotFound { - return dtypes.AfterGenesisSet{}, xerrors.Errorf("getting genesis block failed: %w", err) - } - - genesis, err := g() - if err != nil { - return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis func failed: %w", err) - } - - return dtypes.AfterGenesisSet{}, cs.SetGenesis(genesis) -} - func NetworkName(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, us stmgr.UpgradeSchedule, _ dtypes.AfterGenesisSet) (dtypes.NetworkName, error) { if !build.Devnet { return "testnetnet", nil diff --git a/node/modules/client.go b/node/modules/client.go index ede36b4c9b9..da6a4cd8315 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -83,7 +83,7 @@ func ClientMultiDatastore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.Locke ctx := helpers.LifecycleCtx(mctx, lc) ds, err := r.Datastore(ctx, "/client") if err != nil { - return nil, xerrors.Errorf("getting datastore out of reop: %w", err) + return nil, xerrors.Errorf("getting datastore out of repo: %w", err) } mds, err := multistore.NewMultiDstore(ds) diff --git a/node/modules/dtypes/storage.go b/node/modules/dtypes/storage.go index 87366647f5d..c6963e1e255 100644 --- a/node/modules/dtypes/storage.go +++ b/node/modules/dtypes/storage.go @@ -19,19 +19,41 @@ import ( "github.com/filecoin-project/lotus/node/repo/retrievalstoremgr" ) -// MetadataDS stores metadata -// dy default it's namespaced under /metadata in main repo datastore +// MetadataDS stores metadata. By default it's namespaced under /metadata in +// main repo datastore. type MetadataDS datastore.Batching -type ChainRawBlockstore blockstore.Blockstore -type ChainBlockstore blockstore.Blockstore // optionally bitswap backed +type ( + // UniversalBlockstore is the cold blockstore. + UniversalBlockstore blockstore.Blockstore + + // ChainBlockstore is a blockstore to store chain data (tipsets, blocks, + // messages). It is physically backed by the BareMonolithBlockstore, but it + // has a cache on top that is specially tuned for chain data access + // patterns. + ChainBlockstore blockstore.Blockstore + + // StateBlockstore is a blockstore to store state data (state tree). It is + // physically backed by the BareMonolithBlockstore, but it has a cache on + // top that is specially tuned for state data access patterns. + StateBlockstore blockstore.Blockstore + + // ExposedBlockstore is a blockstore that interfaces directly with the + // network or with users, from which queries are served, and where incoming + // data is deposited. For security reasons, this store is disconnected from + // any internal caches. If blocks are added to this store in a way that + // could render caches dirty (e.g. a block is added when an existence cache + // holds a 'false' for that block), the process should signal so by calling + // blockstore.AllCaches.Dirty(cid). + ExposedBlockstore blockstore.Blockstore +) type ChainBitswap exchange.Interface type ChainBlockService bserv.BlockService type ClientMultiDstore *multistore.MultiStore type ClientImportMgr *importmgr.Mgr -type ClientBlockstore blockstore.Blockstore +type ClientBlockstore blockstore.BasicBlockstore type ClientDealStore *statestore.StateStore type ClientRequestValidator *requestvalidation.UnifiedRequestValidator type ClientDatastore datastore.Batching @@ -50,6 +72,6 @@ type ProviderRequestValidator *requestvalidation.UnifiedRequestValidator type ProviderDataTransfer datatransfer.Manager type StagingDAG format.DAGService -type StagingBlockstore blockstore.Blockstore +type StagingBlockstore blockstore.BasicBlockstore type StagingGraphsync graphsync.GraphExchange type StagingMultiDstore *multistore.MultiStore diff --git a/node/modules/genesis.go b/node/modules/genesis.go new file mode 100644 index 00000000000..43443b125a8 --- /dev/null +++ b/node/modules/genesis.go @@ -0,0 +1,73 @@ +package modules + +import ( + "bytes" + "os" + + "github.com/ipfs/go-datastore" + "github.com/ipld/go-car" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/node/modules/dtypes" +) + +func ErrorGenesis() Genesis { + return func() (header *types.BlockHeader, e error) { + return nil, xerrors.New("No genesis block provided, provide the file with 'lotus daemon --genesis=[genesis file]'") + } +} + +func LoadGenesis(genBytes []byte) func(dtypes.ChainBlockstore) Genesis { + return func(bs dtypes.ChainBlockstore) Genesis { + return func() (header *types.BlockHeader, e error) { + c, err := car.LoadCar(bs, bytes.NewReader(genBytes)) + if err != nil { + return nil, xerrors.Errorf("loading genesis car file failed: %w", err) + } + if len(c.Roots) != 1 { + return nil, xerrors.New("expected genesis file to have one root") + } + root, err := bs.Get(c.Roots[0]) + if err != nil { + return nil, err + } + + h, err := types.DecodeBlock(root.RawData()) + if err != nil { + return nil, xerrors.Errorf("decoding block failed: %w", err) + } + return h, nil + } + } +} + +func DoSetGenesis(_ dtypes.AfterGenesisSet) {} + +func SetGenesis(cs *store.ChainStore, g Genesis) (dtypes.AfterGenesisSet, error) { + genFromRepo, err := cs.GetGenesis() + if err == nil { + if os.Getenv("LOTUS_SKIP_GENESIS_CHECK") != "_yes_" { + expectedGenesis, err := g() + if err != nil { + return dtypes.AfterGenesisSet{}, xerrors.Errorf("getting expected genesis failed: %w", err) + } + + if genFromRepo.Cid() != expectedGenesis.Cid() { + return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis in the repo is not the one expected by this version of Lotus!") + } + } + return dtypes.AfterGenesisSet{}, nil // already set, noop + } + if err != datastore.ErrNotFound { + return dtypes.AfterGenesisSet{}, xerrors.Errorf("getting genesis block failed: %w", err) + } + + genesis, err := g() + if err != nil { + return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis func failed: %w", err) + } + + return dtypes.AfterGenesisSet{}, cs.SetGenesis(genesis) +} diff --git a/node/modules/graphsync.go b/node/modules/graphsync.go index bbb039957e1..a7f62db76ce 100644 --- a/node/modules/graphsync.go +++ b/node/modules/graphsync.go @@ -14,8 +14,8 @@ import ( ) // Graphsync creates a graphsync instance from the given loader and storer -func Graphsync(parallelTransfers uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) { +func Graphsync(parallelTransfers uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ExposedBlockstore, h host.Host) (dtypes.Graphsync, error) { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ExposedBlockstore, h host.Host) (dtypes.Graphsync, error) { graphsyncNetwork := gsnet.NewFromLibp2pHost(h) loader := storeutil.LoaderForBlockstore(clientBs) storer := storeutil.StorerForBlockstore(clientBs) diff --git a/node/modules/ipfsclient.go b/node/modules/ipfsclient.go index 99fcc418066..24c5c96783e 100644 --- a/node/modules/ipfsclient.go +++ b/node/modules/ipfsclient.go @@ -18,7 +18,7 @@ import ( func IpfsClientBlockstore(ipfsMaddr string, onlineMode bool) func(helpers.MetricsCtx, fx.Lifecycle, dtypes.ClientImportMgr) (dtypes.ClientBlockstore, error) { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, localStore dtypes.ClientImportMgr) (dtypes.ClientBlockstore, error) { var err error - var ipfsbs blockstore.Blockstore + var ipfsbs blockstore.BasicBlockstore if ipfsMaddr != "" { var ma multiaddr.Multiaddr ma, err = multiaddr.NewMultiaddr(ipfsMaddr) diff --git a/node/repo/blockstore_opts.go b/node/repo/blockstore_opts.go index 5c2c4b36702..1705217d304 100644 --- a/node/repo/blockstore_opts.go +++ b/node/repo/blockstore_opts.go @@ -5,10 +5,6 @@ import badgerbs "github.com/filecoin-project/lotus/blockstore/badger" // BadgerBlockstoreOptions returns the badger options to apply for the provided // domain. func BadgerBlockstoreOptions(domain BlockstoreDomain, path string, readonly bool) (badgerbs.Options, error) { - if domain != BlockstoreChain { - return badgerbs.Options{}, ErrInvalidBlockstoreDomain - } - opts := badgerbs.DefaultOptions(path) // Due to legacy usage of blockstore.Blockstore, over a datastore, all diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index 1aeaf9aa0ed..d96a5e64513 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -13,7 +13,7 @@ import ( "sync" "github.com/BurntSushi/toml" - "github.com/filecoin-project/lotus/blockstore" + "github.com/ipfs/go-datastore" fslock "github.com/ipfs/go-fs-lock" logging "github.com/ipfs/go-log/v2" @@ -22,7 +22,7 @@ import ( "github.com/multiformats/go-multiaddr" "golang.org/x/xerrors" - lblockstore "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/blockstore" badgerbs "github.com/filecoin-project/lotus/blockstore/badger" "github.com/filecoin-project/lotus/extern/sector-storage/fsutil" "github.com/filecoin-project/lotus/extern/sector-storage/stores" @@ -264,11 +264,18 @@ type fsLockedRepo struct { bs blockstore.Blockstore bsErr error bsOnce sync.Once + ssPath string + ssErr error + ssOnce sync.Once storageLk sync.Mutex configLk sync.Mutex } +func (fsr *fsLockedRepo) Readonly() bool { + return fsr.readonly +} + func (fsr *fsLockedRepo) Path() string { return fsr.path } @@ -301,7 +308,7 @@ func (fsr *fsLockedRepo) Close() error { // Blockstore returns a blockstore for the provided data domain. func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) { - if domain != BlockstoreChain { + if domain != UniversalBlockstore { return nil, ErrInvalidBlockstoreDomain } @@ -325,12 +332,27 @@ func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain fsr.bsErr = err return } - fsr.bs = lblockstore.WrapIDStore(bs) + fsr.bs = blockstore.WrapIDStore(bs) }) return fsr.bs, fsr.bsErr } +func (fsr *fsLockedRepo) SplitstorePath() (string, error) { + fsr.ssOnce.Do(func() { + path := fsr.join(filepath.Join(fsDatastore, "splitstore")) + + if err := os.MkdirAll(path, 0755); err != nil { + fsr.ssErr = err + return + } + + fsr.ssPath = path + }) + + return fsr.ssPath, fsr.ssErr +} + // join joins path elements with fsr.path func (fsr *fsLockedRepo) join(paths ...string) string { return filepath.Join(append([]string{fsr.path}, paths...)...) diff --git a/node/repo/importmgr/mgr.go b/node/repo/importmgr/mgr.go index 0108c8224f8..936d9b60662 100644 --- a/node/repo/importmgr/mgr.go +++ b/node/repo/importmgr/mgr.go @@ -16,7 +16,7 @@ type Mgr struct { mds *multistore.MultiStore ds datastore.Batching - Blockstore blockstore.Blockstore + Blockstore blockstore.BasicBlockstore } type Label string diff --git a/node/repo/interface.go b/node/repo/interface.go index 1dabc0bdae6..33979c8decc 100644 --- a/node/repo/interface.go +++ b/node/repo/interface.go @@ -4,10 +4,10 @@ import ( "context" "errors" - "github.com/filecoin-project/lotus/blockstore" "github.com/ipfs/go-datastore" "github.com/multiformats/go-multiaddr" + "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/extern/sector-storage/fsutil" "github.com/filecoin-project/lotus/extern/sector-storage/stores" @@ -18,11 +18,11 @@ import ( type BlockstoreDomain string const ( - // BlockstoreChain represents the blockstore domain for chain data. + // UniversalBlockstore represents the blockstore domain for all data. // Right now, this includes chain objects (tipsets, blocks, messages), as // well as state. In the future, they may get segregated into different // domains. - BlockstoreChain = BlockstoreDomain("chain") + UniversalBlockstore = BlockstoreDomain("universal") ) var ( @@ -63,6 +63,9 @@ type LockedRepo interface { // the lifecycle. Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) + // SplitstorePath returns the path for the SplitStore + SplitstorePath() (string, error) + // Returns config in this repo Config() (interface{}, error) SetConfig(func(interface{})) error @@ -84,4 +87,7 @@ type LockedRepo interface { // Path returns absolute path of the repo Path() string + + // Readonly returns true if the repo is readonly + Readonly() bool } diff --git a/node/repo/memrepo.go b/node/repo/memrepo.go index bcbc239c0f6..00ea32b88b5 100644 --- a/node/repo/memrepo.go +++ b/node/repo/memrepo.go @@ -201,6 +201,10 @@ func (mem *MemRepo) Lock(t RepoType) (LockedRepo, error) { }, nil } +func (lmem *lockedMemRepo) Readonly() bool { + return false +} + func (lmem *lockedMemRepo) checkToken() error { lmem.RLock() defer lmem.RUnlock() @@ -246,12 +250,16 @@ func (lmem *lockedMemRepo) Datastore(_ context.Context, ns string) (datastore.Ba } func (lmem *lockedMemRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) { - if domain != BlockstoreChain { + if domain != UniversalBlockstore { return nil, ErrInvalidBlockstoreDomain } return lmem.mem.blockstore, nil } +func (lmem *lockedMemRepo) SplitstorePath() (string, error) { + return ioutil.TempDir("", "splitstore.*") +} + func (lmem *lockedMemRepo) ListDatastores(ns string) ([]int64, error) { return nil, nil } diff --git a/node/repo/retrievalstoremgr/retrievalstoremgr.go b/node/repo/retrievalstoremgr/retrievalstoremgr.go index 0f6c98e6b74..ba86ccee540 100644 --- a/node/repo/retrievalstoremgr/retrievalstoremgr.go +++ b/node/repo/retrievalstoremgr/retrievalstoremgr.go @@ -73,13 +73,13 @@ func (mrs *multiStoreRetrievalStore) DAGService() ipldformat.DAGService { // BlockstoreRetrievalStoreManager manages a single blockstore as if it were multiple stores type BlockstoreRetrievalStoreManager struct { - bs blockstore.Blockstore + bs blockstore.BasicBlockstore } var _ RetrievalStoreManager = &BlockstoreRetrievalStoreManager{} // NewBlockstoreRetrievalStoreManager returns a new blockstore based RetrievalStoreManager -func NewBlockstoreRetrievalStoreManager(bs blockstore.Blockstore) RetrievalStoreManager { +func NewBlockstoreRetrievalStoreManager(bs blockstore.BasicBlockstore) RetrievalStoreManager { return &BlockstoreRetrievalStoreManager{ bs: bs, } From b34b4e037437e8b36c95e465673b0efb5275fb2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Sun, 28 Feb 2021 23:02:36 +0000 Subject: [PATCH 2/6] fix test compilation error. --- blockstore/badger/blockstore_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/blockstore/badger/blockstore_test.go b/blockstore/badger/blockstore_test.go index ff6e31752b0..3221458d28f 100644 --- a/blockstore/badger/blockstore_test.go +++ b/blockstore/badger/blockstore_test.go @@ -61,8 +61,8 @@ func TestStorageKey(t *testing.T) { require.Equal(t, k3, k2) } -func newBlockstore(optsSupplier func(path string) Options) func(tb testing.TB) (bs blockstore.Blockstore, path string) { - return func(tb testing.TB) (bs blockstore.Blockstore, path string) { +func newBlockstore(optsSupplier func(path string) Options) func(tb testing.TB) (bs blockstore.BasicBlockstore, path string) { + return func(tb testing.TB) (bs blockstore.BasicBlockstore, path string) { tb.Helper() path, err := ioutil.TempDir("", "") @@ -83,8 +83,8 @@ func newBlockstore(optsSupplier func(path string) Options) func(tb testing.TB) ( } } -func openBlockstore(optsSupplier func(path string) Options) func(tb testing.TB, path string) (bs blockstore.Blockstore, err error) { - return func(tb testing.TB, path string) (bs blockstore.Blockstore, err error) { +func openBlockstore(optsSupplier func(path string) Options) func(tb testing.TB, path string) (bs blockstore.BasicBlockstore, err error) { + return func(tb testing.TB, path string) (bs blockstore.BasicBlockstore, err error) { tb.Helper() return Open(optsSupplier(path)) } From b1c348b4a742cc9a87214871ae45864614abacc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 2 Mar 2021 16:31:01 +0000 Subject: [PATCH 3/6] address review comments. --- chain/stmgr/stmgr.go | 1 - chain/store/store.go | 3 +-- chain/sync.go | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index d6c98428052..ffbe08474ff 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -430,7 +430,6 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp return cid.Cid{}, cid.Cid{}, err } - // XXX: Is the height correct? Or should it be epoch-1? rectarr := blockadt.MakeEmptyArray(sm.cs.ActorStore(ctx)) for i, receipt := range receipts { if err := rectarr.Set(uint64(i), receipt); err != nil { diff --git a/chain/store/store.go b/chain/store/store.go index 1244995fcbb..8230639786a 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -139,7 +139,6 @@ type ChainStore struct { wg sync.WaitGroup } -// chainLocalBlockstore is guaranteed to fail Get* if requested block isn't stored locally func NewChainStore(chainBs bstore.Blockstore, stateBs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore { c, _ := lru.NewARC(DefaultMsgMetaCacheSize) tsc, _ := lru.NewARC(DefaultTipSetCacheSize) @@ -1503,7 +1502,7 @@ func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRe if b.Height == 0 || b.Height > ts.Height()-inclRecentRoots { if walked.Visit(b.ParentStateRoot) { - cids, err := recurseLinks(cs.chainBlockstore, walked, b.ParentStateRoot, []cid.Cid{b.ParentStateRoot}) + cids, err := recurseLinks(cs.stateBlockstore, walked, b.ParentStateRoot, []cid.Cid{b.ParentStateRoot}) if err != nil { return xerrors.Errorf("recursing genesis state failed: %w", err) } diff --git a/chain/sync.go b/chain/sync.go index 88237eb5ab1..96f337c4f46 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1172,7 +1172,7 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock } // Finally, flush. - return vm.Copy(ctx, tmpbs, syncer.store.ChainBlockstore(), mrcid) + return vm.Copy(ctx, tmpbs, syncer.store.StateBlockstore(), mrcid) } func (syncer *Syncer) verifyBlsAggregate(ctx context.Context, sig *crypto.Signature, msgs []cid.Cid, pubks [][]byte) error { From 2047a74958b39d292d6d3c5bcb8093344f9c33a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 2 Mar 2021 17:03:11 +0000 Subject: [PATCH 4/6] implement blockstore.Union, a union blockstore. The union blockstore takes a list of blockstores. It returns the first satisfying read, and broadcasts writes to all stores. It can be used for operations that require reading from any two blockstores, for example WalkSnapshot. --- blockstore/union.go | 108 +++++++++++++++++++++++++++++++++++++++ blockstore/union_test.go | 102 ++++++++++++++++++++++++++++++++++++ chain/store/store.go | 3 +- 3 files changed, 212 insertions(+), 1 deletion(-) create mode 100644 blockstore/union.go create mode 100644 blockstore/union_test.go diff --git a/blockstore/union.go b/blockstore/union.go new file mode 100644 index 00000000000..9573842a4f0 --- /dev/null +++ b/blockstore/union.go @@ -0,0 +1,108 @@ +package blockstore + +import ( + "context" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" +) + +type unionBlockstore []Blockstore + +// Union returns an unioned blockstore. +// +// * Reads return from the first blockstore that has the value, querying in the +// supplied order. +// * Writes (puts and deltes) are broadcast to all stores. +// +func Union(stores ...Blockstore) Blockstore { + return unionBlockstore(stores) +} + +func (m unionBlockstore) Has(cid cid.Cid) (has bool, err error) { + for _, bs := range m { + if has, err = bs.Has(cid); has || err != nil { + break + } + } + return has, err +} + +func (m unionBlockstore) Get(cid cid.Cid) (blk blocks.Block, err error) { + for _, bs := range m { + if blk, err = bs.Get(cid); err == nil || err != ErrNotFound { + break + } + } + return blk, err +} + +func (m unionBlockstore) View(cid cid.Cid, callback func([]byte) error) (err error) { + for _, bs := range m { + if err = bs.View(cid, callback); err == nil || err != ErrNotFound { + break + } + } + return err +} + +func (m unionBlockstore) GetSize(cid cid.Cid) (size int, err error) { + for _, bs := range m { + if size, err = bs.GetSize(cid); err == nil || err != ErrNotFound { + break + } + } + return size, err +} + +func (m unionBlockstore) Put(block blocks.Block) (err error) { + for _, bs := range m { + if err = bs.Put(block); err != nil { + break + } + } + return err +} + +func (m unionBlockstore) PutMany(blks []blocks.Block) (err error) { + for _, bs := range m { + if err = bs.PutMany(blks); err != nil { + break + } + } + return err +} + +func (m unionBlockstore) DeleteBlock(cid cid.Cid) (err error) { + for _, bs := range m { + if err = bs.DeleteBlock(cid); err != nil { + break + } + } + return err +} + +func (m unionBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + // this does not deduplicate; this interface needs to be revisited. + outCh := make(chan cid.Cid) + + go func() { + defer close(outCh) + + for _, bs := range m { + ch, err := bs.AllKeysChan(ctx) + if err != nil { + return + } + for cid := range ch { + outCh <- cid + } + } + }() + + return outCh, nil +} + +func (m unionBlockstore) HashOnRead(enabled bool) { + panic("implement me") +} diff --git a/blockstore/union_test.go b/blockstore/union_test.go new file mode 100644 index 00000000000..b6202689227 --- /dev/null +++ b/blockstore/union_test.go @@ -0,0 +1,102 @@ +package blockstore + +import ( + "context" + "testing" + + blocks "github.com/ipfs/go-block-format" + "github.com/stretchr/testify/require" +) + +var ( + b0 = blocks.NewBlock([]byte("abc")) + b1 = blocks.NewBlock([]byte("foo")) + b2 = blocks.NewBlock([]byte("bar")) +) + +func TestUnionBlockstore_Get(t *testing.T) { + m1 := NewMemory() + m2 := NewMemory() + + _ = m1.Put(b1) + _ = m2.Put(b2) + + u := Union(m1, m2) + + v1, err := u.Get(b1.Cid()) + require.NoError(t, err) + require.Equal(t, b1.RawData(), v1.RawData()) + + v2, err := u.Get(b2.Cid()) + require.NoError(t, err) + require.Equal(t, b2.RawData(), v2.RawData()) +} + +func TestUnionBlockstore_Put_PutMany_Delete_AllKeysChan(t *testing.T) { + m1 := NewMemory() + m2 := NewMemory() + + u := Union(m1, m2) + + err := u.Put(b0) + require.NoError(t, err) + + var has bool + + // write was broadcasted to all stores. + has, _ = m1.Has(b0.Cid()) + require.True(t, has) + + has, _ = m2.Has(b0.Cid()) + require.True(t, has) + + has, _ = u.Has(b0.Cid()) + require.True(t, has) + + // put many. + err = u.PutMany([]blocks.Block{b1, b2}) + require.NoError(t, err) + + // write was broadcasted to all stores. + has, _ = m1.Has(b1.Cid()) + require.True(t, has) + + has, _ = m1.Has(b2.Cid()) + require.True(t, has) + + has, _ = m2.Has(b1.Cid()) + require.True(t, has) + + has, _ = m2.Has(b2.Cid()) + require.True(t, has) + + // also in the union store. + has, _ = u.Has(b1.Cid()) + require.True(t, has) + + has, _ = u.Has(b2.Cid()) + require.True(t, has) + + // deleted from all stores. + err = u.DeleteBlock(b1.Cid()) + require.NoError(t, err) + + has, _ = u.Has(b1.Cid()) + require.False(t, has) + + has, _ = m1.Has(b1.Cid()) + require.False(t, has) + + has, _ = m2.Has(b1.Cid()) + require.False(t, has) + + // check that AllKeysChan returns b0 and b2, twice (once per backing store) + ch, err := u.AllKeysChan(context.Background()) + require.NoError(t, err) + + var i int + for range ch { + i++ + } + require.Equal(t, 4, i) +} diff --git a/chain/store/store.go b/chain/store/store.go index 8230639786a..6a3febcc86e 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -1427,8 +1427,9 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo return xerrors.Errorf("failed to write car header: %s", err) } + unionBs := bstore.Union(cs.stateBlockstore, cs.chainBlockstore) return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, true, func(c cid.Cid) error { - blk, err := cs.chainBlockstore.Get(c) + blk, err := unionBs.Get(c) if err != nil { return xerrors.Errorf("writing object to car, bs.Get: %w", err) } From 68b8e8e9cb75a1f7cdf13467887cfcf9ff60f8a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 2 Mar 2021 21:22:24 +0000 Subject: [PATCH 5/6] implement unionBlockstore#HashOnRead. --- blockstore/union.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/blockstore/union.go b/blockstore/union.go index 9573842a4f0..dfe5ea70c94 100644 --- a/blockstore/union.go +++ b/blockstore/union.go @@ -104,5 +104,7 @@ func (m unionBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error } func (m unionBlockstore) HashOnRead(enabled bool) { - panic("implement me") + for _, bs := range m { + bs.HashOnRead(enabled) + } } From 1ac0c9a926269c790950cdd79214c4c545aa896f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 2 Mar 2021 21:29:24 +0000 Subject: [PATCH 6/6] address review comments. --- chain/store/store.go | 5 +++++ chain/sync.go | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/chain/store/store.go b/chain/store/store.go index 6a3febcc86e..e0660495e5e 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -1549,6 +1549,11 @@ func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRe } func (cs *ChainStore) Import(r io.Reader) (*types.TipSet, error) { + // TODO: writing only to the state blockstore is incorrect. + // At this time, both the state and chain blockstores are backed by the + // universal store. When we physically segregate the stores, we will need + // to route state objects to the state blockstore, and chain objects to + // the chain blockstore. header, err := car.LoadCar(cs.StateBlockstore(), r) if err != nil { return nil, xerrors.Errorf("loadcar failed: %w", err) diff --git a/chain/sync.go b/chain/sync.go index 96f337c4f46..88237eb5ab1 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1172,7 +1172,7 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock } // Finally, flush. - return vm.Copy(ctx, tmpbs, syncer.store.StateBlockstore(), mrcid) + return vm.Copy(ctx, tmpbs, syncer.store.ChainBlockstore(), mrcid) } func (syncer *Syncer) verifyBlsAggregate(ctx context.Context, sig *crypto.Signature, msgs []cid.Cid, pubks [][]byte) error {