Skip to content

Commit

Permalink
Merge pull request #4992 from filecoin-project/feat/splitstore
Browse files Browse the repository at this point in the history
hot/cold blockstore segregation (aka. splitstore)
  • Loading branch information
magik6k authored Mar 8, 2021
2 parents b130462 + 51ed4c7 commit 6591af9
Show file tree
Hide file tree
Showing 27 changed files with 2,499 additions and 20 deletions.
57 changes: 57 additions & 0 deletions blockstore/badger/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,25 @@ func (b *Blockstore) Close() error {
return b.DB.Close()
}

// CollectGarbage runs garbage collection on the value log
func (b *Blockstore) CollectGarbage() error {
if atomic.LoadInt64(&b.state) != stateOpen {
return ErrBlockstoreClosed
}

var err error
for err == nil {
err = b.DB.RunValueLogGC(0.125)
}

if err == badger.ErrNoRewrite {
// not really an error in this case
return nil
}

return err
}

// View implements blockstore.Viewer, which leverages zero-copy read-only
// access to values.
func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error {
Expand Down Expand Up @@ -318,6 +337,44 @@ func (b *Blockstore) DeleteBlock(cid cid.Cid) error {
})
}

func (b *Blockstore) DeleteMany(cids []cid.Cid) error {
if atomic.LoadInt64(&b.state) != stateOpen {
return ErrBlockstoreClosed
}

batch := b.DB.NewWriteBatch()
defer batch.Cancel()

// toReturn tracks the byte slices to return to the pool, if we're using key
// prefixing. we can't return each slice to the pool after each Set, because
// badger holds on to the slice.
var toReturn [][]byte
if b.prefixing {
toReturn = make([][]byte, 0, len(cids))
defer func() {
for _, b := range toReturn {
KeyPool.Put(b)
}
}()
}

for _, cid := range cids {
k, pooled := b.PooledStorageKey(cid)
if pooled {
toReturn = append(toReturn, k)
}
if err := batch.Delete(k); err != nil {
return err
}
}

err := batch.Flush()
if err != nil {
err = fmt.Errorf("failed to delete blocks from badger blockstore: %w", err)
}
return err
}

