Skip to content

Commit

Permalink
Merge pull request #14828 from ahrtr/identify_corrupted_member_20221123
Browse files Browse the repository at this point in the history
Identify corrupted member depending on quorum
  • Loading branch information
ahrtr authored Nov 28, 2022
2 parents 6a156bd + d545d60 commit cf171fd
Show file tree
Hide file tree
Showing 4 changed files with 306 additions and 81 deletions.
19 changes: 18 additions & 1 deletion client/pkg/types/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package types

import "strconv"
import (
"bytes"
"strconv"
)

// ID represents a generic identifier which is canonically
// stored as a uint64 but is typically represented as a
Expand All @@ -37,3 +40,17 @@ type IDSlice []ID
func (p IDSlice) Len() int { return len(p) }
func (p IDSlice) Less(i, j int) bool { return uint64(p[i]) < uint64(p[j]) }
func (p IDSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

func (p IDSlice) String() string {
var b bytes.Buffer
if p.Len() > 0 {
b.WriteString(p[0].String())
}

for i := 1; i < p.Len(); i++ {
b.WriteString(",")
b.WriteString(p[i].String())
}

return b.String()
}
168 changes: 129 additions & 39 deletions server/etcdserver/corrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,64 +251,154 @@ func (cm *corruptionChecker) PeriodicCheck() error {
return nil
}

// CompactHashCheck is based on the fact that 'compactions' are coordinated
// between raft members and performed at the same revision. For each compacted
// revision there is KV store hash computed and saved for some time.
//
// This method communicates with peers to find a recent common revision across
// members, and raises alarm if 2 or more members at the same compact revision
// have different hashes.
//
// We might miss opportunity to perform the check if the compaction is still
// ongoing on one of the members or it was unresponsive. In such situation the
// method still passes without raising alarm.
func (cm *corruptionChecker) CompactHashCheck() {
cm.lg.Info("starting compact hash check",
zap.String("local-member-id", cm.hasher.MemberId().String()),
zap.Duration("timeout", cm.hasher.ReqTimeout()),
)
hashes := cm.uncheckedRevisions()
// Assume that revisions are ordered from largest to smallest
for i, hash := range hashes {
for _, hash := range hashes {
peers := cm.hasher.PeerHashByRev(hash.Revision)
if len(peers) == 0 {
continue
}
peersChecked := 0
for _, p := range peers {
if p.resp == nil || p.resp.CompactRevision != hash.CompactRevision {
continue
}
if cm.checkPeerHashes(hash, peers) {
return
}
}
cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", len(hashes)))
return
}

// follower's compact revision is leader's old one, then hashes must match
if p.resp.Hash != hash.Hash {
cm.hasher.TriggerCorruptAlarm(p.id)
cm.lg.Error("failed compaction hash check",
zap.Int64("revision", hash.Revision),
zap.Int64("leader-compact-revision", hash.CompactRevision),
zap.Uint32("leader-hash", hash.Hash),
zap.Int64("follower-compact-revision", p.resp.CompactRevision),
zap.Uint32("follower-hash", p.resp.Hash),
zap.String("follower-peer-id", p.id.String()),
)
return
}
peersChecked++
cm.lg.Info("successfully checked hash on follower",
zap.Int64("revision", hash.Revision),
zap.String("peer-id", p.id.String()),
)
// check peers hash and raise alarms if detected corruption.
// return a bool indicate whether to check next hash.
//
// true: successfully checked hash on whole cluster or raised alarms, so no need to check next hash
// false: skipped some members, so need to check next hash
func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers []*peerHashKVResp) bool {
leaderId := cm.hasher.MemberId()
hash2members := map[uint32]types.IDSlice{leaderHash.Hash: {leaderId}}

peersChecked := 0
// group all peers by hash
for _, peer := range peers {
skipped := false
reason := ""

if peer.resp == nil {
skipped = true
reason = "no response"
} else if peer.resp.CompactRevision != leaderHash.CompactRevision {
skipped = true
reason = fmt.Sprintf("the peer's CompactRevision %d doesn't match leader's CompactRevision %d",
peer.resp.CompactRevision, leaderHash.CompactRevision)
}
if len(peers) == peersChecked {
cm.lg.Info("successfully checked hash on whole cluster",
zap.Int("number-of-peers-checked", peersChecked),
zap.Int64("revision", hash.Revision),
)
cm.mux.Lock()
if hash.Revision > cm.latestRevisionChecked {
cm.latestRevisionChecked = hash.Revision
if skipped {
cm.lg.Warn("Skipped peer's hash", zap.Int("number-of-peers", len(peers)),
zap.String("leader-id", leaderId.String()),
zap.String("peer-id", peer.id.String()),
zap.String("reason", reason))
continue
}

peersChecked++
if ids, ok := hash2members[peer.resp.Hash]; !ok {
hash2members[peer.resp.Hash] = []types.ID{peer.id}
} else {
ids = append(ids, peer.id)
hash2members[peer.resp.Hash] = ids
}
}

// All members have the same CompactRevision and Hash.
if len(hash2members) == 1 {
return cm.handleConsistentHash(leaderHash, peersChecked, len(peers))
}

// Detected hashes mismatch
// The first step is to figure out the majority with the same hash.
memberCnt := len(peers) + 1
quorum := memberCnt/2 + 1
quorumExist := false
for k, v := range hash2members {
if len(v) >= quorum {
quorumExist = true
// remove the majority, and we might raise alarms for the left members.
delete(hash2members, k)
break
}
}

if !quorumExist {
// If quorum doesn't exist, we don't know which members data are
// corrupted. In such situation, we intentionally set the memberID
// as 0, it means it affects the whole cluster.
cm.lg.Error("Detected compaction hash mismatch but cannot identify the corrupted members, so intentionally set the memberID as 0",
zap.String("leader-id", leaderId.String()),
zap.Int64("leader-revision", leaderHash.Revision),
zap.Int64("leader-compact-revision", leaderHash.CompactRevision),
zap.Uint32("leader-hash", leaderHash.Hash),
)
cm.hasher.TriggerCorruptAlarm(0)
}

// Raise alarm for the left members if the quorum is present.
// But we should always generate error log for debugging.
for k, v := range hash2members {
if quorumExist {
for _, pid := range v {
cm.hasher.TriggerCorruptAlarm(pid)
}
cm.mux.Unlock()
cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", i+1))
return
}
cm.lg.Warn("skipped revision in compaction hash check; was not able to check all peers",

cm.lg.Error("Detected compaction hash mismatch",
zap.String("leader-id", leaderId.String()),
zap.Int64("leader-revision", leaderHash.Revision),
zap.Int64("leader-compact-revision", leaderHash.CompactRevision),
zap.Uint32("leader-hash", leaderHash.Hash),
zap.Uint32("peer-hash", k),
zap.String("peer-ids", v.String()),
zap.Bool("quorum-exist", quorumExist),
)
}

return true
}

func (cm *corruptionChecker) handleConsistentHash(hash mvcc.KeyValueHash, peersChecked, peerCnt int) bool {
if peersChecked == peerCnt {
cm.lg.Info("successfully checked hash on whole cluster",
zap.Int("number-of-peers-checked", peersChecked),
zap.Int("number-of-peers", len(peers)),
zap.Int64("revision", hash.Revision),
zap.Int64("compactRevision", hash.CompactRevision),
)
cm.mux.Lock()
if hash.Revision > cm.latestRevisionChecked {
cm.latestRevisionChecked = hash.Revision
}
cm.mux.Unlock()
return true
}
cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", len(hashes)))
return
cm.lg.Warn("skipped revision in compaction hash check; was not able to check all peers",
zap.Int("number-of-peers-checked", peersChecked),
zap.Int("number-of-peers", peerCnt),
zap.Int64("revision", hash.Revision),
zap.Int64("compactRevision", hash.CompactRevision),
)
// The only case which needs to check next hash
return false
}

func (cm *corruptionChecker) uncheckedRevisions() []mvcc.KeyValueHash {
Expand Down
113 changes: 105 additions & 8 deletions server/etcdserver/corrupt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,23 +251,120 @@ func TestCompactHashCheck(t *testing.T) {
hashes: []mvcc.KeyValueHash{{Revision: 1}, {Revision: 2}},
peerHashes: []*peerHashKVResp{{err: fmt.Errorf("failed getting hash")}},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "PeerHashByRev(1)", "MemberId()"},
},
{
name: "Peer returned different compaction revision is skipped",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1}, {Revision: 2, CompactRevision: 2}},
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{CompactRevision: 3}}},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "PeerHashByRev(1)", "MemberId()"},
},
{
name: "Peer returned same compaction revision but different hash triggers alarm",
name: "Etcd can identify two corrupted members in 5 member cluster",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}},
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{
{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
{peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
{peerInfo: peerInfo{id: 44}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 7}},
{peerInfo: peerInfo{id: 45}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 8}},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(44)", "TriggerCorruptAlarm(45)"},
expectCorrupt: true,
},
{
name: "Etcd checks next hash when one member is unresponsive in 3 member cluster",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{
{err: fmt.Errorf("failed getting hash")},
{peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "PeerHashByRev(1)", "MemberId()"},
expectCorrupt: false,
},
{
name: "Etcd can identify single corrupted member in 3 member cluster",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{
{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
{peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(43)"},
expectCorrupt: true,
},
{
name: "Etcd can identify single corrupted member in 5 member cluster",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{
{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
{peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
{peerInfo: peerInfo{id: 44}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
{peerInfo: peerInfo{id: 45}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(44)"},
expectCorrupt: true,
},
{
name: "Etcd triggers corrupted alarm on whole cluster if in 3 member cluster one member is down and one member corrupted",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{
{err: fmt.Errorf("failed getting hash")},
{peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(0)"},
expectCorrupt: true,
},
{
name: "Etcd triggers corrupted alarm on whole cluster if no quorum in 5 member cluster",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{
{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
{peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
{peerInfo: peerInfo{id: 44}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
{peerInfo: peerInfo{id: 45}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
{peerInfo: peerInfo{id: 46}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 4}},
{peerInfo: peerInfo{id: 47}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(0)"},
expectCorrupt: true,
},
{
name: "Etcd can identify corrupted member in 5 member cluster even if one member is down",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{
{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
{err: fmt.Errorf("failed getting hash")},
{peerInfo: peerInfo{id: 44}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
{peerInfo: peerInfo{id: 45}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(44)"},
expectCorrupt: true,
},
{
name: "Etcd can identify that leader is corrupted",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{
{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
{peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "TriggerCorruptAlarm(42)"},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(1)"},
expectCorrupt: true,
},
{
Expand All @@ -276,7 +373,7 @@ func TestCompactHashCheck(t *testing.T) {
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 1}},
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{MemberId: 42}, CompactRevision: 1, Hash: 1}}},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)"},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()"},
expectLastRevisionChecked: 2,
},
{
Expand All @@ -288,7 +385,7 @@ func TestCompactHashCheck(t *testing.T) {
{err: fmt.Errorf("failed getting hash")},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)"},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)", "MemberId()"},
},
}
for _, tc := range tcs {
Expand Down
Loading

0 comments on commit cf171fd

Please sign in to comment.