Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

modelindexer: Scale active indexers based on load #9393

Merged
merged 16 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ Set error.id for OpenTelemetry exception span events {pull}9372[9372]
- OpenTelemetry GRPC Spans from the Javascript API/SDK/Instrumentations are now correctly transformed into transactions with type=`request`
- Improve Elasticsearch output performance, particularly when compression is enabled (default) {pull}9318[9318]
- Java attacher support for macOS {pull}9413[9413]
- Improve Elasticsearch output performance in instances with more than 6 cores {pull}9393[9393]
81 changes: 78 additions & 3 deletions dev_docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,84 @@
# Architecture of the APM Server

This document gives a high level overview over the architecture of the main APM Server components.
The main purpose of the APM Server is to ingest data. It validates, enriches and transforms data, that it receives from a variety of APM agents, into a dedicated format and passes them on to an output, such as Elasticsearch.
The main purpose of the APM Server is to ingest data. It validates, enriches and transforms data,
received from a variety of APM agents, into a dedicated format and passes them on to an output,
such as Elasticsearch.

## Ingest Flow
High level overview over incoming data and their flow through the APM Server until being passed on to the output publisher pipeline.

![](./docs/images/ingest-flow.png)
High level overview over incoming data and their flow through the APM Server until being passed on
to the output publisher pipeline.

![ingest-flow](../docs/images/ingest-flow.png)

## Modelindexer Architecture

When APM Server uses a custom Elasticsearch output called `modelindexer`, it fills a local cache until it
is full, and then, flushes the cache in the background and continues processing events.

From `8.0` until `8.5`, the _modelindexer_ processed the events synchronously and used mutexes for
synchronized writes to the cache. This worked well, but didn't seem to scale well on bigger instances with
more CPUs.

```mermaid
flowchart LR;
subgraph Goroutine
Flush;
end
AgentA & AgentB-->Handler;
subgraph Intake
Handler<-->|semaphore|Decode
Decode-->Batch;
end
subgraph ModelIndexer
Available-.->Active;
Batch-->Active;
Active<-->|mutex|Cache;
end
Cache-->|FullOrTimer|Flush;

Flush-->|bulk|ES[(Elasticsearch)];
Flush-->|done|Available;
```

From `8.6.0` onwards, the _modelindexer_ accepts events asynchronously and runs one or more _active indexers_,
which pull events from a local queue and (by default) compress them and write them to the local cache. This approach
has reduced locking, and the number of active indexers is automatically scaled up and down based on how full the
outgoing flushes are, with a hard limit on the number of indexers depending on the hardware.

```mermaid
flowchart LR;
subgraph Goroutine11
Flush1(Flush);
end
subgraph Goroutine22
Flush2(Flush);
end
AgentA & AgentB-->Handler;
subgraph Intake
Handler<-->|semaphore|Decode
Decode-->Batch;
end
subgraph ModelIndexer
Batch-->Buffer;
Available;
subgraph Goroutine1
Active1(Active);
Active1(Active)<-->Cache1(Cache);
Cache1(Cache)-->|FullOrTimer|Flush1(Flush);
end
subgraph Goroutine2
Active2(Active);
Active2(Active)<-->Cache2(Cache);
Cache2(Cache)-->|FullOrTimer|Flush2(Flush);
end
subgraph Channel
Buffer-->Active1(Active) & Active2(Active);
end
Available-.->Active1(Active) & Active2(Active);
end

Flush1(Flush) & Flush2(Flush)-->|bulk|ES[(Elasticsearch)];
Flush1(Flush) & Flush2(Flush)-->|done|Available;
```
20 changes: 16 additions & 4 deletions internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,9 @@ func (s *Runner) newFinalBatchProcessor(
FlushBytes string `config:"flush_bytes"`
FlushInterval time.Duration `config:"flush_interval"`
MaxRequests int `config:"max_requests"`
Scaling struct {
Enabled *bool `config:"enabled"`
} `config:"autoscaling"`
}
esConfig.FlushInterval = time.Second
esConfig.Config = elasticsearch.DefaultConfig()
Expand All @@ -566,12 +569,17 @@ func (s *Runner) newFinalBatchProcessor(
if err != nil {
return nil, nil, err
}
var scalingCfg modelindexer.ScalingConfig
if enabled := esConfig.Scaling.Enabled; enabled != nil {
scalingCfg.Disabled = !*enabled
}
indexer, err := modelindexer.New(client, modelindexer.Config{
CompressionLevel: esConfig.CompressionLevel,
FlushBytes: flushBytes,
FlushInterval: esConfig.FlushInterval,
Tracer: tracer,
MaxRequests: esConfig.MaxRequests,
Scaling: scalingCfg,
})
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -612,15 +620,19 @@ func (s *Runner) newFinalBatchProcessor(
v.OnInt(indexer.Stats().Added)
})
monitoring.Default.Remove("output")
monitoring.NewFunc(monitoring.Default, "output.elasticsearch.bulk_requests", func(_ monitoring.Mode, v monitoring.Visitor) {
monitoring.NewFunc(monitoring.Default, "output.elasticsearch.indexers", func(_ monitoring.Mode, v monitoring.Visitor) {
marclop marked this conversation as resolved.
Show resolved Hide resolved
v.OnRegistryStart()
defer v.OnRegistryFinished()
stats := indexer.Stats()
v.OnKey("available")
v.OnInt(stats.AvailableBulkRequests)
v.OnKey("active")
v.OnInt(stats.ActiveBulkRequests)
v.OnKey("completed")
v.OnKey("available")
v.OnInt(stats.AvailableBulkRequests)
v.OnKey("created")
v.OnInt(stats.IndexersCreated)
v.OnKey("destroyed")
v.OnInt(stats.IndexersDestroyed)
v.OnKey("requests_completed")
v.OnInt(stats.BulkRequests)
})
return indexer, indexer.Close, nil
Expand Down
10 changes: 6 additions & 4 deletions internal/beater/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,10 +615,12 @@ func TestServerElasticsearchOutput(t *testing.T) {
snapshot = monitoring.CollectStructSnapshot(monitoring.Default.GetRegistry("output"), monitoring.Full, false)
assert.Equal(t, map[string]interface{}{
"elasticsearch": map[string]interface{}{
"bulk_requests": map[string]interface{}{
"active": int64(1),
"available": int64(24),
"completed": int64(0),
"indexers": map[string]interface{}{
"active": int64(1),
"available": int64(49),
"requests_completed": int64(0),
"destroyed": int64(0),
"created": int64(0),
},
},
}, snapshot)
Expand Down
Loading