diff --git a/tsdb/store.go b/tsdb/store.go index ab4b3283ecc..b19ad8466b3 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -12,6 +12,7 @@ import ( "runtime" "sort" "strconv" + "strings" "sync" "time" @@ -32,6 +33,8 @@ import ( ) var ( + // ErrIncompatibleWAL is returned if incompatible WAL files are detected. + ErrIncompatibleWAL = errors.New("incompatible WAL format") // ErrShardNotFound is returned when trying to get a non existing shard. ErrShardNotFound = fmt.Errorf("shard not found") // ErrStoreClosed is returned when trying to use a closed Store. @@ -308,6 +311,76 @@ func (s *Store) Open(ctx context.Context) error { return nil } +const ( + wrrFileExtension = "wrr" + wrrPrefixVersioned = "_v" + wrrSnapshotExtension = "snapshot" +) + +// generateWRRSegmentFileGlob generates a glob to find all .wrr and related files in a +// WAL directory. +func generateWRRSegmentFileGlob() string { + return fmt.Sprintf("%s*.%s*", wrrPrefixVersioned, wrrFileExtension) +} + +// checkUncommittedWRR determines if there are any uncommitted WRR files found in shardWALPath. +// shardWALPath is the path to a single shard's WAL, not the overall WAL path. +// If no uncommitted WRR files are found, then nil is returned. Otherwise, an error indicating +// the names of uncommitted WRR files is returned. The error returned contains the full context +// and does not require additional information. +func checkUncommittedWRR(shardWALPath string) error { + // It is OK if there are .wrr files as long as they are committed. Committed .wrr files will + // have a .wrr.snapshot newer than the .wrr file. If there is no .wrr.snapshot file newer + // than a given .wrr file, then that .wrr file is uncommitted and we should return an error + // indicating possible data loss due to an in-place conversion of an incompatible WAL format. + // Note that newness for .wrr and .wrr.snapshot files is determined lexically by the name, + // and not the ctime or mtime of the files. + + unfilteredNames, err := filepath.Glob(filepath.Join(shardWALPath, generateWRRSegmentFileGlob())) + if err != nil { + return fmt.Errorf("error finding WRR files in %q: %w", shardWALPath, err) + } + snapshotExt := fmt.Sprintf(".%s.%s", wrrFileExtension, wrrSnapshotExtension) + + // Strip out files that are not .wal or .wal.snapshot, given the glob pattern + // could include false positives, such as foo.wally or foo.wal.snapshotted + names := make([]string, 0, len(unfilteredNames)) + for _, name := range unfilteredNames { + if strings.HasSuffix(name, wrrFileExtension) || strings.HasSuffix(name, snapshotExt) { + names = append(names, name) + } + } + + sort.Strings(names) + + // Find the last snapshot and collect the files after it + for i := len(names) - 1; i >= 0; i-- { + if strings.HasSuffix(names[i], snapshotExt) { + names = names[i+1:] + break + } + } + + // names now contains a list of uncommitted WRR files. + if len(names) > 0 { + return fmt.Errorf("%w: uncommitted WRR files found: %v", ErrIncompatibleWAL, names) + } + + return nil +} + +// checkWALCompatibility ensures that an uncommitted WAL segments in an incompatible +// format are not present. shardWALPath is the path to a single shard's WAL, not the +// overall WAL path. A ErrIncompatibleWAL error with further details is returned if +// an incompatible WAL with unflushed segments is found, The error returned contains +// the full context and does not require additional information. +func checkWALCompatibility(shardWALPath string) error { + // There is one known incompatible WAL format, the .wrr format. Finding these is a problem + // if they are uncommitted. OSS can not read .wrr WAL files, so any uncommitted data in them + // will be lost. + return checkUncommittedWRR(shardWALPath) +} + // generateTrailingPath returns the last part of a shard path or WAL path // based on the shardID, db, and rp. func (s *Store) generateTrailingPath(shardID uint64, db, rp string) string { @@ -507,6 +580,21 @@ func (s *Store) loadShards(ctx context.Context) error { } } + // Verify no incompatible WAL files. Do this before starting to load shards to fail early if found. + // All shards are scanned instead of stopping at just the first one so that the admin will see + // all the problematic shards. + if s.EngineOptions.WALEnabled { + var errs []error + for _, sh := range shards { + if err := checkWALCompatibility(s.generateWALPath(sh.id, sh.db, sh.rp)); err != nil { + errs = append(errs, err) + } + } + if len(errs) > 0 { + return errors.Join(errs...) + } + } + // Do the actual work of loading shards. shardResC := make(chan *shardResponse, len(shards)) pendingShardCount := 0 diff --git a/tsdb/store_test.go b/tsdb/store_test.go index d19351be672..4a721e2c32e 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -25,6 +25,7 @@ import ( "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/pkg/deep" "github.com/influxdata/influxdb/v2/pkg/slices" + "github.com/influxdata/influxdb/v2/pkg/snowflake" "github.com/influxdata/influxdb/v2/predicate" "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxql" @@ -886,6 +887,122 @@ func TestStore_FlushWALOnClose(t *testing.T) { } } +func TestStore_WRRSegments(t *testing.T) { + // Check if uncommitted WRR segments are identified and cause opening the store to abort. + for _, index := range tsdb.RegisteredIndexes() { + t.Run("TestStore_WRRSegments_"+index, func(t *testing.T) { + idGen := snowflake.New(0) + generateWRRSegmentName := func() string { + return fmt.Sprintf("_v01_%020d.wrr", idGen.Next()) + } + createFile := func(t *testing.T, fn string) { + t.Helper() + require.NoError(t, os.WriteFile(fn, nil, 0666)) + } + createWRR := func(t *testing.T, path string) string { + t.Helper() + fn := filepath.Join(path, generateWRRSegmentName()) + createFile(t, fn) + return fn + } + generateWRRSnapshotName := func() string { + return generateWRRSegmentName() + ".snapshot" + } + createWRRSnapshot := func(t *testing.T, path string) string { + t.Helper() + fn := filepath.Join(path, generateWRRSnapshotName()) + createFile(t, fn) + return fn + } + checkWRRError := func(t *testing.T, err error, wrrs ...[]string) { + t.Helper() + require.ErrorIs(t, err, tsdb.ErrIncompatibleWAL) + require.ErrorContains(t, err, "incompatible WAL format: uncommitted WRR files found") + // We don't know the exact order of the errors if there are multiple shards with + // uncommitted WRRs, but this will insure that all of them are included in the error + // message. + for _, w := range wrrs { + if len(w) > 0 { + require.ErrorContains(t, err, fmt.Sprintf("%v", w)) + } + } + } + + s := MustOpenStore(t, index, WithWALFlushOnShutdown(true)) + defer s.Close() + + // Create shard #0 with data. + s.MustCreateShardWithData("db0", "rp0", 0, + `cpu,host=serverA value=1 0`, + `cpu,host=serverA value=2 10`, + `cpu,host=serverB value=3 20`, + ) + + // Create shard #1 with data. + s.MustCreateShardWithData("db0", "rp0", 1, + `cpu,host=serverA value=1 30`, + `cpu,host=serverC value=3 60`, + ) + + sh0WALPath := filepath.Join(s.walPath, "db0", "rp0", "0") + require.DirExists(t, sh0WALPath) + sh1WALPath := filepath.Join(s.walPath, "db0", "rp0", "1") + require.DirExists(t, sh1WALPath) + + // No WRR segments, no error + require.NoError(t, s.Reopen(t)) + + // 1 uncommitted WRR segment in shard 0 + var sh0Uncommitted, sh1Uncommitted []string + checkReopen := func(t *testing.T) { + t.Helper() + allUncommitted := [][]string{sh0Uncommitted, sh1Uncommitted} + var hasUncommitted bool + for _, u := range allUncommitted { + if len(u) > 0 { + hasUncommitted = true + } + } + + if hasUncommitted { + checkWRRError(t, s.Reopen(t), allUncommitted...) + } else { + require.NoError(t, s.Reopen(t)) + } + } + sh0Uncommitted = append(sh0Uncommitted, createWRR(t, sh0WALPath)) + checkReopen(t) + + // 2 uncommitted WRR segments in shard 0 + sh0Uncommitted = append(sh0Uncommitted, createWRR(t, sh0WALPath)) + checkReopen(t) + + // 2 uncommitted WR segments in shard 0, 1 in shard 1 + sh1Uncommitted = append(sh1Uncommitted, createWRR(t, sh1WALPath)) + checkReopen(t) + + // No uncommitted WRR in shard 0, 1 in shard 1 + createWRRSnapshot(t, sh0WALPath) + sh0Uncommitted = nil + checkReopen(t) + + // No uncommitted WRR segments + createWRRSnapshot(t, sh1WALPath) + sh1Uncommitted = nil + checkReopen(t) + + // Add 1 uncommitted to shard 1 + sh1Uncommitted = append(sh1Uncommitted, createWRR(t, sh1WALPath)) + checkReopen(t) + + // No uncommitted WRR segments + createWRRSnapshot(t, sh1WALPath) + sh1Uncommitted = nil + checkReopen(t) + }) + } +} + // Test new reader blocking. func TestStore_NewReadersBlocked(t *testing.T) { //t.Parallel()