Skip to content

Commit

Permalink
Merge pull request hashicorp#465 from hashicorp/dnephin/fix-panic-in-…
Browse files Browse the repository at this point in the history
…decodePeers

Handle installSnapshot decodePeers error without a panic
  • Loading branch information
dnephin authored May 4, 2021
2 parents 06cc4ed + e59f65d commit cfb599d
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 17 deletions.
9 changes: 7 additions & 2 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,9 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
r.logger.Error("failed to get log", "index", index, "error", err)
panic(err)
}
r.processConfigurationLogEntry(&entry)
if err := r.processConfigurationLogEntry(&entry); err != nil {
return nil, err
}
}
r.logger.Info("initial configuration",
"index", r.configurations.latestIndex,
Expand Down Expand Up @@ -627,7 +629,10 @@ func (r *Raft) restoreSnapshot() error {
conf = snapshot.Configuration
index = snapshot.ConfigurationIndex
} else {
conf = decodePeers(snapshot.Peers, r.trans)
var err error
if conf, err = decodePeers(snapshot.Peers, r.trans); err != nil {
return err
}
index = snapshot.Index
}
r.setCommittedConfiguration(conf, index)
Expand Down
10 changes: 4 additions & 6 deletions configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,11 @@ func encodePeers(configuration Configuration, trans Transport) []byte {
// decodePeers is used to deserialize an old list of peers into a Configuration.
// This is here for backwards compatibility with old log entries and snapshots;
// it should be removed eventually.
func decodePeers(buf []byte, trans Transport) Configuration {
func decodePeers(buf []byte, trans Transport) (Configuration, error) {
// Decode the buffer first.
var encPeers [][]byte
if err := decodeMsgPack(buf, &encPeers); err != nil {
panic(fmt.Errorf("failed to decode peers: %v", err))
return Configuration{}, fmt.Errorf("failed to decode peers: %v", err)
}

// Deserialize each peer.
Expand All @@ -333,13 +333,11 @@ func decodePeers(buf []byte, trans Transport) Configuration {
servers = append(servers, Server{
Suffrage: Voter,
ID: ServerID(p),
Address: ServerAddress(p),
Address: p,
})
}

return Configuration{
Servers: servers,
}
return Configuration{Servers: servers}, nil
}

// EncodeConfiguration serializes a Configuration using MsgPack, or panics on
Expand Down
5 changes: 4 additions & 1 deletion configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"reflect"
"strings"
"testing"

"github.com/stretchr/testify/require"
)

var sampleConfiguration = Configuration{
Expand Down Expand Up @@ -300,7 +302,8 @@ func TestConfiguration_encodeDecodePeers(t *testing.T) {
buf := encodePeers(configuration, trans)

// Decode from old format, as if reading an old log entry.
decoded := decodePeers(buf, trans)
decoded, err := decodePeers(buf, trans)
require.NoError(t, err)
if !reflect.DeepEqual(configuration, decoded) {
t.Fatalf("mismatch %v %v", configuration, decoded)
}
Expand Down
32 changes: 24 additions & 8 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,7 @@ func (r *Raft) liveBootstrap(configuration Configuration) error {
}
r.setCurrentTerm(1)
r.setLastLog(entry.Index, entry.Term)
r.processConfigurationLogEntry(&entry)
return nil
return r.processConfigurationLogEntry(&entry)
}

// runCandidate runs the FSM for a candidate.
Expand Down Expand Up @@ -1383,7 +1382,13 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {

// Handle any new configuration changes
for _, newEntry := range newEntries {
r.processConfigurationLogEntry(newEntry)
if err := r.processConfigurationLogEntry(newEntry); err != nil {
r.logger.Warn("failed to append entry",
"index", newEntry.Index,
"error", err)
rpcErr = err
return
}
}

// Update the lastLog
Expand Down Expand Up @@ -1415,14 +1420,21 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
// processConfigurationLogEntry takes a log entry and updates the latest
// configuration if the entry results in a new configuration. This must only be
// called from the main thread, or from NewRaft() before any threads have begun.
func (r *Raft) processConfigurationLogEntry(entry *Log) {
if entry.Type == LogConfiguration {
func (r *Raft) processConfigurationLogEntry(entry *Log) error {
switch entry.Type {
case LogConfiguration:
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
r.setLatestConfiguration(DecodeConfiguration(entry.Data), entry.Index)
} else if entry.Type == LogAddPeerDeprecated || entry.Type == LogRemovePeerDeprecated {

case LogAddPeerDeprecated, LogRemovePeerDeprecated:
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
r.setLatestConfiguration(decodePeers(entry.Data, r.trans), entry.Index)
conf, err := decodePeers(entry.Data, r.trans)
if err != nil {
return err
}
r.setLatestConfiguration(conf, entry.Index)
}
return nil
}

// requestVote is invoked when we get an request vote RPC call.
Expand Down Expand Up @@ -1574,7 +1586,11 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
reqConfiguration = DecodeConfiguration(req.Configuration)
reqConfigurationIndex = req.ConfigurationIndex
} else {
reqConfiguration = decodePeers(req.Peers, r.trans)
reqConfiguration, rpcErr = decodePeers(req.Peers, r.trans)
if rpcErr != nil {
r.logger.Error("failed to install snapshot", "error", rpcErr)
return
}
reqConfigurationIndex = req.LastLogIndex
}
version := getSnapshotVersion(r.protocolVersion)
Expand Down
22 changes: 22 additions & 0 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"testing"
"time"

"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -2316,3 +2317,24 @@ func TestRaft_ReloadConfigValidates(t *testing.T) {
//
// Storage errors handled properly.
// Commit index updated properly.

func TestRaft_InstallSnapshot_InvalidPeers(t *testing.T) {
_, transport := NewInmemTransport("")
r := &Raft{
trans: transport,
logger: hclog.New(nil),
}

req := &InstallSnapshotRequest{
Peers: []byte("invalid msgpack"),
}
chResp := make(chan RPCResponse, 1)
rpc := RPC{
Reader: new(bytes.Buffer),
RespChan: chResp,
}
r.installSnapshot(rpc, req)
resp := <-chResp
require.Error(t, resp.Error)
require.Contains(t, resp.Error.Error(), "failed to decode peers")
}

0 comments on commit cfb599d

Please sign in to comment.