Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(tsdb): minimize lock contention when adding new fields or measurements #21228

Merged
merged 4 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ The prefix used for Prometheus metrics from the query controller has changed fro
1. [20925](https://github.com/influxdata/influxdb/pull/20925): Fix parse error in UI for tag filters containing regex meta characters.
1. [21042](https://github.com/influxdata/influxdb/pull/21042): Prevent concurrent access panic when gathering bolt metrics.
1. [21127](https://github.com/influxdata/influxdb/pull/21127): Fix race condition in Flux controller shutdown.
1. [21228](https://github.com/influxdata/influxdb/pull/21228): Reduce lock contention when adding new fields and measurements.

## v2.0.4 [2021-02-08]

Expand Down
124 changes: 82 additions & 42 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"regexp"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -716,11 +717,7 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error
}
}

if len(fieldsToCreate) > 0 {
return engine.MeasurementFieldSet().Save()
}

return nil
return engine.MeasurementFieldSet().Save()
}

// DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive)
Expand Down Expand Up @@ -1643,16 +1640,20 @@ func (m *MeasurementFields) ForEachField(fn func(name string, typ influxql.DataT
type MeasurementFieldSet struct {
mu sync.RWMutex
fields map[string]*MeasurementFields

// path is the location to persist field sets
path string
// ephemeral counters for updating the file on disk
memoryVersion uint64
writtenVersion uint64
}

// NewMeasurementFieldSet returns a new instance of MeasurementFieldSet.
func NewMeasurementFieldSet(path string) (*MeasurementFieldSet, error) {
fs := &MeasurementFieldSet{
fields: make(map[string]*MeasurementFields),
path: path,
fields: make(map[string]*MeasurementFields),
path: path,
memoryVersion: 0,
writtenVersion: 0,
}

// If there is a load error, return the error and an empty set so
Expand Down Expand Up @@ -1737,21 +1738,41 @@ func (fs *MeasurementFieldSet) IsEmpty() bool {
return len(fs.fields) == 0
}

func (fs *MeasurementFieldSet) Save() error {
fs.mu.Lock()
defer fs.mu.Unlock()

return fs.saveNoLock()
}
func (fs *MeasurementFieldSet) Save() (err error) {
// current version
var v uint64
// Is the MeasurementFieldSet empty?
isEmpty := false
// marshaled MeasurementFieldSet

b, err := func() ([]byte, error) {
fs.mu.Lock()
defer fs.mu.Unlock()
fs.memoryVersion += 1
v = fs.memoryVersion
// If no fields left, remove the fields index file
if len(fs.fields) == 0 {
isEmpty = true
if err := os.RemoveAll(fs.path); err != nil {
return nil, err
} else {
fs.writtenVersion = fs.memoryVersion
return nil, nil
}
}
return fs.marshalMeasurementFieldSetNoLock()
}()

func (fs *MeasurementFieldSet) saveNoLock() error {
// No fields left, remove the fields index file
if len(fs.fields) == 0 {
return os.RemoveAll(fs.path)
if err != nil {
return err
} else if isEmpty {
return nil
}

// Write the new index to a temp file and rename when it's sync'd
path := fs.path + ".tmp"
// if it is still the most recent memoryVersion of the MeasurementFields
path := fs.path + "." + strconv.FormatUint(v, 10) + ".tmp"

fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
if err != nil {
return err
Expand All @@ -1762,28 +1783,6 @@ func (fs *MeasurementFieldSet) saveNoLock() error {
return err
}

pb := internal.MeasurementFieldSet{
Measurements: make([]*internal.MeasurementFields, 0, len(fs.fields)),
}
for name, mf := range fs.fields {
fs := &internal.MeasurementFields{
Name: []byte(name),
Fields: make([]*internal.Field, 0, mf.FieldN()),
}

mf.ForEachField(func(field string, typ influxql.DataType) bool {
fs.Fields = append(fs.Fields, &internal.Field{Name: []byte(field), Type: int32(typ)})
return true
})

pb.Measurements = append(pb.Measurements, fs)
}

b, err := proto.Marshal(&pb)
if err != nil {
return err
}

if _, err := fd.Write(b); err != nil {
return err
}
Expand All @@ -1797,11 +1796,52 @@ func (fs *MeasurementFieldSet) saveNoLock() error {
return err
}

fs.mu.Lock()
defer fs.mu.Unlock()

// Check if a later modification and save of fields has superseded ours
// If so, we are successfully done! We were beaten by a later call
// to this function
if fs.writtenVersion > v {
return nil
}

if err := file.RenameFile(path, fs.path); err != nil {
return err
}

return file.SyncDir(filepath.Dir(fs.path))
if err = file.SyncDir(filepath.Dir(fs.path)); err != nil {
return err
}
// Update the written version to the current version
fs.writtenVersion = v
return nil
}

func (fs *MeasurementFieldSet) marshalMeasurementFieldSetNoLock() (marshalled []byte, err error) {
pb := internal.MeasurementFieldSet{
Measurements: make([]*internal.MeasurementFields, 0, len(fs.fields)),
}

for name, mf := range fs.fields {
imf := &internal.MeasurementFields{
Name: []byte(name),
Fields: make([]*internal.Field, 0, mf.FieldN()),
}

mf.ForEachField(func(field string, typ influxql.DataType) bool {
imf.Fields = append(imf.Fields, &internal.Field{Name: []byte(field), Type: int32(typ)})
return true
})

pb.Measurements = append(pb.Measurements, imf)
}
b, err := proto.Marshal(&pb)
if err != nil {
return nil, err
} else {
return b, nil
}
}

func (fs *MeasurementFieldSet) load() error {
Expand Down
67 changes: 67 additions & 0 deletions tsdb/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1653,6 +1653,73 @@ func TestMeasurementFieldSet_InvalidFormat(t *testing.T) {
}
}

func TestMeasurementFieldSet_ConcurrentSave(t *testing.T) {
var iterations int
dir, cleanup := MustTempDir()
defer cleanup()

if testing.Short() {
iterations = 50
} else {
iterations = 200
}

mt := []string{"cpu", "dpu", "epu", "fpu"}
ft := make([][]string, len(mt), len(mt))
for mi, m := range mt {
ft[mi] = make([]string, iterations, iterations)
for i := 0; i < iterations; i += 1 {
ft[mi][i] = fmt.Sprintf("%s_%s_%d", m, "value", i)
}
}

path := filepath.Join(dir, "fields.idx")
mfs, err := tsdb.NewMeasurementFieldSet(path)
if err != nil {
t.Fatalf("NewMeasurementFieldSet error: %v", err)
}
var wg sync.WaitGroup

wg.Add(len(ft))
for i, fs := range ft {
go testFieldMaker(t, &wg, mfs, mt[i], fs)
}
wg.Wait()

mfs2, err := tsdb.NewMeasurementFieldSet(path)
if err != nil {
t.Fatalf("NewMeasurementFieldSet error: %v", err)
}
for i, fs := range ft {
mf := mfs.Fields([]byte(mt[i]))
mf2 := mfs2.Fields([]byte(mt[i]))
for _, f := range fs {
if mf2.Field(f) == nil {
t.Fatalf("Created field not found on reloaded MeasurementFieldSet %s", f)
}
if mf.Field(f) == nil {
t.Fatalf("Created field not found in original MeasureMentFieldSet: %s", f)
}
}
}

}

func testFieldMaker(t *testing.T, wg *sync.WaitGroup, mf *tsdb.MeasurementFieldSet, measurement string, fieldNames []string) {
defer wg.Done()
fields := mf.CreateFieldsIfNotExists([]byte(measurement))
for _, fieldName := range fieldNames {
if err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
t.Errorf("create field error: %v", err)
return
}
if err := mf.Save(); err != nil {
t.Errorf("save error: %v", err)
return
}
}
}

func BenchmarkWritePoints_NewSeries_1K(b *testing.B) { benchmarkWritePoints(b, 38, 3, 3, 1) }
func BenchmarkWritePoints_NewSeries_100K(b *testing.B) { benchmarkWritePoints(b, 32, 5, 5, 1) }
func BenchmarkWritePoints_NewSeries_250K(b *testing.B) { benchmarkWritePoints(b, 80, 5, 5, 1) }
Expand Down