diff --git a/chronograf/etc/build.py b/chronograf/etc/build.py index a68fa7d8494..833500dbe05 100755 --- a/chronograf/etc/build.py +++ b/chronograf/etc/build.py @@ -11,6 +11,7 @@ import logging import argparse import json +import fs ################ #### Chronograf Variables diff --git a/chronograf/filestore/apps.go b/chronograf/filestore/apps.go index 9b912a04230..b489fa25521 100644 --- a/chronograf/filestore/apps.go +++ b/chronograf/filestore/apps.go @@ -9,6 +9,7 @@ import ( "path" "github.com/influxdata/influxdb/chronograf" + "github.com/influxdata/influxdb/pkg/fs" ) // AppExt is the the file extension searched for in the directory for layout files @@ -58,7 +59,7 @@ func loadFile(name string) (chronograf.Layout, error) { } func createLayout(file string, layout chronograf.Layout) error { - h, err := os.Create(file) + h, err := fs.CreateFile(file) if err != nil { return err } diff --git a/chronograf/filestore/dashboards.go b/chronograf/filestore/dashboards.go index b27e4ccb02d..d99fb5f1201 100644 --- a/chronograf/filestore/dashboards.go +++ b/chronograf/filestore/dashboards.go @@ -10,6 +10,7 @@ import ( "strconv" "github.com/influxdata/influxdb/chronograf" + "github.com/influxdata/influxdb/pkg/fs" ) // DashExt is the the file extension searched for in the directory for dashboard files @@ -56,7 +57,7 @@ func load(name string, resource interface{}) error { } func create(file string, resource interface{}) error { - h, err := os.Create(file) + h, err := fs.CreateFile(file) if err != nil { return err } diff --git a/cmd/influx_inspect/buildtsi/buildtsi.go b/cmd/influx_inspect/buildtsi/buildtsi.go index 97d6888dee0..bfdbd3d395d 100644 --- a/cmd/influx_inspect/buildtsi/buildtsi.go +++ b/cmd/influx_inspect/buildtsi/buildtsi.go @@ -17,6 +17,7 @@ import ( "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/fs" "github.com/influxdata/influxdb/storage" "github.com/influxdata/influxdb/storage/wal" "github.com/influxdata/influxdb/toml" @@ -327,7 +328,7 @@ func IndexShard(sfile *tsdb.SeriesFile, indexPath, dataDir, walDir string, maxLo // Rename TSI to standard path. log.Info("Moving tsi to permanent location") - return os.Rename(tmpPath, indexPath) + return fs.RenameFile(tmpPath, indexPath) } func IndexTSMFile(index *tsi1.Index, path string, batchSize int, log *zap.Logger, verboseLogging bool) error { diff --git a/cmd/influxd/internal/profile/profile.go b/cmd/influxd/internal/profile/profile.go index e240a8c6855..82594851658 100644 --- a/cmd/influxd/internal/profile/profile.go +++ b/cmd/influxd/internal/profile/profile.go @@ -5,6 +5,8 @@ import ( "os" "runtime" "runtime/pprof" + + "github.com/influxdata/influxdb/pkg/fs" ) type Config struct { @@ -32,7 +34,7 @@ func (c *Config) Start() func() { } if c.CPU != "" { - f, err := os.Create(c.CPU) + f, err := fs.CreateFile(c.CPU) if err != nil { log.Fatalf("cpuprofile: %v", err) } @@ -41,7 +43,7 @@ func (c *Config) Start() func() { } if c.Memory != "" { - f, err := os.Create(c.Memory) + f, err := fs.CreateFile(c.Memory) if err != nil { log.Fatalf("memprofile: %v", err) } diff --git a/pkg/file/file_unix.go b/pkg/file/file_unix.go deleted file mode 100644 index 6892e5e55a7..00000000000 --- a/pkg/file/file_unix.go +++ /dev/null @@ -1,35 +0,0 @@ -// +build !windows - -package file - -import ( - "os" - "syscall" -) - -func SyncDir(dirName string) error { - // fsync the dir to flush the rename - dir, err := os.OpenFile(dirName, os.O_RDONLY, os.ModeDir) - if err != nil { - return err - } - defer dir.Close() - - // While we're on unix, we may be running in a Docker container that is - // pointed at a Windows volume over samba. That doesn't support fsyncs - // on directories. This shows itself as an EINVAL, so we ignore that - // error. - err = dir.Sync() - if pe, ok := err.(*os.PathError); ok && pe.Err == syscall.EINVAL { - err = nil - } else if err != nil { - return err - } - - return dir.Close() -} - -// RenameFile will rename the source to target using os function. -func RenameFile(oldpath, newpath string) error { - return os.Rename(oldpath, newpath) -} diff --git a/pkg/file/file_windows.go b/pkg/file/file_windows.go deleted file mode 100644 index 97f31b062f1..00000000000 --- a/pkg/file/file_windows.go +++ /dev/null @@ -1,18 +0,0 @@ -package file - -import "os" - -func SyncDir(dirName string) error { - return nil -} - -// RenameFile will rename the source to target using os function. If target exists it will be removed before renaming. -func RenameFile(oldpath, newpath string) error { - if _, err := os.Stat(newpath); err == nil { - if err = os.Remove(newpath); nil != err { - return err - } - } - - return os.Rename(oldpath, newpath) -} diff --git a/pkg/fs/fs_test.go b/pkg/fs/fs_test.go new file mode 100644 index 00000000000..a29fea10d70 --- /dev/null +++ b/pkg/fs/fs_test.go @@ -0,0 +1,159 @@ +package fs_test + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/influxdata/influxdb/pkg/fs" +) + +func TestRenameFileWithReplacement(t *testing.T) { + // sample data for loading into files + sampleData1 := "this is some data" + sampleData2 := "we got some more data" + + t.Run("exists", func(t *testing.T) { + oldpath := MustCreateTempFile(t, sampleData1) + newpath := MustCreateTempFile(t, sampleData2) + defer MustRemoveAll(oldpath) + defer MustRemoveAll(newpath) + + oldContents := MustReadAllFile(oldpath) + newContents := MustReadAllFile(newpath) + + if got, exp := oldContents, sampleData1; got != exp { + t.Fatalf("got contents %q, expected %q", got, exp) + } else if got, exp := newContents, sampleData2; got != exp { + t.Fatalf("got contents %q, expected %q", got, exp) + } + + if err := fs.RenameFileWithReplacement(oldpath, newpath); err != nil { + t.Fatalf("ReplaceFileIfExists returned an error: %s", err) + } + + if err := fs.SyncDir(filepath.Dir(oldpath)); err != nil { + panic(err) + } + + // Contents of newpath will now be equivalent to oldpath' contents. + newContents = MustReadAllFile(newpath) + if newContents != oldContents { + t.Fatalf("contents for files differ: %q versus %q", newContents, oldContents) + } + + // oldpath will be removed. + if MustFileExists(oldpath) { + t.Fatalf("file %q still exists, but it shouldn't", oldpath) + } + }) + + t.Run("not exists", func(t *testing.T) { + oldpath := MustCreateTempFile(t, sampleData1) + defer MustRemoveAll(oldpath) + + oldContents := MustReadAllFile(oldpath) + if got, exp := oldContents, sampleData1; got != exp { + t.Fatalf("got contents %q, expected %q", got, exp) + } + + root := filepath.Dir(oldpath) + newpath := filepath.Join(root, "foo") + if err := fs.RenameFileWithReplacement(oldpath, newpath); err != nil { + t.Fatalf("ReplaceFileIfExists returned an error: %s", err) + } + + if err := fs.SyncDir(filepath.Dir(oldpath)); err != nil { + panic(err) + } + + // Contents of newpath will now be equivalent to oldpath's contents. + newContents := MustReadAllFile(newpath) + if newContents != oldContents { + t.Fatalf("contents for files differ: %q versus %q", newContents, oldContents) + } + + // oldpath will be removed. + if MustFileExists(oldpath) { + t.Fatalf("file %q still exists, but it shouldn't", oldpath) + } + }) +} + +func TestCreateFileWithReplacement(t *testing.T) { + path := MustCreateTempFile(t, "sample data") + defer MustRemoveAll(path) + + // should return an error if we CreateFile to the same path + _, err := fs.CreateFile(path) + if err == nil { + t.Fatalf("CreateFile did not return an error") + } + + // contents of the file should be intact + contents := MustReadAllFile(path) + if got, exp := contents, "sample data"; got != exp { + t.Fatalf("got contents %q, expected %q", got, exp) + } + + // running CreateFileWithReplacement on path should not return an error + if _, err = fs.CreateFileWithReplacement(path); err != nil { + t.Fatalf("CreateFileWithReplacement returned err: %v", err) + } + + // the file at path should now be empty + contents = MustReadAllFile(path) + if contents != "" { + t.Fatalf("expected file to be empty but got: %v", contents) + } + +} + +// CreateTempFileOrFail creates a temporary file returning the path to the file. +func MustCreateTempFile(t testing.TB, data string) string { + t.Helper() + + f, err := ioutil.TempFile("", "fs-test") + if err != nil { + t.Fatalf("failed to create temp file: %v", err) + } else if _, err := f.WriteString(data); err != nil { + t.Fatal(err) + } else if err := f.Close(); err != nil { + t.Fatal(err) + } + return f.Name() +} + +func MustRemoveAll(path string) { + if err := os.RemoveAll(path); err != nil { + panic(err) + } +} + +// MustFileExists determines if a file exists, panicking if any error +// (other than one associated with the file not existing) is returned. +func MustFileExists(path string) bool { + _, err := os.Stat(path) + if err == nil { + return true + } else if os.IsNotExist(err) { + return false + } + panic(err) +} + +// MustReadAllFile reads the contents of path, panicking if there is an error. +func MustReadAllFile(path string) string { + fd, err := os.Open(path) + if err != nil { + panic(err) + } + defer fd.Close() + + data, err := ioutil.ReadAll(fd) + if err != nil { + panic(err) + } + return string(data) +} diff --git a/pkg/fs/fs_unix.go b/pkg/fs/fs_unix.go new file mode 100644 index 00000000000..66bb1712499 --- /dev/null +++ b/pkg/fs/fs_unix.go @@ -0,0 +1,83 @@ +// +build !windows + +package fs + +import ( + "fmt" + "os" + "syscall" +) + +// A FileExistsError is returned when an operation cannot be completed due to a +// file already existing. +type FileExistsError struct { + path string +} + +func newFileExistsError(path string) FileExistsError { + return FileExistsError{path: path} +} + +func (e FileExistsError) Error() string { + return fmt.Sprintf("operation not allowed, file %q exists", e.path) +} + +// SyncDir flushes any file renames to the filesystem. +func SyncDir(dirName string) error { + // fsync the dir to flush the rename + dir, err := os.OpenFile(dirName, os.O_RDONLY, os.ModeDir) + if err != nil { + return err + } + defer dir.Close() + + // While we're on unix, we may be running in a Docker container that is + // pointed at a Windows volume over samba. That doesn't support fsyncs + // on directories. This shows itself as an EINVAL, so we ignore that + // error. + err = dir.Sync() + if pe, ok := err.(*os.PathError); ok && pe.Err == syscall.EINVAL { + err = nil + } else if err != nil { + return err + } + + return dir.Close() +} + +// RenameFileWithReplacement will replace any existing file at newpath with the contents +// of oldpath. +// +// If no file already exists at newpath, newpath will be created using the contents +// of oldpath. If this function returns successfully, the contents of newpath will +// be identical to oldpath, and oldpath will be removed. +func RenameFileWithReplacement(oldpath, newpath string) error { + return os.Rename(oldpath, newpath) +} + +// RenameFile renames oldpath to newpath, returning an error if newpath already +// exists. If this function returns successfully, the contents of newpath will +// be identical to oldpath, and oldpath will be removed. +func RenameFile(oldpath, newpath string) error { + if _, err := os.Stat(newpath); err == nil { + return newFileExistsError(newpath) + } + + return os.Rename(oldpath, newpath) +} + +// CreateFileWithReplacement will create a new file at any path, removing the +// contents of the old file +func CreateFileWithReplacement(newpath string) (*os.File, error) { + return os.Create(newpath) +} + +// CreateFile creates a new file at newpath, returning an error if newpath already +// exists +func CreateFile(newpath string) (*os.File, error) { + if _, err := os.Stat(newpath); err == nil { + return nil, newFileExistsError(newpath) + } + + return os.Create(newpath) +} diff --git a/pkg/fs/fs_windows.go b/pkg/fs/fs_windows.go new file mode 100644 index 00000000000..44e589b5430 --- /dev/null +++ b/pkg/fs/fs_windows.go @@ -0,0 +1,52 @@ +package fs + +import "os" + +func SyncDir(dirName string) error { + return nil +} + +// RenameFileWithReplacement will replace any existing file at newpath with the contents +// of oldpath. +// +// If no file already exists at newpath, newpath will be created using the contents +// of oldpath. If this function returns successfully, the contents of newpath will +// be identical to oldpath, and oldpath will be removed. +func RenameFileWithReplacement(oldpath, newpath string) error { + if _, err := os.Stat(newpath); err == nil { + if err = os.Remove(newpath); nil != err { + return err + } + } + + return os.Rename(oldpath, newpath) +} + +// RenameFile renames oldpath to newpath, returning an error if newpath already +// exists. If this function returns successfully, the contents of newpath will +// be identical to oldpath, and oldpath will be removed. +func RenameFile(oldpath, newpath string) error { + if _, err := os.Stat(newpath); err == nil { + // os.Rename on Windows will return an error if the file exists, but it's + // preferable to keep the errors the same across platforms. + return newFileExistsError(newpath) + } + + return os.Rename(oldpath, newpath) +} + +// CreateFileWithReplacement will create a new file at any path, removing the +// contents of the old file +func CreateFileWithReplacement(newpath string) (*os.File, error) { + return os.Create(newpath) +} + +// CreateFile creates a new file at newpath, returning an error if newpath already +// exists +func CreateFile(newpath string) (*os.File, error) { + if _, err := os.Stat(newpath); err == nil { + return nil, newFileExistsError(newpath) + } + + return os.Create(newpath) +} diff --git a/tsdb/series_partition.go b/tsdb/series_partition.go index b7fbab2ad41..9ec13ec2bf0 100644 --- a/tsdb/series_partition.go +++ b/tsdb/series_partition.go @@ -14,6 +14,7 @@ import ( "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/fs" "github.com/influxdata/influxdb/pkg/rhh" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -740,7 +741,7 @@ func (c *SeriesPartitionCompactor) Compact(p *SeriesPartition) (time.Duration, e // Reopen index with new file. if err := p.index.Close(); err != nil { return err - } else if err := os.Rename(indexPath, index.path); err != nil { + } else if err := fs.RenameFileWithReplacement(indexPath, index.path); err != nil { return err } else if err := p.index.Open(); err != nil { return err @@ -817,7 +818,7 @@ func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN ui } // Open file handler. - f, err := os.Create(path) + f, err := fs.CreateFile(path) if err != nil { return err } diff --git a/tsdb/series_segment.go b/tsdb/series_segment.go index 16b32c5aa8c..b11838a726e 100644 --- a/tsdb/series_segment.go +++ b/tsdb/series_segment.go @@ -11,6 +11,7 @@ import ( "regexp" "strconv" + "github.com/influxdata/influxdb/pkg/fs" "github.com/influxdata/influxdb/pkg/mmap" ) @@ -58,7 +59,7 @@ func NewSeriesSegment(id uint16, path string) *SeriesSegment { // CreateSeriesSegment generates an empty segment at path. func CreateSeriesSegment(id uint16, path string) (*SeriesSegment, error) { // Generate segment in temp location. - f, err := os.Create(path + ".initializing") + f, err := fs.CreateFile(path + ".initializing") if err != nil { return nil, err } @@ -77,7 +78,7 @@ func CreateSeriesSegment(id uint16, path string) (*SeriesSegment, error) { } // Swap with target path. - if err := os.Rename(f.Name(), path); err != nil { + if err := fs.RenameFile(f.Name(), path); err != nil { return nil, err } diff --git a/tsdb/tsi1/log_file_test.go b/tsdb/tsi1/log_file_test.go index e0b5748ad0f..07b49e30409 100644 --- a/tsdb/tsi1/log_file_test.go +++ b/tsdb/tsi1/log_file_test.go @@ -15,6 +15,7 @@ import ( "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/bloom" + "github.com/influxdata/influxdb/pkg/fs" "github.com/influxdata/influxdb/pkg/slices" "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb/tsi1" @@ -598,7 +599,7 @@ func MustStartCPUProfile(name string) { name = regexp.MustCompile(`\W+`).ReplaceAllString(name, "-") // Open file and start pprof. - f, err := os.Create(filepath.Join("/tmp", fmt.Sprintf("cpu-%s.pprof", name))) + f, err := fs.CreateFile(filepath.Join("/tmp", fmt.Sprintf("cpu-%s.pprof", name))) if err != nil { panic(err) } diff --git a/tsdb/tsi1/partition.go b/tsdb/tsi1/partition.go index 01abfb29b52..a26a5a863f6 100644 --- a/tsdb/tsi1/partition.go +++ b/tsdb/tsi1/partition.go @@ -18,6 +18,7 @@ import ( "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/pkg/bytesutil" + "github.com/influxdata/influxdb/pkg/fs" "github.com/influxdata/influxdb/pkg/lifecycle" "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxql" @@ -1030,7 +1031,7 @@ func (p *Partition) compactToLevel(files []*IndexFile, frefs lifecycle.Reference // Create new index file. path := filepath.Join(p.path, FormatIndexFileName(p.NextSequence(), level)) var f *os.File - if f, err = os.Create(path); err != nil { + if f, err = fs.CreateFile(path); err != nil { log.Error("Cannot create compaction files", zap.Error(err)) return } @@ -1197,7 +1198,7 @@ func (p *Partition) compactLogFile(ctx context.Context, logFile *LogFile, interr // Create new index file. path := filepath.Join(p.path, FormatIndexFileName(id, 1)) - f, err := os.Create(path) + f, err := fs.CreateFile(path) if err != nil { log.Error("Cannot create index file", zap.Error(err)) return diff --git a/tsdb/tsm1/compact_test.go b/tsdb/tsm1/compact_test.go index dbf08940f2a..f52fba433f7 100644 --- a/tsdb/tsm1/compact_test.go +++ b/tsdb/tsm1/compact_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/influxdata/influxdb/pkg/fs" "github.com/influxdata/influxdb/tsdb/cursors" "github.com/influxdata/influxdb/tsdb/tsm1" ) @@ -2917,7 +2918,7 @@ func MustTSMWriter(dir string, gen int) (tsm1.TSMWriter, string) { } newName := filepath.Join(filepath.Dir(oldName), tsm1.DefaultFormatFileName(gen, 1)+".tsm") - if err := os.Rename(oldName, newName); err != nil { + if err := fs.RenameFile(oldName, newName); err != nil { panic(fmt.Sprintf("create tsm file: %v", err)) } diff --git a/tsdb/tsm1/file_store.go b/tsdb/tsm1/file_store.go index b45bdd6fbbd..5a2ac986c03 100644 --- a/tsdb/tsm1/file_store.go +++ b/tsdb/tsm1/file_store.go @@ -18,7 +18,7 @@ import ( "time" "github.com/influxdata/influxdb/kit/tracing" - "github.com/influxdata/influxdb/pkg/file" + "github.com/influxdata/influxdb/pkg/fs" "github.com/influxdata/influxdb/pkg/limiter" "github.com/influxdata/influxdb/pkg/metrics" "github.com/influxdata/influxdb/query" @@ -663,7 +663,7 @@ func (f *FileStore) Open(ctx context.Context) error { // the file, and continue loading the shard without it. if err != nil { f.logger.Error("Cannot read corrupt tsm file, renaming", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err)) - if e := os.Rename(file.Name(), file.Name()+"."+BadTSMFileExtension); e != nil { + if e := fs.RenameFile(file.Name(), file.Name()+"."+BadTSMFileExtension); e != nil { f.logger.Error("Cannot rename corrupt tsm file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(e)) readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %v", file.Name(), e)} return @@ -875,7 +875,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF if strings.HasSuffix(file, tsmTmpExt) { // The new TSM files have a tmp extension. First rename them. newName = file[:len(file)-4] - if err := os.Rename(file, newName); err != nil { + if err := fs.RenameFile(file, newName); err != nil { return err } } @@ -995,7 +995,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF } } - if err := file.SyncDir(f.dir); err != nil { + if err := fs.SyncDir(f.dir); err != nil { return err } diff --git a/tsdb/tsm1/file_store_test.go b/tsdb/tsm1/file_store_test.go index 68f722659e0..16546da6c83 100644 --- a/tsdb/tsm1/file_store_test.go +++ b/tsdb/tsm1/file_store_test.go @@ -14,6 +14,7 @@ import ( "time" "github.com/influxdata/influxdb/logger" + "github.com/influxdata/influxdb/pkg/fs" "github.com/influxdata/influxdb/tsdb/tsm1" ) @@ -2440,7 +2441,7 @@ func TestFileStore_Replace(t *testing.T) { // Replace requires assumes new files have a .tmp extension replacement := fmt.Sprintf("%s.%s", files[2], tsm1.TmpTSMFileExtension) - os.Rename(files[2], replacement) + fs.RenameFile(files[2], replacement) fs := tsm1.NewFileStore(dir) if err := fs.Open(context.Background()); err != nil { @@ -2640,25 +2641,25 @@ func TestFileStore_Stats(t *testing.T) { fatal(t, "creating test files", err) } - fs := tsm1.NewFileStore(dir) - if err := fs.Open(context.Background()); err != nil { + filestore := tsm1.NewFileStore(dir) + if err := filestore.Open(context.Background()); err != nil { fatal(t, "opening file store", err) } - defer fs.Close() + defer filestore.Close() - stats := fs.Stats() + stats := filestore.Stats() if got, exp := len(stats), 3; got != exp { t.Fatalf("file count mismatch: got %v, exp %v", got, exp) } // Another call should result in the same stats being returned. - if got, exp := fs.Stats(), stats; !reflect.DeepEqual(got, exp) { + if got, exp := filestore.Stats(), stats; !reflect.DeepEqual(got, exp) { t.Fatalf("got %v, exp %v", got, exp) } // Removing one of the files should invalidate the cache. - fs.Replace(files[0:1], nil) - if got, exp := len(fs.Stats()), 2; got != exp { + filestore.Replace(files[0:1], nil) + if got, exp := len(filestore.Stats()), 2; got != exp { t.Fatalf("file count mismatch: got %v, exp %v", got, exp) } @@ -2668,16 +2669,16 @@ func TestFileStore_Stats(t *testing.T) { }) replacement := fmt.Sprintf("%s.%s.%s", files[2], tsm1.TmpTSMFileExtension, tsm1.TSMFileExtension) // Assumes new files have a .tmp extension - if err := os.Rename(newFile, replacement); err != nil { + if err := fs.RenameFile(newFile, replacement); err != nil { t.Fatalf("rename: %v", err) } // Replace 3 w/ 1 - if err := fs.Replace(files, []string{replacement}); err != nil { + if err := filestore.Replace(files, []string{replacement}); err != nil { t.Fatalf("replace: %v", err) } var found bool - stats = fs.Stats() + stats = filestore.Stats() for _, stat := range stats { if strings.HasSuffix(stat.Path, fmt.Sprintf("%s.%s.%s", tsm1.TSMFileExtension, tsm1.TmpTSMFileExtension, tsm1.TSMFileExtension)) { found = true @@ -2693,8 +2694,8 @@ func TestFileStore_Stats(t *testing.T) { }) // Adding some files should invalidate the cache. - fs.Replace(nil, []string{newFile}) - if got, exp := len(fs.Stats()), 2; got != exp { + filestore.Replace(nil, []string{newFile}) + if got, exp := len(filestore.Stats()), 2; got != exp { t.Fatalf("file count mismatch: got %v, exp %v", got, exp) } } @@ -2880,7 +2881,7 @@ func newFileDir(dir string, values ...keyValues) ([]string, error) { return nil, err } newName := filepath.Join(filepath.Dir(f.Name()), tsm1.DefaultFormatFileName(id, 1)+".tsm") - if err := os.Rename(f.Name(), newName); err != nil { + if err := fs.RenameFile(f.Name(), newName); err != nil { return nil, err } id++ @@ -2915,7 +2916,7 @@ func newFiles(dir string, values ...keyValues) ([]string, error) { } newName := filepath.Join(filepath.Dir(f.Name()), tsm1.DefaultFormatFileName(id, 1)+".tsm") - if err := os.Rename(f.Name(), newName); err != nil { + if err := fs.RenameFile(f.Name(), newName); err != nil { return nil, err } id++ diff --git a/tsdb/tsm1/reader_mmap.go b/tsdb/tsm1/reader_mmap.go index f5b58d31960..2a8b384e06b 100644 --- a/tsdb/tsm1/reader_mmap.go +++ b/tsdb/tsm1/reader_mmap.go @@ -7,7 +7,7 @@ import ( "sync" "sync/atomic" - "github.com/influxdata/influxdb/pkg/file" + "github.com/influxdata/influxdb/pkg/fs" "go.uber.org/zap" ) @@ -123,7 +123,7 @@ func (m *mmapAccessor) rename(path string) error { m.mu.Lock() defer m.mu.Unlock() - if err := file.RenameFile(m._path, path); err != nil { + if err := fs.RenameFileWithReplacement(m._path, path); err != nil { return err } m._path = path diff --git a/tsdb/tsm1/tombstone.go b/tsdb/tsm1/tombstone.go index ec5b68a18dc..bd0b3861456 100644 --- a/tsdb/tsm1/tombstone.go +++ b/tsdb/tsm1/tombstone.go @@ -56,7 +56,7 @@ import ( "strings" "sync" - "github.com/influxdata/influxdb/pkg/file" + "github.com/influxdata/influxdb/pkg/fs" ) const ( @@ -414,11 +414,11 @@ func (t *Tombstoner) commit() error { return err } - if err := file.RenameFile(tmpFilename, t.tombstonePath()); err != nil { + if err := fs.RenameFileWithReplacement(tmpFilename, t.tombstonePath()); err != nil { return err } - if err := file.SyncDir(filepath.Dir(t.tombstonePath())); err != nil { + if err := fs.SyncDir(filepath.Dir(t.tombstonePath())); err != nil { return err } diff --git a/tsdb/tsm1/tombstone_test.go b/tsdb/tsm1/tombstone_test.go index 83531bd312e..bd756d8defe 100644 --- a/tsdb/tsm1/tombstone_test.go +++ b/tsdb/tsm1/tombstone_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/influxdata/influxdb/pkg/fs" "github.com/influxdata/influxdb/tsdb/tsm1" ) @@ -413,7 +414,7 @@ func TestTombstoner_Existing(t *testing.T) { } name := f.Name() + ".tombstone" - if err := os.Rename(f.Name(), name); err != nil { + if err := fs.RenameFile(f.Name(), name); err != nil { panic(err) } diff --git a/tsdb/tsm1/writer.go b/tsdb/tsm1/writer.go index d69a25344d5..35e8525da25 100644 --- a/tsdb/tsm1/writer.go +++ b/tsdb/tsm1/writer.go @@ -75,6 +75,7 @@ import ( "time" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/fs" ) const ( @@ -826,7 +827,7 @@ func (t *tsmWriter) writeStatsFile() error { return nil } - f, err := os.Create(StatsFilename(fw.Name())) + f, err := fs.CreateFile(StatsFilename(fw.Name())) if err != nil { return err }