From e59f65d60b9e3d48f311560aa7e99523647c53f7 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 30 Apr 2021 17:16:56 -0400 Subject: [PATCH] Handle installSnapshot decodePeers error without a panic decodePeers can be called by the installSnapshot RPC handler, which means that a panic could be caused by an external actor by sending malformed input. Instead handle the error and report it back via the RPC response. --- api.go | 9 +++++++-- configuration.go | 10 ++++------ configuration_test.go | 5 ++++- raft.go | 32 ++++++++++++++++++++++++-------- raft_test.go | 22 ++++++++++++++++++++++ 5 files changed, 61 insertions(+), 17 deletions(-) diff --git a/api.go b/api.go index 8e49e87f55c..9152cf6201a 100644 --- a/api.go +++ b/api.go @@ -564,7 +564,9 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna r.logger.Error("failed to get log", "index", index, "error", err) panic(err) } - r.processConfigurationLogEntry(&entry) + if err := r.processConfigurationLogEntry(&entry); err != nil { + return nil, err + } } r.logger.Info("initial configuration", "index", r.configurations.latestIndex, @@ -627,7 +629,10 @@ func (r *Raft) restoreSnapshot() error { conf = snapshot.Configuration index = snapshot.ConfigurationIndex } else { - conf = decodePeers(snapshot.Peers, r.trans) + var err error + if conf, err = decodePeers(snapshot.Peers, r.trans); err != nil { + return err + } index = snapshot.Index } r.setCommittedConfiguration(conf, index) diff --git a/configuration.go b/configuration.go index 5bd106d0cb3..5c663605859 100644 --- a/configuration.go +++ b/configuration.go @@ -319,11 +319,11 @@ func encodePeers(configuration Configuration, trans Transport) []byte { // decodePeers is used to deserialize an old list of peers into a Configuration. // This is here for backwards compatibility with old log entries and snapshots; // it should be removed eventually. -func decodePeers(buf []byte, trans Transport) Configuration { +func decodePeers(buf []byte, trans Transport) (Configuration, error) { // Decode the buffer first. var encPeers [][]byte if err := decodeMsgPack(buf, &encPeers); err != nil { - panic(fmt.Errorf("failed to decode peers: %v", err)) + return Configuration{}, fmt.Errorf("failed to decode peers: %v", err) } // Deserialize each peer. @@ -333,13 +333,11 @@ func decodePeers(buf []byte, trans Transport) Configuration { servers = append(servers, Server{ Suffrage: Voter, ID: ServerID(p), - Address: ServerAddress(p), + Address: p, }) } - return Configuration{ - Servers: servers, - } + return Configuration{Servers: servers}, nil } // EncodeConfiguration serializes a Configuration using MsgPack, or panics on diff --git a/configuration_test.go b/configuration_test.go index 32c17e3519d..edbe545b1f6 100644 --- a/configuration_test.go +++ b/configuration_test.go @@ -5,6 +5,8 @@ import ( "reflect" "strings" "testing" + + "github.com/stretchr/testify/require" ) var sampleConfiguration = Configuration{ @@ -300,7 +302,8 @@ func TestConfiguration_encodeDecodePeers(t *testing.T) { buf := encodePeers(configuration, trans) // Decode from old format, as if reading an old log entry. - decoded := decodePeers(buf, trans) + decoded, err := decodePeers(buf, trans) + require.NoError(t, err) if !reflect.DeepEqual(configuration, decoded) { t.Fatalf("mismatch %v %v", configuration, decoded) } diff --git a/raft.go b/raft.go index 1d3e8e71d31..9d6b6cac45b 100644 --- a/raft.go +++ b/raft.go @@ -244,8 +244,7 @@ func (r *Raft) liveBootstrap(configuration Configuration) error { } r.setCurrentTerm(1) r.setLastLog(entry.Index, entry.Term) - r.processConfigurationLogEntry(&entry) - return nil + return r.processConfigurationLogEntry(&entry) } // runCandidate runs the FSM for a candidate. @@ -1383,7 +1382,13 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { // Handle any new configuration changes for _, newEntry := range newEntries { - r.processConfigurationLogEntry(newEntry) + if err := r.processConfigurationLogEntry(newEntry); err != nil { + r.logger.Warn("failed to append entry", + "index", newEntry.Index, + "error", err) + rpcErr = err + return + } } // Update the lastLog @@ -1415,14 +1420,21 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { // processConfigurationLogEntry takes a log entry and updates the latest // configuration if the entry results in a new configuration. This must only be // called from the main thread, or from NewRaft() before any threads have begun. -func (r *Raft) processConfigurationLogEntry(entry *Log) { - if entry.Type == LogConfiguration { +func (r *Raft) processConfigurationLogEntry(entry *Log) error { + switch entry.Type { + case LogConfiguration: r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex) r.setLatestConfiguration(DecodeConfiguration(entry.Data), entry.Index) - } else if entry.Type == LogAddPeerDeprecated || entry.Type == LogRemovePeerDeprecated { + + case LogAddPeerDeprecated, LogRemovePeerDeprecated: r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex) - r.setLatestConfiguration(decodePeers(entry.Data, r.trans), entry.Index) + conf, err := decodePeers(entry.Data, r.trans) + if err != nil { + return err + } + r.setLatestConfiguration(conf, entry.Index) } + return nil } // requestVote is invoked when we get an request vote RPC call. @@ -1574,7 +1586,11 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { reqConfiguration = DecodeConfiguration(req.Configuration) reqConfigurationIndex = req.ConfigurationIndex } else { - reqConfiguration = decodePeers(req.Peers, r.trans) + reqConfiguration, rpcErr = decodePeers(req.Peers, r.trans) + if rpcErr != nil { + r.logger.Error("failed to install snapshot", "error", rpcErr) + return + } reqConfigurationIndex = req.LastLogIndex } version := getSnapshotVersion(r.protocolVersion) diff --git a/raft_test.go b/raft_test.go index 30eb3b6989f..03837b31239 100644 --- a/raft_test.go +++ b/raft_test.go @@ -13,6 +13,7 @@ import ( "testing" "time" + "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/require" ) @@ -2316,3 +2317,24 @@ func TestRaft_ReloadConfigValidates(t *testing.T) { // // Storage errors handled properly. // Commit index updated properly. + +func TestRaft_InstallSnapshot_InvalidPeers(t *testing.T) { + _, transport := NewInmemTransport("") + r := &Raft{ + trans: transport, + logger: hclog.New(nil), + } + + req := &InstallSnapshotRequest{ + Peers: []byte("invalid msgpack"), + } + chResp := make(chan RPCResponse, 1) + rpc := RPC{ + Reader: new(bytes.Buffer), + RespChan: chResp, + } + r.installSnapshot(rpc, req) + resp := <-chResp + require.Error(t, resp.Error) + require.Contains(t, resp.Error.Error(), "failed to decode peers") +}