Skip to content

Commit

Permalink
[dbnode] Concurrent time series indexing within a single batch (#2146)
Browse files Browse the repository at this point in the history
* Concurrent indexing.
  • Loading branch information
notbdu authored Mar 5, 2020
1 parent 987db51 commit 7228d90
Show file tree
Hide file tree
Showing 10 changed files with 462 additions and 79 deletions.
1 change: 1 addition & 0 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,7 @@ func (i *nsIndex) Flush(
if err != nil {
return err
}
defer builder.Close()

var evicted int
for _, block := range flushable {
Expand Down
26 changes: 16 additions & 10 deletions src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,18 +706,24 @@ func (b *block) cleanupForegroundCompactWithLock() {
b.foregroundSegments = nil

// Free compactor resources.
if b.compact.foregroundCompactor == nil {
return
if b.compact.foregroundCompactor != nil {
if err := b.compact.foregroundCompactor.Close(); err != nil {
instrument.EmitAndLogInvariantViolation(b.iopts, func(l *zap.Logger) {
l.Error("error closing index block foreground compactor", zap.Error(err))
})
}
b.compact.foregroundCompactor = nil
}

if err := b.compact.foregroundCompactor.Close(); err != nil {
instrument.EmitAndLogInvariantViolation(b.iopts, func(l *zap.Logger) {
l.Error("error closing index block foreground compactor", zap.Error(err))
})
// Free segment builder resources.
if b.compact.segmentBuilder != nil {
if err := b.compact.segmentBuilder.Close(); err != nil {
instrument.EmitAndLogInvariantViolation(b.iopts, func(l *zap.Logger) {
l.Error("error closing index block segment builder", zap.Error(err))
})
}
b.compact.segmentBuilder = nil
}

b.compact.foregroundCompactor = nil
b.compact.segmentBuilder = nil
}

func (b *block) executorWithRLock() (search.Executor, error) {
Expand Down Expand Up @@ -1484,7 +1490,7 @@ func (b *block) writeBatchErrorInvalidState(state blockState) error {

// blockCompact has several lazily allocated compaction components.
type blockCompact struct {
segmentBuilder segment.DocumentsBuilder
segmentBuilder segment.CloseableDocumentsBuilder
foregroundCompactor *compaction.Compactor
backgroundCompactor *compaction.Compactor
compactingForeground bool
Expand Down
13 changes: 13 additions & 0 deletions src/m3ninx/index/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"bytes"
"errors"
"fmt"
"sync"

"github.com/m3db/m3/src/m3ninx/doc"
)
Expand Down Expand Up @@ -79,6 +80,8 @@ func NewBatch(docs []doc.Document, opts ...BatchOption) Batch {
// BatchPartialError indicates an error was encountered inserting some documents in a batch.
// It is not safe for concurrent use.
type BatchPartialError struct {
sync.Mutex

errs []BatchError
}

Expand Down Expand Up @@ -138,6 +141,16 @@ func (e *BatchPartialError) Add(err BatchError) {
e.errs = append(e.errs, err)
}

// AddWithLock adds an error to e with a lock. Any nil errors are ignored.
func (e *BatchPartialError) AddWithLock(err BatchError) {
if err.Err == nil {
return
}
e.Lock()
e.errs = append(e.errs, err)
e.Unlock()
}

// Errs returns the errors with the indexes of the documents in the batch
// which were not indexed.
func (e *BatchPartialError) Errs() []BatchError {
Expand Down
189 changes: 132 additions & 57 deletions src/m3ninx/index/segment/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,43 @@ package builder
import (
"errors"
"fmt"
"sync"

"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/index"
"github.com/m3db/m3/src/m3ninx/index/segment"
"github.com/m3db/m3/src/m3ninx/postings"
"github.com/m3db/m3/src/m3ninx/util"

"github.com/cespare/xxhash"
)

var (
errDocNotFound = errors.New("doc not found")
errClosed = errors.New("builder closed")
)

const (
// Slightly buffer the work to avoid blocking main thread.
indexQueueSize = 2 << 9 // 1024
)

type indexJob struct {
wg *sync.WaitGroup

id postings.ID
field doc.Field

shard int
idx int
batchErr *index.BatchPartialError
}

type builderStatus struct {
sync.RWMutex
closed bool
}

type builder struct {
opts Options
newUUIDFn util.NewUUIDFn
Expand All @@ -44,29 +69,47 @@ type builder struct {
batchSizeOne index.Batch
docs []doc.Document
idSet *IDsMap
fields *fieldsMap
uniqueFields [][]byte
fields *shardedFieldsMap
uniqueFields [][][]byte

indexQueues []chan indexJob
status builderStatus
}

// NewBuilderFromDocuments returns a builder from documents, it is
// not thread safe and is optimized for insertion speed and a
// final build step when documents are indexed.
func NewBuilderFromDocuments(opts Options) (segment.DocumentsBuilder, error) {
return &builder{
func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, error) {
concurrency := opts.Concurrency()
b := &builder{
opts: opts,
newUUIDFn: opts.NewUUIDFn(),
batchSizeOne: index.Batch{
Docs: make([]doc.Document, 1),
AllowPartialUpdates: false,
Docs: make([]doc.Document, 1),
},
idSet: NewIDsMap(IDsMapOptions{
InitialSize: opts.InitialCapacity(),
}),
fields: newFieldsMap(fieldsMapOptions{
InitialSize: opts.InitialCapacity(),
}),
uniqueFields: make([][]byte, 0, opts.InitialCapacity()),
}, nil
uniqueFields: make([][][]byte, 0, concurrency),
indexQueues: make([]chan indexJob, 0, concurrency),
}

for i := 0; i < concurrency; i++ {
indexQueue := make(chan indexJob, indexQueueSize)
b.indexQueues = append(b.indexQueues, indexQueue)
go b.indexWorker(indexQueue)

// Give each shard a fraction of the configured initial capacity.
shardInitialCapacity := opts.InitialCapacity()
if shardInitialCapacity > 0 {
shardInitialCapacity /= concurrency
}
shardUniqueFields := make([][]byte, 0, shardInitialCapacity)
b.uniqueFields = append(b.uniqueFields, shardUniqueFields)
b.fields = newShardedFieldsMap(concurrency, shardInitialCapacity)
}

return b, nil
}

func (b *builder) Reset(offset postings.ID) {
Expand All @@ -83,15 +126,15 @@ func (b *builder) Reset(offset postings.ID) {
b.idSet.Reset()

// Keep fields around, just reset the terms set for each one.
for _, entry := range b.fields.Iter() {
entry.Value().reset()
}
b.fields.ResetTermsSets()

// Reset the unique fields slice
for i := range b.uniqueFields {
b.uniqueFields[i] = nil
for i, shardUniqueFields := range b.uniqueFields {
for i := range shardUniqueFields {
shardUniqueFields[i] = nil
}
b.uniqueFields[i] = shardUniqueFields[:0]
}
b.uniqueFields = b.uniqueFields[:0]
}

func (b *builder) Insert(d doc.Document) ([]byte, error) {
Expand All @@ -107,15 +150,20 @@ func (b *builder) Insert(d doc.Document) ([]byte, error) {
}

func (b *builder) InsertBatch(batch index.Batch) error {
b.status.RLock()
defer b.status.RUnlock()

if b.status.closed {
return errClosed
}

// NB(r): This is all kept in a single method to make the
// insertion path fast.
var wg sync.WaitGroup
batchErr := index.NewBatchPartialError()
for i, d := range batch.Docs {
// Validate doc
if err := d.Validate(); err != nil {
if !batch.AllowPartialUpdates {
return err
}
batchErr.Add(index.BatchError{Err: err, Idx: i})
continue
}
Expand All @@ -124,9 +172,6 @@ func (b *builder) InsertBatch(batch index.Batch) error {
if !d.HasID() {
id, err := b.newUUIDFn()
if err != nil {
if !batch.AllowPartialUpdates {
return err
}
batchErr.Add(index.BatchError{Err: err, Idx: i})
continue
}
Expand All @@ -139,9 +184,6 @@ func (b *builder) InsertBatch(batch index.Batch) error {

// Avoid duplicates.
if _, ok := b.idSet.Get(d.ID); ok {
if !batch.AllowPartialUpdates {
return index.ErrDuplicateID
}
batchErr.Add(index.BatchError{Err: index.ErrDuplicateID, Idx: i})
continue
}
Expand All @@ -158,50 +200,73 @@ func (b *builder) InsertBatch(batch index.Batch) error {

// Index the terms.
for _, f := range d.Fields {
if err := b.index(postings.ID(postingsListID), f); err != nil {
if !batch.AllowPartialUpdates {
return err
}
batchErr.Add(index.BatchError{Err: err, Idx: i})
}
b.index(&wg, postings.ID(postingsListID), f, i, batchErr)
}
if err := b.index(postings.ID(postingsListID), doc.Field{
b.index(&wg, postings.ID(postingsListID), doc.Field{
Name: doc.IDReservedFieldName,
Value: d.ID,
}); err != nil {
if !batch.AllowPartialUpdates {
return err
}
batchErr.Add(index.BatchError{Err: err, Idx: i})
}
}, i, batchErr)
}

// Wait for all the concurrent indexing jobs to finish.
wg.Wait()

if !batchErr.IsEmpty() {
return batchErr
}
return nil
}

func (b *builder) index(id postings.ID, f doc.Field) error {
terms, ok := b.fields.Get(f.Name)
if !ok {
terms = newTerms(b.opts)
b.fields.SetUnsafe(f.Name, terms, fieldsMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: true,
})
func (b *builder) index(
wg *sync.WaitGroup,
id postings.ID,
f doc.Field,
i int,
batchErr *index.BatchPartialError,
) {
wg.Add(1)
// NB(bodu): To avoid locking inside of the terms, we shard the work
// by field name.
shard := b.calculateShard(f.Name)
b.indexQueues[shard] <- indexJob{
wg: wg,
id: id,
field: f,
shard: shard,
idx: i,
batchErr: batchErr,
}
}

// If empty field, track insertion of this key into the fields
// collection for correct response when retrieving all fields.
newField := terms.size() == 0
if err := terms.post(f.Value, id); err != nil {
return err
}
if newField {
b.uniqueFields = append(b.uniqueFields, f.Name)
func (b *builder) indexWorker(indexQueue chan indexJob) {
for job := range indexQueue {
terms, ok := b.fields.ShardedGet(job.shard, job.field.Name)
if !ok {
// NB(bodu): Check again within the lock to make sure we aren't making concurrent map writes.
terms = newTerms(b.opts)
b.fields.ShardedSetUnsafe(job.shard, job.field.Name, terms, fieldsMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: true,
})
}

// If empty field, track insertion of this key into the fields
// collection for correct response when retrieving all fields.
newField := terms.size() == 0
// NB(bodu): Bulk of the cpu time during insertion is spent inside of terms.post().
err := terms.post(job.field.Value, job.id)
if err != nil {
job.batchErr.AddWithLock(index.BatchError{Err: err, Idx: job.idx})
}
if err == nil && newField {
b.uniqueFields[job.shard] = append(b.uniqueFields[job.shard], job.field.Name)
}
job.wg.Done()
}
return nil
}

func (b *builder) calculateShard(field []byte) int {
return int(xxhash.Sum64(field) % uint64(len(b.indexQueues)))
}

func (b *builder) AllDocs() (index.IDDocIterator, error) {
Expand Down Expand Up @@ -236,7 +301,7 @@ func (b *builder) Fields() (segment.FieldsIterator, error) {
}

func (b *builder) Terms(field []byte) (segment.TermsIterator, error) {
terms, ok := b.fields.Get(field)
terms, ok := b.fields.ShardedGet(b.calculateShard(field), field)
if !ok {
return nil, fmt.Errorf("field not found: %s", string(field))
}
Expand All @@ -247,3 +312,13 @@ func (b *builder) Terms(field []byte) (segment.TermsIterator, error) {

return newTermsIter(terms.uniqueTerms), nil
}

func (b *builder) Close() error {
b.status.Lock()
defer b.status.Unlock()
for _, q := range b.indexQueues {
close(q)
}
b.status.closed = true
return nil
}
Loading

0 comments on commit 7228d90

Please sign in to comment.