diff --git a/beater/beater.go b/beater/beater.go index 396d42699a6..d3b20efdb18 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -33,6 +33,7 @@ import ( "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/go-ucfg" + "github.com/dustin/go-humanize" "github.com/pkg/errors" "go.elastic.co/apm" "go.elastic.co/apm/module/apmhttp" @@ -55,6 +56,7 @@ import ( "github.com/elastic/apm-server/kibana" logs "github.com/elastic/apm-server/log" "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/model/modelindexer" "github.com/elastic/apm-server/model/modelprocessor" "github.com/elastic/apm-server/publish" "github.com/elastic/apm-server/sampling" @@ -535,6 +537,10 @@ func (s *serverRunner) run(listener net.Listener) error { runServer = s.wrapRunServerWithPreprocessors(runServer) batchProcessor := make(modelprocessor.Chained, 0, 3) + finalBatchProcessor, err := s.newFinalBatchProcessor(publisher) + if err != nil { + return err + } if !s.config.Sampling.KeepUnsampled { // The server has been configured to discard unsampled // transactions. Make sure this is done just before calling @@ -545,7 +551,7 @@ func (s *serverRunner) run(listener net.Listener) error { } batchProcessor = append(batchProcessor, modelprocessor.DroppedSpansStatsDiscarder{}, - s.newFinalBatchProcessor(publisher), + finalBatchProcessor, ) g.Go(func() error { @@ -642,8 +648,76 @@ func (s *serverRunner) waitReady(ctx context.Context, kibanaClient kibana.Client } // newFinalBatchProcessor returns the final model.BatchProcessor that publishes events. -func (s *serverRunner) newFinalBatchProcessor(p *publish.Publisher) model.BatchProcessor { - return p +func (s *serverRunner) newFinalBatchProcessor(p *publish.Publisher) (model.BatchProcessor, error) { + esOutputConfig := elasticsearchOutputConfig(s.beat) + if esOutputConfig == nil || !s.config.DataStreams.Enabled { + return p, nil + } + + // Add `output.elasticsearch.experimental` config. If this is true and + // data streams are enabled, we'll use the model indexer processor. + var esConfig struct { + *elasticsearch.Config + Experimental bool `config:"experimental"` + FlushBytes string `config:"flush_bytes"` + FlushInterval time.Duration `config:"flush_interval"` + } + esConfig.FlushInterval = time.Second + + if esOutputConfig != nil { + esConfig.Config = elasticsearch.DefaultConfig() + if err := esOutputConfig.Unpack(&esConfig); err != nil { + return nil, err + } + if err := esOutputConfig.Unpack(&esConfig.Config); err != nil { + return nil, err + } + } + if !esConfig.Experimental { + return p, nil + } + + s.logger.Info("using experimental model indexer") + var flushBytes int + if esConfig.FlushBytes != "" { + b, err := humanize.ParseBytes(esConfig.FlushBytes) + if err != nil { + return nil, fmt.Errorf("failed to parse ") + } + if b < 0 { + return nil, fmt.Errorf( + "output.elasticsearch.flush_bytes must be positive, got %s (%d)", + esConfig.FlushBytes, b, + ) + } + flushBytes = int(b) + } + client, err := elasticsearch.NewClient(esConfig.Config) + if err != nil { + return nil, err + } + indexer, err := modelindexer.New(client, modelindexer.Config{ + FlushBytes: flushBytes, + FlushInterval: esConfig.FlushInterval, + }) + if err != nil { + return nil, err + } + + // Remove libbeat output counters, and install our own callback which uses the modelindexer stats. + monitoring.Default.Remove("libbeat.output.events") + monitoring.NewFunc(monitoring.Default, "libbeat.output.events", func(_ monitoring.Mode, v monitoring.Visitor) { + v.OnRegistryStart() + defer v.OnRegistryFinished() + stats := indexer.Stats() + v.OnKey("active") + v.OnInt(stats.Active) + v.OnKey("total") + v.OnInt(stats.Added) + v.OnKey("failed") + v.OnInt(stats.Failed) + }) + return indexer, nil } func (s *serverRunner) wrapRunServerWithPreprocessors(runServer RunServerFunc) RunServerFunc { diff --git a/beater/server_test.go b/beater/server_test.go index e82fb204170..c40052cad8c 100644 --- a/beater/server_test.go +++ b/beater/server_test.go @@ -32,6 +32,7 @@ import ( "path" "reflect" "runtime" + "strings" "sync" "testing" "time" @@ -772,6 +773,57 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) { assert.Equal(t, true, out["publish_ready"]) } +func TestServerExperimentalElasticsearchOutput(t *testing.T) { + bulkCh := make(chan *http.Request, 1) + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-Elastic-Product", "Elasticsearch") + // We must send a valid JSON response for the libbeat + // elasticsearch client to send bulk requests. + fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`) + }) + mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) { + select { + case bulkCh <- r: + default: + } + }) + srv := httptest.NewServer(mux) + defer srv.Close() + + cfg := common.MustNewConfigFrom(map[string]interface{}{ + "data_streams.enabled": true, + "data_streams.wait_for_integration": false, + }) + var beatConfig beat.BeatConfig + err := beatConfig.Output.Unpack(common.MustNewConfigFrom(map[string]interface{}{ + "elasticsearch": map[string]interface{}{ + "hosts": []string{srv.URL}, + "experimental": true, + "flush_bytes": "1kb", // testdata is >1kb + }, + })) + require.NoError(t, err) + + beater, err := setupServer(t, cfg, &beatConfig, nil) + require.NoError(t, err) + + req := makeTransactionRequest(t, beater.baseURL) + req.Header.Add("Content-Type", "application/x-ndjson") + resp, err := beater.client.Do(req) + assert.NoError(t, err) + assert.Equal(t, http.StatusAccepted, resp.StatusCode) + resp.Body.Close() + + select { + case r := <-bulkCh: + userAgent := r.UserAgent() + assert.True(t, strings.HasPrefix(userAgent, "go-elasticsearch"), userAgent) + case <-time.After(10 * time.Second): + t.Fatal("timed out waiting for bulk request") + } +} + type chanClient struct { done chan struct{} Channel chan beat.Event diff --git a/cmd/root.go b/cmd/root.go index 789a8d2bf25..5bc751406a3 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -83,7 +83,7 @@ func DefaultSettings() instance.Settings { DefaultUsername: "apm_system", }, IndexManagement: idxmgmt.MakeDefaultSupporter, - Processing: processing.MakeDefaultObserverSupport(false), + Processing: processing.MakeDefaultSupport(false), ConfigOverrides: libbeatConfigOverrides(), } } diff --git a/model/modelindexer/bulk_indexer.go b/model/modelindexer/bulk_indexer.go new file mode 100644 index 00000000000..7390c8e1372 --- /dev/null +++ b/model/modelindexer/bulk_indexer.go @@ -0,0 +1,135 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package modelindexer + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "strconv" + + "github.com/elastic/go-elasticsearch/v7/esapi" + + "github.com/elastic/apm-server/elasticsearch" +) + +// NOTE(axw) please avoid introducing apm-server specific details to this code; +// it should eventually be removed, and either contributed to go-elasticsearch +// or replaced by a new go-elasticsearch bulk indexing implementation. +// +// At the time of writing, the go-elasticsearch BulkIndexer implementation +// sends all items to a channel, and multiple persistent worker goroutines will +// receive those items and independently fill up their own buffers. Each one +// will independently flush when their buffer is filled up, or when the flush +// interval elapses. If there are many workers, then this may lead to sparse +// bulk requests. +// +// We take a different approach, where we fill up one bulk request at a time. +// When the buffer is filled up, or the flush interval elapses, we start a new +// goroutine to send the request in the background, with a limit on the number +// of concurrent bulk requests. This way we can ensure bulk requests have the +// maximum possible size, based on configuration and throughput. + +type bulkIndexer struct { + client elasticsearch.Client + itemsAdded int + buf bytes.Buffer + aux []byte +} + +func newBulkIndexer(client elasticsearch.Client) *bulkIndexer { + return &bulkIndexer{client: client} +} + +// BulkIndexer resets b, ready for a new request. +func (b *bulkIndexer) Reset() { + b.itemsAdded = 0 + b.buf.Reset() +} + +// Added returns the number of buffered items. +func (b *bulkIndexer) Items() int { + return b.itemsAdded +} + +// Bytes returns the number of buffered bytes. +func (b *bulkIndexer) Bytes() int { + return b.buf.Len() +} + +// Add encodes an item in the buffer. +func (b *bulkIndexer) Add(item elasticsearch.BulkIndexerItem) error { + b.writeMeta(item) + if _, err := b.buf.ReadFrom(item.Body); err != nil { + return err + } + b.buf.WriteRune('\n') + b.itemsAdded++ + return nil +} + +func (b *bulkIndexer) writeMeta(item elasticsearch.BulkIndexerItem) { + b.buf.WriteRune('{') + b.aux = strconv.AppendQuote(b.aux, item.Action) + b.buf.Write(b.aux) + b.aux = b.aux[:0] + b.buf.WriteRune(':') + b.buf.WriteRune('{') + if item.DocumentID != "" { + b.buf.WriteString(`"_id":`) + b.aux = strconv.AppendQuote(b.aux, item.DocumentID) + b.buf.Write(b.aux) + b.aux = b.aux[:0] + } + if item.Index != "" { + if item.DocumentID != "" { + b.buf.WriteRune(',') + } + b.buf.WriteString(`"_index":`) + b.aux = strconv.AppendQuote(b.aux, item.Index) + b.buf.Write(b.aux) + b.aux = b.aux[:0] + } + b.buf.WriteRune('}') + b.buf.WriteRune('}') + b.buf.WriteRune('\n') +} + +// Flush executes a bulk request if there are any items buffered, and clears out the buffer. +func (b *bulkIndexer) Flush(ctx context.Context) (elasticsearch.BulkIndexerResponse, error) { + if b.itemsAdded == 0 { + return elasticsearch.BulkIndexerResponse{}, nil + } + + req := esapi.BulkRequest{Body: &b.buf} + res, err := req.Do(ctx, b.client) + if err != nil { + return elasticsearch.BulkIndexerResponse{}, err + } + defer res.Body.Close() + if res.IsError() { + return elasticsearch.BulkIndexerResponse{}, fmt.Errorf("flush failed: %s", res.String()) + } + + var resp elasticsearch.BulkIndexerResponse + if err := json.NewDecoder(res.Body).Decode(&resp); err != nil { + return resp, err + } + return resp, nil +} diff --git a/model/modelindexer/indexer.go b/model/modelindexer/indexer.go new file mode 100644 index 00000000000..84e6c767fb1 --- /dev/null +++ b/model/modelindexer/indexer.go @@ -0,0 +1,280 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package modelindexer + +import ( + "bytes" + "context" + "errors" + "io" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" + "github.com/elastic/beats/v7/libbeat/logp" + "golang.org/x/sync/errgroup" + + "github.com/elastic/apm-server/elasticsearch" + "github.com/elastic/apm-server/model" +) + +// ErrClosed is returned from methods of closed Indexers. +var ErrClosed = errors.New("model indexer closed") + +// Indexer is a model.BatchProcessor which bulk indexes events as Elasticsearch documents. +type Indexer struct { + eventsAdded int64 + eventsActive int64 + eventsFailed int64 + config Config + logger *logp.Logger + available chan *bulkIndexer + g errgroup.Group + + mu sync.RWMutex + closed bool + activeMu sync.Mutex + active *bulkIndexer + timer *time.Timer +} + +// Config holds configuration for Indexer. +type Config struct { + // MaxRequests holds the maximum number of bulk index requests to execute concurrently. + // The maximum memory usage of Indexer is thus approximately MaxRequests*FlushBytes. + // + // If MaxRequests is zero, the default of 10 will be used. + MaxRequests int + + // FlushBytes holds the flush threshold in bytes. + // + // If FlushBytes is zero, the default of 5MB will be used. + FlushBytes int + + // FlushInterval holds the flush threshold as a duration. + // + // If FlushInterval is zero, the default of 30 seconds will be used. + FlushInterval time.Duration +} + +// New returns a new Indexer that indexes events directly into data streams. +func New(client elasticsearch.Client, cfg Config) (*Indexer, error) { + logger := logp.NewLogger("modelindexer") + if cfg.MaxRequests == 0 { + cfg.MaxRequests = 10 + } + if cfg.FlushBytes <= 0 { + cfg.FlushBytes = 5 * 1024 * 1024 + } + if cfg.FlushInterval <= 0 { + cfg.FlushInterval = 30 * time.Second + } + available := make(chan *bulkIndexer, cfg.MaxRequests) + for i := 0; i < cfg.MaxRequests; i++ { + available <- newBulkIndexer(client) + } + return &Indexer{config: cfg, logger: logger, available: available}, nil +} + +// Close closes the indexer, first flushing any queued events. +// +// Close returns an error if any flush attempts during the indexer's +// lifetime returned an error. +func (i *Indexer) Close() error { + i.mu.Lock() + defer i.mu.Unlock() + if !i.closed { + i.closed = true + i.activeMu.Lock() + defer i.activeMu.Unlock() + if i.active != nil { + i.flushActiveLocked() + } + } + return i.g.Wait() +} + +// Stats returns the bulk indexing stats. +func (i *Indexer) Stats() Stats { + return Stats{ + Added: atomic.LoadInt64(&i.eventsAdded), + Active: atomic.LoadInt64(&i.eventsActive), + Failed: atomic.LoadInt64(&i.eventsFailed), + } +} + +// ProcessBatch creates a document for each event in batch, and adds them to the +// Elasticsearch bulk indexer. +// +// If the indexer has been closed, ProcessBatch returns ErrClosed. +func (i *Indexer) ProcessBatch(ctx context.Context, batch *model.Batch) error { + i.mu.RLock() + defer i.mu.RUnlock() + if i.closed { + return ErrClosed + } + for _, event := range *batch { + if err := i.processEvent(ctx, &event); err != nil { + return err + } + } + return nil +} + +func (i *Indexer) processEvent(ctx context.Context, event *model.APMEvent) error { + r := getPooledReader() + beatEvent := event.BeatEvent(ctx) + if err := r.encoder.AddRaw(&beatEvent); err != nil { + return err + } + + r.indexBuilder.WriteString(event.DataStream.Type) + r.indexBuilder.WriteByte('-') + r.indexBuilder.WriteString(event.DataStream.Dataset) + r.indexBuilder.WriteByte('-') + r.indexBuilder.WriteString(event.DataStream.Namespace) + index := r.indexBuilder.String() + + i.activeMu.Lock() + defer i.activeMu.Unlock() + if i.active == nil { + select { + case <-ctx.Done(): + return ctx.Err() + case i.active = <-i.available: + } + if i.timer == nil { + i.timer = time.AfterFunc( + i.config.FlushInterval, + i.flushActive, + ) + } else { + i.timer.Reset(i.config.FlushInterval) + } + } + + if err := i.active.Add(elasticsearch.BulkIndexerItem{ + Index: index, + Action: "create", + Body: r, + }); err != nil { + return err + } + atomic.AddInt64(&i.eventsAdded, 1) + atomic.AddInt64(&i.eventsActive, 1) + + if i.active.Bytes() >= i.config.FlushBytes { + if i.timer.Stop() { + i.flushActiveLocked() + } + } + return nil +} + +func (i *Indexer) flushActive() { + i.activeMu.Lock() + defer i.activeMu.Unlock() + i.flushActiveLocked() +} + +func (i *Indexer) flushActiveLocked() { + bulkIndexer := i.active + i.active = nil + i.g.Go(func() error { + err := i.flush(bulkIndexer) + bulkIndexer.Reset() + i.available <- bulkIndexer + return err + }) +} + +func (i *Indexer) flush(bulkIndexer *bulkIndexer) error { + n := bulkIndexer.Items() + if n == 0 { + return nil + } + defer atomic.AddInt64(&i.eventsActive, -int64(n)) + resp, err := bulkIndexer.Flush(context.Background()) + if err != nil { + atomic.AddInt64(&i.eventsFailed, int64(n)) + return err + } + var eventsFailed int64 + for _, item := range resp.Items { + for _, info := range item { + if info.Error.Type != "" || info.Status > 201 { + eventsFailed++ + i.logger.Errorf( + "failed to index event (%s): %s", + info.Error.Type, info.Error.Reason, + ) + } + } + } + if eventsFailed > 0 { + atomic.AddInt64(&i.eventsFailed, eventsFailed) + } + return nil +} + +var pool sync.Pool + +type pooledReader struct { + buf bytes.Buffer + indexBuilder strings.Builder + encoder encoder +} + +func getPooledReader() *pooledReader { + if r, ok := pool.Get().(*pooledReader); ok { + return r + } + r := &pooledReader{} + r.encoder = eslegclient.NewJSONEncoder(&r.buf, false) + return r +} + +func (r *pooledReader) Read(p []byte) (int, error) { + n, err := r.buf.Read(p) + if err == io.EOF { + // Release the reader back into the pool after it has been consumed. + r.indexBuilder.Reset() + r.encoder.Reset() + pool.Put(r) + } + return n, err +} + +type encoder interface { + AddRaw(interface{}) error + Reset() +} + +// Stats holds bulk indexing statistics. +type Stats struct { + // Active holds the active number of items waiting in the indexer's queue. + Active int64 + + // Added holds the number of items added to the indexer. + Added int64 + + // Failed holds the number of indexing operations that failed. + Failed int64 +} diff --git a/model/modelindexer/indexer_integration_test.go b/model/modelindexer/indexer_integration_test.go new file mode 100644 index 00000000000..83fd71091c8 --- /dev/null +++ b/model/modelindexer/indexer_integration_test.go @@ -0,0 +1,90 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package modelindexer_test + +import ( + "context" + "encoding/json" + "fmt" + "os" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/go-elasticsearch/v7/esapi" + + "github.com/elastic/apm-server/elasticsearch" + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/model/modelindexer" +) + +func TestModelIndexerIntegration(t *testing.T) { + switch strings.ToLower(os.Getenv("INTEGRATION_TESTS")) { + case "1", "true": + default: + t.Skip("Skipping integration test, export INTEGRATION_TESTS=1 to run") + } + + config := elasticsearch.DefaultConfig() + config.Username = "admin" + config.Password = "changeme" + client, err := elasticsearch.NewClient(config) + require.NoError(t, err) + indexer, err := modelindexer.New(client, modelindexer.Config{FlushInterval: time.Second}) + defer indexer.Close() + + dataStream := model.DataStream{Type: "logs", Dataset: "apm_server", Namespace: "testing"} + index := fmt.Sprintf("%s-%s-%s", dataStream.Type, dataStream.Dataset, dataStream.Namespace) + + deleteIndex := func() { + resp, err := esapi.IndicesDeleteDataStreamRequest{Name: []string{index}}.Do(context.Background(), client) + require.NoError(t, err) + defer resp.Body.Close() + } + deleteIndex() + defer deleteIndex() + + const N = 100 + for i := 0; i < N; i++ { + batch := model.Batch{model.APMEvent{Timestamp: time.Now(), DataStream: dataStream}} + err := indexer.ProcessBatch(context.Background(), &batch) + require.NoError(t, err) + } + + // Closing the indexer flushes enqueued events. + err = indexer.Close() + require.NoError(t, err) + + // Check that docs are indexed. + resp, err := esapi.IndicesRefreshRequest{Index: []string{index}}.Do(context.Background(), client) + require.NoError(t, err) + resp.Body.Close() + + var result struct { + Count int + } + resp, err = esapi.CountRequest{Index: []string{index}}.Do(context.Background(), client) + require.NoError(t, err) + defer resp.Body.Close() + err = json.NewDecoder(resp.Body).Decode(&result) + require.NoError(t, err) + assert.Equal(t, N, result.Count) +} diff --git a/model/modelindexer/indexer_test.go b/model/modelindexer/indexer_test.go new file mode 100644 index 00000000000..55696be2266 --- /dev/null +++ b/model/modelindexer/indexer_test.go @@ -0,0 +1,260 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package modelindexer_test + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/go-elasticsearch/v7/esutil" + + "github.com/elastic/apm-server/elasticsearch" + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/model/modelindexer" +) + +func TestModelIndexer(t *testing.T) { + var indexed int64 + client := newMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + scanner := bufio.NewScanner(r.Body) + var result elasticsearch.BulkIndexerResponse + for scanner.Scan() { + action := make(map[string]interface{}) + if err := json.NewDecoder(strings.NewReader(scanner.Text())).Decode(&action); err != nil { + panic(err) + } + var actionType string + for actionType = range action { + } + if !scanner.Scan() { + panic("expected source") + } + + item := esutil.BulkIndexerResponseItem{Status: http.StatusCreated} + if len(result.Items) == 0 { + // Respond with an error for the first item. This will be recorded + // as a failure in indexing stats. + result.HasErrors = true + item.Status = http.StatusInternalServerError + } + result.Items = append(result.Items, map[string]esutil.BulkIndexerResponseItem{actionType: item}) + + if scanner.Scan() && scanner.Text() != "" { + // Both the libbeat event encoder and bulk indexer add an empty line. + panic("expected empty line") + } + } + atomic.AddInt64(&indexed, int64(len(result.Items))) + json.NewEncoder(w).Encode(result) + }) + indexer, err := modelindexer.New(client, modelindexer.Config{FlushInterval: time.Minute}) + require.NoError(t, err) + defer indexer.Close() + + const N = 100 + for i := 0; i < N; i++ { + batch := model.Batch{model.APMEvent{Timestamp: time.Now(), DataStream: model.DataStream{ + Type: "logs", + Dataset: "apm_server", + Namespace: "testing", + }}} + err := indexer.ProcessBatch(context.Background(), &batch) + require.NoError(t, err) + } + assert.Equal(t, modelindexer.Stats{Added: N, Active: N}, indexer.Stats()) + + // Closing the indexer flushes enqueued events. + err = indexer.Close() + require.NoError(t, err) + assert.Equal(t, modelindexer.Stats{ + Added: N, + Active: 0, + Failed: 1, + }, indexer.Stats()) +} + +func TestModelIndexerFlushInterval(t *testing.T) { + requests := make(chan struct{}, 1) + client := newMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + select { + case <-r.Context().Done(): + case requests <- struct{}{}: + } + }) + indexer, err := modelindexer.New(client, modelindexer.Config{ + // Default flush bytes is 5MB + FlushInterval: time.Millisecond, + }) + require.NoError(t, err) + defer indexer.Close() + + select { + case <-requests: + t.Fatal("unexpected request, no events buffered") + case <-time.After(50 * time.Millisecond): + } + + batch := model.Batch{model.APMEvent{Timestamp: time.Now(), DataStream: model.DataStream{ + Type: "logs", + Dataset: "apm_server", + Namespace: "testing", + }}} + err = indexer.ProcessBatch(context.Background(), &batch) + require.NoError(t, err) + + select { + case <-requests: + case <-time.After(10 * time.Second): + t.Fatal("timed out waiting for request, flush interval elapsed") + } +} + +func TestModelIndexerFlushBytes(t *testing.T) { + requests := make(chan struct{}, 1) + client := newMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + select { + case requests <- struct{}{}: + default: + } + }) + indexer, err := modelindexer.New(client, modelindexer.Config{ + FlushBytes: 1024, + // Default flush interval is 30 seconds + }) + require.NoError(t, err) + defer indexer.Close() + + batch := model.Batch{model.APMEvent{Timestamp: time.Now(), DataStream: model.DataStream{ + Type: "logs", + Dataset: "apm_server", + Namespace: "testing", + }}} + err = indexer.ProcessBatch(context.Background(), &batch) + require.NoError(t, err) + + select { + case <-requests: + t.Fatal("unexpected request, flush bytes not exceeded") + case <-time.After(50 * time.Millisecond): + } + + for i := 0; i < 100; i++ { + err = indexer.ProcessBatch(context.Background(), &batch) + require.NoError(t, err) + } + + select { + case <-requests: + case <-time.After(10 * time.Second): + t.Fatal("timed out waiting for request, flush bytes exceeded") + } +} + +func TestModelIndexerServerError(t *testing.T) { + client := newMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + }) + indexer, err := modelindexer.New(client, modelindexer.Config{FlushInterval: time.Minute}) + require.NoError(t, err) + defer indexer.Close() + + batch := model.Batch{model.APMEvent{Timestamp: time.Now(), DataStream: model.DataStream{ + Type: "logs", + Dataset: "apm_server", + Namespace: "testing", + }}} + err = indexer.ProcessBatch(context.Background(), &batch) + require.NoError(t, err) + + // Closing the indexer flushes enqueued events. + err = indexer.Close() + require.EqualError(t, err, "flush failed: [500 Internal Server Error] ") + assert.Equal(t, modelindexer.Stats{ + Added: 1, + Active: 0, + Failed: 1, + }, indexer.Stats()) +} + +func BenchmarkModelIndexer(b *testing.B) { + var indexed int64 + client := newMockElasticsearchClient(b, func(w http.ResponseWriter, r *http.Request) { + scanner := bufio.NewScanner(r.Body) + var n int64 + for scanner.Scan() { + if scanner.Scan() { + n++ + } + if scanner.Scan() && scanner.Text() != "" { + panic("expected empty line") + } + } + atomic.AddInt64(&indexed, n) + fmt.Fprintln(w, "{}") + }) + + indexer, err := modelindexer.New(client, modelindexer.Config{FlushInterval: time.Second}) + require.NoError(b, err) + defer indexer.Close() + + batch := model.Batch{ + model.APMEvent{ + Processor: model.TransactionProcessor, + Timestamp: time.Now(), + }, + } + for i := 0; i < b.N; i++ { + if err := indexer.ProcessBatch(context.Background(), &batch); err != nil { + b.Fatal(err) + } + } + + // Closing the indexer flushes enqueued events. + if err := indexer.Close(); err != nil { + b.Fatal(err) + } + assert.Equal(b, int64(b.N), indexed) +} + +func newMockElasticsearchClient(t testing.TB, bulkHandler http.HandlerFunc) elasticsearch.Client { + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-Elastic-Product", "Elasticsearch") + fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`) + }) + mux.Handle("/_bulk", bulkHandler) + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + + config := elasticsearch.DefaultConfig() + config.Hosts = elasticsearch.Hosts{srv.URL} + client, err := elasticsearch.NewClient(config) + require.NoError(t, err) + return client +}