Skip to content

Commit

Permalink
model/modelindexer: create our own bulk indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
axw committed Oct 5, 2021
1 parent ccadbc8 commit 266196b
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 75 deletions.
7 changes: 1 addition & 6 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,6 @@ func (s *serverRunner) newFinalBatchProcessor(p *publish.Publisher) (model.Batch
// TODO(axw) FlushBytes should accept suffixed strings like APM agents, e.g. "5mb".
FlushBytes int `config:"flush_bytes"`
FlushInterval time.Duration `config:"flush_interval"`
NumWorkers int `config:"num_workers"`
}
esConfig.FlushInterval = time.Second

Expand All @@ -617,7 +616,6 @@ func (s *serverRunner) newFinalBatchProcessor(p *publish.Publisher) (model.Batch
indexer, err := modelindexer.New(client, modelindexer.Config{
FlushBytes: esConfig.FlushBytes,
FlushInterval: esConfig.FlushInterval,
NumWorkers: esConfig.NumWorkers,
})
if err != nil {
return nil, err
Expand All @@ -628,10 +626,7 @@ func (s *serverRunner) newFinalBatchProcessor(p *publish.Publisher) (model.Batch
monitoring.NewFunc(monitoring.Default, "libbeat.output.events", func(_ monitoring.Mode, v monitoring.Visitor) {
v.OnRegistryStart()
defer v.OnRegistryFinished()
stats, err := indexer.Stats()
if err != nil {
return
}
stats := indexer.Stats()
v.OnKey("active")
v.OnInt(stats.Active)
v.OnKey("total")
Expand Down
117 changes: 117 additions & 0 deletions model/modelindexer/bulk_indexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package modelindexer

import (
"bytes"
"context"
"fmt"
"strconv"

"github.com/elastic/go-elasticsearch/v7/esapi"

"github.com/elastic/apm-server/elasticsearch"
)

type bulkIndexer struct {
client elasticsearch.Client
itemsAdded int
buf bytes.Buffer
aux []byte
}

func newBulkIndexer(client elasticsearch.Client) *bulkIndexer {
return &bulkIndexer{client: client}
}

// BulkIndexer resets b, ready for a new request.
func (b *bulkIndexer) Reset() {
b.itemsAdded = 0
b.buf.Reset()
}

// Added returns the number of buffered items.
func (b *bulkIndexer) Items() int {
return b.itemsAdded
}

// Bytes returns the number of buffered bytes.
func (b *bulkIndexer) Bytes() int {
return b.buf.Len()
}

// Add encodes an item in the buffer.
func (b *bulkIndexer) Add(item elasticsearch.BulkIndexerItem) error {
b.writeMeta(item)
if _, err := b.buf.ReadFrom(item.Body); err != nil {
return err
}
b.buf.WriteRune('\n')
b.itemsAdded++
return nil
}

func (b *bulkIndexer) writeMeta(item elasticsearch.BulkIndexerItem) {
b.buf.WriteRune('{')
b.aux = strconv.AppendQuote(b.aux, item.Action)
b.buf.Write(b.aux)
b.aux = b.aux[:0]
b.buf.WriteRune(':')
b.buf.WriteRune('{')
if item.DocumentID != "" {
b.buf.WriteString(`"_id":`)
b.aux = strconv.AppendQuote(b.aux, item.DocumentID)
b.buf.Write(b.aux)
b.aux = b.aux[:0]
}
if item.Index != "" {
if item.DocumentID != "" {
b.buf.WriteRune(',')
}
b.buf.WriteString(`"_index":`)
b.aux = strconv.AppendQuote(b.aux, item.Index)
b.buf.Write(b.aux)
b.aux = b.aux[:0]
}
b.buf.WriteRune('}')
b.buf.WriteRune('}')
b.buf.WriteRune('\n')
}

func (b *bulkIndexer) Flush(ctx context.Context) (elasticsearch.BulkIndexerResponse, error) {
if b.itemsAdded == 0 {
return elasticsearch.BulkIndexerResponse{}, nil
}

req := esapi.BulkRequest{Body: &b.buf}
res, err := req.Do(ctx, b.client)
if err != nil {
return elasticsearch.BulkIndexerResponse{}, err
}
defer res.Body.Close()
if res.IsError() {
return elasticsearch.BulkIndexerResponse{}, fmt.Errorf("flush failed: %s", res.String())
}

// TODO(axw) getting empty response body, what's up with that?
var resp elasticsearch.BulkIndexerResponse
//if err := json.NewDecoder(res.Body).Decode(&resp); err != nil {
// return resp, err
//}
return resp, nil
}
176 changes: 109 additions & 67 deletions model/modelindexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/logp"
"golang.org/x/sync/errgroup"

"github.com/elastic/apm-server/elasticsearch"
"github.com/elastic/apm-server/model"
Expand All @@ -39,26 +40,27 @@ var ErrClosed = errors.New("model indexer closed")

// Indexer is a model.BatchProcessor which bulk indexes events as Elasticsearch documents.
type Indexer struct {
eventsAdded int64
logger *logp.Logger
eventsAdded int64
eventsActive int64
config Config
logger *logp.Logger
available chan *bulkIndexer
g errgroup.Group

mu sync.RWMutex
bulkIndexer elasticsearch.BulkIndexer
closeError error
mu sync.RWMutex
closed bool
activeMu sync.Mutex
active *bulkIndexer
timer *time.Timer
}

// Config holds configuration for Indexer.
type Config struct {
// NumWorkers holds the number of goroutines to use for bulk indexing documents.
// MaxRequests holds the maximum number of bulk index requests to execute concurrently.
// The maximum memory usage of Indexer is thus approximately MaxRequests*FlushBytes.
//
// If NumWorkers is zero, the default of runtime.NumCPU() will be used.
//
// TODO(axw) instead of pooled bulk indexer workers, consider extending
// go-elasticsearch with a sequential bulk indexer, and then use a bounded
// semaphore approach for creating a new goroutine & bulk indexer when
// flushing another one. By sending events to a single indexer at a time
// we can avoid having many sparse bulk requests.
NumWorkers int
// If MaxRequests is zero, the default of 10 will be used.
MaxRequests int

// FlushBytes holds the flush threshold in bytes.
//
Expand All @@ -74,48 +76,43 @@ type Config struct {
// New returns a new Indexer that indexes events directly into data streams.
func New(client elasticsearch.Client, cfg Config) (*Indexer, error) {
logger := logp.NewLogger("modelindexer")
bulkIndexer, err := client.NewBulkIndexer(elasticsearch.BulkIndexerConfig{
NumWorkers: cfg.NumWorkers,
FlushBytes: cfg.FlushBytes,
FlushInterval: cfg.FlushInterval,
OnError: func(ctx context.Context, err error) {
logger.Errorf("bulk indexing error: %s", err)
},
})
if err != nil {
return nil, err
if cfg.MaxRequests == 0 {
cfg.MaxRequests = 10
}
if cfg.FlushBytes <= 0 {
cfg.FlushBytes = 5 * 1024 * 1024
}
if cfg.FlushInterval <= 0 {
cfg.FlushInterval = 30 * time.Second
}
return &Indexer{bulkIndexer: bulkIndexer, logger: logger}, nil
available := make(chan *bulkIndexer, cfg.MaxRequests)
for i := 0; i < cfg.MaxRequests; i++ {
available <- newBulkIndexer(client)
}
return &Indexer{config: cfg, logger: logger, available: available}, nil
}

// Close closes the indexer, first flushing any queued events.
//
// If Close has previously been called, Close will return the result
// of the previous call.
func (i *Indexer) Close(ctx context.Context) error {
func (i *Indexer) Close() error {
i.mu.Lock()
defer i.mu.Unlock()
indexer := i.bulkIndexer
if indexer != nil {
i.bulkIndexer = nil
i.closeError = indexer.Close(ctx)
if !i.closed {
i.closed = true
if i.active != nil {
if err := i.flush(i.active); err != nil {
return err
}
}
}
return i.closeError
return i.g.Wait()
}

// Stats returns the bulk indexing stats.
//
// If the indexer has been closed, Stats returns ErrClosed.
func (i *Indexer) Stats() (Stats, error) {
i.mu.RLock()
defer i.mu.RUnlock()
if i.bulkIndexer == nil {
return Stats{}, ErrClosed
func (i *Indexer) Stats() Stats {
return Stats{
Added: atomic.LoadInt64(&i.eventsAdded),
Active: atomic.LoadInt64(&i.eventsActive),
}
bulkIndexerStats := i.bulkIndexer.Stats()
stats := Stats{Added: atomic.LoadInt64(&i.eventsAdded)}
stats.Active = stats.Added - int64(bulkIndexerStats.NumFailed+bulkIndexerStats.NumFlushed)
return stats, nil
}

// ProcessBatch creates a document for each event in batch, and adds them to the
Expand All @@ -125,7 +122,7 @@ func (i *Indexer) Stats() (Stats, error) {
func (i *Indexer) ProcessBatch(ctx context.Context, batch *model.Batch) error {
i.mu.RLock()
defer i.mu.RUnlock()
if i.bulkIndexer == nil {
if i.closed {
return ErrClosed
}
for _, event := range *batch {
Expand All @@ -150,33 +147,78 @@ func (i *Indexer) processEvent(ctx context.Context, event *model.APMEvent) error
r.indexBuilder.WriteString(event.DataStream.Namespace)
index := r.indexBuilder.String()

item := elasticsearch.BulkIndexerItem{
i.activeMu.Lock()
defer i.activeMu.Unlock()
if i.active == nil {
select {
case <-ctx.Done():
return ctx.Err()
case i.active = <-i.available:
}
if i.timer == nil {
i.timer = time.AfterFunc(
i.config.FlushInterval,
i.flushActive,
)
} else {
i.timer.Reset(i.config.FlushInterval)
}
}

if err := i.active.Add(elasticsearch.BulkIndexerItem{
Index: index,
Action: "create",
Body: r,
// TODO(axw) consider adding an option to go-elasticsearch
// to not copy the body when OnFailure is set. We don't care
// about the item body, we just care about the index name.
OnFailure: func(
ctx context.Context,
item elasticsearch.BulkIndexerItem,
resp elasticsearch.BulkIndexerResponseItem,
err error,
) {
logger := i.logger
if err != nil {
logger = logger.With(logp.Error(err))
}
logger.Errorf(
"failed to index event into %s (%s): %s",
index, resp.Error.Type, resp.Error.Reason,
)
},
}); err != nil {
return err
}
atomic.AddInt64(&i.eventsAdded, 1)
if err := i.bulkIndexer.Add(ctx, item); err != nil {
atomic.AddInt64(&i.eventsAdded, -1)
atomic.AddInt64(&i.eventsActive, 1)

if i.active.Bytes() >= i.config.FlushBytes {
if i.timer.Stop() {
i.flushActiveLocked()
}
}
return nil
}

func (i *Indexer) flushActive() {
i.activeMu.Lock()
defer i.activeMu.Unlock()
i.flushActiveLocked()
}

func (i *Indexer) flushActiveLocked() {
bulkIndexer := i.active
i.active = nil
i.g.Go(func() error {
err := i.flush(bulkIndexer)
bulkIndexer.Reset()
i.available <- bulkIndexer
return err
})
}

func (i *Indexer) flush(bulkIndexer *bulkIndexer) error {
n := bulkIndexer.Items()
if n == 0 {
return nil
}
defer atomic.AddInt64(&i.eventsActive, -int64(n))
resp, err := bulkIndexer.Flush(context.Background())
if err != nil {
return err
}
for _, item := range resp.Items {
for _, info := range item {
if info.Error.Type != "" || info.Status > 201 {
i.logger.Errorf(
"failed to index event (%s): %s",
info.Error.Type, info.Error.Reason,
)
}
}
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions model/modelindexer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func BenchmarkModelIndexer(b *testing.B) {
client, err := elasticsearch.NewClient(config)
require.NoError(b, err)
indexer, err := modelindexer.New(client, modelindexer.Config{FlushInterval: time.Second})
defer indexer.Close(context.Background())
defer indexer.Close()

batch := model.Batch{
model.APMEvent{
Expand All @@ -63,7 +63,7 @@ func BenchmarkModelIndexer(b *testing.B) {
}

// Closing the indexer flushes enqueued events.
if err := indexer.Close(context.Background()); err != nil {
if err := indexer.Close(); err != nil {
b.Fatal(err)
}
assert.Equal(b, int64(b.N), indexed)
Expand Down

0 comments on commit 266196b

Please sign in to comment.