Skip to content

Commit

Permalink
etcdserver: check ScheduledCompactKeyName and FinishedCompactKeyName
Browse files Browse the repository at this point in the history
before writing hash to hashstore. If they do not match, then it means this compaction is interrupted and its hash value is invalid. In such cases, we won't write the hash values to the hashstore, and avoids the incorrect corruption alarm. See etcd-io#15919.

Also fix some typos and reorder the functions to improve readability.

Signed-off-by: caojiamingalan <[email protected]>
  • Loading branch information
CaojiamingAlan committed May 31, 2023
1 parent 22ecbc0 commit 8601eed
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 16 deletions.
24 changes: 19 additions & 5 deletions server/storage/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,16 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) {
return nil, compactMainRev, nil
}

func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) {
// checkPrevCompactionCompleted checks whether the previous scheduled compaction is completed.
func (s *store) checkPrevCompactionCompleted() bool {
s.b.ReadTx().Lock()
defer s.b.ReadTx().Unlock()
scheduledCompact, scheduledCompactFound := UnsafeReadScheduledCompact(s.b.ReadTx())
finishedCompact, finishedCompactFound := UnsafeReadFinishedCompact(s.b.ReadTx())
return scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound
}

func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) (<-chan struct{}, error) {
ch := make(chan struct{})
j := schedule.NewJob("kvstore_compact", func(ctx context.Context) {
if ctx.Err() != nil {
Expand All @@ -238,7 +247,11 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
s.compactBarrier(context.TODO(), ch)
return
}
s.hashes.Store(hash)
// Only store the hash value if the previous hash is completed, i.e. this compaction
// hashes every revision from last compaction. For more details, see #15919.
if prevCompactionCompleted {
s.hashes.Store(hash)
}
close(ch)
})

Expand All @@ -248,17 +261,18 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
}

func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev)
if err != nil {
return ch, err
}

return s.compact(traceutil.TODO(), rev, prevCompactRev)
return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted)
}

func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
s.mu.Lock()

prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev)
trace.Step("check and update compact revision")
if err != nil {
Expand All @@ -267,7 +281,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
}
s.mu.Unlock()

return s.compact(trace, rev, prevCompactRev)
return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted)
}

func (s *store) Commit() {
Expand Down
4 changes: 4 additions & 0 deletions server/storage/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ func TestStoreCompact(t *testing.T) {
fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}}
key1 := newTestKeyBytes(lg, revision{1, 0}, false)
key2 := newTestKeyBytes(lg, revision{2, 0}, false)
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, [][]byte{[]byte("alice"), []byte("bob")}}

s.Compact(traceutil.TODO(), 3)
Expand All @@ -349,6 +351,8 @@ func TestStoreCompact(t *testing.T) {
end := make([]byte, 8)
binary.BigEndian.PutUint64(end, uint64(4))
wact := []testutil.Action{
{Name: "range", Params: []interface{}{schema.Meta, schema.ScheduledCompactKeyName, []uint8(nil), int64(0)}},
{Name: "range", Params: []interface{}{schema.Meta, schema.FinishedCompactKeyName, []uint8(nil), int64(0)}},
{Name: "put", Params: []interface{}{schema.Meta, schema.ScheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
{Name: "range", Params: []interface{}{schema.Key, make([]byte, 17), end, int64(10000)}},
{Name: "delete", Params: []interface{}{schema.Key, key2}},
Expand Down
18 changes: 9 additions & 9 deletions server/storage/mvcc/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,7 @@ import (
"go.etcd.io/etcd/server/v3/storage/schema"
)

func UnsafeReadFinishedCompact(tx backend.ReadTx) (finishedComact int64, found bool) {
_, finishedCompactBytes := tx.UnsafeRange(schema.Meta, schema.FinishedCompactKeyName, nil, 0)
if len(finishedCompactBytes) != 0 {
return bytesToRev(finishedCompactBytes[0]).main, true
}
return 0, false
}

func UnsafeReadScheduledCompact(tx backend.ReadTx) (scheduledComact int64, found bool) {
func UnsafeReadScheduledCompact(tx backend.ReadTx) (scheduledCompact int64, found bool) {
_, scheduledCompactBytes := tx.UnsafeRange(schema.Meta, schema.ScheduledCompactKeyName, nil, 0)
if len(scheduledCompactBytes) != 0 {
return bytesToRev(scheduledCompactBytes[0]).main, true
Expand All @@ -47,6 +39,14 @@ func UnsafeSetScheduledCompact(tx backend.BatchTx, value int64) {
tx.UnsafePut(schema.Meta, schema.ScheduledCompactKeyName, rbytes)
}

func UnsafeReadFinishedCompact(tx backend.ReadTx) (finishedCompact int64, found bool) {
_, finishedCompactBytes := tx.UnsafeRange(schema.Meta, schema.FinishedCompactKeyName, nil, 0)
if len(finishedCompactBytes) != 0 {
return bytesToRev(finishedCompactBytes[0]).main, true
}
return 0, false
}

func SetFinishedCompact(tx backend.BatchTx, value int64) {
tx.LockInsideApply()
defer tx.Unlock()
Expand Down
2 changes: 0 additions & 2 deletions tests/e2e/corrupt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) {
})

// Add a member to the cluster, assign it a very slow compaction speed, so that its compaction can be interrupted.
//
newConfig := *epc.Cfg
newConfig.CompactionBatchLimit = 1
newConfig.CompactionSleepInterval = 1 * time.Hour
Expand All @@ -231,7 +230,6 @@ func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) {
cc, err := e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsGRPC())
assert.NoError(t, err)
for i := 0; i < 10; i++ {
//use the same key
err := cc.Put(ctx, "key", fmt.Sprint(i), config.PutOptions{})
assert.NoError(t, err, "error on put")
}
Expand Down

0 comments on commit 8601eed

Please sign in to comment.