From c2149c5dfc623b990facf945ecfa43b5193a811c Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 22 Nov 2017 14:57:07 -0800 Subject: [PATCH] etcdserver: CheckInitialHashKV when "InitialCorruptCheck==true" etcdserver: only compare hash values if any It's possible that peer has higher revision than local node. In such case, hashes will still be different on requested revision, but peer's header revision is greater. etcdserver: count mismatch only when compact revisions are same Signed-off-by: Gyu-Ho Lee --- etcdserver/corrupt.go | 103 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 85 insertions(+), 18 deletions(-) diff --git a/etcdserver/corrupt.go b/etcdserver/corrupt.go index 035c62f041c7..f397c2ca10f4 100644 --- a/etcdserver/corrupt.go +++ b/etcdserver/corrupt.go @@ -16,14 +16,61 @@ package etcdserver import ( "context" + "fmt" "time" "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/mvcc" "github.com/coreos/etcd/pkg/types" ) +// CheckInitialHashKV compares initial hash values with its peers +// before serving any peer/client traffic. Only mismatch when hashes +// are different at requested revision, with same compact revision. +func (s *EtcdServer) CheckInitialHashKV() error { + if !s.Cfg.InitialCorruptCheck { + return nil + } + + plog.Infof("%s starting initial corruption check with timeout %v...", s.ID(), s.Cfg.ReqTimeout()) + h, rev, crev, err := s.kv.HashByRev(0) + if err != nil { + return fmt.Errorf("%s failed to fetch hash (%v)", s.ID(), err) + } + peers := s.getPeerHashKVs(rev) + mismatch := 0 + for _, p := range peers { + if p.resp != nil { + peerID := types.ID(p.resp.Header.MemberId) + if h != p.resp.Hash { + if crev == p.resp.CompactRevision { + plog.Errorf("%s's hash %d != %s's hash %d (revision %d, peer revision %d, compact revision %d)", s.ID(), h, peerID, p.resp.Hash, rev, p.resp.Header.Revision, crev) + mismatch++ + } else { + plog.Warningf("%s cannot check hash since the compact reversion is different at revision %d (compact revision %d, peer %s compact revision %d)", s.ID(), rev, crev, peerID, p.resp.CompactRevision) + } + } + continue + } + if p.err != nil { + switch p.err { + case rpctypes.ErrFutureRev: + plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: peer is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error()) + case rpctypes.ErrCompacted: + plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: local node is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error()) + } + } + } + if mismatch > 0 { + return fmt.Errorf("%s found data inconsistency with peers", s.ID()) + } + + plog.Infof("%s succeeded on initial corruption checking: no corruption", s.ID()) + return nil +} + func (s *EtcdServer) monitorKVHash() { t := s.Cfg.CorruptCheckTime if t == 0 { @@ -50,7 +97,7 @@ func (s *EtcdServer) checkHashKV() error { if err != nil { plog.Fatalf("failed to hash kv store (%v)", err) } - resps := s.getPeerHashKVs(rev) + peers := s.getPeerHashKVs(rev) ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) err = s.linearizableReadNotify(ctx) @@ -86,24 +133,27 @@ func (s *EtcdServer) checkHashKV() error { mismatch(uint64(s.ID())) } - for _, resp := range resps { - id := resp.Header.MemberId + for _, p := range peers { + if p.resp == nil { + continue + } + id := p.resp.Header.MemberId // leader expects follower's latest revision less than or equal to leader's - if resp.Header.Revision > rev2 { + if p.resp.Header.Revision > rev2 { plog.Warningf( "revision %d from member %v, expected at most %d", - resp.Header.Revision, + p.resp.Header.Revision, types.ID(id), rev2) mismatch(id) } // leader expects follower's latest compact revision less than or equal to leader's - if resp.CompactRevision > crev2 { + if p.resp.CompactRevision > crev2 { plog.Warningf( "compact revision %d from member %v, expected at most %d", - resp.CompactRevision, + p.resp.CompactRevision, types.ID(id), crev2, ) @@ -111,10 +161,10 @@ func (s *EtcdServer) checkHashKV() error { } // follower's compact revision is leader's old one, then hashes must match - if resp.CompactRevision == crev && resp.Hash != h { + if p.resp.CompactRevision == crev && p.resp.Hash != h { plog.Warningf( "hash %d at revision %d from member %v, expected hash %d", - resp.Hash, + p.resp.Hash, rev, types.ID(id), h, @@ -125,36 +175,53 @@ func (s *EtcdServer) checkHashKV() error { return nil } -func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*clientv3.HashKVResponse) { - for _, m := range s.cluster.Members() { +type peerHashKVResp struct { + resp *clientv3.HashKVResponse + err error + eps []string +} + +func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*peerHashKVResp) { + // TODO: handle the case when "s.cluster.Members" have not + // been populated (e.g. no snapshot to load from disk) + mbs := s.cluster.Members() + pURLs := make([][]string, len(mbs)) + for _, m := range mbs { if m.ID == s.ID() { continue } + pURLs = append(pURLs, m.PeerURLs) + } + for _, purls := range pURLs { + if len(purls) == 0 { + continue + } cli, cerr := clientv3.New(clientv3.Config{ DialTimeout: s.Cfg.ReqTimeout(), - Endpoints: m.PeerURLs, + Endpoints: purls, }) if cerr != nil { - plog.Warningf("%s failed to create client to peer %s for hash checking (%q)", s.ID(), types.ID(m.ID), cerr.Error()) + plog.Warningf("%s failed to create client to peer %q for hash checking (%q)", s.ID(), purls, cerr.Error()) continue } respsLen := len(resps) for _, c := range cli.Endpoints() { ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) - resp, herr := cli.HashKV(ctx, c, rev) + var resp *clientv3.HashKVResponse + resp, cerr = cli.HashKV(ctx, c, rev) cancel() - if herr == nil { - cerr = herr - resps = append(resps, resp) + if cerr == nil { + resps = append(resps, &peerHashKVResp{resp: resp}) break } + plog.Warningf("%s hash-kv error %q on peer %q with revision %d", s.ID(), cerr.Error(), c, rev) } cli.Close() if respsLen == len(resps) { - plog.Warningf("%s failed to hash kv for peer %s (%v)", s.ID(), types.ID(m.ID), cerr) + resps = append(resps, &peerHashKVResp{err: cerr, eps: purls}) } } return resps