// AllKeysChan implements Blockstore.AllKeysChan.
func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
if atomic.LoadInt64(&b.state) != stateOpen {
Expand Down
33 changes: 31 additions & 2 deletions blockstore/blockstore.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package blockstore

import (
"github.com/ipfs/go-cid"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"

Expand All @@ -18,20 +18,38 @@ var ErrNotFound = blockstore.ErrNotFound
type Blockstore interface {
blockstore.Blockstore
blockstore.Viewer
BatchDeleter
}

// BasicBlockstore is an alias to the original IPFS Blockstore.
type BasicBlockstore = blockstore.Blockstore

type Viewer = blockstore.Viewer

type BatchDeleter interface {
DeleteMany(cids []cid.Cid) error
}

// WrapIDStore wraps the underlying blockstore in an "identity" blockstore.
// The ID store filters out all puts for blocks with CIDs using the "identity"
// hash function. It also extracts inlined blocks from CIDs using the identity
// hash function and returns them on get/has, ignoring the contents of the
// blockstore.
func WrapIDStore(bstore blockstore.Blockstore) Blockstore {
return blockstore.NewIdStore(bstore).(Blockstore)
if is, ok := bstore.(*idstore); ok {
// already wrapped
return is
}

if bs, ok := bstore.(Blockstore); ok {
// we need to wrap our own because we don't want to neuter the DeleteMany method
// the underlying blockstore has implemented an (efficient) DeleteMany
return NewIDStore(bs)
}

// The underlying blockstore does not implement DeleteMany, so we need to shim it.
// This is less efficient as it'll iterate and perform single deletes.
return NewIDStore(Adapt(bstore))
}

// FromDatastore creates a new blockstore backed by the given datastore.
Expand All @@ -53,6 +71,17 @@ func (a *adaptedBlockstore) View(cid cid.Cid, callback func([]byte) error) error
return callback(blk.RawData())
}

func (a *adaptedBlockstore) DeleteMany(cids []cid.Cid) error {
for _, cid := range cids {
err := a.DeleteBlock(cid)
if err != nil {
return err
}
}

return nil
}

// Adapt adapts a standard blockstore to a Lotus blockstore by
// enriching it with the extra methods that Lotus requires (e.g. View, Sync).
//
Expand Down
8 changes: 8 additions & 0 deletions blockstore/buffered.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ func (bs *BufferedBlockstore) DeleteBlock(c cid.Cid) error {
return bs.write.DeleteBlock(c)
}

func (bs *BufferedBlockstore) DeleteMany(cids []cid.Cid) error {
if err := bs.read.DeleteMany(cids); err != nil {
return err
}

return bs.write.DeleteMany(cids)
}

func (bs *BufferedBlockstore) View(c cid.Cid, callback func([]byte) error) error {
// both stores are viewable.
if err := bs.write.View(c, callback); err == ErrNotFound {
Expand Down
174 changes: 174 additions & 0 deletions blockstore/idstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package blockstore

import (
"context"
"io"

"golang.org/x/xerrors"

blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
)

var _ Blockstore = (*idstore)(nil)

type idstore struct {
bs Blockstore
}

func NewIDStore(bs Blockstore) Blockstore {
return &idstore{bs: bs}
}

func decodeCid(cid cid.Cid) (inline bool, data []byte, err error) {
if cid.Prefix().MhType != mh.IDENTITY {
return false, nil, nil
}

dmh, err := mh.Decode(cid.Hash())
if err != nil {
return false, nil, err
}

if dmh.Code == mh.IDENTITY {
return true, dmh.Digest, nil
}

return false, nil, err
}

func (b *idstore) Has(cid cid.Cid) (bool, error) {
inline, _, err := decodeCid(cid)
if err != nil {
return false, xerrors.Errorf("error decoding Cid: %w", err)
}

if inline {
return true, nil
}

return b.bs.Has(cid)
}

func (b *idstore) Get(cid cid.Cid) (blocks.Block, error) {
inline, data, err := decodeCid(cid)
if err != nil {
return nil, xerrors.Errorf("error decoding Cid: %w", err)
}

if inline {
return blocks.NewBlockWithCid(data, cid)
}

return b.bs.Get(cid)
}

func (b *idstore) GetSize(cid cid.Cid) (int, error) {
inline, data, err := decodeCid(cid)
if err != nil {
return 0, xerrors.Errorf("error decoding Cid: %w", err)
}

if inline {
return len(data), err
}

return b.bs.GetSize(cid)
}

func (b *idstore) View(cid cid.Cid, cb func([]byte) error) error {
inline, data, err := decodeCid(cid)
if err != nil {
return xerrors.Errorf("error decoding Cid: %w", err)
}

if inline {
return cb(data)
}

return b.bs.View(cid, cb)
}

func (b *idstore) Put(blk blocks.Block) error {
inline, _, err := decodeCid(blk.Cid())
if err != nil {
return xerrors.Errorf("error decoding Cid: %w", err)
}

if inline {
return nil
}

return b.bs.Put(blk)
}

func (b *idstore) PutMany(blks []blocks.Block) error {
toPut := make([]blocks.Block, 0, len(blks))
for _, blk := range blks {
inline, _, err := decodeCid(blk.Cid())
if err != nil {
return xerrors.Errorf("error decoding Cid: %w", err)
}

if inline {
continue
}
toPut = append(toPut, blk)
}

if len(toPut) > 0 {
return b.bs.PutMany(toPut)
}

return nil
}

func (b *idstore) DeleteBlock(cid cid.Cid) error {
inline, _, err := decodeCid(cid)
if err != nil {
return xerrors.Errorf("error decoding Cid: %w", err)
}

if inline {
return nil
}

return b.bs.DeleteBlock(cid)
}

func (b *idstore) DeleteMany(cids []cid.Cid) error {
toDelete := make([]cid.Cid, 0, len(cids))
for _, cid := range cids {
inline, _, err := decodeCid(cid)
if err != nil {
return xerrors.Errorf("error decoding Cid: %w", err)
}

if inline {
continue
}
toDelete = append(toDelete, cid)
}

if len(toDelete) > 0 {
return b.bs.DeleteMany(toDelete)
}

return nil
}

func (b *idstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return b.bs.AllKeysChan(ctx)
}

func (b *idstore) HashOnRead(enabled bool) {
b.bs.HashOnRead(enabled)
}

func (b *idstore) Close() error {
if c, ok := b.bs.(io.Closer); ok {
return c.Close()
}
return nil
}
7 changes: 7 additions & 0 deletions blockstore/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ func (m MemBlockstore) DeleteBlock(k cid.Cid) error {
return nil
}

func (m MemBlockstore) DeleteMany(ks []cid.Cid) error {
for _, k := range ks {
delete(m, k)
}
return nil
}

func (m MemBlockstore) Has(k cid.Cid) (bool, error) {
_, ok := m[k]
return ok, nil
Expand Down
38 changes: 38 additions & 0 deletions blockstore/splitstore/markset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package splitstore

import (
"path/filepath"

"golang.org/x/xerrors"

cid "github.com/ipfs/go-cid"
)

// MarkSet is a utility to keep track of seen CID, and later query for them.
//
// * If the expected dataset is large, it can be backed by a datastore (e.g. bbolt).
// * If a probabilistic result is acceptable, it can be backed by a bloom filter (default).
type MarkSet interface {
Mark(cid.Cid) error
Has(cid.Cid) (bool, error)
Close() error
}

// markBytes is deliberately a non-nil empty byte slice for serialization.
var markBytes = []byte{}

type MarkSetEnv interface {
Create(name string, sizeHint int64) (MarkSet, error)
Close() error
}

func OpenMarkSetEnv(path string, mtype string) (MarkSetEnv, error) {
switch mtype {
case "", "bloom":
return NewBloomMarkSetEnv()
case "bolt":
return NewBoltMarkSetEnv(filepath.Join(path, "markset.bolt"))
default:
return nil, xerrors.Errorf("unknown mark set type %s", mtype)
}
}
Loading

0 comments on commit 6591af9

Please sign in to comment.