Skip to content

Commit

Permalink
feat: check for uncommitted WRR segments during startup (#25560)
Browse files Browse the repository at this point in the history
Check for uncommitted WRR segments during startup and abort startup
if found.

Closes: #25559
(cherry picked from commit 037c6af)
  • Loading branch information
gwossum authored Nov 18, 2024
1 parent 941a41b commit 78e1d77
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 0 deletions.
88 changes: 88 additions & 0 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -32,6 +33,8 @@ import (
)

var (
// ErrIncompatibleWAL is returned if incompatible WAL files are detected.
ErrIncompatibleWAL = errors.New("incompatible WAL format")
// ErrShardNotFound is returned when trying to get a non existing shard.
ErrShardNotFound = fmt.Errorf("shard not found")
// ErrStoreClosed is returned when trying to use a closed Store.
Expand Down Expand Up @@ -308,6 +311,76 @@ func (s *Store) Open(ctx context.Context) error {
return nil
}

const (
wrrFileExtension = "wrr"
wrrPrefixVersioned = "_v"
wrrSnapshotExtension = "snapshot"
)

// generateWRRSegmentFileGlob generates a glob to find all .wrr and related files in a
// WAL directory.
func generateWRRSegmentFileGlob() string {
return fmt.Sprintf("%s*.%s*", wrrPrefixVersioned, wrrFileExtension)
}

// checkUncommittedWRR determines if there are any uncommitted WRR files found in shardWALPath.
// shardWALPath is the path to a single shard's WAL, not the overall WAL path.
// If no uncommitted WRR files are found, then nil is returned. Otherwise, an error indicating
// the names of uncommitted WRR files is returned. The error returned contains the full context
// and does not require additional information.
func checkUncommittedWRR(shardWALPath string) error {
// It is OK if there are .wrr files as long as they are committed. Committed .wrr files will
// have a .wrr.snapshot newer than the .wrr file. If there is no .wrr.snapshot file newer
// than a given .wrr file, then that .wrr file is uncommitted and we should return an error
// indicating possible data loss due to an in-place conversion of an incompatible WAL format.
// Note that newness for .wrr and .wrr.snapshot files is determined lexically by the name,
// and not the ctime or mtime of the files.

unfilteredNames, err := filepath.Glob(filepath.Join(shardWALPath, generateWRRSegmentFileGlob()))
if err != nil {
return fmt.Errorf("error finding WRR files in %q: %w", shardWALPath, err)
}
snapshotExt := fmt.Sprintf(".%s.%s", wrrFileExtension, wrrSnapshotExtension)

// Strip out files that are not .wal or .wal.snapshot, given the glob pattern
// could include false positives, such as foo.wally or foo.wal.snapshotted
names := make([]string, 0, len(unfilteredNames))
for _, name := range unfilteredNames {
if strings.HasSuffix(name, wrrFileExtension) || strings.HasSuffix(name, snapshotExt) {
names = append(names, name)
}
}

sort.Strings(names)

// Find the last snapshot and collect the files after it
for i := len(names) - 1; i >= 0; i-- {
if strings.HasSuffix(names[i], snapshotExt) {
names = names[i+1:]
break
}
}

// names now contains a list of uncommitted WRR files.
if len(names) > 0 {
return fmt.Errorf("%w: uncommitted WRR files found: %v", ErrIncompatibleWAL, names)
}

return nil
}

// checkWALCompatibility ensures that an uncommitted WAL segments in an incompatible
// format are not present. shardWALPath is the path to a single shard's WAL, not the
// overall WAL path. A ErrIncompatibleWAL error with further details is returned if
// an incompatible WAL with unflushed segments is found, The error returned contains
// the full context and does not require additional information.
func checkWALCompatibility(shardWALPath string) error {
// There is one known incompatible WAL format, the .wrr format. Finding these is a problem
// if they are uncommitted. OSS can not read .wrr WAL files, so any uncommitted data in them
// will be lost.
return checkUncommittedWRR(shardWALPath)
}

// generateTrailingPath returns the last part of a shard path or WAL path
// based on the shardID, db, and rp.
func (s *Store) generateTrailingPath(shardID uint64, db, rp string) string {
Expand Down Expand Up @@ -507,6 +580,21 @@ func (s *Store) loadShards(ctx context.Context) error {
}
}

// Verify no incompatible WAL files. Do this before starting to load shards to fail early if found.
// All shards are scanned instead of stopping at just the first one so that the admin will see
// all the problematic shards.
if s.EngineOptions.WALEnabled {
var errs []error
for _, sh := range shards {
if err := checkWALCompatibility(s.generateWALPath(sh.id, sh.db, sh.rp)); err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errors.Join(errs...)
}
}

// Do the actual work of loading shards.
shardResC := make(chan *shardResponse, len(shards))
pendingShardCount := 0
Expand Down
117 changes: 117 additions & 0 deletions tsdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/deep"
"github.com/influxdata/influxdb/v2/pkg/slices"
"github.com/influxdata/influxdb/v2/pkg/snowflake"
"github.com/influxdata/influxdb/v2/predicate"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxql"
Expand Down Expand Up @@ -886,6 +887,122 @@ func TestStore_FlushWALOnClose(t *testing.T) {
}
}

func TestStore_WRRSegments(t *testing.T) {
// Check if uncommitted WRR segments are identified and cause opening the store to abort.
for _, index := range tsdb.RegisteredIndexes() {
t.Run("TestStore_WRRSegments_"+index, func(t *testing.T) {
idGen := snowflake.New(0)
generateWRRSegmentName := func() string {
return fmt.Sprintf("_v01_%020d.wrr", idGen.Next())
}
createFile := func(t *testing.T, fn string) {
t.Helper()
require.NoError(t, os.WriteFile(fn, nil, 0666))
}
createWRR := func(t *testing.T, path string) string {
t.Helper()
fn := filepath.Join(path, generateWRRSegmentName())
createFile(t, fn)
return fn
}
generateWRRSnapshotName := func() string {
return generateWRRSegmentName() + ".snapshot"
}
createWRRSnapshot := func(t *testing.T, path string) string {
t.Helper()
fn := filepath.Join(path, generateWRRSnapshotName())
createFile(t, fn)
return fn
}
checkWRRError := func(t *testing.T, err error, wrrs ...[]string) {
t.Helper()
require.ErrorIs(t, err, tsdb.ErrIncompatibleWAL)
require.ErrorContains(t, err, "incompatible WAL format: uncommitted WRR files found")
// We don't know the exact order of the errors if there are multiple shards with
// uncommitted WRRs, but this will insure that all of them are included in the error
// message.
for _, w := range wrrs {
if len(w) > 0 {
require.ErrorContains(t, err, fmt.Sprintf("%v", w))
}
}
}

s := MustOpenStore(t, index, WithWALFlushOnShutdown(true))
defer s.Close()

// Create shard #0 with data.
s.MustCreateShardWithData("db0", "rp0", 0,
`cpu,host=serverA value=1 0`,
`cpu,host=serverA value=2 10`,
`cpu,host=serverB value=3 20`,
)

// Create shard #1 with data.
s.MustCreateShardWithData("db0", "rp0", 1,
`cpu,host=serverA value=1 30`,
`cpu,host=serverC value=3 60`,
)

sh0WALPath := filepath.Join(s.walPath, "db0", "rp0", "0")
require.DirExists(t, sh0WALPath)
sh1WALPath := filepath.Join(s.walPath, "db0", "rp0", "1")
require.DirExists(t, sh1WALPath)

// No WRR segments, no error
require.NoError(t, s.Reopen(t))

// 1 uncommitted WRR segment in shard 0
var sh0Uncommitted, sh1Uncommitted []string
checkReopen := func(t *testing.T) {
t.Helper()
allUncommitted := [][]string{sh0Uncommitted, sh1Uncommitted}
var hasUncommitted bool
for _, u := range allUncommitted {
if len(u) > 0 {
hasUncommitted = true
}
}

if hasUncommitted {
checkWRRError(t, s.Reopen(t), allUncommitted...)
} else {
require.NoError(t, s.Reopen(t))
}
}
sh0Uncommitted = append(sh0Uncommitted, createWRR(t, sh0WALPath))
checkReopen(t)

// 2 uncommitted WRR segments in shard 0
sh0Uncommitted = append(sh0Uncommitted, createWRR(t, sh0WALPath))
checkReopen(t)

// 2 uncommitted WR segments in shard 0, 1 in shard 1
sh1Uncommitted = append(sh1Uncommitted, createWRR(t, sh1WALPath))
checkReopen(t)

// No uncommitted WRR in shard 0, 1 in shard 1
createWRRSnapshot(t, sh0WALPath)
sh0Uncommitted = nil
checkReopen(t)

// No uncommitted WRR segments
createWRRSnapshot(t, sh1WALPath)
sh1Uncommitted = nil
checkReopen(t)

// Add 1 uncommitted to shard 1
sh1Uncommitted = append(sh1Uncommitted, createWRR(t, sh1WALPath))
checkReopen(t)

// No uncommitted WRR segments
createWRRSnapshot(t, sh1WALPath)
sh1Uncommitted = nil
checkReopen(t)
})
}
}

// Test new reader blocking.
func TestStore_NewReadersBlocked(t *testing.T) {
//t.Parallel()
Expand Down

0 comments on commit 78e1d77

Please sign in to comment.