Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix state.commit is out of range on restart #11888

Merged
merged 8 commits into from
May 15, 2020
29 changes: 23 additions & 6 deletions etcdserver/api/snap/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.etcd.io/etcd/v3/pkg/pbutil"
"go.etcd.io/etcd/v3/raft"
"go.etcd.io/etcd/v3/raft/raftpb"
"go.etcd.io/etcd/v3/wal/walpb"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -102,21 +103,37 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
return nil
}

// Load returns the newest snapshot.
func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
return s.loadMatching(func(*raftpb.Snapshot) bool { return true })
}

// LoadNewestAvailable loads the newest snapshot available that is in walSnaps.
func (s *Snapshotter) LoadNewestAvailable(walSnaps []walpb.Snapshot) (*raftpb.Snapshot, error) {
return s.loadMatching(func(snapshot *raftpb.Snapshot) bool {
m := snapshot.Metadata
for i := len(walSnaps) - 1; i >= 0; i-- {
if m.Term == walSnaps[i].Term && m.Index == walSnaps[i].Index {
return true
}
}
return false
})
}

// loadMatching returns the newest snapshot where matchFn returns true.
func (s *Snapshotter) loadMatching(matchFn func(*raftpb.Snapshot) bool) (*raftpb.Snapshot, error) {
names, err := s.snapNames()
if err != nil {
return nil, err
}
var snap *raftpb.Snapshot
for _, name := range names {
if snap, err = loadSnap(s.lg, s.dir, name); err == nil {
break
if snap, err = loadSnap(s.lg, s.dir, name); err == nil && matchFn(snap) {
return snap, nil
}
}
if err != nil {
return nil, ErrNoSnapshot
}
return snap, nil
return nil, ErrNoSnapshot
}

func loadSnap(lg *zap.Logger, dir, name string) (*raftpb.Snapshot, error) {
Expand Down
47 changes: 42 additions & 5 deletions etcdserver/api/snap/snapshotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"testing"

"go.etcd.io/etcd/v3/raft/raftpb"
"go.etcd.io/etcd/v3/wal/walpb"

"go.uber.org/zap"
)

Expand Down Expand Up @@ -166,12 +168,47 @@ func TestLoadNewestSnap(t *testing.T) {
t.Fatal(err)
}

g, err := ss.Load()
if err != nil {
t.Errorf("err = %v, want nil", err)
cases := []struct {
name string
availableWalSnaps []walpb.Snapshot
expected *raftpb.Snapshot
}{
{
name: "load-newest",
expected: &newSnap,
},
{
name: "loadnewestavailable-newest",
availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}, {Index: 5, Term: 1}},
expected: &newSnap,
},
{
name: "loadnewestavailable-newest-unsorted",
availableWalSnaps: []walpb.Snapshot{{Index: 5, Term: 1}, {Index: 1, Term: 1}, {Index: 0, Term: 0}},
expected: &newSnap,
},
{
name: "loadnewestavailable-previous",
availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}},
expected: testSnap,
},
}
if !reflect.DeepEqual(g, &newSnap) {
t.Errorf("snap = %#v, want %#v", g, &newSnap)
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
var err error
var g *raftpb.Snapshot
if tc.availableWalSnaps != nil {
g, err = ss.LoadNewestAvailable(tc.availableWalSnaps)
} else {
g, err = ss.Load()
}
if err != nil {
t.Errorf("err = %v, want nil", err)
}
if !reflect.DeepEqual(g, tc.expected) {
t.Errorf("snap = %#v, want %#v", g, tc.expected)
}
})
}
}

Expand Down
26 changes: 22 additions & 4 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,16 @@ func (r *raftNode) start(rh *raftReadyHandler) {
r.transport.Send(r.processMessages(rd.Messages))
}

// Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
// ensure that recovery after a snapshot restore is possible.
if !raft.IsEmptySnap(rd.Snapshot) {
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
}
// gofail: var raftAfterSaveSnap struct{}
}

