diff --git a/store/pebble/pebble.go b/store/pebble/pebble.go index bd7f0b0..e680845 100644 --- a/store/pebble/pebble.go +++ b/store/pebble/pebble.go @@ -85,7 +85,10 @@ func (s *store) Get(mh multihash.Multihash) ([]indexer.Value, bool, error) { if err != nil { return nil, false, err } - var values []indexer.Value + + // Optimistically set the capacity of values slice to the number of value-keys + // in order to reduce the append footprint in the loop below. + values := make([]indexer.Value, 0, len(vks)) for _, vk := range vks { vs, vCloser, err := s.db.Get(vk) if err == pebble.ErrNotFound { diff --git a/store/pebble/vk_merger.go b/store/pebble/vk_merger.go index 3a51bb2..315a810 100644 --- a/store/pebble/vk_merger.go +++ b/store/pebble/vk_merger.go @@ -18,8 +18,8 @@ var ( ) type valueKeysValueMerger struct { - merge [][]byte - delete [][]byte + merges [][]byte + deletes map[string]struct{} reverse bool codec indexer.BinaryValueCodec } @@ -45,28 +45,36 @@ func (v *valueKeysValueMerger) MergeNewer(value []byte) error { prefix, dvk := key(value).stripMergeDelete() switch prefix { case mergeDeleteKeyPrefix: - v.delete = append(v.delete, dvk) + v.addToDeletes(dvk) case valueKeyPrefix: - if !v.exists(value) { - v.merge = append(v.merge, value) - } + v.addToMerges(value) default: - // The given value is marshalled value-keys; decode it and populate merge values. - vks, err := v.codec.UnmarshalValueKeys(value) - if err != nil { - return err - } + return v.mergeMarshalled(value) + } + return nil +} - // Grow v.merge capacity if it is less than the upper bound for new length. - // This is to reduce footprint of append called in a loop. - maxLen := len(v.merge) + len(vks) - if cap(v.merge) < maxLen { - v.merge = append(make([][]byte, 0, maxLen), v.merge...) - } - for _, vk := range vks { - if !v.exists(vk) { - v.merge = append(v.merge, vk) - } +// mergeMarshalled extracts value-keys by unmarshalling the given value and adds them to merges. +// This function recursively unmarshalls value-keys to gracefully correct previous behaviour +// of this merger where in certain scenarios already marshalled value-keys may have been +// re-marshalled. This assures that any such records are opportunistically unmarshalled and +// de-duplicated whenever they are read or changed. +// +// See: https://github.com/filecoin-project/go-indexer-core/issues/94 +func (v *valueKeysValueMerger) mergeMarshalled(value []byte) error { + // The given value is marshalled value-keys; decode it and populate merge values. + vks, err := v.codec.UnmarshalValueKeys(value) + if err != nil { + return err + } + // Attempt to grow the capacity of v.merge to reduce append footprint loop below. + v.merges = maybeGrow(v.merges, len(vks)) + for _, vk := range vks { + // Recursively merge the value key to accommodate previous behaviour of the value-key + // merger, where the value-keys may have been marshalled multiple times. + // Recursion here will gracefully and opportunistically correct any such cases. + if err := v.MergeNewer(vk); err != nil { + return err } } return nil @@ -78,18 +86,16 @@ func (v *valueKeysValueMerger) MergeOlder(value []byte) error { } func (v *valueKeysValueMerger) Finish(_ bool) ([]byte, io.Closer, error) { - for _, dvk := range v.delete { - v.deleteValueKey(dvk) - } - if len(v.merge) == 0 { + v.prune() + if len(v.merges) == 0 { return nil, nil, nil } if v.reverse { - for one, other := 0, len(v.merge)-1; one < other; one, other = one+1, other-1 { - v.merge[one], v.merge[other] = v.merge[other], v.merge[one] + for one, other := 0, len(v.merges)-1; one < other; one, other = one+1, other-1 { + v.merges[one], v.merges[other] = v.merges[other], v.merges[one] } } - b, err := v.codec.MarshalValueKeys(v.merge) + b, err := v.codec.MarshalValueKeys(v.merges) return b, nil, err } @@ -98,21 +104,59 @@ func (v *valueKeysValueMerger) DeletableFinish(includesBase bool) ([]byte, bool, return b, len(b) == 0, c, err } -func (v *valueKeysValueMerger) deleteValueKey(value []byte) { - b := v.merge[:0] - for _, x := range v.merge { - if !bytes.Equal(x, value) { - b = append(b, x) +// prune removes value-keys that are present in deletes from merges +func (v *valueKeysValueMerger) prune() { + pruned := v.merges[:0] + for _, x := range v.merges { + if _, ok := v.deletes[string(x)]; !ok { + pruned = append(pruned, x) } } - v.merge = b + v.merges = pruned } +// addToMerges checks whether the given value exists and if not adds it to the list of merges. +func (v *valueKeysValueMerger) addToMerges(value []byte) { + if !v.exists(value) { + dst := make([]byte, len(value)) + copy(dst, value) + v.merges = append(v.merges, dst) + } +} + +// exists checks whether the given value is already present, either pending merge or deletion. func (v *valueKeysValueMerger) exists(value []byte) bool { - for _, x := range v.merge { + if _, pendingDelete := v.deletes[string(value)]; pendingDelete { + return true + } + for _, x := range v.merges { if bytes.Equal(x, value) { return true } } return false } + +// addToMerges checks whether the given value exists and if not adds it to the list of deletes. +func (v *valueKeysValueMerger) addToDeletes(value []byte) { + if v.deletes == nil { + // Lazily instantiate the deletes map since deletions are far less common than merges. + v.deletes = make(map[string]struct{}) + } + v.deletes[string(value)] = struct{}{} +} + +// maybeGrow grows the capacity of the given slice if necessary, such that it can fit n more +// elements and returns the resulting slice. +func maybeGrow(s [][]byte, n int) [][]byte { + const growthFactor = 2 + l := len(s) + switch { + case n <= cap(s)-l: + return s + case l == 0: + return make([][]byte, 0, n*growthFactor) + default: + return append(make([][]byte, 0, (l+n)*growthFactor), s...) + } +} diff --git a/store/pebble/vk_merger_test.go b/store/pebble/vk_merger_test.go index 7880278..9f932bc 100644 --- a/store/pebble/vk_merger_test.go +++ b/store/pebble/vk_merger_test.go @@ -10,14 +10,33 @@ import ( "github.com/multiformats/go-multihash" ) +var ( + value1 = indexer.Value{ProviderID: "fish", ContextID: []byte("1")} + value2 = indexer.Value{ProviderID: "in", ContextID: []byte("2")} + value3 = indexer.Value{ProviderID: "dasea", ContextID: []byte("3")} +) + func TestValueKeysMerger_IsAssociative(t *testing.T) { - key := []byte("fish") - a := []byte("A") - b := []byte("B") - c := []byte("C") + bk := newBlake3Keyer(10) + k, err := bk.multihashKey(multihash.Multihash("fish")) + if err != nil { + t.Fatal() + } + a, err := bk.valueKey(value1, false) + if err != nil { + t.Fatal() + } + b, err := bk.valueKey(value2, false) + if err != nil { + t.Fatal() + } + c, err := bk.valueKey(value3, false) + if err != nil { + t.Fatal() + } subject := newValueKeysMerger() - oneMerge, err := subject.Merge(key, a) + oneMerge, err := subject.Merge(k, a) if err != nil { t.Fatal(err) } @@ -32,7 +51,7 @@ func TestValueKeysMerger_IsAssociative(t *testing.T) { t.Fatal(err) } - anotherMerge, err := subject.Merge(key, c) + anotherMerge, err := subject.Merge(k, c) if err != nil { t.Fatal(err) } @@ -54,9 +73,6 @@ func TestValueKeysMerger_IsAssociative(t *testing.T) { func TestValueKeysValueMerger_DeleteKeyRemovesValueKeys(t *testing.T) { mh := multihash.Multihash("lobster") bk := newBlake3Keyer(10) - value1 := indexer.Value{ProviderID: "fish", ContextID: []byte("1")} - value2 := indexer.Value{ProviderID: "in", ContextID: []byte("2")} - value3 := indexer.Value{ProviderID: "dasea", ContextID: []byte("3")} vk1, err := bk.valueKey(value1, false) if err != nil { @@ -109,3 +125,115 @@ func TestValueKeysValueMerger_DeleteKeyRemovesValueKeys(t *testing.T) { t.Fatalf("expected %v but got %v", wantVKs, gotVKs) } } + +func TestValueKeysValueMerger_RepeatedlyMarshalledValueKeys(t *testing.T) { + + bk := newBlake3Keyer(10) + mh := multihash.Multihash("lobster") + k, err := bk.multihashKey(mh) + if err != nil { + t.Fatal() + } + + vk1, err := bk.valueKey(value1, false) + if err != nil { + t.Fatal(err) + } + vk2, err := bk.valueKey(value2, false) + if err != nil { + t.Fatal(err) + } + vk3, err := bk.valueKey(value3, false) + if err != nil { + t.Fatal(err) + } + + // Repeatedly marshall the marshalled value + want, err := indexer.BinaryValueCodec{}.MarshalValueKeys([][]byte{vk1, vk2, vk3}) + if err != nil { + t.Fatal(err) + } + mvk2, err := indexer.BinaryValueCodec{}.MarshalValueKeys([][]byte{want}) + if err != nil { + t.Fatal(err) + } + mvk3, err := indexer.BinaryValueCodec{}.MarshalValueKeys([][]byte{mvk2}) + if err != nil { + t.Fatal(err) + } + mvk4, err := indexer.BinaryValueCodec{}.MarshalValueKeys([][]byte{mvk3}) + if err != nil { + t.Fatal(err) + } + + t.Run("four nested initial", func(t *testing.T) { + subject := newValueKeysMerger() + m, err := subject.Merge(k, mvk4) + if err != nil { + t.Fatal(err) + } + got, _, err := m.Finish(false) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(want, got) { + t.Fatal() + } + }) + t.Run("mix nested newer", func(t *testing.T) { + subject := newValueKeysMerger() + m, err := subject.Merge(k, vk1) + if err != nil { + t.Fatal(err) + } + if err := m.MergeNewer(vk2); err != nil { + t.Fatal(err) + } + if err := m.MergeNewer(mvk3); err != nil { + t.Fatal(err) + } + got, _, err := m.Finish(false) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(want, got) { + t.Fatal() + } + }) + + reverse, err := indexer.BinaryValueCodec{}.MarshalValueKeys([][]byte{vk3, vk2, vk1}) + if err != nil { + t.Fatal(err) + } + rmvk2, err := indexer.BinaryValueCodec{}.MarshalValueKeys([][]byte{reverse}) + if err != nil { + t.Fatal(err) + } + + t.Run("mix nested older", func(t *testing.T) { + subject := newValueKeysMerger() + m, err := subject.Merge(k, vk3) + if err != nil { + t.Fatal(err) + } + if err := m.MergeOlder(rmvk2); err != nil { + t.Fatal(err) + } + if err := m.MergeOlder(mvk3); err != nil { + t.Fatal(err) + } + if err := m.MergeOlder(vk1); err != nil { + t.Fatal(err) + } + if err := m.MergeOlder(vk2); err != nil { + t.Fatal(err) + } + got, _, err := m.Finish(false) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(want, got) { + t.Fatal() + } + }) +}