Skip to content

Commit

Permalink
snap, etcdserver: tighten up snapshot path handling
Browse files Browse the repository at this point in the history
Computing the snapshot file path is error prone; snapshot recovery was
constructing file paths missing a path separator so the snapshot
would never be loaded. Instead, refactor the backend path handling
to use helper functions where possible.
  • Loading branch information
Anthony Romano committed May 11, 2017
1 parent 066062a commit f6cd4d4
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 162 deletions.
81 changes: 81 additions & 0 deletions etcdserver/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcdserver

import (
"fmt"
"os"
"time"

"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
)

func newBackend(cfg *ServerConfig) backend.Backend {
bcfg := backend.DefaultBackendConfig()
bcfg.Path = cfg.backendPath()
if cfg.QuotaBackendBytes > 0 && cfg.QuotaBackendBytes != DefaultQuotaBytes {
// permit 10% excess over quota for disarm
bcfg.MmapSize = uint64(cfg.QuotaBackendBytes + cfg.QuotaBackendBytes/10)
}
return backend.New(bcfg)
}

// openSnapshotBackend renames a snapshot db to the current etcd db and opens it.
func openSnapshotBackend(cfg *ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) {
snapPath, err := ss.DBFilePath(snapshot.Metadata.Index)
if err != nil {
return nil, fmt.Errorf("database snapshot file path error: %v", err)
}
if err := os.Rename(snapPath, cfg.backendPath()); err != nil {
return nil, fmt.Errorf("rename snapshot file error: %v", err)
}
return openBackend(cfg), nil
}

// openBackend returns a backend using the current etcd db.
func openBackend(cfg *ServerConfig) backend.Backend {
fn := cfg.backendPath()
beOpened := make(chan backend.Backend)
go func() {
beOpened <- newBackend(cfg)
}()
select {
case be := <-beOpened:
return be
case <-time.After(time.Second):
plog.Warningf("another etcd process is using %q and holds the file lock.", fn)
plog.Warningf("waiting for it to exit before starting...")
}
return <-beOpened
}

// recoverBackendSnapshot recovers the DB from a snapshot in case etcd crashes
// before updating the backend db after persisting raft snapshot to disk,
// violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this
// case, replace the db with the snapshot db sent by the leader.
func recoverSnapshotBackend(cfg *ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
var cIndex consistentIndex
kv := mvcc.New(oldbe, &lease.FakeLessor{}, &cIndex)
defer kv.Close()
if snapshot.Metadata.Index <= kv.ConsistentIndex() {
return oldbe, nil
}
oldbe.Close()
return openSnapshotBackend(cfg, snap.New(cfg.SnapDir()), snapshot)
}
2 changes: 2 additions & 0 deletions etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,5 @@ func (c *ServerConfig) bootstrapTimeout() time.Duration {
}
return time.Second
}

