Skip to content

Commit

Permalink
fix(share/eds)!: use separate badger for inverted index (celestiaorg#…
Browse files Browse the repository at this point in the history
…2517)

Having same badger instance for dagstore shards and inverted index has proven to have hangup on Put operation for 1-10min. During tests it has shown, that splitting badger into 2 instances solves the hangup issue.

Also once celestiaorg#2479 is implemented and inverted index will needs to be removed. Separate badger will be much easier for users migration.
  • Loading branch information
walldiss committed Aug 3, 2023
1 parent be6e0c4 commit d26d047
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 20 deletions.
26 changes: 22 additions & 4 deletions share/eds/inverted_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import (
"github.com/filecoin-project/dagstore/index"
"github.com/filecoin-project/dagstore/shard"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/multiformats/go-multihash"

dsbadger "github.com/celestiaorg/go-ds-badger4"
)

const invertedIndexPath = "/inverted_index/"

// simpleInvertedIndex is an inverted index that only stores a single shard key per multihash. Its
// implementation is modified from the default upstream implementation in dagstore/index.
type simpleInvertedIndex struct {
Expand All @@ -21,10 +24,21 @@ type simpleInvertedIndex struct {
// newSimpleInvertedIndex returns a new inverted index that only stores a single shard key per
// multihash. This is because we use badger as a storage backend, so updates are expensive, and we
// don't care which shard is used to serve a cid.
func newSimpleInvertedIndex(dts ds.Batching) *simpleInvertedIndex {
return &simpleInvertedIndex{
ds: namespace.Wrap(dts, ds.NewKey("/inverted/index")),
func newSimpleInvertedIndex(storePath string) (*simpleInvertedIndex, error) {
opts := dsbadger.DefaultOptions // this should be copied
// turn off value log GC
opts.GcInterval = 0
// 20 compactors show to have no hangups on put operation up to 40k blocks with eds size 128.
opts.NumCompactors = 20
// use minimum amount of NumLevelZeroTables to trigger L0 compaction faster
opts.NumLevelZeroTables = 1

ds, err := dsbadger.NewDatastore(storePath+invertedIndexPath, &opts)
if err != nil {
return nil, fmt.Errorf("can't open Badger Datastore: %w", err)
}

return &simpleInvertedIndex{ds: ds}, nil
}

func (s *simpleInvertedIndex) AddMultihashesForShard(
Expand Down Expand Up @@ -75,3 +89,7 @@ func (s *simpleInvertedIndex) GetShardsForMultihash(ctx context.Context, mh mult

return []shard.Key{shardKey}, nil
}

func (s *simpleInvertedIndex) close() error {
return s.ds.Close()
}
9 changes: 4 additions & 5 deletions share/eds/inverted_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"testing"

"github.com/filecoin-project/dagstore/shard"
"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -37,11 +35,12 @@ func TestMultihashesForShard(t *testing.T) {
}

mi := &mockIterator{mhs: mhs}
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
invertedIndex := newSimpleInvertedIndex(ds)
path := t.TempDir()
invertedIndex, err := newSimpleInvertedIndex(path)
require.NoError(t, err)

// 1. Add all 3 multihashes to shard1
err := invertedIndex.AddMultihashesForShard(ctx, mi, shard.KeyFromString("shard1"))
err = invertedIndex.AddMultihashesForShard(ctx, mi, shard.KeyFromString("shard1"))
require.NoError(t, err)
shardKeys, err := invertedIndex.GetShardsForMultihash(ctx, mhs[0])
require.NoError(t, err)
Expand Down
28 changes: 17 additions & 11 deletions share/eds/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ type Store struct {
cache *blockstoreCache
bs bstore.Blockstore

topIdx index.Inverted
carIdx index.FullIndexRepo
carIdx index.FullIndexRepo
invertedIdx *simpleInvertedIndex

basepath string
gcInterval time.Duration
Expand All @@ -83,14 +83,17 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) {
return nil, fmt.Errorf("failed to create index repository: %w", err)
}

invertedRepo := newSimpleInvertedIndex(ds)
invertedIdx, err := newSimpleInvertedIndex(basepath)
if err != nil {
return nil, fmt.Errorf("failed to create index: %w", err)
}
dagStore, err := dagstore.NewDAGStore(
dagstore.Config{
TransientsDir: basepath + transientsPath,
IndexRepo: fsRepo,
Datastore: ds,
MountRegistry: r,
TopLevelIndex: invertedRepo,
TopLevelIndex: invertedIdx,
},
)
if err != nil {
Expand All @@ -103,13 +106,13 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) {
}

store := &Store{
basepath: basepath,
dgstr: dagStore,
topIdx: invertedRepo,
carIdx: fsRepo,
gcInterval: defaultGCInterval,
mounts: r,
cache: cache,
basepath: basepath,
dgstr: dagStore,
carIdx: fsRepo,
invertedIdx: invertedIdx,
gcInterval: defaultGCInterval,
mounts: r,
cache: cache,
}
store.bs = newBlockstore(store, cache)
return store, nil
Expand Down Expand Up @@ -137,6 +140,9 @@ func (s *Store) Start(ctx context.Context) error {
// Stop stops the underlying DAGStore.
func (s *Store) Stop(context.Context) error {
defer s.cancel()
if err := s.invertedIdx.close(); err != nil {
return err
}
return s.dgstr.Close()
}

Expand Down

0 comments on commit d26d047

Please sign in to comment.