Skip to content

Commit

Permalink
internal/keyspan: add Value to Span
Browse files Browse the repository at this point in the history
Add a Value to the `Span` type, copying it to both fragments at a
fragmentation point. This will be used in the implementation of range
keys, which will be fragmented but carry a value.
  • Loading branch information
jbowens committed Nov 23, 2021
1 parent 3036ce0 commit 40f44c8
Show file tree
Hide file tree
Showing 16 changed files with 221 additions and 98 deletions.
4 changes: 2 additions & 2 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ func (b *Batch) newRangeDelIter(o *IterOptions) internalIterator {
// tombstones will remain valid, pointing into the old Batch.data. GC for
// the win.
for key, val := it.First(); key != nil; key, val = it.Next() {
frag.Add(*key, val)
frag.Add(keyspan.Span{Start: *key, End: val})
}
frag.Finish()
}
Expand Down Expand Up @@ -1153,7 +1153,7 @@ func newFlushableBatch(batch *Batch, comparer *Comparer) *flushableBatch {
index: -1,
}
for key, val := it.First(); key != nil; key, val = it.Next() {
frag.Add(*key, val)
frag.Add(keyspan.Span{Start: *key, End: val})
}
frag.Finish()
}
Expand Down
2 changes: 1 addition & 1 deletion compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2436,7 +2436,7 @@ func (d *DB) runCompaction(
// written later during `finishOutput()`. We add them to the
// `Fragmenter` now to make them visible to `compactionIter` so covered
// keys in the same snapshot stripe can be elided.
c.rangeDelFrag.Add(iter.cloneKey(*key), val)
c.rangeDelFrag.Add(keyspan.Span{Start: iter.cloneKey(*key), End: val})
continue
}
if tw == nil {
Expand Down
5 changes: 4 additions & 1 deletion compaction_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ func TestCompactionIter(t *testing.T) {
if iter.Valid() {
fmt.Fprintf(&b, "%s:%s\n", iter.Key(), iter.Value())
if iter.Key().Kind() == InternalKeyKindRangeDelete {
iter.rangeDelFrag.Add(iter.cloneKey(iter.Key()), iter.Value())
iter.rangeDelFrag.Add(keyspan.Span{
Start: iter.cloneKey(iter.Key()),
End: iter.Value(),
})
}
} else if err := iter.Error(); err != nil {
fmt.Fprintf(&b, "err=%v\n", err)
Expand Down
81 changes: 41 additions & 40 deletions internal/keyspan/fragmenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,23 +172,22 @@ func (f *Fragmenter) checkInvariants(buf []Span) {
//
// This process continues until there are no more fragments to flush.
//
// WARNING: the slices backing start.UserKey and end are retained after
// this method returns and should not be modified. This is safe for
// spans that are added from a memtable or batch. It is not safe for a
// range deletion span added from an sstable where the range-del block
// has been prefix compressed.
func (f *Fragmenter) Add(start base.InternalKey, end []byte) {
// WARNING: the slices backing start.UserKey and end are retained after this
// method returns and should not be modified. This is safe for spans that are
// added from a memtable or batch. It is not safe for a range deletion span
// added from an sstable where the range-del block has been prefix compressed.
func (f *Fragmenter) Add(s Span) {
if f.finished {
panic("pebble: span fragmenter already finished")
}
if f.flushedKey != nil {
switch c := f.Cmp(start.UserKey, f.flushedKey); {
switch c := f.Cmp(s.Start.UserKey, f.flushedKey); {
case c < 0:
panic(fmt.Sprintf("pebble: start key (%s) < flushed key (%s)",
f.Format(start.UserKey), f.Format(f.flushedKey)))
f.Format(s.Start.UserKey), f.Format(f.flushedKey)))
}
}
if f.Cmp(start.UserKey, end) >= 0 {
if f.Cmp(s.Start.UserKey, s.End) >= 0 {
// An empty span, we can ignore it.
return
}
Expand All @@ -200,29 +199,23 @@ func (f *Fragmenter) Add(start base.InternalKey, end []byte) {
if len(f.pending) > 0 {
// Since all of the pending spans have the same start key, we only need
// to compare against the first one.
switch c := f.Cmp(f.pending[0].Start.UserKey, start.UserKey); {
switch c := f.Cmp(f.pending[0].Start.UserKey, s.Start.UserKey); {
case c > 0:
panic(fmt.Sprintf("pebble: keys must be added in order: %s > %s",
f.pending[0].Start.Pretty(f.Format), start.Pretty(f.Format)))
f.pending[0].Start.Pretty(f.Format), s.Start.Pretty(f.Format)))
case c == 0:
// The new span has the same start key as the existing pending
// spans. Add it to the pending buffer.
f.pending = append(f.pending, Span{
Start: start,
End: end,
})
f.pending = append(f.pending, s)
return
}

// At this point we know that the new start key is greater than the pending
// spans start keys.
f.truncateAndFlush(start.UserKey)
f.truncateAndFlush(s.Start.UserKey)
}

f.pending = append(f.pending, Span{
Start: start,
End: end,
})
f.pending = append(f.pending, s)
}

// Covers returns true if the specified key is covered by one of the pending
Expand All @@ -243,12 +236,12 @@ func (f *Fragmenter) Covers(key base.InternalKey, snapshot uint64) bool {
}

seqNum := key.SeqNum()
for _, t := range f.pending {
if f.Cmp(key.UserKey, t.End) < 0 {
for _, s := range f.pending {
if f.Cmp(key.UserKey, s.End) < 0 {
// NB: A range deletion tombstone does not delete a point operation
// at the same sequence number, and broadly a span is not considered
// to cover a point operation at the same sequence number.
if t.Start.Visible(snapshot) && t.Start.SeqNum() > seqNum {
if s.Start.Visible(snapshot) && s.Start.SeqNum() > seqNum {
return true
}
}
Expand All @@ -261,9 +254,10 @@ func (f *Fragmenter) Empty() bool {
return f.finished || len(f.pending) == 0
}

// FlushTo flushes all of the fragments before key. Used during compaction to
// force emitting of spans which straddle an sstable boundary. Note that the
// emitted spans are not truncated to the specified key. Consider the scenario:
// FlushTo flushes all of the fragments with a start key <= key. Used during
// compaction to force emitting of spans which straddle an sstable boundary.
// Note that the emitted spans are not truncated to the specified key. Consider
// the scenario:
//
// a---------k#10
// f#8
Expand Down Expand Up @@ -319,13 +313,14 @@ func (f *Fragmenter) FlushTo(key []byte) {
// would become empty.
pending := f.pending
f.pending = f.pending[:0]
for _, t := range pending {
if f.Cmp(key, t.End) < 0 {
// t: a--+--e
for _, s := range pending {
if f.Cmp(key, s.End) < 0 {
// s: a--+--e
// new: c------
f.pending = append(f.pending, Span{
Start: base.MakeInternalKey(key, t.Start.SeqNum(), t.Start.Kind()),
End: t.End,
Start: base.MakeInternalKey(key, s.Start.SeqNum(), s.Start.Kind()),
End: s.End,
Value: s.Value,
})
}
}
Expand Down Expand Up @@ -407,21 +402,26 @@ func (f *Fragmenter) truncateAndFlush(key []byte) {
// pending and f.pending share the same underlying storage. As we iterate
// over pending we append to f.pending, but only one entry is appended in
// each iteration, after we have read the entry being overwritten.
for _, t := range pending {
if f.Cmp(key, t.End) < 0 {
// t: a--+--e
for _, s := range pending {
if f.Cmp(key, s.End) < 0 {
// s: a--+--e
// new: c------
if f.Cmp(t.Start.UserKey, key) < 0 {
done = append(done, Span{Start: t.Start, End: key})
if f.Cmp(s.Start.UserKey, key) < 0 {
done = append(done, Span{
Start: s.Start,
End: key,
Value: s.Value,
})
}
f.pending = append(f.pending, Span{
Start: base.MakeInternalKey(key, t.Start.SeqNum(), t.Start.Kind()),
End: t.End,
Start: base.MakeInternalKey(key, s.Start.SeqNum(), s.Start.Kind()),
End: s.End,
Value: s.Value,
})
} else {
// t: a-----e
// s: a-----e
// new: e----
done = append(done, t)
done = append(done, s)
}
}

Expand Down Expand Up @@ -469,6 +469,7 @@ func (f *Fragmenter) flush(buf []Span, lastKey []byte) {
f.flushBuf = append(f.flushBuf, Span{
Start: buf[i].Start,
End: split,
Value: buf[i].Value,
})
}

Expand Down
109 changes: 70 additions & 39 deletions internal/keyspan/fragmenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,31 @@ import (
"github.com/stretchr/testify/require"
)

var tombstoneRe = regexp.MustCompile(`(\d+):\s*(\w+)-*(\w+)`)
var spanRe = regexp.MustCompile(`(\d+):\s*(\w+)-*(\w+)\w*([^\n]*)`)

func parseTombstone(t *testing.T, s string) Span {
m := tombstoneRe.FindStringSubmatch(s)
if len(m) != 4 {
t.Fatalf("expected 4 components, but found %d: %s", len(m), s)
func parseSpan(t *testing.T, s string, kind base.InternalKeyKind) Span {
m := spanRe.FindStringSubmatch(s)
if len(m) != 5 {
t.Fatalf("expected 5 components, but found %d: %s", len(m), s)
}
seqNum, err := strconv.Atoi(m[1])
require.NoError(t, err)
return Span{
Start: base.MakeInternalKey([]byte(m[2]), uint64(seqNum), base.InternalKeyKindRangeDelete),
Start: base.MakeInternalKey([]byte(m[2]), uint64(seqNum), kind),
End: []byte(m[3]),
Value: []byte(strings.TrimSpace(m[4])),
}
}

func buildTombstones(
t *testing.T, cmp base.Compare, formatKey base.FormatKey, s string,
func buildSpans(
t *testing.T, cmp base.Compare, formatKey base.FormatKey, s string, kind base.InternalKeyKind,
) []Span {
var tombstones []Span
var spans []Span
f := &Fragmenter{
Cmp: cmp,
Format: formatKey,
Emit: func(fragmented []Span) {
tombstones = append(tombstones, fragmented...)
spans = append(spans, fragmented...)
},
}
for _, line := range strings.Split(s, "\n") {
Expand All @@ -60,14 +61,13 @@ func buildTombstones(
continue
}

t := parseTombstone(t, line)
f.Add(t.Start, t.End)
f.Add(parseSpan(t, line, kind))
}
f.Finish()
return tombstones
return spans
}

func formatTombstones(tombstones []Span) string {
func formatSpans(spans []Span) string {
isLetter := func(b []byte) bool {
if len(b) != 1 {
return false
Expand All @@ -76,21 +76,25 @@ func formatTombstones(tombstones []Span) string {
}

var buf bytes.Buffer
for _, v := range tombstones {
if v.Empty() {
fmt.Fprintf(&buf, "<empty>\n")
continue
for _, v := range spans {
switch {
case v.Empty():
fmt.Fprintf(&buf, "<empty>")
case !isLetter(v.Start.UserKey) || !isLetter(v.End) || v.Start.UserKey[0] == v.End[0]:
fmt.Fprintf(&buf, "%d: %s-%s", v.Start.SeqNum(), v.Start.UserKey, v.End)
default:
fmt.Fprintf(&buf, "%d: %s%s%s%s",
v.Start.SeqNum(),
strings.Repeat(" ", int(v.Start.UserKey[0]-'a')),
v.Start.UserKey,
strings.Repeat("-", int(v.End[0]-v.Start.UserKey[0]-1)),
v.End)
}
if !isLetter(v.Start.UserKey) || !isLetter(v.End) || v.Start.UserKey[0] == v.End[0] {
fmt.Fprintf(&buf, "%d: %s-%s\n", v.Start.SeqNum(), v.Start.UserKey, v.End)
continue
if len(v.Value) > 0 {
buf.WriteString(strings.Repeat(" ", int('z'-v.End[0]+1)))
buf.WriteString(string(v.Value))
}
fmt.Fprintf(&buf, "%d: %s%s%s%s\n",
v.Start.SeqNum(),
strings.Repeat(" ", int(v.Start.UserKey[0]-'a')),
v.Start.UserKey,
strings.Repeat("-", int(v.End[0]-v.Start.UserKey[0]-1)),
v.End)
buf.WriteRune('\n')
}
return buf.String()
}
Expand All @@ -114,12 +118,12 @@ func TestFragmenter(t *testing.T) {
var iter base.InternalIterator

// Returns true if the specified <key,seq> pair is deleted at the specified
// read sequence number. Get ignores tombstones newer than the read sequence
// read sequence number. Get ignores spans newer than the read sequence
// number. This is a simple version of what full processing of range
// tombstones looks like.
deleted := func(key []byte, seq, readSeq uint64) bool {
tombstone := Get(cmp, iter, key, readSeq)
return tombstone.Covers(seq)
s := Get(cmp, iter, key, readSeq)
return s.Covers(seq)
}

datadriven.RunTest(t, "testdata/fragmenter", func(d *datadriven.TestData) string {
Expand All @@ -132,9 +136,9 @@ func TestFragmenter(t *testing.T) {
}
}()

tombstones := buildTombstones(t, cmp, fmtKey, d.Input)
iter = NewIter(cmp, tombstones)
return formatTombstones(tombstones)
spans := buildSpans(t, cmp, fmtKey, d.Input, base.InternalKeyKindRangeDelete)
iter = NewIter(cmp, spans)
return formatSpans(spans)
}()

case "get":
Expand Down Expand Up @@ -178,8 +182,8 @@ func TestFragmenterDeleted(t *testing.T) {
for _, line := range strings.Split(d.Input, "\n") {
switch {
case strings.HasPrefix(line, "add "):
t := parseTombstone(t, strings.TrimPrefix(line, "add "))
f.Add(t.Start, t.End)
t := parseSpan(t, strings.TrimPrefix(line, "add "), base.InternalKeyKindRangeDelete)
f.Add(t)
case strings.HasPrefix(line, "deleted "):
key := base.ParseInternalKey(strings.TrimPrefix(line, "deleted "))
func() {
Expand Down Expand Up @@ -214,8 +218,8 @@ func TestFragmenterFlushTo(t *testing.T) {
}
}()

tombstones := buildTombstones(t, cmp, fmtKey, d.Input)
return formatTombstones(tombstones)
spans := buildSpans(t, cmp, fmtKey, d.Input, base.InternalKeyKindRangeDelete)
return formatSpans(spans)
}()

default:
Expand All @@ -238,8 +242,35 @@ func TestFragmenterTruncateAndFlushTo(t *testing.T) {
}
}()

tombstones := buildTombstones(t, cmp, fmtKey, d.Input)
return formatTombstones(tombstones)
spans := buildSpans(t, cmp, fmtKey, d.Input, base.InternalKeyKindRangeDelete)
return formatSpans(spans)
}()

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
})
}

func TestFragmenter_Values(t *testing.T) {
cmp := base.DefaultComparer.Compare
fmtKey := base.DefaultComparer.FormatKey

datadriven.RunTest(t, "testdata/fragmenter_values", func(d *datadriven.TestData) string {
switch d.Cmd {
case "build":
return func() (result string) {
defer func() {
if r := recover(); r != nil {
result = fmt.Sprint(r)
}
}()

// TODO(jackson): Keys of kind InternalKeyKindRangeDelete don't
// have values. Update the call below when we have KindRangeSet,
// KindRangeUnset.
spans := buildSpans(t, cmp, fmtKey, d.Input, base.InternalKeyKindRangeDelete)
return formatSpans(spans)
}()

default:
Expand Down
Loading

0 comments on commit 40f44c8

Please sign in to comment.