Skip to content

Commit

Permalink
modelindexer: introduce go-elasticsearch indexer
Browse files Browse the repository at this point in the history
Introduce an experimental option to index events
using go-elasticsearch, bypassing libbeat.
  • Loading branch information
axw committed Oct 7, 2021
1 parent 57695b7 commit 6f7fb2b
Show file tree
Hide file tree
Showing 9 changed files with 902 additions and 8 deletions.
74 changes: 71 additions & 3 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/go-ucfg"

"github.com/dustin/go-humanize"
"github.com/pkg/errors"
"go.elastic.co/apm"
"go.elastic.co/apm/module/apmhttp"
Expand All @@ -55,6 +56,7 @@ import (
"github.com/elastic/apm-server/kibana"
logs "github.com/elastic/apm-server/log"
"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/model/modelindexer"
"github.com/elastic/apm-server/model/modelprocessor"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/sampling"
Expand Down Expand Up @@ -535,6 +537,10 @@ func (s *serverRunner) run(listener net.Listener) error {
runServer = s.wrapRunServerWithPreprocessors(runServer)

batchProcessor := make(modelprocessor.Chained, 0, 3)
finalBatchProcessor, err := s.newFinalBatchProcessor(publisher)
if err != nil {
return err
}
if !s.config.Sampling.KeepUnsampled {
// The server has been configured to discard unsampled
// transactions. Make sure this is done just before calling
Expand All @@ -545,7 +551,7 @@ func (s *serverRunner) run(listener net.Listener) error {
}
batchProcessor = append(batchProcessor,
modelprocessor.DroppedSpansStatsDiscarder{},
s.newFinalBatchProcessor(publisher),
finalBatchProcessor,
)

g.Go(func() error {
Expand Down Expand Up @@ -642,8 +648,70 @@ func (s *serverRunner) waitReady(ctx context.Context, kibanaClient kibana.Client
}

// newFinalBatchProcessor returns the final model.BatchProcessor that publishes events.
func (s *serverRunner) newFinalBatchProcessor(p *publish.Publisher) model.BatchProcessor {
return p
func (s *serverRunner) newFinalBatchProcessor(p *publish.Publisher) (model.BatchProcessor, error) {
esOutputConfig := elasticsearchOutputConfig(s.beat)
if esOutputConfig == nil || !s.config.DataStreams.Enabled {
return p, nil
}

// Add `output.elasticsearch.experimental` config. If this is true and
// data streams are enabled, we'll use the model indexer processor.
var esConfig struct {
*elasticsearch.Config
Experimental bool `config:"experimental"`
FlushBytes string `config:"flush_bytes"`
FlushInterval time.Duration `config:"flush_interval"`
}
esConfig.FlushInterval = time.Second

if esOutputConfig != nil {
esConfig.Config = elasticsearch.DefaultConfig()
if err := esOutputConfig.Unpack(&esConfig); err != nil {
return nil, err
}
if err := esOutputConfig.Unpack(&esConfig.Config); err != nil {
return nil, err
}
}
if !esConfig.Experimental {
return p, nil
}

s.logger.Info("using experimental model indexer")
var flushBytes int
if esConfig.FlushBytes != "" {
b, err := humanize.ParseBytes(esConfig.FlushBytes)
if err != nil {
return nil, fmt.Errorf("failed to parse ")
}
flushBytes = int(b)
}
client, err := elasticsearch.NewClient(esConfig.Config)
if err != nil {
return nil, err
}
indexer, err := modelindexer.New(client, modelindexer.Config{
FlushBytes: flushBytes,
FlushInterval: esConfig.FlushInterval,
})
if err != nil {
return nil, err
}

// Remove libbeat output counters, and install our own callback which uses the modelindexer stats.
monitoring.Default.Remove("libbeat.output.events")
monitoring.NewFunc(monitoring.Default, "libbeat.output.events", func(_ monitoring.Mode, v monitoring.Visitor) {
v.OnRegistryStart()
defer v.OnRegistryFinished()
stats := indexer.Stats()
v.OnKey("active")
v.OnInt(stats.Active)
v.OnKey("total")
v.OnInt(stats.Added)
v.OnKey("failed")
v.OnInt(stats.Failed)
})
return indexer, nil
}

func (s *serverRunner) wrapRunServerWithPreprocessors(runServer RunServerFunc) RunServerFunc {
Expand Down
52 changes: 52 additions & 0 deletions beater/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"path"
"reflect"
"runtime"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -772,6 +773,57 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
assert.Equal(t, true, out["publish_ready"])
}

func TestServerExperimentalElasticsearchOutput(t *testing.T) {
bulkCh := make(chan *http.Request, 1)
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Elastic-Product", "Elasticsearch")
// We must send a valid JSON response for the libbeat
// elasticsearch client to send bulk requests.
fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`)
})
mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) {
select {
case bulkCh <- r:
default:
}
})
srv := httptest.NewServer(mux)
defer srv.Close()

cfg := common.MustNewConfigFrom(map[string]interface{}{
"data_streams.enabled": true,
"data_streams.wait_for_integration": false,
})
var beatConfig beat.BeatConfig
err := beatConfig.Output.Unpack(common.MustNewConfigFrom(map[string]interface{}{
"elasticsearch": map[string]interface{}{
"hosts": []string{srv.URL},
"experimental": true,
"flush_bytes": "1kb", // testdata is >1kb
},
}))
require.NoError(t, err)

beater, err := setupServer(t, cfg, &beatConfig, nil)
require.NoError(t, err)

req := makeTransactionRequest(t, beater.baseURL)
req.Header.Add("Content-Type", "application/x-ndjson")
resp, err := beater.client.Do(req)
assert.NoError(t, err)
assert.Equal(t, http.StatusAccepted, resp.StatusCode)
resp.Body.Close()

select {
case r := <-bulkCh:
userAgent := r.UserAgent()
assert.True(t, strings.HasPrefix(userAgent, "go-elasticsearch"), userAgent)
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for bulk request")
}
}

type chanClient struct {
done chan struct{}
Channel chan beat.Event
Expand Down
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ https://github.com/elastic/apm-server/compare/7.15\...master[View commits]
- Span documents now duplicate extended HTTP fields, which were previously only under `span.http.*`, under `http.*` {pull}6147[6147]
- We now record the direct network peer for incoming requests as `source.ip` and `source.port`; origin IP is recorded in `client.ip` {pull}6152[6152]
- We now collect span destination metrics for transactions with too many spans (for example due to transaction_max_spans or exit_span_min_duration) when collected and sent by APM agents {pull}6200[6200]
- `output.elasticsearch.experimental` can be used to enable a new, experimental Elasticsearch output using the go-elasticsearch client {pull}5970[5970]

[float]
==== Deprecated
Expand Down
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func DefaultSettings() instance.Settings {
DefaultUsername: "apm_system",
},
IndexManagement: idxmgmt.MakeDefaultSupporter,
Processing: processing.MakeDefaultObserverSupport(false),
Processing: processing.MakeDefaultSupport(false),
ConfigOverrides: libbeatConfigOverrides(),
}
}
Expand Down
135 changes: 135 additions & 0 deletions model/modelindexer/bulk_indexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package modelindexer

import (
"bytes"
"context"
"encoding/json"
"fmt"
"strconv"

"github.com/elastic/go-elasticsearch/v7/esapi"

"github.com/elastic/apm-server/elasticsearch"
)

// NOTE(axw) please avoid introducing apm-server specific details to this code;
// it should eventually be removed, and either contributed to go-elasticsearch
// or replaced by a new go-elasticsearch bulk indexing implementation.
//
// At the time of writing, the go-elasticsearch BulkIndexer implementation
// sends all items to a channel, and multiple persistent worker goroutines will
// receive those items and independently fill up their own buffers. Each one
// will independently flush when their buffer is filled up, or when the flush
// interval elapses. If there are many workers, then this may lead to sparse
// bulk requests.
//
// We take a different approach, where we fill up one bulk request at a time.
// When the buffer is filled up, or the flush interval elapses, we start a new
// goroutine to send the request in the background, with a limit on the number
// of concurrent bulk requests. This way we can ensure bulk requests have the
// maximum possible size, based on configuration and throughput.

type bulkIndexer struct {
client elasticsearch.Client
itemsAdded int
buf bytes.Buffer
aux []byte
}

func newBulkIndexer(client elasticsearch.Client) *bulkIndexer {
return &bulkIndexer{client: client}
}

// BulkIndexer resets b, ready for a new request.
func (b *bulkIndexer) Reset() {
b.itemsAdded = 0
b.buf.Reset()
}

// Added returns the number of buffered items.
func (b *bulkIndexer) Items() int {
return b.itemsAdded
}

// Bytes returns the number of buffered bytes.
func (b *bulkIndexer) Bytes() int {
return b.buf.Len()
}

// Add encodes an item in the buffer.
func (b *bulkIndexer) Add(item elasticsearch.BulkIndexerItem) error {
b.writeMeta(item)
if _, err := b.buf.ReadFrom(item.Body); err != nil {
return err
}
b.buf.WriteRune('\n')
b.itemsAdded++
return nil
}

func (b *bulkIndexer) writeMeta(item elasticsearch.BulkIndexerItem) {
b.buf.WriteRune('{')
b.aux = strconv.AppendQuote(b.aux, item.Action)
b.buf.Write(b.aux)
b.aux = b.aux[:0]
b.buf.WriteRune(':')
b.buf.WriteRune('{')
if item.DocumentID != "" {
b.buf.WriteString(`"_id":`)
b.aux = strconv.AppendQuote(b.aux, item.DocumentID)
b.buf.Write(b.aux)
b.aux = b.aux[:0]
}
if item.Index != "" {
if item.DocumentID != "" {
b.buf.WriteRune(',')
}
b.buf.WriteString(`"_index":`)
b.aux = strconv.AppendQuote(b.aux, item.Index)
b.buf.Write(b.aux)
b.aux = b.aux[:0]
}
b.buf.WriteRune('}')
b.buf.WriteRune('}')
b.buf.WriteRune('\n')
}

// Flush executes a bulk request if there are any items buffered, and clears out the buffer.
func (b *bulkIndexer) Flush(ctx context.Context) (elasticsearch.BulkIndexerResponse, error) {
if b.itemsAdded == 0 {
return elasticsearch.BulkIndexerResponse{}, nil
}

req := esapi.BulkRequest{Body: &b.buf}
res, err := req.Do(ctx, b.client)
if err != nil {
return elasticsearch.BulkIndexerResponse{}, err
}
defer res.Body.Close()
if res.IsError() {
return elasticsearch.BulkIndexerResponse{}, fmt.Errorf("flush failed: %s", res.String())
}

var resp elasticsearch.BulkIndexerResponse
if err := json.NewDecoder(res.Body).Decode(&resp); err != nil {
return resp, err
}
return resp, nil
}
Loading

0 comments on commit 6f7fb2b

Please sign in to comment.