Skip to content

Commit

Permalink
fix incorrect store usage; per-cache sizing.
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk committed Nov 13, 2020
1 parent a741335 commit fe01fb4
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 20 deletions.
6 changes: 3 additions & 3 deletions chain/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
}

// Finally, flush.
return vm.Copy(context.TODO(), blockstore, syncer.store.StateBlockstore(), smroot)
return vm.Copy(context.TODO(), blockstore, syncer.store.ChainBlockstore(), smroot)
}

func (syncer *Syncer) LocalPeer() peer.ID {
Expand Down Expand Up @@ -1177,7 +1177,7 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock
}

// Finally, flush.
return vm.Copy(ctx, tmpbs, syncer.store.StateBlockstore(), mrcid)
return vm.Copy(ctx, tmpbs, syncer.store.ChainBlockstore(), mrcid)
}

func (syncer *Syncer) verifyBlsAggregate(ctx context.Context, sig *crypto.Signature, msgs []cid.Cid, pubks [][]byte) error {
Expand Down Expand Up @@ -1564,7 +1564,7 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS
return err
}

if err := copyBlockstore(ctx, bs, syncer.store.StateBlockstore()); err != nil {
if err := copyBlockstore(ctx, bs, syncer.store.ChainBlockstore()); err != nil {
return xerrors.Errorf("message processing failed: %w", err)
}
}
Expand Down
16 changes: 11 additions & 5 deletions lib/blockstore/cache_freecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,28 @@ type FreecacheCachingBlockstore struct {
var _ Blockstore = (*FreecacheCachingBlockstore)(nil)
var _ Viewer = (*FreecacheCachingBlockstore)(nil)

func WrapFreecacheCache(ctx context.Context, name string, inner Blockstore) (*FreecacheCachingBlockstore, error) {
type FreecacheConfig struct {
Name string
BlockCapacity int
ExistsCapacity int
}

func WrapFreecacheCache(ctx context.Context, inner Blockstore, config FreecacheConfig) (*FreecacheCachingBlockstore, error) {
v, _ := inner.(Viewer)
c := &FreecacheCachingBlockstore{
blockCache: freecache.NewCache(1 << 28), // 512MiB.
existsCache: freecache.NewCache(1 << 26), // 64MiB.
blockCache: freecache.NewCache(config.BlockCapacity),
existsCache: freecache.NewCache(config.ExistsCapacity),
inner: inner,
viewer: v,
}

go func() {
blockCacheTag, err := tag.New(ctx, tag.Insert(CacheName, name+"_block_cache"))
blockCacheTag, err := tag.New(ctx, tag.Insert(CacheName, config.Name+"_block_cache"))
if err != nil {
log.Warnf("blockstore metrics: failed to instantiate block cache tag: %s", err)
return
}
existsCacheTag, err := tag.New(ctx, tag.Insert(CacheName, name+"_exists_cache"))
existsCacheTag, err := tag.New(ctx, tag.Insert(CacheName, config.Name+"_exists_cache"))
if err != nil {
log.Warnf("blockstore metrics: failed to instantiate exists cache tag: %s", err)
return
Expand Down
10 changes: 5 additions & 5 deletions node/impl/full/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ type ChainAPI struct {

Chain *store.ChainStore

// ExposedBlockstore is the global monolith blockstore that is safe to
// ServingBlockstore is the global monolith blockstore that is safe to
// expose externally. In the future, this will be segregated into two
// blockstores.
ExposedBlockstore dtypes.ExposedBlockstore
ServingBlockstore dtypes.ExposedBlockstore
}

func (m *ChainModule) ChainNotify(ctx context.Context) (<-chan []*api.HeadChange, error) {
Expand Down Expand Up @@ -232,15 +232,15 @@ func (m *ChainModule) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, er
}

func (a *ChainAPI) ChainDeleteObj(ctx context.Context, obj cid.Cid) error {
return a.ExposedBlockstore.DeleteBlock(obj)
return a.ServingBlockstore.DeleteBlock(obj)
}

func (m *ChainModule) ChainHasObj(ctx context.Context, obj cid.Cid) (bool, error) {
return m.ExposedBlockstore.Has(obj)
}

func (a *ChainAPI) ChainStatObj(ctx context.Context, obj cid.Cid, base cid.Cid) (api.ObjStat, error) {
bs := a.ExposedBlockstore
bs := a.ServingBlockstore
bsvc := blockservice.New(bs, offline.Exchange(bs))

dag := merkledag.NewDAGService(bsvc)
Expand Down Expand Up @@ -525,7 +525,7 @@ func (a *ChainAPI) ChainGetNode(ctx context.Context, p string) (*api.IpldObject,
return nil, xerrors.Errorf("parsing path: %w", err)
}

bs := a.Chain.StateBlockstore()
bs := a.ServingBlockstore
bsvc := blockservice.New(bs, offline.Exchange(bs))

dag := merkledag.NewDAGService(bsvc)
Expand Down
14 changes: 11 additions & 3 deletions node/modules/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ func BareMonolithBlockstore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.BareMono

// StateBlockstore returns the blockstore to use to store the state tree.
func StateBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.BareMonolithBlockstore) (dtypes.StateBlockstore, error) {
cbs, err := blockstore.WrapFreecacheCache(helpers.LifecycleCtx(mctx, lc), "state", bs)
sbs, err := blockstore.WrapFreecacheCache(helpers.LifecycleCtx(mctx, lc), bs, blockstore.FreecacheConfig{
Name: "state",
BlockCapacity: 1 << 28, // 256MiB.
ExistsCapacity: 1 << 25, // 32MiB.
})
if err != nil {
return nil, err
}
Expand All @@ -44,12 +48,16 @@ func StateBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.BareMon
if c, ok := bs.(io.Closer); ok {
lc.Append(closerStopHook(c))
}
return cbs, nil
return sbs, nil
}

// ChainBlockstore returns the blockstore to use for chain data (tipsets, blocks, messages).
func ChainBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.BareMonolithBlockstore) (dtypes.ChainBlockstore, error) {
cbs, err := blockstore.WrapFreecacheCache(helpers.LifecycleCtx(mctx, lc), "chain", bs)
cbs, err := blockstore.WrapFreecacheCache(helpers.LifecycleCtx(mctx, lc), bs, blockstore.FreecacheConfig{
Name: "chain",
BlockCapacity: 1 << 27, // 128MiB.
ExistsCapacity: 1 << 24, // 16MiB.
})
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions node/modules/testing/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (

var glog = logging.Logger("genesis")

func MakeGenesisMem(out io.Writer, template genesis.Template) func(bs dtypes.StateBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) modules.Genesis {
return func(bs dtypes.StateBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) modules.Genesis {
func MakeGenesisMem(out io.Writer, template genesis.Template) func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) modules.Genesis {
return func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) modules.Genesis {
return func() (*types.BlockHeader, error) {
glog.Warn("Generating new random genesis block, note that this SHOULD NOT happen unless you are setting up new network")
b, err := genesis2.MakeGenesisBlock(context.TODO(), j, bs, syscalls, template)
Expand All @@ -51,8 +51,8 @@ func MakeGenesisMem(out io.Writer, template genesis.Template) func(bs dtypes.Sta
}
}

func MakeGenesis(outFile, genesisTemplate string) func(bs dtypes.StateBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) modules.Genesis {
return func(bs dtypes.StateBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) modules.Genesis {
func MakeGenesis(outFile, genesisTemplate string) func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) modules.Genesis {
return func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) modules.Genesis {
return func() (*types.BlockHeader, error) {
glog.Warn("Generating new random genesis block, note that this SHOULD NOT happen unless you are setting up new network")
genesisTemplate, err := homedir.Expand(genesisTemplate)
Expand Down

0 comments on commit fe01fb4

Please sign in to comment.