Skip to content

Commit

Permalink
Rollover to new TSM file when max blocks exceeded
Browse files Browse the repository at this point in the history
Fixes #6406
  • Loading branch information
jwilder committed May 17, 2016
1 parent a574e2e commit aa699c5
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 4 deletions.
13 changes: 11 additions & 2 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([

// We've hit the max file limit and there is more to write. Create a new file
// and continue.
if err == errMaxFileExceeded {
if err == errMaxFileExceeded || err == ErrMaxBlocksExceeded {
files = append(files, fileName)
continue
} else if err == ErrNoValues {
Expand Down Expand Up @@ -573,7 +573,16 @@ func (c *Compactor) write(path string, iter KeyIterator) (err error) {
}

// Write the key and value
if err := w.WriteBlock(key, minTime, maxTime, block); err != nil {
err = w.WriteBlock(key, minTime, maxTime, block)
if err == ErrMaxBlocksExceeded {
if err := w.WriteIndex(); err != nil {
return err
}

return ErrMaxBlocksExceeded
}

if err != nil {
return err
}

Expand Down
89 changes: 87 additions & 2 deletions tsdb/engine/tsm1/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,85 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) {
}
}

// Ensures that a compaction will properly rollover to a new file when the
// max keys per blocks is exceeded
func TestCompactor_CompactFull_MaxKeys(t *testing.T) {
if testing.Short() {
t.Skip("Skipping max keys compaction test")
}
dir := MustTempDir()
defer os.RemoveAll(dir)

// write two files where the first contains a single key with the maximum
// number of full blocks that can fit in a TSM file
f1, f1Name := MustTSMWriter(dir, 1)
values := make([]tsm1.Value, 1000)
for i := 0; i < 65535; i++ {
values = values[:0]
for j := 0; j < 1000; j++ {
values = append(values, tsm1.NewValue(int64(i*1000+j), float64(j)))
}
if err := f1.Write("cpu,host=A#!~#value", values); err != nil {
t.Fatalf("write tsm f1: %v", err)
}
}
if err := f1.WriteIndex(); err != nil {
t.Fatalf("write index f1: %v", err)
}
f1.Close()

// Write a new file with 1 block that when compacted would exceed the max
// blocks
lastTimeStamp := values[len(values)-1].UnixNano()
values = values[:0]
f2, f2Name := MustTSMWriter(dir, 2)
for j := lastTimeStamp; j < lastTimeStamp+1000; j++ {
values = append(values, tsm1.NewValue(int64(j), float64(j)))
}
if err := f2.Write("cpu,host=A#!~#value", values); err != nil {
t.Fatalf("write tsm f1: %v", err)
}

if err := f2.WriteIndex(); err != nil {
t.Fatalf("write index f2: %v", err)
}
f2.Close()

compactor := &tsm1.Compactor{
Dir: dir,
FileStore: &fakeFileStore{},
}

// Compact both files, should get 2 files back
files, err := compactor.CompactFull([]string{f1Name, f2Name})
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}

if got, exp := len(files), 2; got != exp {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}

expGen, expSeq, err := tsm1.ParseTSMFileName(f2Name)
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
expSeq = expSeq + 1

gotGen, gotSeq, err := tsm1.ParseTSMFileName(files[0])
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}

if gotGen != expGen {
t.Fatalf("wrong generation for new file: got %v, exp %v", gotGen, expGen)
}

if gotSeq != expSeq {
t.Fatalf("wrong sequence for new file: got %v, exp %v", gotSeq, expSeq)
}
}

// Tests that a single TSM file can be read and iterated over
func TestTSMKeyIterator_Single(t *testing.T) {
dir := MustTempDir()
Expand Down Expand Up @@ -1399,7 +1478,7 @@ func MustWALSegment(dir string, entries []tsm1.WALEntry) *tsm1.WALSegmentReader
return tsm1.NewWALSegmentReader(f)
}

func MustWriteTSM(dir string, gen int, values map[string][]tsm1.Value) string {
func MustTSMWriter(dir string, gen int) (tsm1.TSMWriter, string) {
f := MustTempFile(dir)
oldName := f.Name()

Expand All @@ -1425,6 +1504,12 @@ func MustWriteTSM(dir string, gen int, values map[string][]tsm1.Value) string {
panic(fmt.Sprintf("create TSM writer: %v", err))
}

return w, newName
}

func MustWriteTSM(dir string, gen int, values map[string][]tsm1.Value) string {
w, name := MustTSMWriter(dir, gen)

for k, v := range values {
if err := w.Write(k, v); err != nil {
panic(fmt.Sprintf("write TSM value: %v", err))
Expand All @@ -1439,7 +1524,7 @@ func MustWriteTSM(dir string, gen int, values map[string][]tsm1.Value) string {
panic(fmt.Sprintf("write TSM close: %v", err))
}

return newName
return name
}

func MustTSMReader(dir string, gen int, values map[string][]tsm1.Value) *tsm1.TSMReader {
Expand Down
8 changes: 8 additions & 0 deletions tsdb/engine/tsm1/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ var (
ErrNoValues = fmt.Errorf("no values written")
ErrTSMClosed = fmt.Errorf("tsm file closed")
ErrMaxKeyLengthExceeded = fmt.Errorf("max key length exceeded")
ErrMaxBlocksExceeded = fmt.Errorf("max blocks exceeded")
)

// TSMWriter writes TSM formatted key and values.
Expand Down Expand Up @@ -471,6 +472,9 @@ func (t *tsmWriter) Write(key string, values Values) error {
return nil
}

// WriteBlock writes block for the given key and time range to the TSM file. If the write
// exceeds max entries for a given key, ErrMaxBlocksExceeded is returned. This indicates
// that the index is now full for this key and no future writes to this key will succeed.
func (t *tsmWriter) WriteBlock(key string, minTime, maxTime int64, block []byte) error {
// Nothing to write
if len(block) == 0 {
Expand Down Expand Up @@ -509,6 +513,10 @@ func (t *tsmWriter) WriteBlock(key string, minTime, maxTime int64, block []byte)
// Increment file position pointer (checksum + block len)
t.n += int64(n)

if len(t.index.Entries(key)) >= maxIndexEntries {
return ErrMaxBlocksExceeded
}

return nil
}

Expand Down

0 comments on commit aa699c5

Please sign in to comment.