Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
corylanou committed May 10, 2016
1 parent 10db0aa commit f415cf8
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 2 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ With this release InfluxDB is moving to Go v1.6.
- [#6534](https://github.com/influxdata/influxdb/pull/6534): Move to Go v1.6.2 (over Go v1.4.3)
- [#6522](https://github.com/influxdata/influxdb/pull/6522): Dump TSM files to line protocol
- [#6585](https://github.com/influxdata/influxdb/pull/6585): Parallelize iterators
- [#6502](https://github.com/influxdata/influxdb/pull/6502): Add ability to copy shard via rpc calls. Remove deprecated copier service..
- [#6502](https://github.com/influxdata/influxdb/pull/6502): Add ability to copy shard via rpc calls. Remove deprecated copier service.
- [#6593](https://github.com/influxdata/influxdb/pull/6593): Add ability to create snapshots of shards.

### Bugfixes

Expand Down
1 change: 1 addition & 0 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Engine interface {
DeleteMeasurement(name string, seriesKeys []string) error
SeriesCount() (n int, err error)
MeasurementFields(measurement string) *MeasurementFields
CreateSnapshot() (string, error)

// Format will return the format for the engine
Format() EngineFormat
Expand Down
28 changes: 28 additions & 0 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"archive/tar"
"fmt"
"io"
"io/ioutil"
"log"
"math"
"os"
Expand Down Expand Up @@ -578,6 +579,19 @@ func (e *Engine) WriteSnapshot() error {
return e.writeSnapshotAndCommit(closedFiles, snapshot, compactor)
}

// CreateSnapshot will create a temp directory that holds
// temporary hardlinks to the underylyng shard files
func (e *Engine) CreateSnapshot() (string, error) {
if err := e.WriteSnapshot(); err != nil {
return "", nil
}

e.mu.RLock()
defer e.mu.RUnlock()

return e.FileStore.CreateSnapshot()
}

// writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments
func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache, compactor *Compactor) (err error) {

Expand Down Expand Up @@ -797,6 +811,20 @@ func (e *Engine) cleanup() error {
return fmt.Errorf("error removing temp compaction files: %v", err)
}
}

allfiles, err := ioutil.ReadDir(e.path)
if err != nil {
return err
}
for _, f := range allfiles {
// Check to see if there are any `.tmp` directories that were left over from failed shard snapshots
if f.IsDir() && strings.HasSuffix(f.Name(), ".tmp") {
if err := os.Remove(f.Name()); err != nil {
return fmt.Errorf("error removing tmp snapshot directory %q: %s", f.Name(), err)
}
}
}

return nil
}

Expand Down
60 changes: 59 additions & 1 deletion tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"expvar"
"fmt"
"io"
"io/ioutil"
"log"
"math"
"os"
Expand All @@ -20,7 +21,7 @@ import (

type TSMFile interface {
// Path returns the underlying file path for the TSMFile. If the file
// has not be written or loaded from disk, the zero value is returne.
// has not be written or loaded from disk, the zero value is returned.
Path() string

// Read returns all the values in the block where time t resides
Expand Down Expand Up @@ -113,6 +114,8 @@ type FileStore struct {
traceLogging bool

statMap *expvar.Map

currentTempDirID int
}

type FileStat struct {
Expand Down Expand Up @@ -294,6 +297,24 @@ func (f *FileStore) Open() error {
return nil
}

// find the current max ID for temp directories
tmpfiles, err := ioutil.ReadDir(f.dir)
if err != nil {
return err
}
for _, fi := range tmpfiles {
if fi.IsDir() && strings.HasSuffix(fi.Name(), ".tmp") {
ss := strings.Split(filepath.Base(fi.Name()), ".")
if len(ss) == 2 {
if i, err := strconv.Atoi(ss[0]); err != nil {
if i > f.currentTempDirID {
f.currentTempDirID = i
}
}
}
}
}

files, err := filepath.Glob(filepath.Join(f.dir, fmt.Sprintf("*.%s", TSMFileExtension)))
if err != nil {
return err
Expand Down Expand Up @@ -589,6 +610,43 @@ func (f *FileStore) locations(key string, t int64, ascending bool) []*location {
return locations
}

// CreateSnapshot will create hardlinks for all tsm and tombstone files
// in the path provided
func (f *FileStore) CreateSnapshot() (string, error) {
files := f.Files()

f.mu.Lock()
f.currentTempDirID += 1
f.mu.Unlock()

f.mu.RLock()
defer f.mu.RUnlock()

// get a tmp directory name
tmpPath := fmt.Sprintf("%s/%d.tmp", f.dir, f.currentTempDirID)
err := os.Mkdir(tmpPath, 0777)
if err != nil {
return "", nil
}

for _, tsmf := range files {
newpath := filepath.Join(tmpPath, filepath.Base(tsmf.Path()))
if err := os.Link(tsmf.Path(), newpath); err != nil {
return "", fmt.Errorf("error creating tsm hard link: %q", err)
}
// Check for tombstones and link those as well
for _, tf := range tsmf.TombstoneFiles() {
tfpath := filepath.Join(f.dir, tf.Path)
newpath := filepath.Join(tmpPath, filepath.Base(tf.Path))
if err := os.Link(tfpath, newpath); err != nil {
return "", fmt.Errorf("error creating tombstone hard link: %q", err)
}
}
}

return tmpPath, nil
}

// ParseTSMFileName parses the generation and sequence from a TSM file name.
func ParseTSMFileName(name string) (int, int, error) {
base := filepath.Base(name)
Expand Down
53 changes: 53 additions & 0 deletions tsdb/engine/tsm1/file_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1538,7 +1538,60 @@ func TestFileStore_Stats(t *testing.T) {
if got, exp := len(stats), 3; got != exp {
t.Fatalf("file count mismatch: got %v, exp %v", got, exp)
}
}

func TestFileStore_CreateSnapshot(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)

// Setup 3 files
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 1.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(1, 2.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, 3.0)}},
}

files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}

fs.Add(files...)

// Create a tombstone
if err := fs.DeleteRange([]string{"cpu"}, 1, 1); err != nil {
t.Fatalf("unexpected error delete range: %v", err)
}

s, e := fs.CreateSnapshot()
if e != nil {
t.Fatal(e)
}
t.Logf("temp file for hard links: %q", s)

tfs, e := ioutil.ReadDir(s)
if e != nil {
t.Fatal(e)
}
if len(tfs) == 0 {
t.Fatal("no files found")
}

for _, f := range fs.Files() {
p := filepath.Join(s, filepath.Base(f.Path()))
t.Logf("checking for existence of hard link %q", p)
if _, err := os.Stat(p); os.IsNotExist(err) {
t.Fatalf("unable to find file %q", p)
}
for _, tf := range f.TombstoneFiles() {
p := filepath.Join(s, filepath.Base(tf.Path))
t.Logf("checking for existence of hard link %q", p)
if _, err := os.Stat(p); os.IsNotExist(err) {
t.Fatalf("unable to find file %q", p)
}
}
}
}

func newFileDir(dir string, values ...keyValues) ([]string, error) {
Expand Down
8 changes: 8 additions & 0 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,14 @@ func (s *Shard) Restore(r io.Reader, basePath string) error {
return s.Open()
}

// CreateSnapshot will return a path to a temp directory
// containing hard links to the underlying shard files
func (s *Shard) CreateSnapshot() (string, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.engine.CreateSnapshot()
}

// Shards represents a sortable list of shards.
type Shards []*Shard

Expand Down
11 changes: 11 additions & 0 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,17 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
return nil
}

// CreateShardSnapShot will create a hard link to the underlying shard and return a path
// The caller is responsible for cleaning up (removing) the file path returned
func (s *Store) CreateShardSnapshot(id uint64) (string, error) {
sh := s.Shard(id)
if sh == nil {
return "", ErrShardNotFound
}

return sh.CreateSnapshot()
}

// DeleteShard removes a shard from disk.
func (s *Store) DeleteShard(shardID uint64) error {
s.mu.Lock()
Expand Down
23 changes: 23 additions & 0 deletions tsdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,29 @@ func TestStore_DeleteShard(t *testing.T) {
}
}

// Ensure the store can create a snapshot to a shard.
func TestStore_CreateShardSnapShot(t *testing.T) {
s := MustOpenStore()
defer s.Close()

// Create a new shard and verify that it exists.
if err := s.CreateShard("db0", "rp0", 1); err != nil {
t.Fatal(err)
} else if sh := s.Shard(1); sh == nil {
t.Fatalf("expected shard")
} else if di := s.DatabaseIndex("db0"); di == nil {
t.Errorf("expected database index")
}

dir, e := s.CreateShardSnapshot(1)
if e != nil {
t.Fatal(e)
}
if dir == "" {
t.Fatal("empty directory name")
}
}

// Ensure the store reports an error when it can't open a database directory.
func TestStore_Open_InvalidDatabaseFile(t *testing.T) {
s := NewStore()
Expand Down

0 comments on commit f415cf8

Please sign in to comment.