Skip to content

Commit

Permalink
mvcc: allows mvcc/backend to hash kv state by rev
Browse files Browse the repository at this point in the history
  • Loading branch information
fanminshi committed Jul 14, 2017
1 parent 5acba2b commit 663dec1
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 5 deletions.
17 changes: 15 additions & 2 deletions etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,24 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
}

func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
h, compactRev, err := ms.kg.KV().HashByRev(r.Revision)
var (
h uint32
rev int64
compactRev int64
err error
)

if r.Revision == 0 {
h, rev, err = ms.kg.KV().Hash()
} else {
h, rev, compactRev, err = ms.kg.KV().HashByRev(r.Revision)
}

if err != nil {
return nil, togRPCError(err)
}
resp := &pb.HashResponse{Header: &pb.ResponseHeader{}, Hash: h, CompactRevision: compactRev}

resp := &pb.HashResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h, CompactRevision: compactRev}
ms.hdr.fill(resp.Header)
return resp, nil
}
Expand Down
34 changes: 34 additions & 0 deletions mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type Backend interface {

Snapshot() Snapshot
Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
// HashBucket hashes key value pairs of the given bucket.
// the func f determines whether the key should be hashed or not.
HashBucket(bucket []byte, f func(key []byte) bool) (uint32, error)
// Size returns the current size of the backend.
Size() int64
Defrag() error
Expand Down Expand Up @@ -240,6 +243,37 @@ func (b *backend) Hash(ignores map[IgnoreKey]struct{}) (uint32, error) {
return h.Sum32(), nil
}

func (b *backend) HashBucket(bucket []byte, f func(rev []byte) bool) (hash uint32, err error) {
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))

b.mu.RLock()
defer b.mu.RUnlock()

err = b.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
if b == nil {
return fmt.Errorf("bucket not found %v", string(bucket))
}
h.Write(bucket)
b.ForEach(func(k, v []byte) error {
if !f(k) {
return nil
}

h.Write(k)
h.Write(v)
return nil
})
return nil
})

if err != nil {
return 0, err
}

return h.Sum32(), nil
}

func (b *backend) Size() int64 {
return atomic.LoadInt64(&b.size)
}
Expand Down
6 changes: 4 additions & 2 deletions mvcc/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,12 @@ type KV interface {
// Write creates a write transaction.
Write() TxnWrite

// Hash retrieves the hash of KV state and revision.
// This method is designed for consistency checking purposes.
// Hash computes the hash of the KV's backend.
Hash() (hash uint32, revision int64, err error)

// HashByRev computes the hash of all MVCC keys up to a given revision.
HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error)

// Compact frees all superseded keys with revisions less than rev.
Compact(rev int64) (<-chan struct{}, error)

Expand Down
44 changes: 43 additions & 1 deletion mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,15 @@ type store struct {
fifoSched schedule.Scheduler

stopc chan struct{}

compactc chan struct{}
}

// NewStore returns a new store. It is useful to create a store inside
// mvcc pkg. It should only be used for testing externally.
func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *store {
closedc := make(chan struct{})
close(closedc)
s := &store{
b: b,
ig: ig,
Expand All @@ -116,7 +120,8 @@ func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *sto
bytesBuf8: make([]byte, 8),
fifoSched: schedule.NewFIFOScheduler(),

stopc: make(chan struct{}),
stopc: make(chan struct{}),
compactc: closedc,
}
s.ReadView = &readView{s}
s.WriteView = &writeView{s}
Expand Down Expand Up @@ -160,6 +165,42 @@ func (s *store) Hash() (hash uint32, revision int64, err error) {
return h, s.currentRev, err
}

func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) {
for {
s.mu.RLock()
compactMainRev := s.compactMainRev
currentRev = s.currentRev
compactc := s.compactc
s.mu.RUnlock()
if rev < compactMainRev {
return 0, 0, compactMainRev, ErrCompacted
} else if rev > currentRev {
return 0, currentRev, 0, ErrFutureRev
}
select {
case <-compactc:
case <-s.stopc:
return 0, 0, 0, ErrClosed
}
s.mu.Lock()
if s.compactc == compactc {
defer s.mu.Unlock()
break
}
s.mu.Unlock()
}
s.b.ForceCommit()

upper := revision{main: rev + 1}
f := func(r []byte) bool {
return upper.GreaterThan(bytesToRev(r))
}

hash, err = s.b.HashBucket(keyBucketName, f)

return hash, s.currentRev, s.compactMainRev, err
}

func (s *store) Compact(rev int64) (<-chan struct{}, error) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down Expand Up @@ -192,6 +233,7 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {

keep := s.kvindex.Compact(rev)
ch := make(chan struct{})
s.compactc = ch
var j = func(ctx context.Context) {
if ctx.Err() != nil {
s.compactBarrier(ctx, ch)
Expand Down
1 change: 1 addition & 0 deletions mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ type fakeBackend struct {
func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx }
func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx }
func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil }
func (b *fakeBackend) HashBucket([]byte, func([]byte) bool) (uint32, error) { return 0, nil }
func (b *fakeBackend) Size() int64 { return 0 }
func (b *fakeBackend) Snapshot() backend.Snapshot { return nil }
func (b *fakeBackend) ForceCommit() {}
Expand Down

0 comments on commit 663dec1

Please sign in to comment.