Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshot restore progress #490

Merged
merged 8 commits into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 48 additions & 31 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"io"
"os"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -315,6 +314,9 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
if err != nil {
return fmt.Errorf("failed to list snapshots: %v", err)
}

logger := conf.getOrCreateLogger()

for _, snapshot := range snapshots {
var source io.ReadCloser
_, source, err = snaps.Open(snapshot.ID)
Expand All @@ -330,9 +332,18 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
// server instance. If the same process will eventually become a Raft peer
// then it will call NewRaft and restore again from disk then which will
// report metrics.
err = fsm.Restore(source)
snapLogger := logger.With(
"id", snapshot.ID,
"last-index", snapshot.Index,
"last-term", snapshot.Term,
"size-in-bytes", snapshot.Size,
)
crc := newCountingReadCloser(source)
monitor := startSnapshotRestoreMonitor(snapLogger, crc, snapshot.Size, false)
err = fsm.Restore(crc)
// Close the source after the restore has completed
source.Close()
monitor.StopAndWait()
if err != nil {
// Same here, skip and try the next one.
continue
Expand Down Expand Up @@ -463,20 +474,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
}

// Ensure we have a LogOutput.
var logger hclog.Logger
if conf.Logger != nil {
logger = conf.Logger
} else {
if conf.LogOutput == nil {
conf.LogOutput = os.Stderr
}

logger = hclog.New(&hclog.LoggerOptions{
Name: "raft",
Level: hclog.LevelFromString(conf.LogLevel),
Output: conf.LogOutput,
})
}
logger := conf.getOrCreateLogger()

// Try to restore the current term.
currentTerm, err := stable.GetUint64(keyCurrentTerm)
Expand Down Expand Up @@ -600,21 +598,8 @@ func (r *Raft) restoreSnapshot() error {

// Try to load in order of newest to oldest
for _, snapshot := range snapshots {
if !r.config().NoSnapshotRestoreOnStart {
_, source, err := r.snapshots.Open(snapshot.ID)
if err != nil {
r.logger.Error("failed to open snapshot", "id", snapshot.ID, "error", err)
continue
}

if err := fsmRestoreAndMeasure(r.fsm, source); err != nil {
source.Close()
r.logger.Error("failed to restore snapshot", "id", snapshot.ID, "error", err)
continue
}
source.Close()

r.logger.Info("restored from snapshot", "id", snapshot.ID)
if success := r.tryRestoreSingleSnapshot(snapshot); !success {
continue
}

// Update the lastApplied so we don't replay old logs
Expand Down Expand Up @@ -650,6 +635,38 @@ func (r *Raft) restoreSnapshot() error {
return nil
}

func (r *Raft) tryRestoreSingleSnapshot(snapshot *SnapshotMeta) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't be better to make tryRestoreSingleSnapshot return an error and create the logger outside of it and log the error when it's returned?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe? I wanted to have a derived logger here (Logger.With()) so that all of the progress logs and the errors got the same set of bonus KV data logged.

Given that, I'd end up creating the logger outside of this method to pass in for progress in fsmRestoreAndMeasure, but returning an error that is immediately logged by the caller, which seemed differently strange.

Also the two current logs are slightly different today:

snapLogger.Error("failed to open snapshot", "error", err)
snapLogger.Error("failed to restore snapshot", "error", err)

and are carried over from the existing code. If these were changed to fmt.Errorf return values there'd be a slight change of output as we'd end up having to do snapLogger.Error(err.Error()) instead, and we'd lose the "error" hclog attribute.

I don't mind inverting that logic if you think it is warranted.

if r.config().NoSnapshotRestoreOnStart {
return true
}

snapLogger := r.logger.With(
"id", snapshot.ID,
"last-index", snapshot.Index,
"last-term", snapshot.Term,
"size-in-bytes", snapshot.Size,
)

snapLogger.Info("starting restore from snapshot")

_, source, err := r.snapshots.Open(snapshot.ID)
if err != nil {
snapLogger.Error("failed to open snapshot", "error", err)
return false
}

if err := fsmRestoreAndMeasure(snapLogger, r.fsm, source, snapshot.Size); err != nil {
source.Close()
snapLogger.Error("failed to restore snapshot", "error", err)
return false
}
source.Close()

snapLogger.Info("restored from snapshot")

return true
}

func (r *Raft) config() Config {
return r.conf.Load().(Config)
}
Expand Down
16 changes: 16 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package raft
import (
"fmt"
"io"
"os"
"time"

"github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -222,6 +223,21 @@ type Config struct {
skipStartup bool
}

func (conf *Config) getOrCreateLogger() hclog.Logger {
if conf.Logger != nil {
return conf.Logger
}
if conf.LogOutput == nil {
conf.LogOutput = os.Stderr
}

return hclog.New(&hclog.LoggerOptions{
Name: "raft",
Level: hclog.LevelFromString(conf.LogLevel),
Output: conf.LogOutput,
})
}

// ReloadableConfig is the subset of Config that may be reconfigured during
// runtime using raft.ReloadConfig. We choose to duplicate fields over embedding
// or accepting a Config but only using specific fields to keep the API clear.
Expand Down
21 changes: 18 additions & 3 deletions fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/armon/go-metrics"
hclog "github.com/hashicorp/go-hclog"
)

// FSM is implemented by clients to make use of the replicated log.
Expand Down Expand Up @@ -184,8 +185,15 @@ func (r *Raft) runFSM() {
}
defer source.Close()

snapLogger := r.logger.With(
"id", req.ID,
"last-index", meta.Index,
"last-term", meta.Term,
"size-in-bytes", meta.Size,
)

// Attempt to restore
if err := fsmRestoreAndMeasure(r.fsm, source); err != nil {
if err := fsmRestoreAndMeasure(snapLogger, r.fsm, source, meta.Size); err != nil {
req.respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err))
return
}
Expand Down Expand Up @@ -241,13 +249,20 @@ func (r *Raft) runFSM() {
// fsmRestoreAndMeasure wraps the Restore call on an FSM to consistently measure
// and report timing metrics. The caller is still responsible for calling Close
// on the source in all cases.
func fsmRestoreAndMeasure(fsm FSM, source io.ReadCloser) error {
func fsmRestoreAndMeasure(logger hclog.Logger, fsm FSM, source io.ReadCloser, snapshotSize int64) error {
start := time.Now()
if err := fsm.Restore(source); err != nil {

crc := newCountingReadCloser(source)

monitor := startSnapshotRestoreMonitor(logger, crc, snapshotSize, false)
defer monitor.StopAndWait()

if err := fsm.Restore(crc); err != nil {
return err
}
metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start)
metrics.SetGauge([]string{"raft", "fsm", "lastRestoreDuration"},
float32(time.Since(start).Milliseconds()))

return nil
}
160 changes: 160 additions & 0 deletions progress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package raft

import (
"context"
"io"
"sync"
"time"

hclog "github.com/hashicorp/go-hclog"
)

const (
snapshotRestoreMonitorInterval = 10 * time.Second
)

type snapshotRestoreMonitor struct {
logger hclog.Logger
cr CountingReader
size int64
networkTransfer bool

once sync.Once
cancel func()
doneCh chan struct{}
}

func startSnapshotRestoreMonitor(
logger hclog.Logger,
cr CountingReader,
size int64,
networkTransfer bool,
) *snapshotRestoreMonitor {
ctx, cancel := context.WithCancel(context.Background())

m := &snapshotRestoreMonitor{
logger: logger,
cr: cr,
size: size,
networkTransfer: networkTransfer,
cancel: cancel,
doneCh: make(chan struct{}),
}
go m.run(ctx)
return m
}

func (m *snapshotRestoreMonitor) run(ctx context.Context) {
defer close(m.doneCh)

ticker := time.NewTicker(snapshotRestoreMonitorInterval)
defer ticker.Stop()

ranOnce := false
for {
select {
case <-ctx.Done():
if !ranOnce {
m.runOnce()
}
return
case <-ticker.C:
m.runOnce()
ranOnce = true
}
}
}

func (m *snapshotRestoreMonitor) runOnce() {
readBytes := m.cr.Count()
pct := float64(100*readBytes) / float64(m.size)

message := "snapshot restore progress"
if m.networkTransfer {
message = "snapshot network transfer progress"
}

m.logger.Info(message,
"read-bytes", readBytes,
"percent-complete", hclog.Fmt("%0.2f%%", pct),
)
}

func (m *snapshotRestoreMonitor) StopAndWait() {
m.once.Do(func() {
m.cancel()
<-m.doneCh
})
}

func reportProgress(
rboyer marked this conversation as resolved.
Show resolved Hide resolved
ctx context.Context,
interval time.Duration,
reportOnceFn func(last bool),
) <-chan struct{} {
done := make(chan struct{})

go func() {
defer func() {
reportOnceFn(true)
close(done)
}()

ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}

reportOnceFn(false)
}
}()

return done
}

type CountingReader interface {
io.Reader
Count() int64
}

type countingReader struct {
reader io.Reader

mu sync.Mutex
bytes int64
}

func (r *countingReader) Read(p []byte) (n int, err error) {
n, err = r.reader.Read(p)
r.mu.Lock()
r.bytes += int64(n)
r.mu.Unlock()
return n, err
}

func (r *countingReader) Count() int64 {
r.mu.Lock()
defer r.mu.Unlock()
return r.bytes
}

func newCountingReader(r io.Reader) *countingReader {
return &countingReader{reader: r}
}

type countingReadCloser struct {
*countingReader
io.Closer
}

func newCountingReadCloser(rc io.ReadCloser) *countingReadCloser {
return &countingReadCloser{
countingReader: newCountingReader(rc),
Closer: rc,
}
}
8 changes: 7 additions & 1 deletion raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1608,8 +1608,14 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
return
}

// Separately track the progress of streaming a snapshot over the network
// because this too can take a long time.
countingRPCReader := newCountingReader(rpc.Reader)

// Spill the remote snapshot to disk
n, err := io.Copy(sink, rpc.Reader)
transferMonitor := startSnapshotRestoreMonitor(r.logger, countingRPCReader, req.Size, true)
n, err := io.Copy(sink, countingRPCReader)
transferMonitor.StopAndWait()
if err != nil {
sink.Cancel()
r.logger.Error("failed to copy snapshot", "error", err)
Expand Down