Skip to content

Commit

Permalink
fix(tsdb): allow backups during snapshotting, and don't leak tmp files (
Browse files Browse the repository at this point in the history
#20527)



Co-authored-by: davidby-influx <[email protected]>
  • Loading branch information
danxmoran and davidby-influx authored Jan 19, 2021
1 parent 4239d03 commit 743aef4
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 28 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ Replacement `tsi1` indexes will be automatically generated on startup for shards
1. [20489](https://github.com/influxdata/influxdb/pull/20489): Improve error message when opening BoltDB with unsupported file system options.
1. [20490](https://github.com/influxdata/influxdb/pull/20490): Fix silent failure to register CLI args as required.
1. [20522](https://github.com/influxdata/influxdb/pull/20522): Fix loading config when INFLUXD_CONFIG_PATH points to a `.yml` file.
1. [20527](https://github.com/influxdata/influxdb/pull/20527): Don't leak .tmp files while backing up shards.
1. [20527](https://github.com/influxdata/influxdb/pull/20527): Allow backups to complete while a snapshot is in progress.

## v2.0.3 [2020-12-14]

Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Engine interface {

LoadMetadataIndex(shardID uint64, index Index) error

CreateSnapshot() (string, error)
CreateSnapshot(skipCacheOk bool) (string, error)
Backup(w io.Writer, basePath string, since time.Time) error
Export(w io.Writer, basePath string, start time.Time, end time.Time) error
Restore(r io.Reader, basePath string) error
Expand Down
39 changes: 19 additions & 20 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -908,25 +908,14 @@ func (e *Engine) Free() error {
// of the files in the archive. It will force a snapshot of the WAL first
// then perform the backup with a read lock against the file store. This means
// that new TSM files will not be able to be created in this shard while the
// backup is running. For shards that are still acively getting writes, this
// could cause the WAL to backup, increasing memory usage and evenutally rejecting writes.
// backup is running. For shards that are still actively getting writes, this
// could cause the WAL to backup, increasing memory usage and eventually rejecting writes.
func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error {
var err error
var path string
for i := 0; i < 3; i++ {
path, err = e.CreateSnapshot()
if err != nil {
switch err {
case ErrSnapshotInProgress:
backoff := time.Duration(math.Pow(32, float64(i))) * time.Millisecond
time.Sleep(backoff)
default:
return err
}
}
}
if err == ErrSnapshotInProgress {
e.logger.Warn("Snapshotter busy: Backup proceeding without snapshot contents.")
path, err = e.CreateSnapshot(true)
if err != nil {
return err
}
// Remove the temporary snapshot dir
defer os.RemoveAll(path)
Expand Down Expand Up @@ -990,7 +979,7 @@ func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo
}

func (e *Engine) Export(w io.Writer, basePath string, start time.Time, end time.Time) error {
path, err := e.CreateSnapshot()
path, err := e.CreateSnapshot(false)
if err != nil {
return err
}
Expand Down Expand Up @@ -1873,9 +1862,19 @@ func (e *Engine) WriteSnapshot() (err error) {
}

// CreateSnapshot will create a temp directory that holds
// temporary hardlinks to the underylyng shard files.
func (e *Engine) CreateSnapshot() (string, error) {
if err := e.WriteSnapshot(); err != nil {
// temporary hardlinks to the underlying shard files.
// skipCacheOk controls whether it is permissible to fail writing out
// in-memory cache data when a previous snapshot is in progress.
func (e *Engine) CreateSnapshot(skipCacheOk bool) (string, error) {
err := e.WriteSnapshot()
for i := 0; i < 3 && err == ErrSnapshotInProgress; i += 1 {
backoff := time.Duration(math.Pow(32, float64(i))) * time.Millisecond
time.Sleep(backoff)
err = e.WriteSnapshot()
}
if err == ErrSnapshotInProgress && skipCacheOk {
e.logger.Warn("Snapshotter busy: proceeding without cache contents")
} else if err != nil {
return "", err
}

Expand Down
102 changes: 102 additions & 0 deletions tsdb/engine/tsm1/engine_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package tsm1

import (
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"

"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/stretchr/testify/require"
)

func TestEngine_ConcurrentShardSnapshots(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "shard_test")
require.NoError(t, err, "error creating temporary directory")
defer os.RemoveAll(tmpDir)

tmpShard := filepath.Join(tmpDir, "shard")
tmpWal := filepath.Join(tmpDir, "wal")

sfile := NewSeriesFile(tmpDir)
defer sfile.Close()

opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{})

sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile, opts)
require.NoError(t, sh.Open(), "error opening shard")
defer sh.Close()

points := make([]models.Point, 0, 10000)
for i := 0; i < cap(points); i++ {
points = append(points, models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0},
time.Unix(int64(i), 0),
))
}
err = sh.WritePoints(points)
require.NoError(t, err)

engineInterface, err := sh.Engine()
require.NoError(t, err, "error retrieving shard engine")

// Get the struct underlying the interface. Not a recommended practice.
realEngineStruct, ok := (engineInterface).(*Engine)
if !ok {
t.Log("Engine type does not permit simulating Cache race conditions")
return
}
// fake a race condition in snapshotting the cache.
realEngineStruct.Cache.snapshotting = true
defer func() {
realEngineStruct.Cache.snapshotting = false
}()

snapshotFunc := func(skipCacheOk bool) {
if f, err := sh.CreateSnapshot(skipCacheOk); err == nil {
require.NoError(t, os.RemoveAll(f), "error cleaning up TestEngine_ConcurrentShardSnapshots")
} else if err == ErrSnapshotInProgress {
if skipCacheOk {
t.Fatalf("failing to ignore this error,: %s", err.Error())
}
} else {
t.Fatalf("error creating shard snapshot: %s", err.Error())
}
}

// Permit skipping cache in the snapshot
snapshotFunc(true)
// do not permit skipping the cache in the snapshot
snapshotFunc(false)
realEngineStruct.Cache.snapshotting = false
}

// NewSeriesFile returns a new instance of SeriesFile with a temporary file path.
func NewSeriesFile(tmpDir string) *tsdb.SeriesFile {
dir, err := ioutil.TempDir(tmpDir, "tsdb-series-file-")
if err != nil {
panic(err)
}
f := tsdb.NewSeriesFile(dir)
f.Logger = logger.New(os.Stdout)
if err := f.Open(); err != nil {
panic(err)
}
return f
}

type seriesIDSets []*tsdb.SeriesIDSet

func (a seriesIDSets) ForEach(f func(ids *tsdb.SeriesIDSet)) error {
for _, v := range a {
f(v)
}
return nil
}
4 changes: 2 additions & 2 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,12 +1133,12 @@ func (s *Shard) Import(r io.Reader, basePath string) error {

// CreateSnapshot will return a path to a temp directory
// containing hard links to the underlying shard files.
func (s *Shard) CreateSnapshot() (string, error) {
func (s *Shard) CreateSnapshot(skipCacheOk bool) (string, error) {
engine, err := s.Engine()
if err != nil {
return "", err
}
return engine.CreateSnapshot()
return engine.CreateSnapshot(skipCacheOk)
}

// ForEachMeasurementName iterates over each measurement in the shard.
Expand Down
4 changes: 2 additions & 2 deletions tsdb/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
}

_ = sh.WritePoints(points[:500])
if f, err := sh.CreateSnapshot(); err == nil {
if f, err := sh.CreateSnapshot(false); err == nil {
os.RemoveAll(f)
}

Expand All @@ -431,7 +431,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
}

_ = sh.WritePoints(points[500:])
if f, err := sh.CreateSnapshot(); err == nil {
if f, err := sh.CreateSnapshot(false); err == nil {
os.RemoveAll(f)
}
}
Expand Down
4 changes: 2 additions & 2 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,13 +629,13 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en

// CreateShardSnapShot will create a hard link to the underlying shard and return a path.
// The caller is responsible for cleaning up (removing) the file path returned.
func (s *Store) CreateShardSnapshot(id uint64) (string, error) {
func (s *Store) CreateShardSnapshot(id uint64, skipCacheOk bool) (string, error) {
sh := s.Shard(id)
if sh == nil {
return "", ErrShardNotFound
}

return sh.CreateSnapshot()
return sh.CreateSnapshot(skipCacheOk)
}

// SetShardEnabled enables or disables a shard for read and writes.
Expand Down
2 changes: 1 addition & 1 deletion tsdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func TestStore_CreateShardSnapShot(t *testing.T) {
t.Fatalf("expected shard")
}

dir, e := s.CreateShardSnapshot(1)
dir, e := s.CreateShardSnapshot(1, false)
if e != nil {
t.Fatal(e)
}
Expand Down

0 comments on commit 743aef4

Please sign in to comment.