diff --git a/api.go b/api.go index 03a99614e29..c6f947f2416 100644 --- a/api.go +++ b/api.go @@ -4,12 +4,13 @@ import ( "errors" "fmt" "io" - "log" "os" "strconv" "sync" "time" + "github.com/hashicorp/go-hclog" + "github.com/armon/go-metrics" ) @@ -103,7 +104,7 @@ type Raft struct { localAddr ServerAddress // Used for our logging - logger *log.Logger + logger hclog.Logger // LogStore provides durable storage for logs logs LogStore @@ -394,14 +395,19 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna } // Ensure we have a LogOutput. - var logger *log.Logger + var logger hclog.Logger if conf.Logger != nil { logger = conf.Logger } else { if conf.LogOutput == nil { conf.LogOutput = os.Stderr } - logger = log.New(conf.LogOutput, "", log.LstdFlags) + + logger = hclog.New(&hclog.LoggerOptions{ + Name: "raft", + Level: hclog.LevelFromString(conf.LogLevel), + Output: conf.LogOutput, + }) } // Try to restore the current term. @@ -487,14 +493,13 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna for index := snapshotIndex + 1; index <= lastLog.Index; index++ { var entry Log if err := r.logs.GetLog(index, &entry); err != nil { - r.logger.Printf("[ERR] raft: Failed to get log at %d: %v", index, err) + r.logger.Error(fmt.Sprintf("Failed to get log at %d: %v", index, err)) panic(err) } r.processConfigurationLogEntry(&entry) } - - r.logger.Printf("[INFO] raft: Initial configuration (index=%d): %+v", - r.configurations.latestIndex, r.configurations.latest.Servers) + r.logger.Info(fmt.Sprintf("Initial configuration (index=%d): %+v", + r.configurations.latestIndex, r.configurations.latest.Servers)) // Setup a heartbeat fast-path to avoid head-of-line // blocking where possible. It MUST be safe for this @@ -514,7 +519,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna func (r *Raft) restoreSnapshot() error { snapshots, err := r.snapshots.List() if err != nil { - r.logger.Printf("[ERR] raft: Failed to list snapshots: %v", err) + r.logger.Error(fmt.Sprintf("Failed to list snapshots: %v", err)) return err } @@ -522,18 +527,18 @@ func (r *Raft) restoreSnapshot() error { for _, snapshot := range snapshots { _, source, err := r.snapshots.Open(snapshot.ID) if err != nil { - r.logger.Printf("[ERR] raft: Failed to open snapshot %v: %v", snapshot.ID, err) + r.logger.Error(fmt.Sprintf("Failed to open snapshot %v: %v", snapshot.ID, err)) continue } defer source.Close() if err := r.fsm.Restore(source); err != nil { - r.logger.Printf("[ERR] raft: Failed to restore snapshot %v: %v", snapshot.ID, err) + r.logger.Error(fmt.Sprintf("Failed to restore snapshot %v: %v", snapshot.ID, err)) continue } // Log success - r.logger.Printf("[INFO] raft: Restored from snapshot %v", snapshot.ID) + r.logger.Info(fmt.Sprintf("Restored from snapshot %v", snapshot.ID)) // Update the lastApplied so we don't replay old logs r.setLastApplied(snapshot.Index) @@ -955,7 +960,7 @@ func (r *Raft) Stats() map[string]string { future := r.GetConfiguration() if err := future.Error(); err != nil { - r.logger.Printf("[WARN] raft: could not get configuration for Stats: %v", err) + r.logger.Warn(fmt.Sprintf("could not get configuration for Stats: %v", err)) } else { configuration := future.Configuration() s["latest_configuration_index"] = toString(future.Index()) diff --git a/config.go b/config.go index c1ce03ac22b..fe6c85a171e 100644 --- a/config.go +++ b/config.go @@ -3,8 +3,9 @@ package raft import ( "fmt" "io" - "log" "time" + + "github.com/hashicorp/go-hclog" ) // These are the versions of the protocol (which includes RPC messages as @@ -190,9 +191,13 @@ type Config struct { // Defaults to os.Stderr. LogOutput io.Writer - // Logger is a user-provided logger. If nil, a logger writing to LogOutput - // is used. - Logger *log.Logger + // LogLevel represents a log level. If a no matching string is specified, + // hclog.NoLevel is assumed. + LogLevel string + + // Logger is a user-provided hc-log logger. If nil, a logger writing to + // LogOutput with LogLevel is used. + Logger hclog.Logger } // DefaultConfig returns a Config with usable defaults. @@ -208,6 +213,7 @@ func DefaultConfig() *Config { SnapshotInterval: 120 * time.Second, SnapshotThreshold: 8192, LeaderLeaseTimeout: 500 * time.Millisecond, + LogLevel: "DEBUG", } } diff --git a/integ_test.go b/integ_test.go index 1464437749b..4324308ff96 100644 --- a/integ_test.go +++ b/integ_test.go @@ -4,10 +4,11 @@ import ( "bytes" "fmt" "io/ioutil" - "log" "os" "testing" "time" + + "github.com/hashicorp/go-hclog" ) // CheckInteg will skip a test if integration testing is not enabled. @@ -30,7 +31,7 @@ type RaftEnv struct { snapshot *FileSnapshotStore trans *NetworkTransport raft *Raft - logger *log.Logger + logger hclog.Logger } // Release shuts down and cleans up any stored data, its not restartable after this @@ -42,7 +43,7 @@ func (r *RaftEnv) Release() { // Shutdown shuts down raft & transport, but keeps track of its data, its restartable // after a Shutdown() by calling Start() func (r *RaftEnv) Shutdown() { - r.logger.Printf("[WARN] Shutdown node at %v", r.raft.localAddr) + r.logger.Warn(fmt.Sprintf("Shutdown node at %v", r.raft.localAddr)) f := r.raft.Shutdown() if err := f.Error(); err != nil { panic(err) @@ -57,7 +58,7 @@ func (r *RaftEnv) Restart(t *testing.T) { t.Fatalf("err: %v", err) } r.trans = trans - r.logger.Printf("[INFO] Starting node at %v", trans.LocalAddr()) + r.logger.Info(fmt.Sprintf("Starting node at %v", trans.LocalAddr())) raft, err := NewRaft(r.conf, r.fsm, r.store, r.store, r.snapshot, r.trans) if err != nil { t.Fatalf("err: %v", err) @@ -89,13 +90,15 @@ func MakeRaft(t *testing.T, conf *Config, bootstrap bool) *RaftEnv { store: stable, snapshot: snap, fsm: &MockFSM{}, - logger: log.New(&testLoggerAdapter{t: t}, "", log.Lmicroseconds), } trans, err := NewTCPTransport("127.0.0.1:0", nil, 2, time.Second, nil) if err != nil { t.Fatalf("err: %v", err) } - env.logger = log.New(os.Stdout, string(trans.LocalAddr())+" :", log.Lmicroseconds) + + env.logger = hclog.New(&hclog.LoggerOptions{ + Name: string(trans.LocalAddr()) + " :", + }) env.trans = trans if bootstrap { @@ -110,7 +113,7 @@ func MakeRaft(t *testing.T, conf *Config, bootstrap bool) *RaftEnv { t.Fatalf("err: %v", err) } } - log.Printf("[INFO] Starting node at %v", trans.LocalAddr()) + env.logger.Info(fmt.Sprintf("Starting node at %v", trans.LocalAddr())) conf.Logger = env.logger raft, err := NewRaft(conf, env.fsm, stable, stable, snap, trans) if err != nil { @@ -237,7 +240,7 @@ func TestRaft_Integ(t *testing.T) { } for _, f := range futures { NoErr(WaitFuture(f, t), t) - leader.logger.Printf("[DEBUG] Applied at %d, size %d", f.Index(), sz) + leader.logger.Debug(fmt.Sprintf("Applied at %d, size %d", f.Index(), sz)) } totalApplied += n } diff --git a/raft.go b/raft.go index a6e0d72c01c..67b2ece85d3 100644 --- a/raft.go +++ b/raft.go @@ -145,7 +145,7 @@ func (r *Raft) run() { // runFollower runs the FSM for a follower. func (r *Raft) runFollower() { didWarn := false - r.logger.Printf("[INFO] raft: %v entering Follower state (Leader: %q)", r, r.Leader()) + r.logger.Info(fmt.Sprintf("%v entering Follower state (Leader: %q)", r, r.Leader())) metrics.IncrCounter([]string{"raft", "state", "follower"}, 1) heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout) for { @@ -192,17 +192,17 @@ func (r *Raft) runFollower() { if r.configurations.latestIndex == 0 { if !didWarn { - r.logger.Printf("[WARN] raft: no known peers, aborting election") + r.logger.Warn("no known peers, aborting election") didWarn = true } } else if r.configurations.latestIndex == r.configurations.committedIndex && !hasVote(r.configurations.latest, r.localID) { if !didWarn { - r.logger.Printf("[WARN] raft: not part of stable configuration, aborting election") + r.logger.Warn("not part of stable configuration, aborting election") didWarn = true } } else { - r.logger.Printf(`[WARN] raft: Heartbeat timeout from %q reached, starting election`, lastLeader) + r.logger.Warn(fmt.Sprintf("Heartbeat timeout from %q reached, starting election", lastLeader)) metrics.IncrCounter([]string{"raft", "transition", "heartbeat_timeout"}, 1) r.setState(Candidate) return @@ -238,8 +238,7 @@ func (r *Raft) liveBootstrap(configuration Configuration) error { // runCandidate runs the FSM for a candidate. func (r *Raft) runCandidate() { - r.logger.Printf("[INFO] raft: %v entering Candidate state in term %v", - r, r.getCurrentTerm()+1) + r.logger.Info(fmt.Sprintf("%v entering Candidate state in term %v", r, r.getCurrentTerm()+1)) metrics.IncrCounter([]string{"raft", "state", "candidate"}, 1) // Start vote for us, and set a timeout @@ -249,7 +248,7 @@ func (r *Raft) runCandidate() { // Tally the votes, need a simple majority grantedVotes := 0 votesNeeded := r.quorumSize() - r.logger.Printf("[DEBUG] raft: Votes needed: %d", votesNeeded) + r.logger.Debug(fmt.Sprintf("Votes needed: %d", votesNeeded)) for r.getState() == Candidate { select { @@ -259,7 +258,7 @@ func (r *Raft) runCandidate() { case vote := <-voteCh: // Check if the term is greater than ours, bail if vote.Term > r.getCurrentTerm() { - r.logger.Printf("[DEBUG] raft: Newer term discovered, fallback to follower") + r.logger.Debug("Newer term discovered, fallback to follower") r.setState(Follower) r.setCurrentTerm(vote.Term) return @@ -268,13 +267,13 @@ func (r *Raft) runCandidate() { // Check if the vote is granted if vote.Granted { grantedVotes++ - r.logger.Printf("[DEBUG] raft: Vote granted from %s in term %v. Tally: %d", - vote.voterID, vote.Term, grantedVotes) + r.logger.Debug(fmt.Sprintf("Vote granted from %s in term %v. Tally: %d", + vote.voterID, vote.Term, grantedVotes)) } // Check if we've become the leader if grantedVotes >= votesNeeded { - r.logger.Printf("[INFO] raft: Election won. Tally: %d", grantedVotes) + r.logger.Info(fmt.Sprintf("Election won. Tally: %d", grantedVotes)) r.setState(Leader) r.setLeader(r.localAddr) return @@ -306,7 +305,7 @@ func (r *Raft) runCandidate() { case <-electionTimer: // Election failed! Restart the election. We simply return, // which will kick us back into runCandidate - r.logger.Printf("[WARN] raft: Election timeout reached, restarting election") + r.logger.Warn("Election timeout reached, restarting election") return case <-r.shutdownCh: @@ -318,7 +317,7 @@ func (r *Raft) runCandidate() { // runLeader runs the FSM for a leader. Do the setup here and drop into // the leaderLoop for the hot loop. func (r *Raft) runLeader() { - r.logger.Printf("[INFO] raft: %v entering Leader state", r) + r.logger.Info(fmt.Sprintf("%v entering Leader state", r)) metrics.IncrCounter([]string{"raft", "state", "leader"}, 1) // Notify that we are the leader @@ -435,7 +434,7 @@ func (r *Raft) startStopReplication() { } inConfig[server.ID] = true if _, ok := r.leaderState.replState[server.ID]; !ok { - r.logger.Printf("[INFO] raft: Added peer %v, starting replication", server.ID) + r.logger.Info(fmt.Sprintf("Added peer %v, starting replication", server.ID)) s := &followerReplication{ peer: server, commitment: r.leaderState.commitment, @@ -460,7 +459,7 @@ func (r *Raft) startStopReplication() { continue } // Replicate up to lastIdx and stop - r.logger.Printf("[INFO] raft: Removed peer %v, stopping replication after %v", serverID, lastIdx) + r.logger.Info(fmt.Sprintf("Removed peer %v, stopping replication after %v", serverID, lastIdx)) repl.stopCh <- lastIdx close(repl.stopCh) delete(r.leaderState.replState, serverID) @@ -550,10 +549,10 @@ func (r *Raft) leaderLoop() { if stepDown { if r.conf.ShutdownOnRemove { - r.logger.Printf("[INFO] raft: Removed ourself, shutting down") + r.logger.Info("Removed ourself, shutting down") r.Shutdown() } else { - r.logger.Printf("[INFO] raft: Removed ourself, transitioning to follower") + r.logger.Info("Removed ourself, transitioning to follower") r.setState(Follower) } } @@ -565,7 +564,7 @@ func (r *Raft) leaderLoop() { } else if v.votes < v.quorumSize { // Early return, means there must be a new leader - r.logger.Printf("[WARN] raft: New leader elected, stepping down") + r.logger.Warn("New leader elected, stepping down") r.setState(Follower) delete(r.leaderState.notify, v) for _, repl := range r.leaderState.replState { @@ -685,9 +684,9 @@ func (r *Raft) checkLeaderLease() time.Duration { } else { // Log at least once at high value, then debug. Otherwise it gets very verbose. if diff <= 3*r.conf.LeaderLeaseTimeout { - r.logger.Printf("[WARN] raft: Failed to contact %v in %v", peer, diff) + r.logger.Warn(fmt.Sprintf("Failed to contact %v in %v", peer, diff)) } else { - r.logger.Printf("[DEBUG] raft: Failed to contact %v in %v", peer, diff) + r.logger.Debug(fmt.Sprintf("Failed to contact %v in %v", peer, diff)) } } metrics.AddSample([]string{"raft", "leader", "lastContact"}, float32(diff/time.Millisecond)) @@ -696,7 +695,7 @@ func (r *Raft) checkLeaderLease() time.Duration { // Verify we can contact a quorum quorum := r.quorumSize() if contacted < quorum { - r.logger.Printf("[WARN] raft: Failed to contact quorum of nodes, stepping down") + r.logger.Warn("Failed to contact quorum of nodes, stepping down") r.setState(Follower) metrics.IncrCounter([]string{"raft", "transition", "leader_lease_timeout"}, 1) } @@ -784,7 +783,7 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error { if err := sink.Close(); err != nil { return fmt.Errorf("failed to close snapshot: %v", err) } - r.logger.Printf("[INFO] raft: Copied %d bytes to local snapshot", n) + r.logger.Info(fmt.Sprintf("Copied %d bytes to local snapshot", n)) // Restore the snapshot into the FSM. If this fails we are in a // bad state so we panic to take ourselves out. @@ -808,7 +807,7 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error { r.setLastApplied(lastIndex) r.setLastSnapshot(lastIndex, term) - r.logger.Printf("[INFO] raft: Restored user snapshot (index %d)", lastIndex) + r.logger.Info(fmt.Sprintf("Restored user snapshot (index %d)", lastIndex)) return nil } @@ -822,8 +821,8 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) { return } - r.logger.Printf("[INFO] raft: Updating configuration with %s (%v, %v) to %+v", - future.req.command, future.req.serverID, future.req.serverAddress, configuration.Servers) + r.logger.Info(fmt.Sprintf("Updating configuration with %s (%v, %v) to %+v", + future.req.command, future.req.serverID, future.req.serverAddress, configuration.Servers)) // In pre-ID compatibility mode we translate all configuration changes // in to an old remove peer message, which can handle all supported @@ -876,7 +875,7 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { // Write the log entry locally if err := r.logs.StoreLogs(logs); err != nil { - r.logger.Printf("[ERR] raft: Failed to commit logs: %v", err) + r.logger.Error(fmt.Sprintf("Failed to commit logs: %v", err)) for _, applyLog := range applyLogs { applyLog.respond(err) } @@ -905,7 +904,7 @@ func (r *Raft) processLogs(index uint64, future *logFuture) { // Reject logs we've applied already lastApplied := r.getLastApplied() if index <= lastApplied { - r.logger.Printf("[WARN] raft: Skipping application of old log: %d", index) + r.logger.Warn(fmt.Sprintf("Skipping application of old log: %d", index)) return } @@ -917,7 +916,7 @@ func (r *Raft) processLogs(index uint64, future *logFuture) { } else { l := new(Log) if err := r.logs.GetLog(idx, l); err != nil { - r.logger.Printf("[ERR] raft: Failed to get log at %d: %v", idx, err) + r.logger.Error(fmt.Sprintf("Failed to get log at %d: %v", idx, err)) panic(err) } r.processLog(l, nil) @@ -981,7 +980,7 @@ func (r *Raft) processRPC(rpc RPC) { case *InstallSnapshotRequest: r.installSnapshot(rpc, cmd) default: - r.logger.Printf("[ERR] raft: Got unexpected command: %#v", rpc.Command) + r.logger.Error(fmt.Sprintf("Got unexpected command: %#v", rpc.Command)) rpc.Respond(nil, fmt.Errorf("unexpected command")) } } @@ -1004,7 +1003,7 @@ func (r *Raft) processHeartbeat(rpc RPC) { case *AppendEntriesRequest: r.appendEntries(rpc, cmd) default: - r.logger.Printf("[ERR] raft: Expected heartbeat, got command: %#v", rpc.Command) + r.logger.Error(fmt.Sprintf("Expected heartbeat, got command: %#v", rpc.Command)) rpc.Respond(nil, fmt.Errorf("unexpected command")) } } @@ -1054,8 +1053,8 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { } else { var prevLog Log if err := r.logs.GetLog(a.PrevLogEntry, &prevLog); err != nil { - r.logger.Printf("[WARN] raft: Failed to get previous log: %d %v (last: %d)", - a.PrevLogEntry, err, lastIdx) + r.logger.Warn(fmt.Sprintf("Failed to get previous log: %d %v (last: %d)", + a.PrevLogEntry, err, lastIdx)) resp.NoRetryBackoff = true return } @@ -1063,8 +1062,8 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { } if a.PrevLogTerm != prevLogTerm { - r.logger.Printf("[WARN] raft: Previous log term mis-match: ours: %d remote: %d", - prevLogTerm, a.PrevLogTerm) + r.logger.Warn(fmt.Sprintf("Previous log term mis-match: ours: %d remote: %d", + prevLogTerm, a.PrevLogTerm)) resp.NoRetryBackoff = true return } @@ -1084,14 +1083,14 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { } var storeEntry Log if err := r.logs.GetLog(entry.Index, &storeEntry); err != nil { - r.logger.Printf("[WARN] raft: Failed to get log entry %d: %v", - entry.Index, err) + r.logger.Warn(fmt.Sprintf("Failed to get log entry %d: %v", + entry.Index, err)) return } if entry.Term != storeEntry.Term { - r.logger.Printf("[WARN] raft: Clearing log suffix from %d to %d", entry.Index, lastLogIdx) + r.logger.Warn(fmt.Sprintf("Clearing log suffix from %d to %d", entry.Index, lastLogIdx)) if err := r.logs.DeleteRange(entry.Index, lastLogIdx); err != nil { - r.logger.Printf("[ERR] raft: Failed to clear log suffix: %v", err) + r.logger.Error(fmt.Sprintf("Failed to clear log suffix: %v", err)) return } if entry.Index <= r.configurations.latestIndex { @@ -1106,7 +1105,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { if n := len(newEntries); n > 0 { // Append the new entries if err := r.logs.StoreLogs(newEntries); err != nil { - r.logger.Printf("[ERR] raft: Failed to append to logs: %v", err) + r.logger.Error(fmt.Sprintf("Failed to append to logs: %v", err)) // TODO: leaving r.getLastLog() in the wrong // state if there was a truncation above return @@ -1186,8 +1185,8 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { // Check if we have an existing leader [who's not the candidate] candidate := r.trans.DecodePeer(req.Candidate) if leader := r.Leader(); leader != "" && leader != candidate { - r.logger.Printf("[WARN] raft: Rejecting vote request from %v since we have a leader: %v", - candidate, leader) + r.logger.Warn(fmt.Sprintf("Rejecting vote request from %v since we have a leader: %v", + candidate, leader)) return } @@ -1207,20 +1206,20 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { // Check if we have voted yet lastVoteTerm, err := r.stable.GetUint64(keyLastVoteTerm) if err != nil && err.Error() != "not found" { - r.logger.Printf("[ERR] raft: Failed to get last vote term: %v", err) + r.logger.Error(fmt.Sprintf("Failed to get last vote term: %v", err)) return } lastVoteCandBytes, err := r.stable.Get(keyLastVoteCand) if err != nil && err.Error() != "not found" { - r.logger.Printf("[ERR] raft: Failed to get last vote candidate: %v", err) + r.logger.Error(fmt.Sprintf("Failed to get last vote candidate: %v", err)) return } // Check if we've voted in this election before if lastVoteTerm == req.Term && lastVoteCandBytes != nil { - r.logger.Printf("[INFO] raft: Duplicate RequestVote for same term: %d", req.Term) + r.logger.Info(fmt.Sprintf("Duplicate RequestVote for same term: %d", req.Term)) if bytes.Compare(lastVoteCandBytes, req.Candidate) == 0 { - r.logger.Printf("[WARN] raft: Duplicate RequestVote from candidate: %s", req.Candidate) + r.logger.Warn(fmt.Sprintf("Duplicate RequestVote from candidate: %s", req.Candidate)) resp.Granted = true } return @@ -1229,20 +1228,20 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { // Reject if their term is older lastIdx, lastTerm := r.getLastEntry() if lastTerm > req.LastLogTerm { - r.logger.Printf("[WARN] raft: Rejecting vote request from %v since our last term is greater (%d, %d)", - candidate, lastTerm, req.LastLogTerm) + r.logger.Warn(fmt.Sprintf("Rejecting vote request from %v since our last term is greater (%d, %d)", + candidate, lastTerm, req.LastLogTerm)) return } if lastTerm == req.LastLogTerm && lastIdx > req.LastLogIndex { - r.logger.Printf("[WARN] raft: Rejecting vote request from %v since our last index is greater (%d, %d)", - candidate, lastIdx, req.LastLogIndex) + r.logger.Warn(fmt.Sprintf("Rejecting vote request from %v since our last index is greater (%d, %d)", + candidate, lastIdx, req.LastLogIndex)) return } // Persist a vote for safety if err := r.persistVote(req.Term, req.Candidate); err != nil { - r.logger.Printf("[ERR] raft: Failed to persist vote: %v", err) + r.logger.Error(fmt.Sprintf("Failed to persist vote: %v", err)) return } @@ -1277,7 +1276,8 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { // Ignore an older term if req.Term < r.getCurrentTerm() { - r.logger.Printf("[INFO] raft: Ignoring installSnapshot request with older term of %d vs currentTerm %d", req.Term, r.getCurrentTerm()) + r.logger.Info(fmt.Sprintf("Ignoring installSnapshot request with older term of %d vs currentTerm %d", + req.Term, r.getCurrentTerm())) return } @@ -1306,7 +1306,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { sink, err := r.snapshots.Create(version, req.LastLogIndex, req.LastLogTerm, reqConfiguration, reqConfigurationIndex, r.trans) if err != nil { - r.logger.Printf("[ERR] raft: Failed to create snapshot to install: %v", err) + r.logger.Error(fmt.Sprintf("Failed to create snapshot to install: %v", err)) rpcErr = fmt.Errorf("failed to create snapshot: %v", err) return } @@ -1315,7 +1315,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { n, err := io.Copy(sink, rpc.Reader) if err != nil { sink.Cancel() - r.logger.Printf("[ERR] raft: Failed to copy snapshot: %v", err) + r.logger.Error(fmt.Sprintf("Failed to copy snapshot: %v", err)) rpcErr = err return } @@ -1323,18 +1323,18 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { // Check that we received it all if n != req.Size { sink.Cancel() - r.logger.Printf("[ERR] raft: Failed to receive whole snapshot: %d / %d", n, req.Size) + r.logger.Error(fmt.Sprintf("Failed to receive whole snapshot: %d / %d", n, req.Size)) rpcErr = fmt.Errorf("short read") return } // Finalize the snapshot if err := sink.Close(); err != nil { - r.logger.Printf("[ERR] raft: Failed to finalize snapshot: %v", err) + r.logger.Error(fmt.Sprintf("Failed to finalize snapshot: %v", err)) rpcErr = err return } - r.logger.Printf("[INFO] raft: Copied %d bytes to local snapshot", n) + r.logger.Info(fmt.Sprintf("Copied %d bytes to local snapshot", n)) // Restore snapshot future := &restoreFuture{ID: sink.ID()} @@ -1348,7 +1348,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { // Wait for the restore to happen if err := future.Error(); err != nil { - r.logger.Printf("[ERR] raft: Failed to restore snapshot: %v", err) + r.logger.Error(fmt.Sprintf("Failed to restore snapshot: %v", err)) rpcErr = err return } @@ -1367,10 +1367,10 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { // Compact logs, continue even if this fails if err := r.compactLogs(req.LastLogIndex); err != nil { - r.logger.Printf("[ERR] raft: Failed to compact logs: %v", err) + r.logger.Error(fmt.Sprintf("Failed to compact logs: %v", err)) } - r.logger.Printf("[INFO] raft: Installed remote snapshot") + r.logger.Info("Installed remote snapshot") resp.Success = true r.setLastContact() return @@ -1416,7 +1416,7 @@ func (r *Raft) electSelf() <-chan *voteResult { resp := &voteResult{voterID: peer.ID} err := r.trans.RequestVote(peer.ID, peer.Address, req, &resp.RequestVoteResponse) if err != nil { - r.logger.Printf("[ERR] raft: Failed to make RequestVote RPC to %v: %v", peer, err) + r.logger.Error(fmt.Sprintf("Failed to make RequestVote RPC to %v: %v", peer, err)) resp.Term = req.Term resp.Granted = false } @@ -1430,7 +1430,7 @@ func (r *Raft) electSelf() <-chan *voteResult { if server.ID == r.localID { // Persist a vote for ourselves if err := r.persistVote(req.Term, req.Candidate); err != nil { - r.logger.Printf("[ERR] raft: Failed to persist vote : %v", err) + r.logger.Error(fmt.Sprintf("Failed to persist vote : %v", err)) return nil } // Include our own vote diff --git a/raft_test.go b/raft_test.go index 202d6268f0f..f2c73e3ce11 100644 --- a/raft_test.go +++ b/raft_test.go @@ -15,6 +15,7 @@ import ( "testing" "time" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-msgpack/codec" ) @@ -75,7 +76,7 @@ func inmemConfig(t *testing.T) *Config { conf.ElectionTimeout = 50 * time.Millisecond conf.LeaderLeaseTimeout = 50 * time.Millisecond conf.CommitTimeout = 5 * time.Millisecond - conf.Logger = newTestLogger(t) + conf.Logger = newTestLeveledLogger(t) return conf } @@ -112,6 +113,20 @@ func newTestLoggerWithPrefix(t *testing.T, prefix string) *log.Logger { return log.New(&testLoggerAdapter{t: t, prefix: prefix}, "", log.Lmicroseconds) } +func newTestLeveledLogger(t *testing.T) hclog.Logger { + return hclog.New(&hclog.LoggerOptions{ + Name: "", + Output: &testLoggerAdapter{t: t}, + }) +} + +func newTestLeveledLoggerWithPrefix(t *testing.T, prefix string) hclog.Logger { + return hclog.New(&hclog.LoggerOptions{ + Name: prefix, + Output: &testLoggerAdapter{t: t, prefix: prefix}, + }) +} + type cluster struct { dirs []string stores []*InmemStore @@ -628,7 +643,7 @@ func makeCluster(n int, bootstrap bool, t *testing.T, conf *Config) *cluster { peerConf := conf peerConf.LocalID = configuration.Servers[i].ID - peerConf.Logger = newTestLoggerWithPrefix(t, string(configuration.Servers[i].ID)) + peerConf.Logger = newTestLeveledLoggerWithPrefix(t, string(configuration.Servers[i].ID)) if bootstrap { err := BootstrapCluster(peerConf, logs, store, snap, trans, configuration) diff --git a/replication.go b/replication.go index 574d9ed6965..1f5f1007f5a 100644 --- a/replication.go +++ b/replication.go @@ -138,9 +138,9 @@ RPC: lastLogIdx, _ := r.getLastLog() shouldStop = r.replicateTo(s, lastLogIdx) // This is _not_ our heartbeat mechanism but is to ensure - // followers quickly learn the leader's commit index when - // raft commits stop flowing naturally. The actual heartbeats - // can't do this to keep them unblocked by disk IO on the + // followers quickly learn the leader's commit index when + // raft commits stop flowing naturally. The actual heartbeats + // can't do this to keep them unblocked by disk IO on the // follower. See https://github.com/hashicorp/raft/issues/282. case <-randomTimeout(r.conf.CommitTimeout): lastLogIdx, _ := r.getLastLog() @@ -163,7 +163,7 @@ PIPELINE: // to standard mode on failure. if err := r.pipelineReplicate(s); err != nil { if err != ErrPipelineReplicationNotSupported { - r.logger.Printf("[ERR] raft: Failed to start pipeline replication to %s: %s", s.peer, err) + r.logger.Error(fmt.Sprintf("Failed to start pipeline replication to %s: %s", s.peer, err)) } } goto RPC @@ -196,7 +196,7 @@ START: // Make the RPC call start = time.Now() if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil { - r.logger.Printf("[ERR] raft: Failed to AppendEntries to %v: %v", s.peer, err) + r.logger.Error(fmt.Sprintf("Failed to AppendEntries to %v: %v", s.peer, err)) s.failures++ return } @@ -226,7 +226,7 @@ START: } else { s.failures++ } - r.logger.Printf("[WARN] raft: AppendEntries to %v rejected, sending older logs (next: %d)", s.peer, s.nextIndex) + r.logger.Warn(fmt.Sprintf("AppendEntries to %v rejected, sending older logs (next: %d)", s.peer, s.nextIndex)) } CHECK_MORE: @@ -253,7 +253,7 @@ SEND_SNAP: if stop, err := r.sendLatestSnapshot(s); stop { return true } else if err != nil { - r.logger.Printf("[ERR] raft: Failed to send snapshot to %v: %v", s.peer, err) + r.logger.Error(fmt.Sprintf("Failed to send snapshot to %v: %v", s.peer, err)) return } @@ -267,7 +267,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { // Get the snapshots snapshots, err := r.snapshots.List() if err != nil { - r.logger.Printf("[ERR] raft: Failed to list snapshots: %v", err) + r.logger.Error(fmt.Sprintf("Failed to list snapshots: %v", err)) return false, err } @@ -280,7 +280,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { snapID := snapshots[0].ID meta, snapshot, err := r.snapshots.Open(snapID) if err != nil { - r.logger.Printf("[ERR] raft: Failed to open snapshot %v: %v", snapID, err) + r.logger.Error(fmt.Sprintf("Failed to open snapshot %v: %v", snapID, err)) return false, err } defer snapshot.Close() @@ -303,7 +303,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { start := time.Now() var resp InstallSnapshotResponse if err := r.trans.InstallSnapshot(s.peer.ID, s.peer.Address, &req, &resp, snapshot); err != nil { - r.logger.Printf("[ERR] raft: Failed to install snapshot %v: %v", snapID, err) + r.logger.Error(fmt.Sprintf("Failed to install snapshot %v: %v", snapID, err)) s.failures++ return false, err } @@ -331,7 +331,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { s.notifyAll(true) } else { s.failures++ - r.logger.Printf("[WARN] raft: InstallSnapshot to %v rejected", s.peer) + r.logger.Warn(fmt.Sprintf("InstallSnapshot to %v rejected", s.peer)) } return false, nil } @@ -358,7 +358,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { start := time.Now() if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil { - r.logger.Printf("[ERR] raft: Failed to heartbeat to %v: %v", s.peer.Address, err) + r.logger.Error(fmt.Sprintf("Failed to heartbeat to %v: %v", s.peer.Address, err)) failures++ select { case <-time.After(backoff(failureWait, failures, maxFailureScale)): @@ -386,8 +386,8 @@ func (r *Raft) pipelineReplicate(s *followerReplication) error { defer pipeline.Close() // Log start and stop of pipeline - r.logger.Printf("[INFO] raft: pipelining replication to peer %v", s.peer) - defer r.logger.Printf("[INFO] raft: aborting pipeline replication to peer %v", s.peer) + r.logger.Info(fmt.Sprintf("pipelining replication to peer %v", s.peer)) + defer r.logger.Info(fmt.Sprintf("aborting pipeline replication to peer %v", s.peer)) // Create a shutdown and finish channel stopCh := make(chan struct{}) @@ -440,7 +440,7 @@ func (r *Raft) pipelineSend(s *followerReplication, p AppendPipeline, nextIdx *u // Pipeline the append entries if _, err := p.AppendEntries(req, new(AppendEntriesResponse)); err != nil { - r.logger.Printf("[ERR] raft: Failed to pipeline AppendEntries to %v: %v", s.peer, err) + r.logger.Error(fmt.Sprintf("Failed to pipeline AppendEntries to %v: %v", s.peer, err)) return true } @@ -516,8 +516,7 @@ func (r *Raft) setPreviousLog(req *AppendEntriesRequest, nextIndex uint64) error } else { var l Log if err := r.logs.GetLog(nextIndex-1, &l); err != nil { - r.logger.Printf("[ERR] raft: Failed to get log at index %d: %v", - nextIndex-1, err) + r.logger.Error(fmt.Sprintf("Failed to get log at index %d: %v", nextIndex-1, err)) return err } @@ -536,7 +535,7 @@ func (r *Raft) setNewLogs(req *AppendEntriesRequest, nextIndex, lastIndex uint64 for i := nextIndex; i <= maxIndex; i++ { oldLog := new(Log) if err := r.logs.GetLog(i, oldLog); err != nil { - r.logger.Printf("[ERR] raft: Failed to get log at index %d: %v", i, err) + r.logger.Error(fmt.Sprintf("Failed to get log at index %d: %v", i, err)) return err } req.Entries = append(req.Entries, oldLog) @@ -552,7 +551,7 @@ func appendStats(peer string, start time.Time, logs float32) { // handleStaleTerm is used when a follower indicates that we have a stale term. func (r *Raft) handleStaleTerm(s *followerReplication) { - r.logger.Printf("[ERR] raft: peer %v has newer term, stopping replication", s.peer) + r.logger.Error(fmt.Sprintf("peer %v has newer term, stopping replication", s.peer)) s.notifyAll(false) // No longer leader asyncNotifyCh(s.stepDown) } diff --git a/snapshot.go b/snapshot.go index 5287ebc4183..2e0f77a5dd7 100644 --- a/snapshot.go +++ b/snapshot.go @@ -77,14 +77,14 @@ func (r *Raft) runSnapshots() { // Trigger a snapshot if _, err := r.takeSnapshot(); err != nil { - r.logger.Printf("[ERR] raft: Failed to take snapshot: %v", err) + r.logger.Error(fmt.Sprintf("Failed to take snapshot: %v", err)) } case future := <-r.userSnapshotCh: // User-triggered, run immediately id, err := r.takeSnapshot() if err != nil { - r.logger.Printf("[ERR] raft: Failed to take snapshot: %v", err) + r.logger.Error(fmt.Sprintf("Failed to take snapshot: %v", err)) } else { future.opener = func() (*SnapshotMeta, io.ReadCloser, error) { return r.snapshots.Open(id) @@ -107,7 +107,7 @@ func (r *Raft) shouldSnapshot() bool { // Check the last log index lastIdx, err := r.logs.LastIndex() if err != nil { - r.logger.Printf("[ERR] raft: Failed to get last log index: %v", err) + r.logger.Error(fmt.Sprintf("Failed to get last log index: %v", err)) return false } @@ -172,7 +172,7 @@ func (r *Raft) takeSnapshot() (string, error) { } // Create a new snapshot. - r.logger.Printf("[INFO] raft: Starting snapshot up to %d", snapReq.index) + r.logger.Info(fmt.Sprintf("Starting snapshot up to %d", snapReq.index)) start := time.Now() version := getSnapshotVersion(r.protocolVersion) sink, err := r.snapshots.Create(version, snapReq.index, snapReq.term, committed, committedIndex, r.trans) @@ -202,7 +202,7 @@ func (r *Raft) takeSnapshot() (string, error) { return "", err } - r.logger.Printf("[INFO] raft: Snapshot to %d complete", snapReq.index) + r.logger.Info(fmt.Sprintf("Snapshot to %d complete", snapReq.index)) return sink.ID(), nil } @@ -229,7 +229,7 @@ func (r *Raft) compactLogs(snapIdx uint64) error { maxLog := min(snapIdx, lastLogIdx-r.conf.TrailingLogs) // Log this - r.logger.Printf("[INFO] raft: Compacting logs from %d to %d", minLog, maxLog) + r.logger.Info(fmt.Sprintf("Compacting logs from %d to %d", minLog, maxLog)) // Compact the logs if err := r.logs.DeleteRange(minLog, maxLog); err != nil {