Skip to content

Commit

Permalink
Fix data corruption (shard.go) (allegro#119)
Browse files Browse the repository at this point in the history
This PR contains a testcase which demonstrates corruption (intermittently). Sometimes leading to `panic`, and sometimes leading to data corruption of values. 

I thought that fixing the datarace suggested in allegro#117 would solve it, but it seems I was wrong about that, there's some other underlying bug. I'll push a commit on top if I find it.
  • Loading branch information
holiman authored and cristaloleg committed Feb 18, 2019
1 parent 270c79f commit 84c0185
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 8 deletions.
63 changes: 63 additions & 0 deletions bigcache_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package bigcache

import (
"bytes"
"fmt"
"math/rand"
"runtime"
"sync"
"testing"
Expand Down Expand Up @@ -358,6 +360,67 @@ func TestCacheDel(t *testing.T) {
assert.Len(t, cachedValue, 0)
}

// TestCacheDelRandomly does simultaneous deletes, puts and gets, to check for corruption errors.
func TestCacheDelRandomly(t *testing.T) {
t.Parallel()
c := Config{
Shards: 1,
LifeWindow: time.Second,
CleanWindow: 0,
MaxEntriesInWindow: 10,
MaxEntrySize: 10,
Verbose: true,
Hasher: newDefaultHasher(),
HardMaxCacheSize: 1,
Logger: DefaultLogger(),
}
//c.Hasher = hashStub(5)
cache, _ := NewBigCache(c)
var wg sync.WaitGroup
var ntest = 800000
wg.Add(1)
go func() {
for i := 0; i < ntest; i++ {
r := uint8(rand.Int())
key := fmt.Sprintf("thekey%d", r)

cache.Delete(key)
}
wg.Done()
}()
wg.Add(1)
go func() {
val := make([]byte, 1024)
for i := 0; i < ntest; i++ {
r := byte(rand.Int())
key := fmt.Sprintf("thekey%d", r)

for j := 0; j < len(val); j++ {
val[j] = r
}
cache.Set(key, val)
}
wg.Done()
}()
wg.Add(1)
go func() {
val := make([]byte, 1024)
for i := 0; i < ntest; i++ {
r := byte(rand.Int())
key := fmt.Sprintf("thekey%d", r)

for j := 0; j < len(val); j++ {
val[j] = r
}
if got, err := cache.Get(key); err == nil && !bytes.Equal(got, val) {
t.Errorf("got %s ->\n %x\n expected:\n %x\n ", key, got, val)
}
}
wg.Done()
}()
wg.Wait()
}

func TestCacheReset(t *testing.T) {
t.Parallel()

Expand Down
34 changes: 31 additions & 3 deletions queue/bytes_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ const (
minimumEmptyBlobSize = 32 + headerEntrySize
)

var (
errEmptyQueue = &queueError{"Empty queue"}
errInvalidIndex = &queueError{"Index must be greater than zero. Invalid index."}
errIndexOutOfBounds = &queueError{"Index out of range"}
)

// BytesQueue is a non-thread safe queue type of fifo based on bytes array.
// For every push operation index of entry is returned. It can be used to read the entry later
type BytesQueue struct {
Expand Down Expand Up @@ -162,6 +168,11 @@ func (q *BytesQueue) Get(index int) ([]byte, error) {
return data, err
}

// CheckGet checks if an entry can be read from index
func (q *BytesQueue) CheckGet(index int) error {
return q.peekCheckErr(index)
}

// Capacity returns number of allocated bytes for queue
func (q *BytesQueue) Capacity() int {
return q.capacity
Expand All @@ -177,18 +188,35 @@ func (e *queueError) Error() string {
return e.message
}

// peekCheckErr is identical to peek, but does not actually return any data
func (q *BytesQueue) peekCheckErr(index int) error {

if q.count == 0 {
return errEmptyQueue
}

if index <= 0 {
return errInvalidIndex
}

if index+headerEntrySize >= len(q.array) {
return errIndexOutOfBounds
}
return nil
}

func (q *BytesQueue) peek(index int) ([]byte, int, error) {

if q.count == 0 {
return nil, 0, &queueError{"Empty queue"}
return nil, 0, errEmptyQueue
}

if index <= 0 {
return nil, 0, &queueError{"Index must be grater than zero. Invalid index."}
return nil, 0, errInvalidIndex
}

if index+headerEntrySize >= len(q.array) {
return nil, 0, &queueError{"Index out of range"}
return nil, 0, errIndexOutOfBounds
}

blockSize := int(binary.LittleEndian.Uint32(q.array[index : index+headerEntrySize]))
Expand Down
13 changes: 11 additions & 2 deletions queue/bytes_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,19 @@ func TestPeek(t *testing.T) {

// when
read, err := queue.Peek()

err2 := queue.peekCheckErr(queue.head)
// then
assert.Equal(t, err, err2)
assert.EqualError(t, err, "Empty queue")
assert.Nil(t, read)

// when
queue.Push(entry)
read, err = queue.Peek()
err2 = queue.peekCheckErr(queue.head)

// then
assert.Equal(t, err, err2)
assert.NoError(t, err)
assert.Equal(t, pop(queue), read)
assert.Equal(t, entry, read)
Expand Down Expand Up @@ -286,10 +289,12 @@ func TestGetEntryFromInvalidIndex(t *testing.T) {

// when
result, err := queue.Get(0)
err2 := queue.CheckGet(0)

// then
assert.Equal(t, err, err2)
assert.Nil(t, result)
assert.EqualError(t, err, "Index must be grater than zero. Invalid index.")
assert.EqualError(t, err, "Index must be greater than zero. Invalid index.")
}

func TestGetEntryFromIndexOutOfRange(t *testing.T) {
Expand All @@ -301,8 +306,10 @@ func TestGetEntryFromIndexOutOfRange(t *testing.T) {

// when
result, err := queue.Get(42)
err2 := queue.CheckGet(42)

// then
assert.Equal(t, err, err2)
assert.Nil(t, result)
assert.EqualError(t, err, "Index out of range")
}
Expand All @@ -315,8 +322,10 @@ func TestGetEntryFromEmptyQueue(t *testing.T) {

// when
result, err := queue.Get(1)
err2 := queue.CheckGet(1)

// then
assert.Equal(t, err, err2)
assert.Nil(t, result)
assert.EqualError(t, err, "Empty queue")
}
Expand Down
26 changes: 23 additions & 3 deletions shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ func (s *cacheShard) get(key string, hashedKey uint64) ([]byte, error) {
s.collision()
return nil, ErrEntryNotFound
}
entry := readEntry(wrappedEntry)
s.lock.RUnlock()
s.hit()
return readEntry(wrappedEntry), nil
return entry, nil
}

func (s *cacheShard) set(key string, hashedKey uint64, entry []byte) error {
Expand Down Expand Up @@ -85,6 +86,7 @@ func (s *cacheShard) set(key string, hashedKey uint64, entry []byte) error {
}

func (s *cacheShard) del(key string, hashedKey uint64) error {
// Optimistic pre-check using only readlock
s.lock.RLock()
itemIndex := s.hashmap[hashedKey]

Expand All @@ -94,8 +96,7 @@ func (s *cacheShard) del(key string, hashedKey uint64) error {
return ErrEntryNotFound
}

wrappedEntry, err := s.entries.Get(int(itemIndex))
if err != nil {
if err := s.entries.CheckGet(int(itemIndex)); err != nil {
s.lock.RUnlock()
s.delmiss()
return err
Expand All @@ -104,6 +105,23 @@ func (s *cacheShard) del(key string, hashedKey uint64) error {

s.lock.Lock()
{
// After obtaining the writelock, we need to read the same again,
// since the data delivered earlier may be stale now
itemIndex = s.hashmap[hashedKey]

if itemIndex == 0 {
s.lock.Unlock()
s.delmiss()
return ErrEntryNotFound
}

wrappedEntry, err := s.entries.Get(int(itemIndex))
if err != nil {
s.lock.Unlock()
s.delmiss()
return err
}

delete(s.hashmap, hashedKey)
s.onRemove(wrappedEntry, Deleted)
resetKeyFromEntry(wrappedEntry)
Expand Down Expand Up @@ -136,6 +154,8 @@ func (s *cacheShard) cleanUp(currentTimestamp uint64) {
}

func (s *cacheShard) getOldestEntry() ([]byte, error) {
s.lock.RLock()
defer s.lock.RUnlock()
return s.entries.Peek()
}

Expand Down

0 comments on commit 84c0185

Please sign in to comment.