Skip to content

Commit

Permalink
etcdserver: check ScheduledCompactKeyName and FinishedCompactKeyName
Browse files Browse the repository at this point in the history
before writing hash to hashstore. If they do not match, then it means this compaction is interrupted and its hash value is invalid. In such cases, we won't write the hash values to the hashstore, and avoids the incorrect corruption alarm. See etcd-io#15919.

Also fix some typos and reorder the functions to improve readability.

Signed-off-by: caojiamingalan <[email protected]>
  • Loading branch information
CaojiamingAlan committed May 31, 2023
1 parent 22ecbc0 commit 6dd5b8e
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 14 deletions.
24 changes: 19 additions & 5 deletions server/storage/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,16 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) {
return nil, compactMainRev, nil
}

func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) {
// checkPrevCompactionCompleted checks whether the previous scheduled compaction is completed.
func (s *store) checkPrevCompactionCompleted() bool {
s.b.ReadTx().Lock()
defer s.b.ReadTx().Unlock()
scheduledCompact, scheduledCompactFound := UnsafeReadScheduledCompact(s.b.ReadTx())
finishedCompact, finishedCompactFound := UnsafeReadFinishedCompact(s.b.ReadTx())
return scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound
}

func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) (<-chan struct{}, error) {
ch := make(chan struct{})
j := schedule.NewJob("kvstore_compact", func(ctx context.Context) {
if ctx.Err() != nil {
Expand All @@ -238,7 +247,11 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
s.compactBarrier(context.TODO(), ch)
return
}
s.hashes.Store(hash)
// Only store the hash value is the previous hash is completed, i.e. this compaction
// hashes every revision from last compaction. For more details, see #15919.
if prevCompactionCompleted {
s.hashes.Store(hash)
}
close(ch)
})

Expand All @@ -248,17 +261,18 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
}

func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev)
if err != nil {
return ch, err
}

return s.compact(traceutil.TODO(), rev, prevCompactRev)
return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted)
}

func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
s.mu.Lock()

prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev)
trace.Step("check and update compact revision")
if err != nil {
Expand All @@ -267,7 +281,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
}
s.mu.Unlock()

return s.compact(trace, rev, prevCompactRev)
return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted)
}

func (s *store) Commit() {
Expand Down
18 changes: 9 additions & 9 deletions server/storage/mvcc/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,7 @@ import (
"go.etcd.io/etcd/server/v3/storage/schema"
)

func UnsafeReadFinishedCompact(tx backend.ReadTx) (finishedComact int64, found bool) {
_, finishedCompactBytes := tx.UnsafeRange(schema.Meta, schema.FinishedCompactKeyName, nil, 0)
if len(finishedCompactBytes) != 0 {
return bytesToRev(finishedCompactBytes[0]).main, true
}
return 0, false
}

func UnsafeReadScheduledCompact(tx backend.ReadTx) (scheduledComact int64, found bool) {
func UnsafeReadScheduledCompact(tx backend.ReadTx) (scheduledCompact int64, found bool) {
_, scheduledCompactBytes := tx.UnsafeRange(schema.Meta, schema.ScheduledCompactKeyName, nil, 0)
if len(scheduledCompactBytes) != 0 {
return bytesToRev(scheduledCompactBytes[0]).main, true
Expand All @@ -47,6 +39,14 @@ func UnsafeSetScheduledCompact(tx backend.BatchTx, value int64) {
tx.UnsafePut(schema.Meta, schema.ScheduledCompactKeyName, rbytes)
}

func UnsafeReadFinishedCompact(tx backend.ReadTx) (finishedCompact int64, found bool) {
_, finishedCompactBytes := tx.UnsafeRange(schema.Meta, schema.FinishedCompactKeyName, nil, 0)
if len(finishedCompactBytes) != 0 {
return bytesToRev(finishedCompactBytes[0]).main, true
}
return 0, false
}

func SetFinishedCompact(tx backend.BatchTx, value int64) {
tx.LockInsideApply()
defer tx.Unlock()
Expand Down

0 comments on commit 6dd5b8e

Please sign in to comment.