Skip to content

Commit

Permalink
fix(tsm1): fix data race and validation in cache ring (#20843)
Browse files Browse the repository at this point in the history

Co-authored-by: Yun Zhao <[email protected]>
Co-authored-by: Sam Arnold <[email protected]>
  • Loading branch information
3 people authored Mar 9, 2021
1 parent 009e434 commit 9266b6c
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 163 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ the endpoint has been removed. Use the `/metrics` endpoint to collect system sta
1. [20837](https://github.com/influxdata/influxdb/pull/20837): Fix use-after-free bug in series ID iterator. Thanks @foobar!
1. [20834](https://github.com/influxdata/influxdb/pull/20834): Fix InfluxDB port in Flux function UI examples. Thanks @sunjincheng121!
1. [20833](https://github.com/influxdata/influxdb/pull/20833): Fix Single Stat graphs with thresholds crashing on negative values.
1. [20843](https://github.com/influxdata/influxdb/pull/20843): Fix data race in TSM cache. Thanks @StoneYunZhao!

## v2.0.4 [2021-02-08]
----------------------
Expand Down
34 changes: 0 additions & 34 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ const (
type storer interface {
entry(key []byte) *entry // Get an entry by its key.
write(key []byte, values Values) (bool, error) // Write an entry to the store.
add(key []byte, entry *entry) // Add a new entry to the store.
remove(key []byte) // Remove an entry from the store.
keys(sorted bool) [][]byte // Return an optionally sorted slice of entry keys.
apply(f func([]byte, *entry) error) error // Apply f to all entries in the store in parallel.
Expand Down Expand Up @@ -280,38 +279,6 @@ func (c *Cache) Free() {
c.mu.Unlock()
}

// Write writes the set of values for the key to the cache. This function is goroutine-safe.
// It returns an error if the cache will exceed its max size by adding the new values.
func (c *Cache) Write(key []byte, values []Value) error {
c.init()
addedSize := uint64(Values(values).Size())

// Enough room in the cache?
limit := c.maxSize
n := c.Size() + addedSize

if limit > 0 && n > limit {
atomic.AddInt64(&c.stats.WriteErr, 1)
return ErrCacheMemorySizeLimitExceeded(n, limit)
}

newKey, err := c.store.write(key, values)
if err != nil {
atomic.AddInt64(&c.stats.WriteErr, 1)
return err
}

if newKey {
addedSize += uint64(len(key))
}
// Update the cache size and the memory size stat.
c.increaseSize(addedSize)
c.updateMemSize(int64(addedSize))
atomic.AddInt64(&c.stats.WriteOK, 1)

return nil
}

// WriteMulti writes the map of keys and associated values to the cache. This
// function is goroutine-safe. It returns an error if the cache will exceeded
// its max size by adding the new values. The write attempts to write as many
Expand Down Expand Up @@ -833,7 +800,6 @@ type emptyStore struct{}

func (e emptyStore) entry(key []byte) *entry { return nil }
func (e emptyStore) write(key []byte, values Values) (bool, error) { return false, nil }
func (e emptyStore) add(key []byte, entry *entry) {}
func (e emptyStore) remove(key []byte) {}
func (e emptyStore) keys(sorted bool) [][]byte { return nil }
func (e emptyStore) apply(f func([]byte, *entry) error) error { return nil }
Expand Down
52 changes: 5 additions & 47 deletions tsdb/engine/tsm1/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ import (
"github.com/golang/snappy"
)

// Convenience method for testing.
func (c *Cache) Write(key []byte, values []Value) error {
return c.WriteMulti(map[string][]Value{string(key): values})
}

func TestCache_NewCache(t *testing.T) {
c := NewCache(100)
if c == nil {
Expand All @@ -35,51 +40,6 @@ func TestCache_NewCache(t *testing.T) {
}
}

func TestCache_CacheWrite(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, 2.0)
v2 := NewValue(3, 3.0)
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())

c := NewCache(3 * valuesSize)

if err := c.Write([]byte("foo"), values); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}
if err := c.Write([]byte("bar"), values); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}
if n := c.Size(); n != 2*valuesSize+6 {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", 2*valuesSize, n)
}

if exp, keys := [][]byte{[]byte("bar"), []byte("foo")}, c.Keys(); !reflect.DeepEqual(keys, exp) {
t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys)
}
}

func TestCache_CacheWrite_TypeConflict(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, int(64))
values := Values{v0, v1}
valuesSize := v0.Size() + v1.Size()

c := NewCache(uint64(2 * valuesSize))

if err := c.Write([]byte("foo"), values[:1]); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}

if err := c.Write([]byte("foo"), values[1:]); err == nil {
t.Fatalf("expected field type conflict")
}

if exp, got := uint64(v0.Size())+3, c.Size(); exp != got {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", exp, got)
}
}

func TestCache_CacheWriteMulti(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, 2.0)
Expand Down Expand Up @@ -873,7 +833,6 @@ func mustMarshalEntry(entry WALEntry) (WalEntryType, []byte) {
type TestStore struct {
entryf func(key []byte) *entry
writef func(key []byte, values Values) (bool, error)
addf func(key []byte, entry *entry)
removef func(key []byte)
keysf func(sorted bool) [][]byte
applyf func(f func([]byte, *entry) error) error
Expand All @@ -886,7 +845,6 @@ type TestStore struct {
func NewTestStore() *TestStore { return &TestStore{} }
func (s *TestStore) entry(key []byte) *entry { return s.entryf(key) }
func (s *TestStore) write(key []byte, values Values) (bool, error) { return s.writef(key, values) }
func (s *TestStore) add(key []byte, entry *entry) { s.addf(key, entry) }
func (s *TestStore) remove(key []byte) { s.removef(key) }
func (s *TestStore) keys(sorted bool) [][]byte { return s.keysf(sorted) }
func (s *TestStore) apply(f func([]byte, *entry) error) error { return s.applyf(f) }
Expand Down
41 changes: 10 additions & 31 deletions tsdb/engine/tsm1/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package tsm1
import (
"fmt"
"sync"
"sync/atomic"

"github.com/cespare/xxhash"
"github.com/influxdata/influxdb/v2/pkg/bytesutil"
Expand All @@ -20,24 +19,18 @@ const partitions = 16
// ring is implemented as a crude hash ring, in so much that you can have
// variable numbers of members in the ring, and the appropriate member for a
// given series key can always consistently be found. Unlike a true hash ring
// though, this ring is not resizeable—there must be at most 256 members in the
// though, this ring is not resizeable—there must be at most 16 members in the
// ring, and the number of members must always be a power of 2.
//
// ring works as follows: Each member of the ring contains a single store, which
// contains a map of series keys to entries. A ring always has 256 partitions,
// contains a map of series keys to entries. A ring always has 16 partitions,
// and a member takes up one or more of these partitions (depending on how many
// members are specified to be in the ring)
//
// To determine the partition that a series key should be added to, the series
// key is hashed and the first 8 bits are used as an index to the ring.
//
type ring struct {
// Number of keys within the ring. This is used to provide a hint for
// allocating the return values in keys(). It will not be perfectly accurate
// since it doesn't consider adding duplicate keys, or trying to remove non-
// existent keys.
keysHint int64

// The unique set of partitions in the ring.
// len(partitions) <= len(continuum)
partitions []*partition
Expand All @@ -47,11 +40,14 @@ type ring struct {
// power of 2, and for performance reasons should be larger than the number of
// cores on the host. The supported set of values for n is:
//
// {1, 2, 4, 8, 16, 32, 64, 128, 256}.
// {1, 2, 4, 8, 16}.
//
func newring(n int) (*ring, error) {
if n <= 0 || n > partitions {
return nil, fmt.Errorf("invalid number of paritions: %d", n)
return nil, fmt.Errorf("invalid number of partitions: %d", n)
}
if n&(n-1) != 0 {
return nil, fmt.Errorf("partitions %d is not a power of two", n)
}

r := ring{
Expand All @@ -78,7 +74,6 @@ func (r *ring) reset() {
for _, partition := range r.partitions {
partition.reset()
}
r.keysHint = 0
}

// getPartition retrieves the hash ring partition associated with the provided
Expand All @@ -100,25 +95,16 @@ func (r *ring) write(key []byte, values Values) (bool, error) {
return r.getPartition(key).write(key, values)
}

// add adds an entry to the ring.
func (r *ring) add(key []byte, entry *entry) {
r.getPartition(key).add(key, entry)
atomic.AddInt64(&r.keysHint, 1)
}

// remove deletes the entry for the given key.
// remove is safe for use by multiple goroutines.
func (r *ring) remove(key []byte) {
r.getPartition(key).remove(key)
if r.keysHint > 0 {
atomic.AddInt64(&r.keysHint, -1)
}
}

// keys returns all the keys from all partitions in the hash ring. The returned
// keys will be in order if sorted is true.
func (r *ring) keys(sorted bool) [][]byte {
keys := make([][]byte, 0, atomic.LoadInt64(&r.keysHint))
keys := make([][]byte, 0)
for _, p := range r.partitions {
keys = append(keys, p.keys()...)
}
Expand All @@ -129,6 +115,8 @@ func (r *ring) keys(sorted bool) [][]byte {
return keys
}

// count returns the number of values in the ring
// count is not accurate since it doesn't use read lock when iterating over partitions
func (r *ring) count() int {
var n int
for _, p := range r.partitions {
Expand Down Expand Up @@ -202,7 +190,6 @@ func (r *ring) applySerial(f func([]byte, *entry) error) error {
}

func (r *ring) split(n int) []storer {
var keys int
storers := make([]storer, n)
for i := 0; i < n; i++ {
storers[i], _ = newring(len(r.partitions))
Expand All @@ -211,7 +198,6 @@ func (r *ring) split(n int) []storer {
for i, p := range r.partitions {
r := storers[i%n].(*ring)
r.partitions[i] = p
keys += len(p.store)
}
return storers
}
Expand Down Expand Up @@ -261,13 +247,6 @@ func (p *partition) write(key []byte, values Values) (bool, error) {
return true, nil
}

// add adds a new entry for key to the partition.
func (p *partition) add(key []byte, entry *entry) {
p.mu.Lock()
p.store[string(key)] = entry
p.mu.Unlock()
}

// remove deletes the entry associated with the provided key.
// remove is safe for use by multiple goroutines.
func (p *partition) remove(key []byte) {
Expand Down
Loading

0 comments on commit 9266b6c

Please sign in to comment.