diff --git a/CHANGELOG.md b/CHANGELOG.md index c75f505e523..484850bb200 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ This release adds an embedded SQLite database for storing metadata required by t 1. [21635](https://github.com/influxdata/influxdb/pull/21635): Port `influxd inspect verify-seriesfile` to 2.x 1. [21621](https://github.com/influxdata/influxdb/pull/21621): Add `storage-wal-max-concurrent-writes` config option to `influxd` to enable tuning memory pressure under heavy write load. 1. [21621](https://github.com/influxdata/influxdb/pull/21621): Add `storage-wal-max-write-delay` config option to `influxd` to prevent deadlocks when the WAL is overloaded with concurrent writes. +1. [21615](https://github.com/influxdata/influxdb/pull/21615): Ported the `influxd inspect verify-tsm` command from 1.x. ### Bug Fixes diff --git a/cmd/influx_inspect/verify/tsm/verify.go b/cmd/influx_inspect/verify/tsm/verify.go deleted file mode 100644 index 9a565c3ce7e..00000000000 --- a/cmd/influx_inspect/verify/tsm/verify.go +++ /dev/null @@ -1,232 +0,0 @@ -// Package tsm verifies integrity of TSM files. -package tsm - -import ( - "flag" - "fmt" - "hash/crc32" - "io" - "os" - "path/filepath" - "text/tabwriter" - "time" - "unicode/utf8" - - "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" - "github.com/pkg/errors" -) - -// Command represents the program execution for "influx_inspect verify". -type Command struct { - Stderr io.Writer - Stdout io.Writer -} - -// NewCommand returns a new instance of Command. -func NewCommand() *Command { - return &Command{ - Stderr: os.Stderr, - Stdout: os.Stdout, - } -} - -// Run executes the command. -func (cmd *Command) Run(args ...string) error { - var path string - fs := flag.NewFlagSet("verify", flag.ExitOnError) - fs.StringVar(&path, "dir", os.Getenv("HOME")+"/.influxdb", "Root storage path. [$HOME/.influxdb]") - - var checkUTF8 bool - fs.BoolVar(&checkUTF8, "check-utf8", false, "Verify series keys are valid UTF-8") - - fs.SetOutput(cmd.Stdout) - fs.Usage = cmd.printUsage - - if err := fs.Parse(args); err != nil { - return err - } - - dataPath := filepath.Join(path, "data") - tw := tabwriter.NewWriter(cmd.Stdout, 16, 8, 0, '\t', 0) - - var runner verifier - if checkUTF8 { - runner = &verifyUTF8{} - } else { - runner = &verifyChecksums{} - } - err := runner.Run(tw, dataPath) - tw.Flush() - return err -} - -// printUsage prints the usage message to STDERR. -func (cmd *Command) printUsage() { - usage := fmt.Sprintf(`Verifies the integrity of TSM files. - -Usage: influx_inspect verify [flags] - - -dir - The root storage path. - Must be changed if you are using a non-default storage directory. - Defaults to "%[1]s/.influxdb". - -check-utf8 - Verify series keys are valid UTF-8. - This check skips verification of block checksums. - `, os.Getenv("HOME")) - - fmt.Fprint(cmd.Stdout, usage) -} - -type verifyTSM struct { - files []string - f string - start time.Time - err error -} - -func (v *verifyTSM) loadFiles(dataPath string) error { - err := filepath.Walk(dataPath, func(path string, f os.FileInfo, err error) error { - if err != nil { - return err - } - if filepath.Ext(path) == "."+tsm1.TSMFileExtension { - v.files = append(v.files, path) - } - return nil - }) - - if err != nil { - return errors.Wrap(err, "could not load storage files (use -dir for custom storage root)") - } - - return nil -} - -func (v *verifyTSM) Next() bool { - if len(v.files) == 0 { - return false - } - - v.f, v.files = v.files[0], v.files[1:] - return true -} - -func (v *verifyTSM) TSMReader() (string, *tsm1.TSMReader) { - file, err := os.OpenFile(v.f, os.O_RDONLY, 0600) - if err != nil { - v.err = err - return "", nil - } - - reader, err := tsm1.NewTSMReader(file) - if err != nil { - file.Close() - v.err = err - return "", nil - } - - return v.f, reader -} - -func (v *verifyTSM) Start() { - v.start = time.Now() -} - -func (v *verifyTSM) Elapsed() time.Duration { - return time.Since(v.start) -} - -type verifyChecksums struct { - verifyTSM - totalErrors int - total int -} - -func (v *verifyChecksums) Run(w io.Writer, dataPath string) error { - if err := v.loadFiles(dataPath); err != nil { - return err - } - - v.Start() - - for v.Next() { - f, reader := v.TSMReader() - if reader == nil { - break - } - - blockItr := reader.BlockIterator() - fileErrors := 0 - count := 0 - for blockItr.Next() { - v.total++ - key, _, _, _, checksum, buf, err := blockItr.Read() - if err != nil { - v.totalErrors++ - fileErrors++ - fmt.Fprintf(w, "%s: could not get checksum for key %v block %d due to error: %q\n", f, key, count, err) - } else if expected := crc32.ChecksumIEEE(buf); checksum != expected { - v.totalErrors++ - fileErrors++ - fmt.Fprintf(w, "%s: got %d but expected %d for key %v, block %d\n", f, checksum, expected, key, count) - } - count++ - } - if fileErrors == 0 { - fmt.Fprintf(w, "%s: healthy\n", f) - } - reader.Close() - } - - fmt.Fprintf(w, "Broken Blocks: %d / %d, in %vs\n", v.totalErrors, v.total, v.Elapsed().Seconds()) - - return v.err -} - -type verifyUTF8 struct { - verifyTSM - totalErrors int - total int -} - -func (v *verifyUTF8) Run(w io.Writer, dataPath string) error { - if err := v.loadFiles(dataPath); err != nil { - return err - } - - v.Start() - - for v.Next() { - f, reader := v.TSMReader() - if reader == nil { - break - } - - n := reader.KeyCount() - fileErrors := 0 - v.total += n - for i := 0; i < n; i++ { - key, _ := reader.KeyAt(i) - if !utf8.Valid(key) { - v.totalErrors++ - fileErrors++ - fmt.Fprintf(w, "%s: key #%d is not valid UTF-8\n", f, i) - } - } - if fileErrors == 0 { - fmt.Fprintf(w, "%s: healthy\n", f) - } - } - - fmt.Fprintf(w, "Invalid Keys: %d / %d, in %vs\n", v.totalErrors, v.total, v.Elapsed().Seconds()) - if v.totalErrors > 0 && v.err == nil { - v.err = errors.New("check-utf8: failed") - } - - return v.err -} - -type verifier interface { - Run(w io.Writer, dataPath string) error -} diff --git a/cmd/influx_inspect/verify/tsm/verify_test.go b/cmd/influx_inspect/verify/tsm/verify_test.go deleted file mode 100644 index 7b6a3911bcc..00000000000 --- a/cmd/influx_inspect/verify/tsm/verify_test.go +++ /dev/null @@ -1,3 +0,0 @@ -package tsm_test - -// TODO: write some tests diff --git a/cmd/influxd/inspect/inspect.go b/cmd/influxd/inspect/inspect.go index bf6cd43f35f..f35bd866d7e 100644 --- a/cmd/influxd/inspect/inspect.go +++ b/cmd/influxd/inspect/inspect.go @@ -22,6 +22,7 @@ func NewCommand(v *viper.Viper) (*cobra.Command, error) { } base.AddCommand(exportLp) base.AddCommand(NewExportIndexCommand()) + base.AddCommand(NewTSMVerifyCommand()) base.AddCommand(NewVerifySeriesfileCommand()) diff --git a/cmd/influxd/inspect/tsm_verify.go b/cmd/influxd/inspect/tsm_verify.go new file mode 100644 index 00000000000..984e92966f4 --- /dev/null +++ b/cmd/influxd/inspect/tsm_verify.go @@ -0,0 +1,207 @@ +package inspect + +import ( + "fmt" + "hash/crc32" + "os" + "path/filepath" + "time" + "unicode/utf8" + + "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" + "github.com/pkg/errors" + "github.com/spf13/cobra" +) + +type verifier interface { + Run(cmd *cobra.Command, dataPath string, verbose bool) error +} + +type verifyTSM struct { + files []string + f string + start time.Time +} + +type verifyUTF8 struct { + verifyTSM + totalErrors int + total int +} + +type verifyChecksums struct { + verifyTSM + totalErrors int + total int +} + +func NewTSMVerifyCommand() *cobra.Command { + var checkUTF8 bool + var dir string + var verbose bool + + cmd := &cobra.Command{ + Use: `verify-tsm`, + Short: `Verifies the integrity of TSM files`, + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + var runner verifier + if checkUTF8 { + runner = &verifyUTF8{} + } else { + runner = &verifyChecksums{} + } + err := runner.Run(cmd, dir, verbose) + return err + }, + } + cmd.Flags().StringVar(&dir, "dir", os.Getenv("HOME")+"/.influxdbv2", "Root storage path.") + cmd.Flags().BoolVar(&checkUTF8, "check-utf8", false, "Verify series keys are valid UTF-8. This check skips verification of block checksums.") + cmd.Flags().BoolVar(&verbose, "verbose", false, "Enable verbose logging") + return cmd +} + +func (v *verifyUTF8) Run(cmd *cobra.Command, dataPath string, verbose bool) error { + if err := v.loadFiles(dataPath); err != nil { + return err + } + + v.Start() + + for v.Next() { + reader, closer, err := v.TSMReader() + if closer != nil { + defer closer() + } + if err != nil { + return err + } + + n := reader.KeyCount() + fileErrors := 0 + v.total += n + for i := 0; i < n; i++ { + key, _ := reader.KeyAt(i) + if !utf8.Valid(key) { + v.totalErrors++ + fileErrors++ + if verbose { + cmd.PrintErrf("%s: key #%d is not valid UTF-8\n", v.f, i) + } + } + } + if fileErrors == 0 && verbose { + cmd.PrintErrf("%s: healthy\n", v.f) + } + } + + cmd.PrintErrf("Invalid Keys: %d / %d, in %vs\n", v.totalErrors, v.total, v.Elapsed().Seconds()) + if v.totalErrors > 0 { + return errors.New("check-utf8: failed") + } + + return nil +} + +func (v *verifyChecksums) Run(cmd *cobra.Command, dataPath string, verbose bool) error { + if err := v.loadFiles(dataPath); err != nil { + return err + } + + v.Start() + + for v.Next() { + reader, closer, err := v.TSMReader() + if closer != nil { + defer closer() + } + if err != nil { + return err + } + + blockItr := reader.BlockIterator() + fileErrors := 0 + count := 0 + for blockItr.Next() { + v.total++ + key, _, _, _, checksum, buf, err := blockItr.Read() + if err != nil { + v.totalErrors++ + fileErrors++ + if verbose { + cmd.PrintErrf("%s: could not get checksum for key %v block %d due to error: %q\n", v.f, key, count, err) + } + } else if expected := crc32.ChecksumIEEE(buf); checksum != expected { + v.totalErrors++ + fileErrors++ + if verbose { + cmd.PrintErrf("%s: got %d but expected %d for key %v, block %d\n", v.f, checksum, expected, key, count) + } + } + count++ + } + if fileErrors == 0 && verbose { + cmd.PrintErrf("%s: healthy\n", v.f) + } + } + + cmd.PrintErrf("Broken Blocks: %d / %d, in %vs\n", v.totalErrors, v.total, v.Elapsed().Seconds()) + + return nil +} + +func (v *verifyTSM) loadFiles(dataPath string) error { + err := filepath.Walk(dataPath, func(path string, f os.FileInfo, err error) error { + if err != nil { + return err + } + if filepath.Ext(path) == "."+tsm1.TSMFileExtension { + v.files = append(v.files, path) + } + return nil + }) + + if err != nil { + return fmt.Errorf("could not load storage files (use -dir for custom storage root): %w", err) + } + + return nil +} + +func (v *verifyTSM) Next() bool { + if len(v.files) == 0 { + return false + } + + v.f, v.files = v.files[0], v.files[1:] + return true +} + +func (v *verifyTSM) TSMReader() (*tsm1.TSMReader, func(), error) { + file, err := os.OpenFile(v.f, os.O_RDONLY, 0600) + if err != nil { + return nil, nil, err + } + + reader, err := tsm1.NewTSMReader(file) + if err != nil { + closer := func() { + file.Close() + } + return nil, closer, err + } + + closer := func() { + file.Close() + reader.Close() + } + return reader, closer, nil +} + +func (v *verifyTSM) Start() { + v.start = time.Now() +} + +func (v *verifyTSM) Elapsed() time.Duration { + return time.Since(v.start) +} diff --git a/cmd/influxd/inspect/tsm_verify_test.go b/cmd/influxd/inspect/tsm_verify_test.go new file mode 100644 index 00000000000..3f9ac9c208d --- /dev/null +++ b/cmd/influxd/inspect/tsm_verify_test.go @@ -0,0 +1,123 @@ +package inspect + +import ( + "bytes" + "encoding/binary" + "io/ioutil" + "os" + "testing" + + "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" + "github.com/stretchr/testify/require" +) + +func TestInvalidChecksum(t *testing.T) { + path := newChecksumTest(t, true) + defer os.RemoveAll(path) + + verify := NewTSMVerifyCommand() + b := bytes.NewBufferString("") + verify.SetOut(b) + verify.SetArgs([]string{"--dir", path}) + require.NoError(t, verify.Execute()) + + out, err := ioutil.ReadAll(b) + require.NoError(t, err) + require.Contains(t, string(out), "Broken Blocks: 1 / 1") +} + +func TestValidChecksum(t *testing.T) { + path := newChecksumTest(t, false) + defer os.RemoveAll(path) + + verify := NewTSMVerifyCommand() + b := bytes.NewBufferString("") + verify.SetOut(b) + verify.SetArgs([]string{"--dir", path}) + require.NoError(t, verify.Execute()) + + out, err := ioutil.ReadAll(b) + require.NoError(t, err) + require.Contains(t, string(out), "Broken Blocks: 0 / 1") +} + +func TestInvalidUTF8(t *testing.T) { + path := newUTFTest(t, true) + defer os.RemoveAll(path) + + verify := NewTSMVerifyCommand() + verify.SetOut(bytes.NewBufferString("")) + verify.SetArgs([]string{"--dir", path, "--check-utf8"}) + require.Error(t, verify.Execute()) +} + +func TestValidUTF8(t *testing.T) { + path := newUTFTest(t, false) + defer os.RemoveAll(path) + + verify := NewTSMVerifyCommand() + b := bytes.NewBufferString("") + verify.SetOut(b) + verify.SetArgs([]string{"--dir", path, "--check-utf8"}) + require.NoError(t, verify.Execute()) + + out, err := ioutil.ReadAll(b) + require.NoError(t, err) + require.Contains(t, string(out), "Invalid Keys: 0 / 1") +} + +func newUTFTest(t *testing.T, withError bool) string { + t.Helper() + + dir, err := ioutil.TempDir("", "verify-tsm") + require.NoError(t, err) + + f, err := ioutil.TempFile(dir, "verifytsmtest*"+"."+tsm1.TSMFileExtension) + require.NoError(t, err) + + w, err := tsm1.NewTSMWriter(f) + require.NoError(t, err) + defer w.Close() + + values := []tsm1.Value{tsm1.NewValue(0, 1.0)} + require.NoError(t, w.Write([]byte("cpu"), values)) + + if withError { + require.NoError(t, binary.Write(f, binary.BigEndian, []byte("foobar\n"))) + } + + require.NoError(t, w.WriteIndex()) + + return dir +} + +func newChecksumTest(t *testing.T, withError bool) string { + t.Helper() + + dir, err := ioutil.TempDir("", "verify-tsm") + require.NoError(t, err) + + f, err := ioutil.TempFile(dir, "verifytsmtest*"+"."+tsm1.TSMFileExtension) + require.NoError(t, err) + + w, err := tsm1.NewTSMWriter(f) + require.NoError(t, err) + + values := []tsm1.Value{tsm1.NewValue(0, "entry")} + require.NoError(t, w.Write([]byte("cpu"), values)) + + require.NoError(t, w.WriteIndex()) + w.Close() + + if withError { + fh, err := os.OpenFile(f.Name(), os.O_RDWR, 0) + require.NoError(t, err) + defer fh.Close() + + written, err := fh.WriteAt([]byte("foob"), 5) + require.Equal(t, 4, written) + require.NoError(t, err) + } + + return dir +}