diff --git a/api.go b/api.go index aa704b4d5..4b34b4d07 100644 --- a/api.go +++ b/api.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "io" - "os" "strconv" "sync" "sync/atomic" @@ -315,6 +314,9 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore, if err != nil { return fmt.Errorf("failed to list snapshots: %v", err) } + + logger := conf.getOrCreateLogger() + for _, snapshot := range snapshots { var source io.ReadCloser _, source, err = snaps.Open(snapshot.ID) @@ -330,9 +332,18 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore, // server instance. If the same process will eventually become a Raft peer // then it will call NewRaft and restore again from disk then which will // report metrics. - err = fsm.Restore(source) + snapLogger := logger.With( + "id", snapshot.ID, + "last-index", snapshot.Index, + "last-term", snapshot.Term, + "size-in-bytes", snapshot.Size, + ) + crc := newCountingReadCloser(source) + monitor := startSnapshotRestoreMonitor(snapLogger, crc, snapshot.Size, false) + err = fsm.Restore(crc) // Close the source after the restore has completed source.Close() + monitor.StopAndWait() if err != nil { // Same here, skip and try the next one. continue @@ -463,20 +474,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna } // Ensure we have a LogOutput. - var logger hclog.Logger - if conf.Logger != nil { - logger = conf.Logger - } else { - if conf.LogOutput == nil { - conf.LogOutput = os.Stderr - } - - logger = hclog.New(&hclog.LoggerOptions{ - Name: "raft", - Level: hclog.LevelFromString(conf.LogLevel), - Output: conf.LogOutput, - }) - } + logger := conf.getOrCreateLogger() // Try to restore the current term. currentTerm, err := stable.GetUint64(keyCurrentTerm) @@ -600,21 +598,8 @@ func (r *Raft) restoreSnapshot() error { // Try to load in order of newest to oldest for _, snapshot := range snapshots { - if !r.config().NoSnapshotRestoreOnStart { - _, source, err := r.snapshots.Open(snapshot.ID) - if err != nil { - r.logger.Error("failed to open snapshot", "id", snapshot.ID, "error", err) - continue - } - - if err := fsmRestoreAndMeasure(r.fsm, source); err != nil { - source.Close() - r.logger.Error("failed to restore snapshot", "id", snapshot.ID, "error", err) - continue - } - source.Close() - - r.logger.Info("restored from snapshot", "id", snapshot.ID) + if success := r.tryRestoreSingleSnapshot(snapshot); !success { + continue } // Update the lastApplied so we don't replay old logs @@ -650,6 +635,38 @@ func (r *Raft) restoreSnapshot() error { return nil } +func (r *Raft) tryRestoreSingleSnapshot(snapshot *SnapshotMeta) bool { + if r.config().NoSnapshotRestoreOnStart { + return true + } + + snapLogger := r.logger.With( + "id", snapshot.ID, + "last-index", snapshot.Index, + "last-term", snapshot.Term, + "size-in-bytes", snapshot.Size, + ) + + snapLogger.Info("starting restore from snapshot") + + _, source, err := r.snapshots.Open(snapshot.ID) + if err != nil { + snapLogger.Error("failed to open snapshot", "error", err) + return false + } + + if err := fsmRestoreAndMeasure(snapLogger, r.fsm, source, snapshot.Size); err != nil { + source.Close() + snapLogger.Error("failed to restore snapshot", "error", err) + return false + } + source.Close() + + snapLogger.Info("restored from snapshot") + + return true +} + func (r *Raft) config() Config { return r.conf.Load().(Config) } diff --git a/config.go b/config.go index 148b27624..ef1f7adaa 100644 --- a/config.go +++ b/config.go @@ -3,6 +3,7 @@ package raft import ( "fmt" "io" + "os" "time" "github.com/hashicorp/go-hclog" @@ -222,6 +223,21 @@ type Config struct { skipStartup bool } +func (conf *Config) getOrCreateLogger() hclog.Logger { + if conf.Logger != nil { + return conf.Logger + } + if conf.LogOutput == nil { + conf.LogOutput = os.Stderr + } + + return hclog.New(&hclog.LoggerOptions{ + Name: "raft", + Level: hclog.LevelFromString(conf.LogLevel), + Output: conf.LogOutput, + }) +} + // ReloadableConfig is the subset of Config that may be reconfigured during // runtime using raft.ReloadConfig. We choose to duplicate fields over embedding // or accepting a Config but only using specific fields to keep the API clear. diff --git a/fsm.go b/fsm.go index 50e993d24..487cb4b78 100644 --- a/fsm.go +++ b/fsm.go @@ -6,6 +6,7 @@ import ( "time" "github.com/armon/go-metrics" + hclog "github.com/hashicorp/go-hclog" ) // FSM is implemented by clients to make use of the replicated log. @@ -184,8 +185,15 @@ func (r *Raft) runFSM() { } defer source.Close() + snapLogger := r.logger.With( + "id", req.ID, + "last-index", meta.Index, + "last-term", meta.Term, + "size-in-bytes", meta.Size, + ) + // Attempt to restore - if err := fsmRestoreAndMeasure(r.fsm, source); err != nil { + if err := fsmRestoreAndMeasure(snapLogger, r.fsm, source, meta.Size); err != nil { req.respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err)) return } @@ -241,13 +249,20 @@ func (r *Raft) runFSM() { // fsmRestoreAndMeasure wraps the Restore call on an FSM to consistently measure // and report timing metrics. The caller is still responsible for calling Close // on the source in all cases. -func fsmRestoreAndMeasure(fsm FSM, source io.ReadCloser) error { +func fsmRestoreAndMeasure(logger hclog.Logger, fsm FSM, source io.ReadCloser, snapshotSize int64) error { start := time.Now() - if err := fsm.Restore(source); err != nil { + + crc := newCountingReadCloser(source) + + monitor := startSnapshotRestoreMonitor(logger, crc, snapshotSize, false) + defer monitor.StopAndWait() + + if err := fsm.Restore(crc); err != nil { return err } metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start) metrics.SetGauge([]string{"raft", "fsm", "lastRestoreDuration"}, float32(time.Since(start).Milliseconds())) + return nil } diff --git a/progress.go b/progress.go new file mode 100644 index 000000000..5bdccf178 --- /dev/null +++ b/progress.go @@ -0,0 +1,130 @@ +package raft + +import ( + "context" + "io" + "sync" + "time" + + hclog "github.com/hashicorp/go-hclog" +) + +const ( + snapshotRestoreMonitorInterval = 10 * time.Second +) + +type snapshotRestoreMonitor struct { + logger hclog.Logger + cr CountingReader + size int64 + networkTransfer bool + + once sync.Once + cancel func() + doneCh chan struct{} +} + +func startSnapshotRestoreMonitor( + logger hclog.Logger, + cr CountingReader, + size int64, + networkTransfer bool, +) *snapshotRestoreMonitor { + ctx, cancel := context.WithCancel(context.Background()) + + m := &snapshotRestoreMonitor{ + logger: logger, + cr: cr, + size: size, + networkTransfer: networkTransfer, + cancel: cancel, + doneCh: make(chan struct{}), + } + go m.run(ctx) + return m +} + +func (m *snapshotRestoreMonitor) run(ctx context.Context) { + defer close(m.doneCh) + + ticker := time.NewTicker(snapshotRestoreMonitorInterval) + defer ticker.Stop() + + ranOnce := false + for { + select { + case <-ctx.Done(): + if !ranOnce { + m.runOnce() + } + return + case <-ticker.C: + m.runOnce() + ranOnce = true + } + } +} + +func (m *snapshotRestoreMonitor) runOnce() { + readBytes := m.cr.Count() + pct := float64(100*readBytes) / float64(m.size) + + message := "snapshot restore progress" + if m.networkTransfer { + message = "snapshot network transfer progress" + } + + m.logger.Info(message, + "read-bytes", readBytes, + "percent-complete", hclog.Fmt("%0.2f%%", pct), + ) +} + +func (m *snapshotRestoreMonitor) StopAndWait() { + m.once.Do(func() { + m.cancel() + <-m.doneCh + }) +} + +type CountingReader interface { + io.Reader + Count() int64 +} + +type countingReader struct { + reader io.Reader + + mu sync.Mutex + bytes int64 +} + +func (r *countingReader) Read(p []byte) (n int, err error) { + n, err = r.reader.Read(p) + r.mu.Lock() + r.bytes += int64(n) + r.mu.Unlock() + return n, err +} + +func (r *countingReader) Count() int64 { + r.mu.Lock() + defer r.mu.Unlock() + return r.bytes +} + +func newCountingReader(r io.Reader) *countingReader { + return &countingReader{reader: r} +} + +type countingReadCloser struct { + *countingReader + io.Closer +} + +func newCountingReadCloser(rc io.ReadCloser) *countingReadCloser { + return &countingReadCloser{ + countingReader: newCountingReader(rc), + Closer: rc, + } +} diff --git a/raft.go b/raft.go index 8b7d5b0ef..8ed676b1b 100644 --- a/raft.go +++ b/raft.go @@ -1608,8 +1608,14 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { return } + // Separately track the progress of streaming a snapshot over the network + // because this too can take a long time. + countingRPCReader := newCountingReader(rpc.Reader) + // Spill the remote snapshot to disk - n, err := io.Copy(sink, rpc.Reader) + transferMonitor := startSnapshotRestoreMonitor(r.logger, countingRPCReader, req.Size, true) + n, err := io.Copy(sink, countingRPCReader) + transferMonitor.StopAndWait() if err != nil { sink.Cancel() r.logger.Error("failed to copy snapshot", "error", err) diff --git a/raft_test.go b/raft_test.go index 096da6918..b25aa0c78 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1,6 +1,7 @@ package raft import ( + "bufio" "bytes" "fmt" "io/ioutil" @@ -988,6 +989,106 @@ func TestRaft_SnapshotRestore(t *testing.T) { } } +func TestRaft_SnapshotRestore_Progress(t *testing.T) { + // Make the cluster + conf := inmemConfig(t) + conf.TrailingLogs = 10 + c := MakeCluster(1, t, conf) + defer c.Close() + + // Commit a lot of things + leader := c.Leader() + var future Future + for i := 0; i < 100; i++ { + future = leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0) + } + + // Wait for the last future to apply + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Take a snapshot + snapFuture := leader.Snapshot() + if err := snapFuture.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Check for snapshot + snaps, _ := leader.snapshots.List() + if len(snaps) != 1 { + t.Fatalf("should have a snapshot") + } + snap := snaps[0] + + // Logs should be trimmed + if idx, _ := leader.logs.FirstIndex(); idx != snap.Index-conf.TrailingLogs+1 { + t.Fatalf("should trim logs to %d: but is %d", snap.Index-conf.TrailingLogs+1, idx) + } + + // Shutdown + shutdown := leader.Shutdown() + if err := shutdown.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Restart the Raft + r := leader + // Can't just reuse the old transport as it will be closed + _, trans2 := NewInmemTransport(r.trans.LocalAddr()) + cfg := r.config() + + // Intercept logs and look for specific log messages. + var logbuf lockedBytesBuffer + cfg.Logger = hclog.New(&hclog.LoggerOptions{ + Name: "test", + Level: hclog.Info, + Output: &logbuf, + }) + r, err := NewRaft(&cfg, r.fsm, r.logs, r.stable, r.snapshots, trans2) + if err != nil { + t.Fatalf("err: %v", err) + } + c.rafts[0] = r + + // We should have restored from the snapshot! + if last := r.getLastApplied(); last != snap.Index { + t.Fatalf("bad last index: %d, expecting %d", last, snap.Index) + } + + { + scan := bufio.NewScanner(strings.NewReader(logbuf.String())) + found := false + for scan.Scan() { + line := scan.Text() + if strings.Contains(line, "snapshot restore progress") && strings.Contains(line, "percent-complete=100.00%") { + found = true + break + } + } + if !found { + t.Fatalf("could not find a log line indicating that snapshot restore progress was being logged") + } + } +} + +type lockedBytesBuffer struct { + mu sync.Mutex + buf bytes.Buffer +} + +func (b *lockedBytesBuffer) Write(p []byte) (n int, err error) { + b.mu.Lock() + defer b.mu.Unlock() + return b.buf.Write(p) +} + +func (b *lockedBytesBuffer) String() string { + b.mu.Lock() + defer b.mu.Unlock() + return b.buf.String() +} + // TODO: Need a test that has a previous format Snapshot and check that it can // be read/installed on the new code.