func (c *ServerConfig) backendPath() string { return filepath.Join(c.SnapDir(), "db") }
48 changes: 14 additions & 34 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"net/http"
"os"
"path"
"path/filepath"
"regexp"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -76,7 +75,6 @@ const (
// (since it will timeout).
monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second

databaseFilename = "db"
// max number of in-flight snapshot messages etcdserver allows to have
// This number is more than enough for most clusters with 5 machines.
maxInFlightMsgSnap = 16
Expand Down Expand Up @@ -200,7 +198,8 @@ type EtcdServer struct {

cluster *membership.RaftCluster

store store.Store
store store.Store
snapshotter *snap.Snapshotter

applyV2 ApplierV2

Expand Down Expand Up @@ -271,10 +270,9 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
}
ss := snap.New(cfg.SnapDir())

bepath := filepath.Join(cfg.SnapDir(), databaseFilename)
bepath := cfg.backendPath()
beExist := fileutil.Exist(bepath)

be := openBackend(bepath, cfg.QuotaBackendBytes)
be := openBackend(cfg)

defer func() {
if err != nil {
Expand Down Expand Up @@ -372,9 +370,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
plog.Panicf("recovered store from snapshot error: %v", err)
}
plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index)

be, err = checkAndRecoverDB(snapshot, be, cfg.QuotaBackendBytes, cfg.SnapDir())
if err != nil {
if be, err = recoverSnapshotBackend(cfg, be, *snapshot); err != nil {
plog.Panicf("recovering backend from snapshot error: %v", err)
}
}
Expand Down Expand Up @@ -408,11 +404,12 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {

heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
srv = &EtcdServer{
readych: make(chan struct{}),
Cfg: cfg,
snapCount: cfg.SnapCount,
errorc: make(chan error, 1),
store: st,
readych: make(chan struct{}),
Cfg: cfg,
snapCount: cfg.SnapCount,
errorc: make(chan error, 1),
store: st,
snapshotter: ss,
r: *newRaftNode(
raftNodeConfig{
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Expand Down Expand Up @@ -795,21 +792,14 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
apply.snapshot.Metadata.Index, ep.appliedi)
}

// wait for raftNode to persist snashot onto the disk
// wait for raftNode to persist snapshot onto the disk
<-apply.notifyc

snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index)
newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot)
if err != nil {
plog.Panicf("get database snapshot file path error: %v", err)
}

fn := filepath.Join(s.Cfg.SnapDir(), databaseFilename)
if err := os.Rename(snapfn, fn); err != nil {
plog.Panicf("rename snapshot file error: %v", err)
plog.Panic(err)
}

newbe := newBackend(fn, s.Cfg.QuotaBackendBytes)

// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
if s.lessor != nil {
Expand Down Expand Up @@ -1662,13 +1652,3 @@ func (s *EtcdServer) goAttach(f func()) {
f()
}()
}

func newBackend(path string, quotaBytes int64) backend.Backend {
bcfg := backend.DefaultBackendConfig()
bcfg.Path = path
if quotaBytes > 0 && quotaBytes != DefaultQuotaBytes {
// permit 10% excess over quota for disarm
bcfg.MmapSize = uint64(quotaBytes + quotaBytes/10)
}
return backend.New(bcfg)
}
83 changes: 40 additions & 43 deletions etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"testing"
"time"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/pkg/idutil"
"github.com/coreos/etcd/pkg/mock/mockstorage"
"github.com/coreos/etcd/pkg/mock/mockstore"
Expand All @@ -40,6 +42,7 @@ import (
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/store"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -964,13 +967,15 @@ func TestSnapshotOrdering(t *testing.T) {
t.Fatalf("couldn't open tempdir (%v)", err)
}
defer os.RemoveAll(testdir)
if err := os.MkdirAll(testdir+"/member/snap", 0755); err != nil {

snapdir := filepath.Join(testdir, "member", "snap")
if err := os.MkdirAll(snapdir, 0755); err != nil {
t.Fatalf("couldn't make snap dir (%v)", err)
}

rs := raft.NewMemoryStorage()
p := mockstorage.NewStorageRecorderStream(testdir)
tr, snapDoneC := rafthttp.NewSnapTransporter(testdir)
tr, snapDoneC := rafthttp.NewSnapTransporter(snapdir)
r := newRaftNode(raftNodeConfig{
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: n,
Expand All @@ -982,10 +987,11 @@ func TestSnapshotOrdering(t *testing.T) {
Cfg: &ServerConfig{
DataDir: testdir,
},
r: *r,
store: st,
cluster: cl,
SyncTicker: &time.Ticker{},
r: *r,
store: st,
snapshotter: snap.New(snapdir),
cluster: cl,
SyncTicker: &time.Ticker{},
}
s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster}

Expand All @@ -997,40 +1003,30 @@ func TestSnapshotOrdering(t *testing.T) {
s.start()
defer s.Stop()

actionc := p.Chan()
n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
if ac := <-actionc; ac.Name != "Save" {
// MsgSnap triggers raftNode to call Save()
t.Fatalf("expect save() is called, but got %v", ac.Name)
}

// get the snapshot sent by the transport
snapMsg := <-snapDoneC

// Snapshot first triggers raftnode to persists the snapshot onto disk
// before renaming db snapshot file to db
snapMsg.Snapshot.Metadata.Index = 1
n.readyc <- raft.Ready{Snapshot: snapMsg.Snapshot}
var seenSaveSnap bool
timer := time.After(5 * time.Second)
for {
select {
case ac := <-actionc:
switch ac.Name {
// DBFilePath() is called immediately before snapshot renaming.
case "DBFilePath":
if !seenSaveSnap {
t.Fatalf("DBFilePath called before SaveSnap")
}
return
case "SaveSnap":
seenSaveSnap = true
default:
continue
}
case <-timer:
t.Fatalf("timeout waiting on actions")
}
go func() {
// get the snapshot sent by the transport
snapMsg := <-snapDoneC
// Snapshot first triggers raftnode to persists the snapshot onto disk
// before renaming db snapshot file to db
snapMsg.Snapshot.Metadata.Index = 1
n.readyc <- raft.Ready{Snapshot: snapMsg.Snapshot}
}()

if ac := <-p.Chan(); ac.Name != "Save" {
t.Fatalf("expected Save, got %+v", ac)
}
if ac := <-p.Chan(); ac.Name != "Save" {
t.Fatalf("expected Save, got %+v", ac)
}
// confirm snapshot file still present before calling SaveSnap
snapPath := filepath.Join(snapdir, fmt.Sprintf("%016x.snap.db", 1))
if !fileutil.Exist(snapPath) {
t.Fatalf("expected file %q, got missing", snapPath)
}
// unblock SaveSnapshot, etcdserver now permitted to move snapshot file
if ac := <-p.Chan(); ac.Name != "SaveSnap" {
t.Fatalf("expected SaveSnap, got %+v", ac)
}
}

Expand Down Expand Up @@ -1119,10 +1115,11 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
Cfg: &ServerConfig{
DataDir: testdir,
},
r: *r,
store: st,
cluster: cl,
SyncTicker: &time.Ticker{},
r: *r,
store: st,
snapshotter: snap.New(testdir),
cluster: cl,
SyncTicker: &time.Ticker{},
}
s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster}

Expand Down
3 changes: 0 additions & 3 deletions etcdserver/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ type Storage interface {
Save(st raftpb.HardState, ents []raftpb.Entry) error
// SaveSnap function saves snapshot to the underlying stable storage.
SaveSnap(snap raftpb.Snapshot) error
// DBFilePath returns the file path of database snapshot saved with given
// id.
DBFilePath(id uint64) (string, error)
// Close closes the Storage and performs finalization.
Close() error
}
Expand Down
Loading

0 comments on commit f6cd4d4

Please sign in to comment.