Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

segregate chain and state blockstores #5695

Merged
merged 6 commits into from
Mar 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions blockstore/badger/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,7 @@ func Open(opts Options) (*Blockstore, error) {
return nil, fmt.Errorf("failed to open badger blockstore: %w", err)
}

bs := &Blockstore{
DB: db,
}

bs := &Blockstore{DB: db}
if p := opts.Prefix; p != "" {
bs.prefixing = true
bs.prefix = []byte(p)
Expand Down
8 changes: 4 additions & 4 deletions blockstore/badger/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func TestStorageKey(t *testing.T) {
require.Equal(t, k3, k2)
}

func newBlockstore(optsSupplier func(path string) Options) func(tb testing.TB) (bs blockstore.Blockstore, path string) {
return func(tb testing.TB) (bs blockstore.Blockstore, path string) {
func newBlockstore(optsSupplier func(path string) Options) func(tb testing.TB) (bs blockstore.BasicBlockstore, path string) {
return func(tb testing.TB) (bs blockstore.BasicBlockstore, path string) {
tb.Helper()

path, err := ioutil.TempDir("", "")
Expand All @@ -83,8 +83,8 @@ func newBlockstore(optsSupplier func(path string) Options) func(tb testing.TB) (
}
}

func openBlockstore(optsSupplier func(path string) Options) func(tb testing.TB, path string) (bs blockstore.Blockstore, err error) {
return func(tb testing.TB, path string) (bs blockstore.Blockstore, err error) {
func openBlockstore(optsSupplier func(path string) Options) func(tb testing.TB, path string) (bs blockstore.BasicBlockstore, err error) {
return func(tb testing.TB, path string) (bs blockstore.BasicBlockstore, err error) {
tb.Helper()
return Open(optsSupplier(path))
}
Expand Down
9 changes: 5 additions & 4 deletions blockstore/badger/blockstore_test_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@ import (
"strings"
"testing"

"github.com/filecoin-project/lotus/blockstore"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
u "github.com/ipfs/go-ipfs-util"

"github.com/filecoin-project/lotus/blockstore"

"github.com/stretchr/testify/require"
)

// TODO: move this to go-ipfs-blockstore.
type Suite struct {
NewBlockstore func(tb testing.TB) (bs blockstore.Blockstore, path string)
OpenBlockstore func(tb testing.TB, path string) (bs blockstore.Blockstore, err error)
NewBlockstore func(tb testing.TB) (bs blockstore.BasicBlockstore, path string)
OpenBlockstore func(tb testing.TB, path string) (bs blockstore.BasicBlockstore, err error)
}

func (s *Suite) RunTests(t *testing.T, prefix string) {
Expand Down Expand Up @@ -290,7 +291,7 @@ func (s *Suite) TestDelete(t *testing.T) {

}

func insertBlocks(t *testing.T, bs blockstore.Blockstore, count int) []cid.Cid {
func insertBlocks(t *testing.T, bs blockstore.BasicBlockstore, count int) []cid.Cid {
keys := make([]cid.Cid, count)
for i := 0; i < count; i++ {
block := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i)))
Expand Down
15 changes: 14 additions & 1 deletion blockstore/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@ import (
"github.com/ipfs/go-cid"
)

// UnwrapFallbackStore takes a blockstore, and returns the underlying blockstore
// if it was a FallbackStore. Otherwise, it just returns the supplied store
// unmodified.
func UnwrapFallbackStore(bs Blockstore) (Blockstore, bool) {
if fbs, ok := bs.(*FallbackStore); ok {
return fbs.Blockstore, true
}
return bs, false
}

// FallbackStore is a read-through store that queries another (potentially
// remote) source if the block is not found locally. If the block is found
// during the fallback, it stores it in the local store.
type FallbackStore struct {
Blockstore

Expand All @@ -30,7 +43,7 @@ func (fbs *FallbackStore) SetFallback(missFn func(context.Context, cid.Cid) (blo
}

func (fbs *FallbackStore) getFallback(c cid.Cid) (blocks.Block, error) {
log.Errorw("fallbackstore: Block not found locally, fetching from the network", "cid", c)
log.Warnf("fallbackstore: block not found locally, fetching from the network; cid: %s", c)
fbs.lk.RLock()
defer fbs.lk.RUnlock()

Expand Down
154 changes: 154 additions & 0 deletions blockstore/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package blockstore

import (
"time"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

//
// Currently unused, but kept in repo in case we introduce one of the candidate
// cache implementations (Freecache, Ristretto), both of which report these
// metrics.
//

// CacheMetricsEmitInterval is the interval at which metrics are emitted onto
// OpenCensus.
var CacheMetricsEmitInterval = 5 * time.Second

var (
CacheName, _ = tag.NewKey("cache_name")
)

// CacheMeasures groups all metrics emitted by the blockstore caches.
var CacheMeasures = struct {
HitRatio *stats.Float64Measure
Hits *stats.Int64Measure
Misses *stats.Int64Measure
Entries *stats.Int64Measure
QueriesServed *stats.Int64Measure
Adds *stats.Int64Measure
Updates *stats.Int64Measure
Evictions *stats.Int64Measure
CostAdded *stats.Int64Measure
CostEvicted *stats.Int64Measure
SetsDropped *stats.Int64Measure
SetsRejected *stats.Int64Measure
QueriesDropped *stats.Int64Measure
}{
HitRatio: stats.Float64("blockstore/cache/hit_ratio", "Hit ratio of blockstore cache", stats.UnitDimensionless),
Hits: stats.Int64("blockstore/cache/hits", "Total number of hits at blockstore cache", stats.UnitDimensionless),
Misses: stats.Int64("blockstore/cache/misses", "Total number of misses at blockstore cache", stats.UnitDimensionless),
Entries: stats.Int64("blockstore/cache/entry_count", "Total number of entries currently in the blockstore cache", stats.UnitDimensionless),
QueriesServed: stats.Int64("blockstore/cache/queries_served", "Total number of queries served by the blockstore cache", stats.UnitDimensionless),
Adds: stats.Int64("blockstore/cache/adds", "Total number of adds to blockstore cache", stats.UnitDimensionless),
Updates: stats.Int64("blockstore/cache/updates", "Total number of updates in blockstore cache", stats.UnitDimensionless),
Evictions: stats.Int64("blockstore/cache/evictions", "Total number of evictions from blockstore cache", stats.UnitDimensionless),
CostAdded: stats.Int64("blockstore/cache/cost_added", "Total cost (byte size) of entries added into blockstore cache", stats.UnitBytes),
CostEvicted: stats.Int64("blockstore/cache/cost_evicted", "Total cost (byte size) of entries evicted by blockstore cache", stats.UnitBytes),
SetsDropped: stats.Int64("blockstore/cache/sets_dropped", "Total number of sets dropped by blockstore cache", stats.UnitDimensionless),
SetsRejected: stats.Int64("blockstore/cache/sets_rejected", "Total number of sets rejected by blockstore cache", stats.UnitDimensionless),
QueriesDropped: stats.Int64("blockstore/cache/queries_dropped", "Total number of queries dropped by blockstore cache", stats.UnitDimensionless),
}

// CacheViews groups all cache-related default views.
var CacheViews = struct {
HitRatio *view.View
Hits *view.View
Misses *view.View
Entries *view.View
QueriesServed *view.View
Adds *view.View
Updates *view.View
Evictions *view.View
CostAdded *view.View
CostEvicted *view.View
SetsDropped *view.View
SetsRejected *view.View
QueriesDropped *view.View
}{
HitRatio: &view.View{
Measure: CacheMeasures.HitRatio,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
Hits: &view.View{
Measure: CacheMeasures.Hits,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
Misses: &view.View{
Measure: CacheMeasures.Misses,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
Entries: &view.View{
Measure: CacheMeasures.Entries,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
QueriesServed: &view.View{
Measure: CacheMeasures.QueriesServed,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
Adds: &view.View{
Measure: CacheMeasures.Adds,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
Updates: &view.View{
Measure: CacheMeasures.Updates,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
Evictions: &view.View{
Measure: CacheMeasures.Evictions,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
CostAdded: &view.View{
Measure: CacheMeasures.CostAdded,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
CostEvicted: &view.View{
Measure: CacheMeasures.CostEvicted,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
SetsDropped: &view.View{
Measure: CacheMeasures.SetsDropped,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
SetsRejected: &view.View{
Measure: CacheMeasures.SetsRejected,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
QueriesDropped: &view.View{
Measure: CacheMeasures.QueriesDropped,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
}

// DefaultViews exports all default views for this package.
var DefaultViews = []*view.View{
CacheViews.HitRatio,
CacheViews.Hits,
CacheViews.Misses,
CacheViews.Entries,
CacheViews.QueriesServed,
CacheViews.Adds,
CacheViews.Updates,
CacheViews.Evictions,
CacheViews.CostAdded,
CacheViews.CostEvicted,
CacheViews.SetsDropped,
CacheViews.SetsRejected,
CacheViews.QueriesDropped,
}
110 changes: 110 additions & 0 deletions blockstore/union.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package blockstore

import (
"context"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
)

type unionBlockstore []Blockstore

// Union returns an unioned blockstore.
//
// * Reads return from the first blockstore that has the value, querying in the
// supplied order.
// * Writes (puts and deltes) are broadcast to all stores.
//
func Union(stores ...Blockstore) Blockstore {
return unionBlockstore(stores)
}

func (m unionBlockstore) Has(cid cid.Cid) (has bool, err error) {
for _, bs := range m {
if has, err = bs.Has(cid); has || err != nil {
break
}
}
return has, err
}

func (m unionBlockstore) Get(cid cid.Cid) (blk blocks.Block, err error) {
for _, bs := range m {
if blk, err = bs.Get(cid); err == nil || err != ErrNotFound {
break
}
}
return blk, err
}

func (m unionBlockstore) View(cid cid.Cid, callback func([]byte) error) (err error) {
for _, bs := range m {
if err = bs.View(cid, callback); err == nil || err != ErrNotFound {
break
}
}
return err
}

func (m unionBlockstore) GetSize(cid cid.Cid) (size int, err error) {
for _, bs := range m {
if size, err = bs.GetSize(cid); err == nil || err != ErrNotFound {
break
}
}
return size, err
}

func (m unionBlockstore) Put(block blocks.Block) (err error) {
for _, bs := range m {
if err = bs.Put(block); err != nil {
break
}
}
return err
}

func (m unionBlockstore) PutMany(blks []blocks.Block) (err error) {
for _, bs := range m {
if err = bs.PutMany(blks); err != nil {
break
}
}
return err
}

func (m unionBlockstore) DeleteBlock(cid cid.Cid) (err error) {
for _, bs := range m {
if err = bs.DeleteBlock(cid); err != nil {
break
}
}
return err
}

func (m unionBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
// this does not deduplicate; this interface needs to be revisited.
outCh := make(chan cid.Cid)

go func() {
defer close(outCh)

for _, bs := range m {
ch, err := bs.AllKeysChan(ctx)
if err != nil {
return
}
for cid := range ch {
outCh <- cid
}
}
}()

return outCh, nil
}

func (m unionBlockstore) HashOnRead(enabled bool) {
for _, bs := range m {
bs.HashOnRead(enabled)
}
}
Loading