Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid rewriting fields.idx unnecessarily (#21592) #21610

Merged
merged 7 commits into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ This release adds an embedded SQLite database for storing metadata required by t
1. [21522](https://github.com/influxdata/influxdb/pull/21522): Replace telemetry file name with slug for `ttf`, `woff`, and `eot` files.
1. [21540](https://github.com/influxdata/influxdb/pull/21540): Enable use of absolute path for `--upgrade-log` when running `influxd upgrade` on Windows.
1. [21545](https://github.com/influxdata/influxdb/pull/21545): Make InfluxQL meta queries respect query timeouts.
1. [21610](https://github.com/influxdata/influxdb/pull/21610): Avoid rewriting `fields.idx` unnecessarily

## v2.0.6 [2021-04-29]

Expand Down
1 change: 1 addition & 0 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,7 @@ func (e *Engine) Close() error {
e.mu.Lock()
defer e.mu.Unlock()
e.done = nil // Ensures that the channel will not be closed again.
e.fieldset.Close()

if err := e.FileStore.Close(); err != nil {
return err
Expand Down
200 changes: 136 additions & 64 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"regexp"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1641,26 +1640,29 @@ type MeasurementFieldSet struct {
mu sync.RWMutex
fields map[string]*MeasurementFields
// path is the location to persist field sets
path string
// ephemeral counters for updating the file on disk
memoryVersion uint64
writtenVersion uint64
path string
writer *MeasurementFieldSetWriter
}

// NewMeasurementFieldSet returns a new instance of MeasurementFieldSet.
func NewMeasurementFieldSet(path string) (*MeasurementFieldSet, error) {
const MaxCombinedWrites = 100
fs := &MeasurementFieldSet{
fields: make(map[string]*MeasurementFields),
path: path,
memoryVersion: 0,
writtenVersion: 0,
fields: make(map[string]*MeasurementFields),
path: path,
}

fs.SetMeasurementFieldSetWriter(MaxCombinedWrites)
// If there is a load error, return the error and an empty set so
// it can be rebuild manually.
return fs, fs.load()
}

func (fs *MeasurementFieldSet) Close() {
if fs != nil && fs.writer != nil {
fs.writer.Close()
}
}

// Bytes estimates the memory footprint of this MeasurementFieldSet, in bytes.
func (fs *MeasurementFieldSet) Bytes() int {
var b int
Expand Down Expand Up @@ -1738,83 +1740,153 @@ func (fs *MeasurementFieldSet) IsEmpty() bool {
return len(fs.fields) == 0
}

func (fs *MeasurementFieldSet) Save() (err error) {
// current version
var v uint64
// Is the MeasurementFieldSet empty?
isEmpty := false
// marshaled MeasurementFieldSet
type errorChannel chan<- error

b, err := func() ([]byte, error) {
fs.mu.Lock()
defer fs.mu.Unlock()
fs.memoryVersion += 1
v = fs.memoryVersion
// If no fields left, remove the fields index file
if len(fs.fields) == 0 {
isEmpty = true
if err := os.RemoveAll(fs.path); err != nil {
return nil, err
} else {
fs.writtenVersion = fs.memoryVersion
return nil, nil
}
}
return fs.marshalMeasurementFieldSetNoLock()
}()
type writeRequest struct {
done errorChannel
}

if err != nil {
return err
} else if isEmpty {
return nil
}
type MeasurementFieldSetWriter struct {
wg sync.WaitGroup
writeRequests chan writeRequest
}

// Write the new index to a temp file and rename when it's sync'd
// if it is still the most recent memoryVersion of the MeasurementFields
path := fs.path + "." + strconv.FormatUint(v, 10) + ".tmp"
// SetMeasurementFieldSetWriter - initialize the queue for write requests
// and start the background write process
func (fs *MeasurementFieldSet) SetMeasurementFieldSetWriter(queueLength int) {
fs.mu.Lock()
defer fs.mu.Unlock()
fs.writer = &MeasurementFieldSetWriter{writeRequests: make(chan writeRequest, queueLength)}
fs.writer.wg.Add(1)
go fs.saveWriter()
}

fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
if err != nil {
return err
func (w *MeasurementFieldSetWriter) Close() {
if w != nil {
close(w.writeRequests)
}
defer os.RemoveAll(path)
w.wg.Wait()
}

if _, err := fd.Write(fieldsIndexMagicNumber); err != nil {
return err
}
func (fs *MeasurementFieldSet) Save() error {
return fs.writer.RequestSave()
}

if _, err := fd.Write(b); err != nil {
return err
func (w *MeasurementFieldSetWriter) RequestSave() error {
done := make(chan error)
wr := writeRequest{done: done}
w.writeRequests <- wr
return <-done
}

func (fs *MeasurementFieldSet) saveWriter() {
defer fs.writer.wg.Done()
// Block until someone modifies the MeasurementFieldSet and
// it needs to be written to disk.
for req, ok := <-fs.writer.writeRequests; ok; req, ok = <-fs.writer.writeRequests {
fs.writeToFile(req)
}
}

if err = fd.Sync(); err != nil {
return err
// writeToFile: Write the new index to a temp file and rename when it's sync'd
func (fs *MeasurementFieldSet) writeToFile(first writeRequest) {
var err error
// Put the errorChannel on which we blocked into a slice to allow more invocations
// to share the return code from the file write
errorChannels := []errorChannel{first.done}
defer func() {
for _, c := range errorChannels {
c <- err
close(c)
}
}()
// Do some blocking IO operations before marshalling the in-memory index
// to allow other changes to it to be queued up and be captured in one
// write operation, in case we are under heavy field creation load
path := fs.path + ".tmp"

// Open the temp file
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
if err != nil {
return
}
// Ensure temp file is cleaned up
defer func() {
if e := os.RemoveAll(path); err == nil {
err = e
}
}()
isEmpty, err := func() (isEmpty bool, err error) {
// ensure temp file closed before rename (for Windows)
defer func() {
if e := fd.Close(); err == nil {
err = e
}
}()
if _, err = fd.Write(fieldsIndexMagicNumber); err != nil {
return true, err
}

//close file handle before renaming to support Windows
if err = fd.Close(); err != nil {
return err
// Read all the pending new field and measurement write requests
// that will be captured in the marshaling of the in-memory copy
for {
select {
case ec := <-fs.writer.writeRequests:
errorChannels = append(errorChannels, ec.done)
continue
default:
}
break
}
// Lock, copy, and marshal the in-memory index
b, err := fs.marshalMeasurementFieldSet()
if err != nil {
return true, err
}
if b == nil {
// No fields, file removed, all done
return true, nil
}
if _, err := fd.Write(b); err != nil {
return true, err
}
return false, fd.Sync()
}()
if err != nil || isEmpty {
return
}
err = fs.renameFile(path)
}

// marshalMeasurementFieldSet: remove the fields.idx file if no fields
// otherwise, copy the in-memory version into a protobuf to write to
// disk
func (fs *MeasurementFieldSet) marshalMeasurementFieldSet() ([]byte, error) {
fs.mu.Lock()
defer fs.mu.Unlock()

// Check if a later modification and save of fields has superseded ours
// If so, we are successfully done! We were beaten by a later call
// to this function
if fs.writtenVersion > v {
return nil
if len(fs.fields) == 0 {
// If no fields left, remove the fields index file
if err := os.RemoveAll(fs.path); err != nil {
return nil, err
} else {
return nil, nil
}
}
return fs.marshalMeasurementFieldSetNoLock()
}

func (fs *MeasurementFieldSet) renameFile(path string) error {
fs.mu.Lock()
defer fs.mu.Unlock()

if err := file.RenameFile(path, fs.path); err != nil {
return err
}

if err = file.SyncDir(filepath.Dir(fs.path)); err != nil {
if err := file.SyncDir(filepath.Dir(fs.path)); err != nil {
return err
}
// Update the written version to the current version
fs.writtenVersion = v

return nil
}

Expand Down
Loading