Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix state.commit is out of range on restart #11888

Merged
merged 8 commits into from
May 15, 2020
58 changes: 52 additions & 6 deletions etcdserver/api/snap/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"time"

Expand All @@ -30,6 +31,7 @@ import (
"go.etcd.io/etcd/v3/pkg/pbutil"
"go.etcd.io/etcd/v3/raft"
"go.etcd.io/etcd/v3/raft/raftpb"
"go.etcd.io/etcd/v3/wal/walpb"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -102,21 +104,37 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
return nil
}

// Load returns the newest snapshot.
func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
return s.loadMatching(func(*raftpb.Snapshot) bool { return true })
}

// LoadNewestAvailable loads the newest snapshot available that is in walSnaps.
func (s *Snapshotter) LoadNewestAvailable(walSnaps []walpb.Snapshot) (*raftpb.Snapshot, error) {
return s.loadMatching(func(snapshot *raftpb.Snapshot) bool {
m := snapshot.Metadata
for i := len(walSnaps) - 1; i >= 0; i-- {
if m.Term == walSnaps[i].Term && m.Index == walSnaps[i].Index {
return true
}
}
return false
})
}

// loadMatching returns the newest snapshot where matchFn returns true.
func (s *Snapshotter) loadMatching(matchFn func(*raftpb.Snapshot) bool) (*raftpb.Snapshot, error) {
names, err := s.snapNames()
if err != nil {
return nil, err
}
var snap *raftpb.Snapshot
for _, name := range names {
if snap, err = loadSnap(s.lg, s.dir, name); err == nil {
break
if snap, err = loadSnap(s.lg, s.dir, name); err == nil && matchFn(snap) {
return snap, nil
}
}
if err != nil {
return nil, ErrNoSnapshot
}
return snap, nil
return nil, ErrNoSnapshot
}

func loadSnap(lg *zap.Logger, dir, name string) (*raftpb.Snapshot, error) {
Expand Down Expand Up @@ -248,3 +266,31 @@ func (s *Snapshotter) cleanupSnapdir(filenames []string) error {
}
return nil
}

func (s *Snapshotter) ReleaseSnapDBs(snap raftpb.Snapshot) error {
dir, err := os.Open(s.dir)
if err != nil {
return err
}
defer dir.Close()
filenames, err := dir.Readdirnames(-1)
if err != nil {
return err
}
for _, filename := range filenames {
if strings.HasSuffix(filename, ".snap.db") {
hexIndex := strings.TrimSuffix(filepath.Base(filename), ".snap.db")
index, err := strconv.ParseUint(hexIndex, 16, 64)
if err != nil {
return fmt.Errorf("failed to parse index from .snap.db filename '%s': %w", filename, err)
Copy link
Contributor

@tedyu tedyu May 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.snap.db seems to be redundant since it is suffix of filename

Also, we should continue the loop and try to release other files

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Submitted #11899

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't just call it .snap since there are already different files with that suffix. But the purpose of this PR isn't to rethink the file suffix anyway...

Continuing if we hit an error sounds fine. This is merged now though, so that would need a separate PR.

}
if index < snap.Metadata.Index {
s.lg.Info("found orphaned .snap.db file; deleting", zap.String("path", filename))
if rmErr := os.Remove(filepath.Join(s.dir, filename)); rmErr != nil && !os.IsNotExist(rmErr) {
return fmt.Errorf("failed to remove orphaned defragmentation file %s: %v", filename, rmErr)
}
}
}
}
return nil
}
87 changes: 82 additions & 5 deletions etcdserver/api/snap/snapshotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package snap

import (
"fmt"
"go.etcd.io/etcd/v3/pkg/fileutil"
"hash/crc32"
"io/ioutil"
"os"
Expand All @@ -24,6 +25,8 @@ import (
"testing"

"go.etcd.io/etcd/v3/raft/raftpb"
"go.etcd.io/etcd/v3/wal/walpb"

"go.uber.org/zap"
)

Expand Down Expand Up @@ -166,12 +169,47 @@ func TestLoadNewestSnap(t *testing.T) {
t.Fatal(err)
}

g, err := ss.Load()
if err != nil {
t.Errorf("err = %v, want nil", err)
cases := []struct {
name string
availableWalSnaps []walpb.Snapshot
expected *raftpb.Snapshot
}{
{
name: "load-newest",
expected: &newSnap,
},
{
name: "loadnewestavailable-newest",
availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}, {Index: 5, Term: 1}},
expected: &newSnap,
},
{
name: "loadnewestavailable-newest-unsorted",
availableWalSnaps: []walpb.Snapshot{{Index: 5, Term: 1}, {Index: 1, Term: 1}, {Index: 0, Term: 0}},
expected: &newSnap,
},
{
name: "loadnewestavailable-previous",
availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}},
expected: testSnap,
},
}
if !reflect.DeepEqual(g, &newSnap) {
t.Errorf("snap = %#v, want %#v", g, &newSnap)
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
var err error
var g *raftpb.Snapshot
if tc.availableWalSnaps != nil {
g, err = ss.LoadNewestAvailable(tc.availableWalSnaps)
} else {
g, err = ss.Load()
}
if err != nil {
t.Errorf("err = %v, want nil", err)
}
if !reflect.DeepEqual(g, tc.expected) {
t.Errorf("snap = %#v, want %#v", g, tc.expected)
}
})
}
}