// gofail: var raftBeforeSave struct{}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
Expand All @@ -237,17 +247,25 @@ func (r *raftNode) start(rh *raftReadyHandler) {
// gofail: var raftAfterSave struct{}

if !raft.IsEmptySnap(rd.Snapshot) {
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
// Force WAL to fsync its hard state before Release() releases
// old data from the WAL. Otherwise could get an error like:
// panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
jpbetz marked this conversation as resolved.
Show resolved Hide resolved
if err := r.storage.Sync(); err != nil {
log.Fatal(err)
jpbetz marked this conversation as resolved.
Show resolved Hide resolved
}

// etcdserver now claim the snapshot has been persisted onto the disk
notifyc <- struct{}{}

// gofail: var raftAfterSaveSnap struct{}
// gofail: var raftBeforeApplySnap struct{}
r.raftStorage.ApplySnapshot(rd.Snapshot)
r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
// gofail: var raftAfterApplySnap struct{}

if err := r.storage.Release(rd.Snapshot); err != nil {
log.Fatal(err)
jpbetz marked this conversation as resolved.
Show resolved Hide resolved
}
// gofail: var raftAfterWALRelease struct{}
}

r.raftStorage.Append(rd.Entries)
Expand Down
28 changes: 20 additions & 8 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ import (
"sync/atomic"
"time"

"github.com/coreos/go-semver/semver"
humanize "github.com/dustin/go-humanize"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"go.etcd.io/etcd/v3/auth"
"go.etcd.io/etcd/v3/etcdserver/api"
"go.etcd.io/etcd/v3/etcdserver/api/membership"
Expand Down Expand Up @@ -59,11 +64,6 @@ import (
"go.etcd.io/etcd/v3/raft/raftpb"
"go.etcd.io/etcd/v3/version"
"go.etcd.io/etcd/v3/wal"

"github.com/coreos/go-semver/semver"
humanize "github.com/dustin/go-humanize"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -414,10 +414,19 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
zap.String("wal-dir", cfg.WALDir()),
)
}
snapshot, err = ss.Load()

// Find a snapshot to start/restart a raft node
walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir())
if err != nil {
return nil, err
}
// snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding
// wal log entries
snapshot, err := ss.LoadNewestAvailable(walSnaps)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change created a local snapshot variable. Now the snapshot declared on line 337 is always passed empty to validation on line 535.

if err != nil && err != snap.ErrNoSnapshot {
return nil, err
}

if snapshot != nil {
if err = st.Recovery(snapshot.Data); err != nil {
cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err))
Expand Down Expand Up @@ -2150,11 +2159,14 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
}
lg.Panic("failed to create snapshot", zap.Error(err))
}
// SaveSnap saves the snapshot and releases the locked wal files
// to the snapshot index.
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
if err = s.r.storage.SaveSnap(snap); err != nil {
lg.Panic("failed to save snapshot", zap.Error(err))
}
if err = s.r.storage.Release(snap); err != nil {
lg.Panic("failed to release wal", zap.Error(err))
}

lg.Info(
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
Expand Down
44 changes: 33 additions & 11 deletions etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,15 +995,19 @@ func TestSnapshot(t *testing.T) {
ch := make(chan struct{}, 2)

go func() {
gaction, _ := p.Wait(1)
gaction, _ := p.Wait(2)
defer func() { ch <- struct{}{} }()

if len(gaction) != 1 {
t.Errorf("len(action) = %d, want 1", len(gaction))
if len(gaction) != 2 {
t.Fatalf("len(action) = %d, want 2", len(gaction))
}
if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) {
t.Errorf("action = %s, want SaveSnap", gaction[0])
}

if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "Release"}) {
t.Errorf("action = %s, want Release", gaction[1])
}
}()

