diff --git a/beater/beater.go b/beater/beater.go index 003f730d818..e344f44f86c 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -592,7 +592,6 @@ func (s *serverRunner) newFinalBatchProcessor(p *publish.Publisher) (model.Batch // TODO(axw) FlushBytes should accept suffixed strings like APM agents, e.g. "5mb". FlushBytes int `config:"flush_bytes"` FlushInterval time.Duration `config:"flush_interval"` - NumWorkers int `config:"num_workers"` } esConfig.FlushInterval = time.Second @@ -617,7 +616,6 @@ func (s *serverRunner) newFinalBatchProcessor(p *publish.Publisher) (model.Batch indexer, err := modelindexer.New(client, modelindexer.Config{ FlushBytes: esConfig.FlushBytes, FlushInterval: esConfig.FlushInterval, - NumWorkers: esConfig.NumWorkers, }) if err != nil { return nil, err @@ -628,10 +626,7 @@ func (s *serverRunner) newFinalBatchProcessor(p *publish.Publisher) (model.Batch monitoring.NewFunc(monitoring.Default, "libbeat.output.events", func(_ monitoring.Mode, v monitoring.Visitor) { v.OnRegistryStart() defer v.OnRegistryFinished() - stats, err := indexer.Stats() - if err != nil { - return - } + stats := indexer.Stats() v.OnKey("active") v.OnInt(stats.Active) v.OnKey("total") diff --git a/model/modelindexer/bulk_indexer.go b/model/modelindexer/bulk_indexer.go new file mode 100644 index 00000000000..a95c4b7c6f9 --- /dev/null +++ b/model/modelindexer/bulk_indexer.go @@ -0,0 +1,117 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package modelindexer + +import ( + "bytes" + "context" + "fmt" + "strconv" + + "github.com/elastic/go-elasticsearch/v7/esapi" + + "github.com/elastic/apm-server/elasticsearch" +) + +type bulkIndexer struct { + client elasticsearch.Client + itemsAdded int + buf bytes.Buffer + aux []byte +} + +func newBulkIndexer(client elasticsearch.Client) *bulkIndexer { + return &bulkIndexer{client: client} +} + +// BulkIndexer resets b, ready for a new request. +func (b *bulkIndexer) Reset() { + b.itemsAdded = 0 + b.buf.Reset() +} + +// Added returns the number of buffered items. +func (b *bulkIndexer) Items() int { + return b.itemsAdded +} + +// Bytes returns the number of buffered bytes. +func (b *bulkIndexer) Bytes() int { + return b.buf.Len() +} + +// Add encodes an item in the buffer. +func (b *bulkIndexer) Add(item elasticsearch.BulkIndexerItem) error { + b.writeMeta(item) + if _, err := b.buf.ReadFrom(item.Body); err != nil { + return err + } + b.buf.WriteRune('\n') + b.itemsAdded++ + return nil +} + +func (b *bulkIndexer) writeMeta(item elasticsearch.BulkIndexerItem) { + b.buf.WriteRune('{') + b.aux = strconv.AppendQuote(b.aux, item.Action) + b.buf.Write(b.aux) + b.aux = b.aux[:0] + b.buf.WriteRune(':') + b.buf.WriteRune('{') + if item.DocumentID != "" { + b.buf.WriteString(`"_id":`) + b.aux = strconv.AppendQuote(b.aux, item.DocumentID) + b.buf.Write(b.aux) + b.aux = b.aux[:0] + } + if item.Index != "" { + if item.DocumentID != "" { + b.buf.WriteRune(',') + } + b.buf.WriteString(`"_index":`) + b.aux = strconv.AppendQuote(b.aux, item.Index) + b.buf.Write(b.aux) + b.aux = b.aux[:0] + } + b.buf.WriteRune('}') + b.buf.WriteRune('}') + b.buf.WriteRune('\n') +} + +func (b *bulkIndexer) Flush(ctx context.Context) (elasticsearch.BulkIndexerResponse, error) { + if b.itemsAdded == 0 { + return elasticsearch.BulkIndexerResponse{}, nil + } + + req := esapi.BulkRequest{Body: &b.buf} + res, err := req.Do(ctx, b.client) + if err != nil { + return elasticsearch.BulkIndexerResponse{}, err + } + defer res.Body.Close() + if res.IsError() { + return elasticsearch.BulkIndexerResponse{}, fmt.Errorf("flush failed: %s", res.String()) + } + + // TODO(axw) getting empty response body, what's up with that? + var resp elasticsearch.BulkIndexerResponse + //if err := json.NewDecoder(res.Body).Decode(&resp); err != nil { + // return resp, err + //} + return resp, nil +} diff --git a/model/modelindexer/indexer.go b/model/modelindexer/indexer.go index 88a647b0e28..b022e82e45e 100644 --- a/model/modelindexer/indexer.go +++ b/model/modelindexer/indexer.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/logp" + "golang.org/x/sync/errgroup" "github.com/elastic/apm-server/elasticsearch" "github.com/elastic/apm-server/model" @@ -39,26 +40,27 @@ var ErrClosed = errors.New("model indexer closed") // Indexer is a model.BatchProcessor which bulk indexes events as Elasticsearch documents. type Indexer struct { - eventsAdded int64 - logger *logp.Logger + eventsAdded int64 + eventsActive int64 + config Config + logger *logp.Logger + available chan *bulkIndexer + g errgroup.Group - mu sync.RWMutex - bulkIndexer elasticsearch.BulkIndexer - closeError error + mu sync.RWMutex + closed bool + activeMu sync.Mutex + active *bulkIndexer + timer *time.Timer } // Config holds configuration for Indexer. type Config struct { - // NumWorkers holds the number of goroutines to use for bulk indexing documents. + // MaxRequests holds the maximum number of bulk index requests to execute concurrently. + // The maximum memory usage of Indexer is thus approximately MaxRequests*FlushBytes. // - // If NumWorkers is zero, the default of runtime.NumCPU() will be used. - // - // TODO(axw) instead of pooled bulk indexer workers, consider extending - // go-elasticsearch with a sequential bulk indexer, and then use a bounded - // semaphore approach for creating a new goroutine & bulk indexer when - // flushing another one. By sending events to a single indexer at a time - // we can avoid having many sparse bulk requests. - NumWorkers int + // If MaxRequests is zero, the default of 10 will be used. + MaxRequests int // FlushBytes holds the flush threshold in bytes. // @@ -74,48 +76,43 @@ type Config struct { // New returns a new Indexer that indexes events directly into data streams. func New(client elasticsearch.Client, cfg Config) (*Indexer, error) { logger := logp.NewLogger("modelindexer") - bulkIndexer, err := client.NewBulkIndexer(elasticsearch.BulkIndexerConfig{ - NumWorkers: cfg.NumWorkers, - FlushBytes: cfg.FlushBytes, - FlushInterval: cfg.FlushInterval, - OnError: func(ctx context.Context, err error) { - logger.Errorf("bulk indexing error: %s", err) - }, - }) - if err != nil { - return nil, err + if cfg.MaxRequests == 0 { + cfg.MaxRequests = 10 + } + if cfg.FlushBytes <= 0 { + cfg.FlushBytes = 5 * 1024 * 1024 + } + if cfg.FlushInterval <= 0 { + cfg.FlushInterval = 30 * time.Second } - return &Indexer{bulkIndexer: bulkIndexer, logger: logger}, nil + available := make(chan *bulkIndexer, cfg.MaxRequests) + for i := 0; i < cfg.MaxRequests; i++ { + available <- newBulkIndexer(client) + } + return &Indexer{config: cfg, logger: logger, available: available}, nil } // Close closes the indexer, first flushing any queued events. -// -// If Close has previously been called, Close will return the result -// of the previous call. -func (i *Indexer) Close(ctx context.Context) error { +func (i *Indexer) Close() error { i.mu.Lock() defer i.mu.Unlock() - indexer := i.bulkIndexer - if indexer != nil { - i.bulkIndexer = nil - i.closeError = indexer.Close(ctx) + if !i.closed { + i.closed = true + if i.active != nil { + if err := i.flush(i.active); err != nil { + return err + } + } } - return i.closeError + return i.g.Wait() } // Stats returns the bulk indexing stats. -// -// If the indexer has been closed, Stats returns ErrClosed. -func (i *Indexer) Stats() (Stats, error) { - i.mu.RLock() - defer i.mu.RUnlock() - if i.bulkIndexer == nil { - return Stats{}, ErrClosed +func (i *Indexer) Stats() Stats { + return Stats{ + Added: atomic.LoadInt64(&i.eventsAdded), + Active: atomic.LoadInt64(&i.eventsActive), } - bulkIndexerStats := i.bulkIndexer.Stats() - stats := Stats{Added: atomic.LoadInt64(&i.eventsAdded)} - stats.Active = stats.Added - int64(bulkIndexerStats.NumFailed+bulkIndexerStats.NumFlushed) - return stats, nil } // ProcessBatch creates a document for each event in batch, and adds them to the @@ -125,7 +122,7 @@ func (i *Indexer) Stats() (Stats, error) { func (i *Indexer) ProcessBatch(ctx context.Context, batch *model.Batch) error { i.mu.RLock() defer i.mu.RUnlock() - if i.bulkIndexer == nil { + if i.closed { return ErrClosed } for _, event := range *batch { @@ -150,33 +147,78 @@ func (i *Indexer) processEvent(ctx context.Context, event *model.APMEvent) error r.indexBuilder.WriteString(event.DataStream.Namespace) index := r.indexBuilder.String() - item := elasticsearch.BulkIndexerItem{ + i.activeMu.Lock() + defer i.activeMu.Unlock() + if i.active == nil { + select { + case <-ctx.Done(): + return ctx.Err() + case i.active = <-i.available: + } + if i.timer == nil { + i.timer = time.AfterFunc( + i.config.FlushInterval, + i.flushActive, + ) + } else { + i.timer.Reset(i.config.FlushInterval) + } + } + + if err := i.active.Add(elasticsearch.BulkIndexerItem{ Index: index, Action: "create", Body: r, - // TODO(axw) consider adding an option to go-elasticsearch - // to not copy the body when OnFailure is set. We don't care - // about the item body, we just care about the index name. - OnFailure: func( - ctx context.Context, - item elasticsearch.BulkIndexerItem, - resp elasticsearch.BulkIndexerResponseItem, - err error, - ) { - logger := i.logger - if err != nil { - logger = logger.With(logp.Error(err)) - } - logger.Errorf( - "failed to index event into %s (%s): %s", - index, resp.Error.Type, resp.Error.Reason, - ) - }, + }); err != nil { + return err } atomic.AddInt64(&i.eventsAdded, 1) - if err := i.bulkIndexer.Add(ctx, item); err != nil { - atomic.AddInt64(&i.eventsAdded, -1) + atomic.AddInt64(&i.eventsActive, 1) + + if i.active.Bytes() >= i.config.FlushBytes { + if i.timer.Stop() { + i.flushActiveLocked() + } + } + return nil +} + +func (i *Indexer) flushActive() { + i.activeMu.Lock() + defer i.activeMu.Unlock() + i.flushActiveLocked() +} + +func (i *Indexer) flushActiveLocked() { + bulkIndexer := i.active + i.active = nil + i.g.Go(func() error { + err := i.flush(bulkIndexer) + bulkIndexer.Reset() + i.available <- bulkIndexer return err + }) +} + +func (i *Indexer) flush(bulkIndexer *bulkIndexer) error { + n := bulkIndexer.Items() + if n == 0 { + return nil + } + defer atomic.AddInt64(&i.eventsActive, -int64(n)) + resp, err := bulkIndexer.Flush(context.Background()) + if err != nil { + return err + } + for _, item := range resp.Items { + for _, info := range item { + if info.Error.Type != "" || info.Status > 201 { + i.logger.Errorf( + "failed to index event (%s): %s", + info.Error.Type, info.Error.Reason, + ) + } + } } return nil } diff --git a/model/modelindexer/indexer_test.go b/model/modelindexer/indexer_test.go index 814af0440a0..d72e4569175 100644 --- a/model/modelindexer/indexer_test.go +++ b/model/modelindexer/indexer_test.go @@ -48,7 +48,7 @@ func BenchmarkModelIndexer(b *testing.B) { client, err := elasticsearch.NewClient(config) require.NoError(b, err) indexer, err := modelindexer.New(client, modelindexer.Config{FlushInterval: time.Second}) - defer indexer.Close(context.Background()) + defer indexer.Close() batch := model.Batch{ model.APMEvent{ @@ -63,7 +63,7 @@ func BenchmarkModelIndexer(b *testing.B) { } // Closing the indexer flushes enqueued events. - if err := indexer.Close(context.Background()); err != nil { + if err := indexer.Close(); err != nil { b.Fatal(err) } assert.Equal(b, int64(b.N), indexed)