Expand Down Expand Up @@ -229,3 +267,42 @@ func TestAllSnapshotBroken(t *testing.T) {
t.Errorf("err = %v, want %v", err, ErrNoSnapshot)
}
}

func TestReleaseSnapDBs(t *testing.T) {
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)

snapIndices := []uint64{100, 200, 300, 400}
for _, index := range snapIndices {
filename := filepath.Join(dir, fmt.Sprintf("%016x.snap.db", index))
if err := ioutil.WriteFile(filename, []byte("snap file\n"), 0644); err != nil {
t.Fatal(err)
}
}

ss := New(zap.NewExample(), dir)

if err := ss.ReleaseSnapDBs(raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 300}}); err != nil {
t.Fatal(err)
}

deleted := []uint64{100, 200}
for _, index := range deleted {
filename := filepath.Join(dir, fmt.Sprintf("%016x.snap.db", index))
if fileutil.Exist(filename) {
t.Errorf("expected %s (index: %d) to be deleted, but it still exists", filename, index)
}
}

retained := []uint64{300, 400}
for _, index := range retained {
filename := filepath.Join(dir, fmt.Sprintf("%016x.snap.db", index))
if !fileutil.Exist(filename) {
t.Errorf("expected %s (index: %d) to be retained, but it no longer exists", filename, index)
}
}
}
27 changes: 23 additions & 4 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,16 @@ func (r *raftNode) start(rh *raftReadyHandler) {
r.transport.Send(r.processMessages(rd.Messages))
}

// Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
// ensure that recovery after a snapshot restore is possible.
if !raft.IsEmptySnap(rd.Snapshot) {
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
}
// gofail: var raftAfterSaveSnap struct{}
}

// gofail: var raftBeforeSave struct{}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
Expand All @@ -237,17 +247,26 @@ func (r *raftNode) start(rh *raftReadyHandler) {
// gofail: var raftAfterSave struct{}

if !raft.IsEmptySnap(rd.Snapshot) {
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
// Force WAL to fsync its hard state before Release() releases
// old data from the WAL. Otherwise could get an error like:
// panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
jpbetz marked this conversation as resolved.
Show resolved Hide resolved
// See https://github.com/etcd-io/etcd/issues/10219 for more details.
if err := r.storage.Sync(); err != nil {
r.lg.Fatal("failed to sync Raft snapshot", zap.Error(err))
}

// etcdserver now claim the snapshot has been persisted onto the disk
notifyc <- struct{}{}

// gofail: var raftAfterSaveSnap struct{}
// gofail: var raftBeforeApplySnap struct{}
r.raftStorage.ApplySnapshot(rd.Snapshot)
r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
// gofail: var raftAfterApplySnap struct{}

if err := r.storage.Release(rd.Snapshot); err != nil {
r.lg.Fatal("failed to release Raft wal", zap.Error(err))
}
// gofail: var raftAfterWALRelease struct{}
}

r.raftStorage.Append(rd.Entries)
Expand Down
28 changes: 20 additions & 8 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ import (
"sync/atomic"
"time"

"github.com/coreos/go-semver/semver"
humanize "github.com/dustin/go-humanize"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"go.etcd.io/etcd/v3/auth"
"go.etcd.io/etcd/v3/etcdserver/api"
"go.etcd.io/etcd/v3/etcdserver/api/membership"
Expand Down Expand Up @@ -59,11 +64,6 @@ import (
"go.etcd.io/etcd/v3/raft/raftpb"
"go.etcd.io/etcd/v3/version"
"go.etcd.io/etcd/v3/wal"

"github.com/coreos/go-semver/semver"
humanize "github.com/dustin/go-humanize"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -414,10 +414,19 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
zap.String("wal-dir", cfg.WALDir()),
)
}
snapshot, err = ss.Load()

// Find a snapshot to start/restart a raft node
walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir())
if err != nil {
return nil, err
}
// snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding
// wal log entries
snapshot, err := ss.LoadNewestAvailable(walSnaps)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change created a local snapshot variable. Now the snapshot declared on line 337 is always passed empty to validation on line 535.

if err != nil && err != snap.ErrNoSnapshot {
return nil, err
}

if snapshot != nil {
if err = st.Recovery(snapshot.Data); err != nil {
cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err))
Expand Down Expand Up @@ -2150,11 +2159,14 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
}
lg.Panic("failed to create snapshot", zap.Error(err))
}
// SaveSnap saves the snapshot and releases the locked wal files
// to the snapshot index.
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
if err = s.r.storage.SaveSnap(snap); err != nil {
lg.Panic("failed to save snapshot", zap.Error(err))
}
if err = s.r.storage.Release(snap); err != nil {
lg.Panic("failed to release wal", zap.Error(err))
}

lg.Info(
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
Expand Down
Loading