go func() {
Expand Down Expand Up @@ -1087,20 +1091,32 @@ func TestSnapshotOrdering(t *testing.T) {
n.readyc <- raft.Ready{Snapshot: snapMsg.Snapshot}
}()

if ac := <-p.Chan(); ac.Name != "Save" {
ac := <-p.Chan()
if ac.Name != "Save" {
t.Fatalf("expected Save, got %+v", ac)
}

if ac := <-p.Chan(); ac.Name != "SaveSnap" {
t.Fatalf("expected SaveSnap, got %+v", ac)
}

if ac := <-p.Chan(); ac.Name != "Save" {
t.Fatalf("expected Save, got %+v", ac)
}

// confirm snapshot file still present before calling SaveSnap
snapPath := filepath.Join(snapdir, fmt.Sprintf("%016x.snap.db", 1))
if !fileutil.Exist(snapPath) {
t.Fatalf("expected file %q, got missing", snapPath)
}

// unblock SaveSnapshot, etcdserver now permitted to move snapshot file
if ac := <-p.Chan(); ac.Name != "SaveSnap" {
t.Fatalf("expected SaveSnap, got %+v", ac)
if ac := <-p.Chan(); ac.Name != "Sync" {
t.Fatalf("expected Sync, got %+v", ac)
}

if ac := <-p.Chan(); ac.Name != "Release" {
t.Fatalf("expected Release, got %+v", ac)
}
}

Expand Down Expand Up @@ -1140,16 +1156,22 @@ func TestTriggerSnap(t *testing.T) {

donec := make(chan struct{})
go func() {
wcnt := 2 + snapc
wcnt := 3 + snapc
gaction, _ := p.Wait(wcnt)

// each operation is recorded as a Save
// (SnapshotCount+1) * Puts + SaveSnap = (SnapshotCount+1) * Save + SaveSnap
// (SnapshotCount+1) * Puts + SaveSnap = (SnapshotCount+1) * Save + SaveSnap + Release
if len(gaction) != wcnt {
t.Errorf("len(action) = %d, want %d", len(gaction), wcnt)
fmt.Println("gaction", gaction)
jpbetz marked this conversation as resolved.
Show resolved Hide resolved
t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
}

if !reflect.DeepEqual(gaction[wcnt-2], testutil.Action{Name: "SaveSnap"}) {
t.Errorf("action = %s, want SaveSnap", gaction[wcnt-2])
}
if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) {
t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1])

if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "Release"}) {
t.Errorf("action = %s, want Release", gaction[wcnt-1])
}
close(donec)
}()
Expand Down
26 changes: 19 additions & 7 deletions etcdserver/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ type Storage interface {
SaveSnap(snap raftpb.Snapshot) error
// Close closes the Storage and performs finalization.
Close() error
// Release releases the locked wal files older than the provided snapshot.
Release(snap raftpb.Snapshot) error
// Sync WAL
Sync() error
}

type storage struct {
Expand All @@ -47,24 +51,32 @@ func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
return &storage{w, s}
}

// SaveSnap saves the snapshot to disk and release the locked
// wal files since they will not be used.
// SaveSnap saves the snapshot file to disk and writes the WAL snapshot entry.
func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
walsnap := walpb.Snapshot{
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
}
err := st.WAL.SaveSnapshot(walsnap)
if err != nil {
return err
}
err = st.Snapshotter.SaveSnap(snap)
// save the snapshot file before writing the snapshot to the wal.
// This makes it possible for the snapshot file to become orphaned, but prevents
// a WAL snapshot entry from having no corresponding snapshot file.
err := st.Snapshotter.SaveSnap(snap)
if err != nil {
return err
}
// gofail: var raftBeforeWALSaveSnaphot struct{}

return st.WAL.SaveSnapshot(walsnap)
}

// Release release the locks to the wal files that are older than the provided wal for the given snap.
func (st *storage) Release(snap raftpb.Snapshot) error {
return st.WAL.ReleaseLockTo(snap.Metadata.Index)
}

// readWAL reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear
// after the position of the given snap in the WAL.
// The snap must have been previously saved to the WAL, or this call will panic.
jpbetz marked this conversation as resolved.
Show resolved Hide resolved
func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
var (
err error
Expand Down
12 changes: 12 additions & 0 deletions pkg/mock/mockstorage/storage_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,16 @@ func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error {
return nil
}

func (p *storageRecorder) Release(st raftpb.Snapshot) error {
if !raft.IsEmptySnap(st) {
p.Record(testutil.Action{Name: "Release"})
}
return nil
}

func (p *storageRecorder) Sync() error {
p.Record(testutil.Action{Name: "Sync"})
return nil
}

func (p *storageRecorder) Close() error { return nil }
Loading