Skip to content

Commit

Permalink
Merge pull request #37064 from ajwerner/backport19.1-37055
Browse files Browse the repository at this point in the history
release-19.1: storage: drop raftentry.Cache data in applySnapshot
  • Loading branch information
ajwerner authored Apr 24, 2019
2 parents 7109d29 + 0071acd commit 25dd36f
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 22 deletions.
25 changes: 19 additions & 6 deletions pkg/storage/raftentry/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type partition struct {
next, prev *partition // accessed under Cache.mu
}

var partitionSize = int32(unsafe.Sizeof(partition{}))
const partitionSize = int32(unsafe.Sizeof(partition{}))

// rangeCache represents the interface that the partition uses.
// It is never explicitly used but a new implementation to replace ringBuf must
Expand Down Expand Up @@ -131,6 +131,16 @@ func (c *Cache) Metrics() Metrics {
return c.metrics
}

// Drop drops all cached entries associated with the specified range.
func (c *Cache) Drop(id roachpb.RangeID) {
c.mu.Lock()
defer c.mu.Unlock()
p := c.getPartLocked(id, false /* create */, false /* recordUse */)
if p != nil {
c.updateGauges(c.evictPartitionLocked(p))
}
}

// Add inserts ents into the cache. If truncate is true, the method also removes
// all entries with indices equal to or greater than the indices of the entries
// provided. ents is expected to consist of entries with a contiguous sequence
Expand Down Expand Up @@ -269,14 +279,17 @@ func (c *Cache) getPartLocked(id roachpb.RangeID, create, recordUse bool) *parti
func (c *Cache) evictLocked(toAdd int32) {
bytes := c.addBytes(toAdd)
for bytes > c.maxBytes && len(c.parts) > 0 {
p := c.lru.remove(c.lru.back())
pBytes, pEntries := p.evict()
c.addEntries(-1 * pEntries)
bytes = c.addBytes(-1 * pBytes)
delete(c.parts, p.id)
bytes, _ = c.evictPartitionLocked(c.lru.back())
}
}

func (c *Cache) evictPartitionLocked(p *partition) (updatedBytes, updatedEntries int32) {
delete(c.parts, p.id)
c.lru.remove(p)
pBytes, pEntries := p.evict()
return c.addBytes(-1 * pBytes), c.addEntries(-1 * pEntries)
}

// recordUpdate adjusts the partition and cache bookkeeping to account for the
// changes which actually occurred in an update relative to the guess made
// before the update.
Expand Down
73 changes: 57 additions & 16 deletions pkg/storage/raftentry/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,28 @@ func TestAddAndTruncate(t *testing.T) {
verifyMetrics(t, c, 4, 36+int64(partitionSize))
}

func TestDrop(t *testing.T) {
defer leaktest.AfterTest(t)()
const (
r1 roachpb.RangeID = 1
r2 roachpb.RangeID = 2

sizeOf9Entries = 81
partitionSize = int64(sizeOf9Entries + partitionSize)
)
c := NewCache(1 << 10)
ents1 := addEntries(c, r1, 1, 10)
verifyGet(t, c, r1, 1, 10, ents1, 10, false)
verifyMetrics(t, c, 9, partitionSize)
ents2 := addEntries(c, r2, 1, 10)
verifyGet(t, c, r2, 1, 10, ents2, 10, false)
verifyMetrics(t, c, 18, 2*partitionSize)
c.Drop(r1)
verifyMetrics(t, c, 9, partitionSize)
c.Drop(r2)
verifyMetrics(t, c, 0, 0)
}

func TestCacheLaterEntries(t *testing.T) {
c := NewCache(1000)
rangeID := roachpb.RangeID(1)
Expand Down Expand Up @@ -337,7 +359,6 @@ func TestConcurrentEvictions(t *testing.T) {
}
c.Clear(r, data[len(data)-1].Index+1)
}

verifyMetrics(t, c, 0, int64(len(c.parts))*int64(partitionSize))
}

Expand Down Expand Up @@ -428,24 +449,44 @@ func TestEntryCacheEviction(t *testing.T) {
func TestConcurrentUpdates(t *testing.T) {
defer leaktest.AfterTest(t)()
c := NewCache(10000)
const r1 roachpb.RangeID = 1
ents := []raftpb.Entry{newEntry(20, 35), newEntry(21, 35)}
var wg sync.WaitGroup
// NB: N is chosen based on the race detector's limit of 8128 goroutines.
const N = 8000
wg.Add(N)
for i := 0; i < N; i++ {
go func(i int) {
if i%2 == 1 {
c.Add(1, ents, true)
} else {
c.Clear(1, 22)
// Test using both Clear and Drop to remove the added entries.
for _, clearMethod := range []struct {
name string
clear func()
}{
{"drop", func() { c.Drop(r1) }},
{"clear", func() { c.Clear(r1, ents[len(ents)-1].Index+1) }},
} {
t.Run(clearMethod.name, func(t *testing.T) {
// NB: N is chosen based on the race detector's limit of 8128 goroutines.
const N = 8000
var wg sync.WaitGroup
wg.Add(N)
for i := 0; i < N; i++ {
go func(i int) {
if i%2 == 1 {
c.Add(r1, ents, true)
} else {
clearMethod.clear()
}
wg.Done()
}(i)
}
wg.Done()
}(i)
wg.Wait()
clearMethod.clear()
// Clear does not evict the partition struct itself so we expect the cache
// to have a partition's initial byte size when using Clear and nothing
// when using Drop.
switch clearMethod.name {
case "drop":
verifyMetrics(t, c, 0, 0)
case "clear":
verifyMetrics(t, c, 0, int64(initialSize.bytes()))
}
})
}
wg.Wait()
c.Clear(1, 22)
verifyMetrics(t, c, 0, int64(initialSize.bytes()))
}

func TestPartitionList(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,9 @@ func (r *Replica) applySnapshot(
if err := clearRangeData(ctx, s.Desc, r.store.Engine(), batch, true /* destroyData */); err != nil {
return err
}
// Clear the cached raft log entries to ensure that old or uncommitted
// entries don't impact the in-memory state.
r.store.raftEntryCache.Drop(r.RangeID)
stats.clear = timeutil.Now()

// Write the snapshot into the range.
Expand Down

0 comments on commit 25dd36f

Please sign in to comment.