diff --git a/client/pkg/types/id.go b/client/pkg/types/id.go index ae00388dde0..9a8429391ed 100644 --- a/client/pkg/types/id.go +++ b/client/pkg/types/id.go @@ -14,7 +14,10 @@ package types -import "strconv" +import ( + "bytes" + "strconv" +) // ID represents a generic identifier which is canonically // stored as a uint64 but is typically represented as a @@ -37,3 +40,17 @@ type IDSlice []ID func (p IDSlice) Len() int { return len(p) } func (p IDSlice) Less(i, j int) bool { return uint64(p[i]) < uint64(p[j]) } func (p IDSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +func (p IDSlice) String() string { + var b bytes.Buffer + if p.Len() > 0 { + b.WriteString(p[0].String()) + } + + for i := 1; i < p.Len(); i++ { + b.WriteString(",") + b.WriteString(p[i].String()) + } + + return b.String() +} diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index 7b72124e670..20e19fbb706 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -251,6 +251,17 @@ func (cm *corruptionChecker) PeriodicCheck() error { return nil } +// CompactHashCheck is based on the fact that 'compactions' are coordinated +// between raft members and performed at the same revision. For each compacted +// revision there is KV store hash computed and saved for some time. +// +// This method communicates with peers to find a recent common revision across +// members, and raises alarm if 2 or more members at the same compact revision +// have different hashes. +// +// We might miss opportunity to perform the check if the compaction is still +// ongoing on one of the members or it was unresponsive. In such situation the +// method still passes without raising alarm. func (cm *corruptionChecker) CompactHashCheck() { cm.lg.Info("starting compact hash check", zap.String("local-member-id", cm.hasher.MemberId().String()), @@ -258,57 +269,136 @@ func (cm *corruptionChecker) CompactHashCheck() { ) hashes := cm.uncheckedRevisions() // Assume that revisions are ordered from largest to smallest - for i, hash := range hashes { + for _, hash := range hashes { peers := cm.hasher.PeerHashByRev(hash.Revision) if len(peers) == 0 { continue } - peersChecked := 0 - for _, p := range peers { - if p.resp == nil || p.resp.CompactRevision != hash.CompactRevision { - continue - } + if cm.checkPeerHashes(hash, peers) { + return + } + } + cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", len(hashes))) + return +} - // follower's compact revision is leader's old one, then hashes must match - if p.resp.Hash != hash.Hash { - cm.hasher.TriggerCorruptAlarm(p.id) - cm.lg.Error("failed compaction hash check", - zap.Int64("revision", hash.Revision), - zap.Int64("leader-compact-revision", hash.CompactRevision), - zap.Uint32("leader-hash", hash.Hash), - zap.Int64("follower-compact-revision", p.resp.CompactRevision), - zap.Uint32("follower-hash", p.resp.Hash), - zap.String("follower-peer-id", p.id.String()), - ) - return - } - peersChecked++ - cm.lg.Info("successfully checked hash on follower", - zap.Int64("revision", hash.Revision), - zap.String("peer-id", p.id.String()), - ) +// check peers hash and raise alarms if detected corruption. +// return a bool indicate whether to check next hash. +// +// true: successfully checked hash on whole cluster or raised alarms, so no need to check next hash +// false: skipped some members, so need to check next hash +func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers []*peerHashKVResp) bool { + leaderId := cm.hasher.MemberId() + hash2members := map[uint32]types.IDSlice{leaderHash.Hash: {leaderId}} + + peersChecked := 0 + // group all peers by hash + for _, peer := range peers { + skipped := false + reason := "" + + if peer.resp == nil { + skipped = true + reason = "no response" + } else if peer.resp.CompactRevision != leaderHash.CompactRevision { + skipped = true + reason = fmt.Sprintf("the peer's CompactRevision %d doesn't match leader's CompactRevision %d", + peer.resp.CompactRevision, leaderHash.CompactRevision) } - if len(peers) == peersChecked { - cm.lg.Info("successfully checked hash on whole cluster", - zap.Int("number-of-peers-checked", peersChecked), - zap.Int64("revision", hash.Revision), - ) - cm.mux.Lock() - if hash.Revision > cm.latestRevisionChecked { - cm.latestRevisionChecked = hash.Revision + if skipped { + cm.lg.Warn("Skipped peer's hash", zap.Int("number-of-peers", len(peers)), + zap.String("leader-id", leaderId.String()), + zap.String("peer-id", peer.id.String()), + zap.String("reason", reason)) + continue + } + + peersChecked++ + if ids, ok := hash2members[peer.resp.Hash]; !ok { + hash2members[peer.resp.Hash] = []types.ID{peer.id} + } else { + ids = append(ids, peer.id) + hash2members[peer.resp.Hash] = ids + } + } + + // All members have the same CompactRevision and Hash. + if len(hash2members) == 1 { + return cm.handleConsistentHash(leaderHash, peersChecked, len(peers)) + } + + // Detected hashes mismatch + // The first step is to figure out the majority with the same hash. + memberCnt := len(peers) + 1 + quorum := memberCnt/2 + 1 + quorumExist := false + for k, v := range hash2members { + if len(v) >= quorum { + quorumExist = true + // remove the majority, and we might raise alarms for the left members. + delete(hash2members, k) + break + } + } + + if !quorumExist { + // If quorum doesn't exist, we don't know which members data are + // corrupted. In such situation, we intentionally set the memberID + // as 0, it means it affects the whole cluster. + cm.lg.Error("Detected compaction hash mismatch but cannot identify the corrupted members, so intentionally set the memberID as 0", + zap.String("leader-id", leaderId.String()), + zap.Int64("leader-revision", leaderHash.Revision), + zap.Int64("leader-compact-revision", leaderHash.CompactRevision), + zap.Uint32("leader-hash", leaderHash.Hash), + ) + cm.hasher.TriggerCorruptAlarm(0) + } + + // Raise alarm for the left members if the quorum is present. + // But we should always generate error log for debugging. + for k, v := range hash2members { + if quorumExist { + for _, pid := range v { + cm.hasher.TriggerCorruptAlarm(pid) } - cm.mux.Unlock() - cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", i+1)) - return } - cm.lg.Warn("skipped revision in compaction hash check; was not able to check all peers", + + cm.lg.Error("Detected compaction hash mismatch", + zap.String("leader-id", leaderId.String()), + zap.Int64("leader-revision", leaderHash.Revision), + zap.Int64("leader-compact-revision", leaderHash.CompactRevision), + zap.Uint32("leader-hash", leaderHash.Hash), + zap.Uint32("peer-hash", k), + zap.String("peer-ids", v.String()), + zap.Bool("quorum-exist", quorumExist), + ) + } + + return true +} + +func (cm *corruptionChecker) handleConsistentHash(hash mvcc.KeyValueHash, peersChecked, peerCnt int) bool { + if peersChecked == peerCnt { + cm.lg.Info("successfully checked hash on whole cluster", zap.Int("number-of-peers-checked", peersChecked), - zap.Int("number-of-peers", len(peers)), zap.Int64("revision", hash.Revision), + zap.Int64("compactRevision", hash.CompactRevision), ) + cm.mux.Lock() + if hash.Revision > cm.latestRevisionChecked { + cm.latestRevisionChecked = hash.Revision + } + cm.mux.Unlock() + return true } - cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", len(hashes))) - return + cm.lg.Warn("skipped revision in compaction hash check; was not able to check all peers", + zap.Int("number-of-peers-checked", peersChecked), + zap.Int("number-of-peers", peerCnt), + zap.Int64("revision", hash.Revision), + zap.Int64("compactRevision", hash.CompactRevision), + ) + // The only case which needs to check next hash + return false } func (cm *corruptionChecker) uncheckedRevisions() []mvcc.KeyValueHash { diff --git a/server/etcdserver/corrupt_test.go b/server/etcdserver/corrupt_test.go index 93942051c50..dd876ca2333 100644 --- a/server/etcdserver/corrupt_test.go +++ b/server/etcdserver/corrupt_test.go @@ -251,7 +251,7 @@ func TestCompactHashCheck(t *testing.T) { hashes: []mvcc.KeyValueHash{{Revision: 1}, {Revision: 2}}, peerHashes: []*peerHashKVResp{{err: fmt.Errorf("failed getting hash")}}, }, - expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"}, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "PeerHashByRev(1)", "MemberId()"}, }, { name: "Peer returned different compaction revision is skipped", @@ -259,15 +259,112 @@ func TestCompactHashCheck(t *testing.T) { hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1}, {Revision: 2, CompactRevision: 2}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{CompactRevision: 3}}}, }, - expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"}, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "PeerHashByRev(1)", "MemberId()"}, }, { - name: "Peer returned same compaction revision but different hash triggers alarm", + name: "Etcd can identify two corrupted members in 5 member cluster", hasher: fakeHasher{ - hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}}, - peerHashes: []*peerHashKVResp{{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}}, + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}}, + peerHashes: []*peerHashKVResp{ + {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + {peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + {peerInfo: peerInfo{id: 44}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 7}}, + {peerInfo: peerInfo{id: 45}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 8}}, + }, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(44)", "TriggerCorruptAlarm(45)"}, + expectCorrupt: true, + }, + { + name: "Etcd checks next hash when one member is unresponsive in 3 member cluster", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}}, + peerHashes: []*peerHashKVResp{ + {err: fmt.Errorf("failed getting hash")}, + {peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + }, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "PeerHashByRev(1)", "MemberId()"}, + expectCorrupt: false, + }, + { + name: "Etcd can identify single corrupted member in 3 member cluster", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}}, + peerHashes: []*peerHashKVResp{ + {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + {peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, + }, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(43)"}, + expectCorrupt: true, + }, + { + name: "Etcd can identify single corrupted member in 5 member cluster", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}}, + peerHashes: []*peerHashKVResp{ + {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + {peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + {peerInfo: peerInfo{id: 44}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, + {peerInfo: peerInfo{id: 45}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + }, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(44)"}, + expectCorrupt: true, + }, + { + name: "Etcd triggers corrupted alarm on whole cluster if in 3 member cluster one member is down and one member corrupted", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}}, + peerHashes: []*peerHashKVResp{ + {err: fmt.Errorf("failed getting hash")}, + {peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, + }, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(0)"}, + expectCorrupt: true, + }, + { + name: "Etcd triggers corrupted alarm on whole cluster if no quorum in 5 member cluster", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}}, + peerHashes: []*peerHashKVResp{ + {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + {peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, + {peerInfo: peerInfo{id: 44}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, + {peerInfo: peerInfo{id: 45}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, + {peerInfo: peerInfo{id: 46}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 4}}, + {peerInfo: peerInfo{id: 47}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + }, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(0)"}, + expectCorrupt: true, + }, + { + name: "Etcd can identify corrupted member in 5 member cluster even if one member is down", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}}, + peerHashes: []*peerHashKVResp{ + {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + {err: fmt.Errorf("failed getting hash")}, + {peerInfo: peerInfo{id: 44}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, + {peerInfo: peerInfo{id: 45}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + }, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(44)"}, + expectCorrupt: true, + }, + { + name: "Etcd can identify that leader is corrupted", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}}, + peerHashes: []*peerHashKVResp{ + {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, + {peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, + }, }, - expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "TriggerCorruptAlarm(42)"}, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(1)"}, expectCorrupt: true, }, { @@ -276,7 +373,7 @@ func TestCompactHashCheck(t *testing.T) { hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 1}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{MemberId: 42}, CompactRevision: 1, Hash: 1}}}, }, - expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)"}, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()"}, expectLastRevisionChecked: 2, }, { @@ -288,7 +385,7 @@ func TestCompactHashCheck(t *testing.T) { {err: fmt.Errorf("failed getting hash")}, }, }, - expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)"}, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)", "MemberId()"}, }, } for _, tc := range tcs { diff --git a/tests/integration/corrupt_test.go b/tests/integration/corrupt_test.go index ee2474a2371..cd695c5782e 100644 --- a/tests/integration/corrupt_test.go +++ b/tests/integration/corrupt_test.go @@ -93,30 +93,13 @@ func TestPeriodicCheckDetectsCorruption(t *testing.T) { time.Sleep(50 * time.Millisecond) leader := clus.WaitLeader(t) - // Get sorted member IDs - members, err := cc.MemberList(ctx) - assert.NoError(t, err, "error on member list %v") - - // NOTE: If the corrupted member has been elected as leader, the - // alarm will show the smaller member. - var expectedID = uint64(clus.Members[0].ID()) - if leader == 0 { - for _, m := range members.Members { - if m.Name != clus.Members[0].Name { - expectedID = m.ID - break - } - } - - } - err = clus.Members[leader].Server.CorruptionChecker().PeriodicCheck() assert.NoError(t, err, "error on periodic check") time.Sleep(50 * time.Millisecond) alarmResponse, err := cc.AlarmList(ctx) assert.NoError(t, err, "error on alarm list") - assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: expectedID}}, alarmResponse.Alarms) + assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: uint64(clus.Members[0].ID())}}, alarmResponse.Alarms) } func TestCompactHashCheck(t *testing.T) { @@ -186,26 +169,64 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) { time.Sleep(50 * time.Millisecond) leader := clus.WaitLeader(t) - // Get sorted member IDs - members, err := cc.MemberList(ctx) - assert.NoError(t, err, "error on member list %v") - - // NOTE: If the corrupted member has been elected as leader, the - // alarm will show the smaller member. - var expectedID = uint64(clus.Members[0].ID()) - if leader == 0 { - for _, m := range members.Members { - if m.Name != clus.Members[0].Name { - expectedID = m.ID - break - } - } + clus.Members[leader].Server.CorruptionChecker().CompactHashCheck() + time.Sleep(50 * time.Millisecond) + alarmResponse, err := cc.AlarmList(ctx) + assert.NoError(t, err, "error on alarm list") + assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: uint64(clus.Members[0].ID())}}, alarmResponse.Alarms) +} + +func TestCompactHashCheckDetectMultipleCorruption(t *testing.T) { + integration.BeforeTest(t) + + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 5}) + defer clus.Terminate(t) + + cc, err := clus.ClusterClient(t) + require.NoError(t, err) + ctx := context.Background() + + for i := 0; i < 10; i++ { + _, err := cc.Put(ctx, testutil.PickKey(int64(i)), fmt.Sprint(i)) + assert.NoError(t, err, "error on put") } + clus.Members[0].Server.CorruptionChecker().CompactHashCheck() + clus.Members[0].Stop(t) + clus.Members[1].Server.CorruptionChecker().CompactHashCheck() + clus.Members[1].Stop(t) + clus.WaitLeader(t) + + err = testutil.CorruptBBolt(clus.Members[0].BackendPath()) + require.NoError(t, err) + err = testutil.CorruptBBolt(clus.Members[1].BackendPath()) + require.NoError(t, err) + + err = clus.Members[0].Restart(t) + require.NoError(t, err) + err = clus.Members[1].Restart(t) + require.NoError(t, err) + + _, err = cc.Compact(ctx, 5) + require.NoError(t, err) + time.Sleep(50 * time.Millisecond) + leader := clus.WaitLeader(t) + clus.Members[leader].Server.CorruptionChecker().CompactHashCheck() time.Sleep(50 * time.Millisecond) alarmResponse, err := cc.AlarmList(ctx) assert.NoError(t, err, "error on alarm list") - assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: expectedID}}, alarmResponse.Alarms) + + expectedAlarmMap := map[uint64]etcdserverpb.AlarmType{ + uint64(clus.Members[0].ID()): etcdserverpb.AlarmType_CORRUPT, + uint64(clus.Members[1].ID()): etcdserverpb.AlarmType_CORRUPT, + } + + actualAlarmMap := make(map[uint64]etcdserverpb.AlarmType) + for _, alarm := range alarmResponse.Alarms { + actualAlarmMap[alarm.MemberID] = alarm.Alarm + } + + require.Equal(t, expectedAlarmMap, actualAlarmMap) }