Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve startup time of inmem index #9488

Merged
merged 1 commit into from
Feb 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 80 additions & 20 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,43 +725,76 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {
return nil
}

keys := make([][]byte, 0, 10000)
fieldTypes := make([]influxql.DataType, 0, 10000)

if err := e.FileStore.WalkKeys(nil, func(key []byte, typ byte) error {
fieldType := BlockTypeToInfluxQLDataType(typ)
if fieldType == influxql.Unknown {
return fmt.Errorf("unknown block type: %v", typ)
}

if err := e.addToIndexFromKey(key, fieldType); err != nil {
return err
keys = append(keys, key)
fieldTypes = append(fieldTypes, fieldType)
if len(keys) == cap(keys) {
// Send batch of keys to the index.
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
return err
}

// Reset buffers.
keys, fieldTypes = keys[:0], fieldTypes[:0]
}

return nil
}); err != nil {
return err
}

if len(keys) > 0 {
// Add remaining partial batch from FileStore.
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
return err
}
keys, fieldTypes = keys[:0], fieldTypes[:0]
}

// load metadata from the Cache
if err := e.Cache.ApplyEntryFn(func(key []byte, entry *entry) error {
fieldType, err := entry.values.InfluxQLType()
if err != nil {
e.logger.Info("Error getting the data type of values for key",
zap.ByteString("key", key), zap.Error(err))
e.logger.Info("Error getting the data type of values for key", zap.ByteString("key", key), zap.Error(err))
}

if err := e.addToIndexFromKey(key, fieldType); err != nil {
return err
keys = append(keys, key)
fieldTypes = append(fieldTypes, fieldType)
if len(keys) == cap(keys) {
// Send batch of keys to the index.
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
return err
}

// Reset buffers.
keys, fieldTypes = keys[:0], fieldTypes[:0]
}
return nil
}); err != nil {
return err
}

if len(keys) > 0 {
// Add remaining partial batch from FileStore.
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
return err
}
}

// Save the field set index so we don't have to rebuild it next time
if err := e.fieldset.Save(); err != nil {
return err
}

e.traceLogger.Info("Meta data index for shard loaded",
zap.Uint64("id", shardID), zap.Duration("duration", time.Since(now)))
e.traceLogger.Info("Meta data index for shard loaded", zap.Uint64("id", shardID), zap.Duration("duration", time.Since(now)))
return nil
}

Expand Down Expand Up @@ -1013,14 +1046,32 @@ func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error {

// Merge and dedup all the series keys across each reader to reduce
// lock contention on the index.
keys := make([][]byte, 0, 10000)
fieldTypes := make([]influxql.DataType, 0, 10000)
merged := merge(readers...)
for v := range merged {
fieldType := BlockTypeToInfluxQLDataType(v.typ)
if fieldType == influxql.Unknown {
return fmt.Errorf("unknown block type: %v", v.typ)
}

if err := e.addToIndexFromKey(v.key, fieldType); err != nil {
keys = append(keys, v.key)
fieldTypes = append(fieldTypes, fieldType)

if len(keys) == cap(keys) {
// Send batch of keys to the index.
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
return err
}

// Reset buffers.
keys, fieldTypes = keys[:0], fieldTypes[:0]
}
}

if len(keys) > 0 {
// Add remaining partial batch.
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
return err
}
}
Expand Down Expand Up @@ -1086,25 +1137,34 @@ func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string, as
return tmp, nil
}

// addToIndexFromKey will pull the measurement name, series key, and field name from a composite key and add it to the
// database index and measurement fields
func (e *Engine) addToIndexFromKey(key []byte, fieldType influxql.DataType) error {
seriesKey, field := SeriesAndFieldFromCompositeKey(key)
name := tsdb.MeasurementFromSeriesKey(seriesKey)
// addToIndexFromKey will pull the measurement names, series keys, and field
// names from composite keys, and add them to the database index and measurement
// fields.
func (e *Engine) addToIndexFromKey(keys [][]byte, fieldTypes []influxql.DataType) error {
var field []byte
names := make([][]byte, 0, len(keys))
tags := make([]models.Tags, 0, len(keys))

for i := 0; i < len(keys); i++ {
// Replace tsm key format with index key format.
keys[i], field = SeriesAndFieldFromCompositeKey(keys[i])
name := tsdb.MeasurementFromSeriesKey(keys[i])
mf := e.fieldset.CreateFieldsIfNotExists(name)
if err := mf.CreateFieldIfNotExists(field, fieldTypes[i]); err != nil {
return err
}

mf := e.fieldset.CreateFieldsIfNotExists(name)
if err := mf.CreateFieldIfNotExists(field, fieldType); err != nil {
return err
names = append(names, name)
tags = append(tags, models.ParseTags(keys[i]))
}

tags := models.ParseTags(seriesKey)
// Build in-memory index, if necessary.
if e.index.Type() == inmem.IndexName {
if err := e.index.InitializeSeries(seriesKey, name, tags); err != nil {
if err := e.index.InitializeSeries(keys, names, tags); err != nil {
return err
}
} else {
if err := e.index.CreateSeriesIfNotExists(seriesKey, name, tags); err != nil {
if err := e.index.CreateSeriesListIfNotExists(keys, names, tags); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Index interface {
DropMeasurement(name []byte) error
ForEachMeasurementName(fn func(name []byte) error) error

InitializeSeries(key, name []byte, tags models.Tags) error
InitializeSeries(keys, names [][]byte, tags []models.Tags) error
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
DropSeries(seriesID uint64, key []byte, cascade bool) error
Expand Down
6 changes: 3 additions & 3 deletions tsdb/index/inmem/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -1155,9 +1155,9 @@ func (idx *ShardIndex) SeriesN() int64 {
}

// InitializeSeries is called during start-up.
// This works the same as CreateSeriesIfNotExists except it ignore limit errors.
func (idx *ShardIndex) InitializeSeries(key, name []byte, tags models.Tags) error {
return idx.Index.CreateSeriesListIfNotExists(idx.id, idx.seriesIDSet, [][]byte{key}, [][]byte{name}, []models.Tags{tags}, &idx.opt, true)
// This works the same as CreateSeriesListIfNotExists except it ignore limit errors.
func (idx *ShardIndex) InitializeSeries(keys, names [][]byte, tags []models.Tags) error {
return idx.Index.CreateSeriesListIfNotExists(idx.id, idx.seriesIDSet, keys, names, tags, &idx.opt, true)
}

// CreateSeriesIfNotExists creates the provided series on the index if it is not
Expand Down
2 changes: 1 addition & 1 deletion tsdb/index/tsi1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) erro
}

// InitializeSeries is a no-op. This only applies to the in-memory index.
func (i *Index) InitializeSeries(key, name []byte, tags models.Tags) error {
func (i *Index) InitializeSeries(keys, names [][]byte, tags []models.Tags) error {
return nil
}

Expand Down