Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully recover from nested value-key marshalling #97

Merged
merged 1 commit into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion store/pebble/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ func (s *store) Get(mh multihash.Multihash) ([]indexer.Value, bool, error) {
if err != nil {
return nil, false, err
}
var values []indexer.Value

// Optimistically set the capacity of values slice to the number of value-keys
// in order to reduce the append footprint in the loop below.
values := make([]indexer.Value, 0, len(vks))
for _, vk := range vks {
vs, vCloser, err := s.db.Get(vk)
if err == pebble.ErrNotFound {
Expand Down
114 changes: 79 additions & 35 deletions store/pebble/vk_merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ var (
)

type valueKeysValueMerger struct {
merge [][]byte
delete [][]byte
merges [][]byte
deletes map[string]struct{}
reverse bool
codec indexer.BinaryValueCodec
}
Expand All @@ -45,28 +45,36 @@ func (v *valueKeysValueMerger) MergeNewer(value []byte) error {
prefix, dvk := key(value).stripMergeDelete()
switch prefix {
case mergeDeleteKeyPrefix:
v.delete = append(v.delete, dvk)
v.addToDeletes(dvk)
case valueKeyPrefix:
if !v.exists(value) {
v.merge = append(v.merge, value)
}
v.addToMerges(value)
default:
// The given value is marshalled value-keys; decode it and populate merge values.
vks, err := v.codec.UnmarshalValueKeys(value)
if err != nil {
return err
}
return v.mergeMarshalled(value)
}
return nil
}

// Grow v.merge capacity if it is less than the upper bound for new length.
// This is to reduce footprint of append called in a loop.
maxLen := len(v.merge) + len(vks)
if cap(v.merge) < maxLen {
v.merge = append(make([][]byte, 0, maxLen), v.merge...)
}
for _, vk := range vks {
if !v.exists(vk) {
v.merge = append(v.merge, vk)
}
// mergeMarshalled extracts value-keys by unmarshalling the given value and adds them to merges.
// This function recursively unmarshalls value-keys to gracefully correct previous behaviour
// of this merger where in certain scenarios already marshalled value-keys may have been
// re-marshalled. This assures that any such records are opportunistically unmarshalled and
// de-duplicated whenever they are read or changed.
//
// See: https://github.com/filecoin-project/go-indexer-core/issues/94
func (v *valueKeysValueMerger) mergeMarshalled(value []byte) error {
// The given value is marshalled value-keys; decode it and populate merge values.
vks, err := v.codec.UnmarshalValueKeys(value)
if err != nil {
return err
}
// Attempt to grow the capacity of v.merge to reduce append footprint loop below.
v.merges = maybeGrow(v.merges, len(vks))
for _, vk := range vks {
// Recursively merge the value key to accommodate previous behaviour of the value-key
// merger, where the value-keys may have been marshalled multiple times.
// Recursion here will gracefully and opportunistically correct any such cases.
if err := v.MergeNewer(vk); err != nil {
return err
}
}
return nil
Expand All @@ -78,18 +86,16 @@ func (v *valueKeysValueMerger) MergeOlder(value []byte) error {
}

func (v *valueKeysValueMerger) Finish(_ bool) ([]byte, io.Closer, error) {
for _, dvk := range v.delete {
v.deleteValueKey(dvk)
}
if len(v.merge) == 0 {
v.prune()
if len(v.merges) == 0 {
return nil, nil, nil
}
if v.reverse {
for one, other := 0, len(v.merge)-1; one < other; one, other = one+1, other-1 {
v.merge[one], v.merge[other] = v.merge[other], v.merge[one]
for one, other := 0, len(v.merges)-1; one < other; one, other = one+1, other-1 {
v.merges[one], v.merges[other] = v.merges[other], v.merges[one]
}
}
b, err := v.codec.MarshalValueKeys(v.merge)
b, err := v.codec.MarshalValueKeys(v.merges)
return b, nil, err
}

Expand All @@ -98,21 +104,59 @@ func (v *valueKeysValueMerger) DeletableFinish(includesBase bool) ([]byte, bool,
return b, len(b) == 0, c, err
}

func (v *valueKeysValueMerger) deleteValueKey(value []byte) {
b := v.merge[:0]
for _, x := range v.merge {
if !bytes.Equal(x, value) {
b = append(b, x)
// prune removes value-keys that are present in deletes from merges
func (v *valueKeysValueMerger) prune() {
pruned := v.merges[:0]
for _, x := range v.merges {
if _, ok := v.deletes[string(x)]; !ok {
pruned = append(pruned, x)
}
}
v.merge = b
v.merges = pruned
}

// addToMerges checks whether the given value exists and if not adds it to the list of merges.
func (v *valueKeysValueMerger) addToMerges(value []byte) {
if !v.exists(value) {
dst := make([]byte, len(value))
copy(dst, value)
v.merges = append(v.merges, dst)
}
}

// exists checks whether the given value is already present, either pending merge or deletion.
func (v *valueKeysValueMerger) exists(value []byte) bool {
for _, x := range v.merge {
if _, pendingDelete := v.deletes[string(value)]; pendingDelete {
return true
}
for _, x := range v.merges {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How many elements do you expect to be in v.merges? Would it make sense to have an additional hashtable to avoid full scan?

Copy link
Member Author

@masih masih Oct 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for not using a map to store merges is that we need to maintain the insertion order. Iteration order over map in Golang is non-deterministic I'm afraid.

if bytes.Equal(x, value) {
return true
}
}
return false
}

// addToMerges checks whether the given value exists and if not adds it to the list of deletes.
func (v *valueKeysValueMerger) addToDeletes(value []byte) {
if v.deletes == nil {
// Lazily instantiate the deletes map since deletions are far less common than merges.
v.deletes = make(map[string]struct{})
}
v.deletes[string(value)] = struct{}{}
}

// maybeGrow grows the capacity of the given slice if necessary, such that it can fit n more
// elements and returns the resulting slice.
func maybeGrow(s [][]byte, n int) [][]byte {
const growthFactor = 2
l := len(s)
switch {
case n <= cap(s)-l:
return s
case l == 0:
return make([][]byte, 0, n*growthFactor)
default:
return append(make([][]byte, 0, (l+n)*growthFactor), s...)
}
}
146 changes: 137 additions & 9 deletions store/pebble/vk_merger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,33 @@ import (
"github.com/multiformats/go-multihash"
)

var (
value1 = indexer.Value{ProviderID: "fish", ContextID: []byte("1")}
value2 = indexer.Value{ProviderID: "in", ContextID: []byte("2")}
value3 = indexer.Value{ProviderID: "dasea", ContextID: []byte("3")}
)

func TestValueKeysMerger_IsAssociative(t *testing.T) {
key := []byte("fish")
a := []byte("A")
b := []byte("B")
c := []byte("C")
bk := newBlake3Keyer(10)
k, err := bk.multihashKey(multihash.Multihash("fish"))
if err != nil {
t.Fatal()
}
a, err := bk.valueKey(value1, false)
if err != nil {
t.Fatal()
}
b, err := bk.valueKey(value2, false)
if err != nil {
t.Fatal()
}
c, err := bk.valueKey(value3, false)
if err != nil {
t.Fatal()
}

subject := newValueKeysMerger()
oneMerge, err := subject.Merge(key, a)
oneMerge, err := subject.Merge(k, a)
if err != nil {
t.Fatal(err)
}
Expand All @@ -32,7 +51,7 @@ func TestValueKeysMerger_IsAssociative(t *testing.T) {
t.Fatal(err)
}

anotherMerge, err := subject.Merge(key, c)
anotherMerge, err := subject.Merge(k, c)
if err != nil {
t.Fatal(err)
}
Expand All @@ -54,9 +73,6 @@ func TestValueKeysMerger_IsAssociative(t *testing.T) {
func TestValueKeysValueMerger_DeleteKeyRemovesValueKeys(t *testing.T) {
mh := multihash.Multihash("lobster")
bk := newBlake3Keyer(10)
value1 := indexer.Value{ProviderID: "fish", ContextID: []byte("1")}
value2 := indexer.Value{ProviderID: "in", ContextID: []byte("2")}
value3 := indexer.Value{ProviderID: "dasea", ContextID: []byte("3")}

vk1, err := bk.valueKey(value1, false)
if err != nil {
Expand Down Expand Up @@ -109,3 +125,115 @@ func TestValueKeysValueMerger_DeleteKeyRemovesValueKeys(t *testing.T) {
t.Fatalf("expected %v but got %v", wantVKs, gotVKs)
}
}

func TestValueKeysValueMerger_RepeatedlyMarshalledValueKeys(t *testing.T) {

bk := newBlake3Keyer(10)
mh := multihash.Multihash("lobster")
k, err := bk.multihashKey(mh)
if err != nil {
t.Fatal()
}

vk1, err := bk.valueKey(value1, false)
if err != nil {
t.Fatal(err)
}
vk2, err := bk.valueKey(value2, false)
if err != nil {
t.Fatal(err)
}
vk3, err := bk.valueKey(value3, false)
if err != nil {
t.Fatal(err)
}

// Repeatedly marshall the marshalled value
want, err := indexer.BinaryValueCodec{}.MarshalValueKeys([][]byte{vk1, vk2, vk3})
if err != nil {
t.Fatal(err)
}
mvk2, err := indexer.BinaryValueCodec{}.MarshalValueKeys([][]byte{want})
if err != nil {
t.Fatal(err)
}
mvk3, err := indexer.BinaryValueCodec{}.MarshalValueKeys([][]byte{mvk2})
if err != nil {
t.Fatal(err)
}
mvk4, err := indexer.BinaryValueCodec{}.MarshalValueKeys([][]byte{mvk3})
if err != nil {
t.Fatal(err)
}

t.Run("four nested initial", func(t *testing.T) {
subject := newValueKeysMerger()
m, err := subject.Merge(k, mvk4)
if err != nil {
t.Fatal(err)
}
got, _, err := m.Finish(false)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(want, got) {
t.Fatal()
}
})
t.Run("mix nested newer", func(t *testing.T) {
subject := newValueKeysMerger()
m, err := subject.Merge(k, vk1)
if err != nil {
t.Fatal(err)
}
if err := m.MergeNewer(vk2); err != nil {
t.Fatal(err)
}
if err := m.MergeNewer(mvk3); err != nil {
t.Fatal(err)
}
got, _, err := m.Finish(false)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(want, got) {
t.Fatal()
}
})

reverse, err := indexer.BinaryValueCodec{}.MarshalValueKeys([][]byte{vk3, vk2, vk1})
if err != nil {
t.Fatal(err)
}
rmvk2, err := indexer.BinaryValueCodec{}.MarshalValueKeys([][]byte{reverse})
if err != nil {
t.Fatal(err)
}

t.Run("mix nested older", func(t *testing.T) {
subject := newValueKeysMerger()
m, err := subject.Merge(k, vk3)
if err != nil {
t.Fatal(err)
}
if err := m.MergeOlder(rmvk2); err != nil {
t.Fatal(err)
}
if err := m.MergeOlder(mvk3); err != nil {
t.Fatal(err)
}
if err := m.MergeOlder(vk1); err != nil {
t.Fatal(err)
}
if err := m.MergeOlder(vk2); err != nil {
t.Fatal(err)
}
got, _, err := m.Finish(false)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(want, got) {
t.Fatal()
}
})
}