Skip to content

Commit

Permalink
mvcc: Clone the key index for compaction and lock on each item
Browse files Browse the repository at this point in the history
For compaction, clone the original Btree for traversal purposes, so as to
not hold the lock for the duration of compaction. This allows read/write
throughput by not blocking when the index tree is large (> 1M entries).

mvcc: add comment for index compaction lock
mvcc: explicitly unlock store to do index compaction synchronously
mvcc: formatting index bench
mvcc: add release note for index compaction changes
mvcc: add license header
  • Loading branch information
braintreeps authored and gyuho committed Apr 18, 2018
1 parent d0847f4 commit f176427
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-3.4.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.3.0...v3.4.0) and [
- e.g. a node is removed from cluster, or [`raftpb.MsgProp` arrives at current leader while there is an ongoing leadership transfer](https://github.com/coreos/etcd/issues/8975).
- Add [`snapshot`](https://github.com/coreos/etcd/pull/9118) package for easier snapshot workflow (see [`godoc.org/github.com/etcd/snapshot`](https://godoc.org/github.com/coreos/etcd/snapshot) for more).
- Improve [functional tester](https://github.com/coreos/etcd/tree/master/functional) coverage: [proxy layer to run network fault tests in CI](https://github.com/coreos/etcd/pull/9081), [TLS is enabled both for server and client](https://github.com/coreos/etcd/pull/9534), [liveness mode](https://github.com/coreos/etcd/issues/9230), [shuffle test sequence](https://github.com/coreos/etcd/issues/9381), [membership reconfiguration failure cases](https://github.com/coreos/etcd/pull/9564), [disastrous quorum loss and snapshot recover from a seed member](https://github.com/coreos/etcd/pull/9565), [embedded etcd](https://github.com/coreos/etcd/pull/9572).
- Improve [index compaction blocking](https://github.com/coreos/etcd/pull/9511) by using a copy on write clone to avoid holding the lock for the traversal of the entire index.

### Breaking Changes

Expand Down
44 changes: 20 additions & 24 deletions mvcc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,27 +185,34 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {

func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
available := make(map[revision]struct{})
var emptyki []*keyIndex
if ti.lg != nil {
ti.lg.Info("compact tree index", zap.Int64("revision", rev))
} else {
plog.Printf("store.index: compact %d", rev)
}
// TODO: do not hold the lock for long time?
// This is probably OK. Compacting 10M keys takes O(10ms).
ti.Lock()
defer ti.Unlock()
ti.tree.Ascend(compactIndex(rev, available, &emptyki))
for _, ki := range emptyki {
item := ti.tree.Delete(ki)
if item == nil {
if ti.lg != nil {
ti.lg.Panic("failed to delete during compaction")
} else {
plog.Panic("store.index: unexpected delete failure during compaction")
clone := ti.tree.Clone()
ti.Unlock()

clone.Ascend(func(item btree.Item) bool {
keyi := item.(*keyIndex)
//Lock is needed here to prevent modification to the keyIndex while
//compaction is going on or revision added to empty before deletion
ti.Lock()
keyi.compact(rev, available)
if keyi.isEmpty() {
item := ti.tree.Delete(keyi)
if item == nil {
if ti.lg != nil {
ti.lg.Panic("failed to delete during compaction")
} else {
plog.Panic("store.index: unexpected delete failure during compaction")
}
}
}
}
ti.Unlock()
return true
})
return available
}

Expand All @@ -222,17 +229,6 @@ func (ti *treeIndex) Keep(rev int64) map[revision]struct{} {
return available
}

func compactIndex(rev int64, available map[revision]struct{}, emptyki *[]*keyIndex) func(i btree.Item) bool {
return func(i btree.Item) bool {
keyi := i.(*keyIndex)
keyi.compact(rev, available)
if keyi.isEmpty() {
*emptyki = append(*emptyki, keyi)
}
return true
}
}

func (ti *treeIndex) Equal(bi index) bool {
b := bi.(*treeIndex)

Expand Down
42 changes: 42 additions & 0 deletions mvcc/index_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mvcc

import (
"testing"

"go.uber.org/zap"
)

func BenchmarkIndexCompact1(b *testing.B) { benchmarkIndexCompact(b, 1) }
func BenchmarkIndexCompact100(b *testing.B) { benchmarkIndexCompact(b, 100) }
func BenchmarkIndexCompact10000(b *testing.B) { benchmarkIndexCompact(b, 10000) }
func BenchmarkIndexCompact100000(b *testing.B) { benchmarkIndexCompact(b, 100000) }
func BenchmarkIndexCompact1000000(b *testing.B) { benchmarkIndexCompact(b, 1000000) }

func benchmarkIndexCompact(b *testing.B, size int) {
log := zap.NewNop()
kvindex := newTreeIndex(log)

bytesN := 64
keys := createBytesSlice(bytesN, size)
for i := 1; i < size; i++ {
kvindex.Put(keys[i], revision{main: int64(i), sub: int64(i)})
}
b.ResetTimer()
for i := 1; i < b.N; i++ {
kvindex.Compact(int64(i))
}
}
9 changes: 6 additions & 3 deletions mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,17 +217,18 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev

func (s *store) Compact(rev int64) (<-chan struct{}, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.revMu.Lock()
defer s.revMu.Unlock()

if rev <= s.compactMainRev {
ch := make(chan struct{})
f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
s.fifoSched.Schedule(f)
s.mu.Unlock()
s.revMu.Unlock()
return ch, ErrCompacted
}
if rev > s.currentRev {
s.mu.Unlock()
s.revMu.Unlock()
return nil, ErrFutureRev
}

Expand All @@ -245,6 +246,8 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
// ensure that desired compaction is persisted
s.b.ForceCommit()

s.mu.Unlock()
s.revMu.Unlock()
keep := s.kvindex.Compact(rev)
ch := make(chan struct{})
var j = func(ctx context.Context) {
Expand Down

0 comments on commit f176427

Please sign in to comment.