Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

modelindexer: Scale active indexers based on load #9393

Merged
merged 16 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 80 additions & 3 deletions dev_docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,86 @@
# Architecture of the APM Server

This document gives a high level overview over the architecture of the main APM Server components.
The main purpose of the APM Server is to ingest data. It validates, enriches and transforms data, that it receives from a variety of APM agents, into a dedicated format and passes them on to an output, such as Elasticsearch.
The main purpose of the APM Server is to ingest data. It validates, enriches and transforms data,
received from a variety of APM agents, into a dedicated format and passes them on to an output,
such as Elasticsearch.

## Ingest Flow
High level overview over incoming data and their flow through the APM Server until being passed on to the output publisher pipeline.

![](./docs/images/ingest-flow.png)
High level overview over incoming data and their flow through the APM Server until being passed on
to the output publisher pipeline.

![ingest-flow](../docs/images/ingest-flow.png)

## Modelindexer Architecture

When APM Server runs under managed mode or using the Elasticsearch output in standalone mode, it uses
marclop marked this conversation as resolved.
Show resolved Hide resolved
marclop marked this conversation as resolved.
Show resolved Hide resolved
a custom Elasticsearch output called `modelindexer`. This custom output differs from the previous
libbeat output in multiple ways, but its main difference is that it fills a local cache until it is full,
marclop marked this conversation as resolved.
Show resolved Hide resolved
and then, flushes the cache in the background and continues processing events.

From `8.0` until `8.5`, the _modelindexer_ processed the events syncrhonously and used mutexes for
marclop marked this conversation as resolved.
Show resolved Hide resolved
syncrhonized writes to the cache. This worked well, but didn't seem to scale well on bigger instances with
marclop marked this conversation as resolved.
Show resolved Hide resolved
marclop marked this conversation as resolved.
Show resolved Hide resolved
more CPUs.

```mermaid
flowchart LR;
subgraph Goroutine
Flush;
end
AgentA & AgentB-->Handler;
subgraph Intake
Handler<-->|semaphore|Decode
Decode-->Batch;
end
subgraph ModelIndexer
Available-.->Active;
Batch-->Active;
Active<-->|mutex|Cache;
end
Cache-->|FullOrTimer|Flush;

Flush-->|bulk|ES[(Elasticsearch)];
Flush-->|done|Available;
```

From `8.6.0` onwards, the _modelindexer_ accepts events asynchronously and runs one or more _active indexers_,
which pull events from a local queue and (by default) compress them and write them to the local cache. This approach
has reduced locking, and the number of active indexers is automatically scaled up and down based on how full the
outgoing flushes are, with a hard limit on the number of indexers depending on the hardware.

```mermaid
flowchart LR;
subgraph Goroutine11
Flush1(Flush);
end
subgraph Goroutine22
Flush2(Flush);
end
AgentA & AgentB-->Handler;
subgraph Intake
Handler<-->|semaphore|Decode
Decode-->Batch;
end
subgraph ModelIndexer
Batch-->Buffer;
Available;
subgraph Goroutine1
Active1(Active);
Active1(Active)<-->Cache1(Cache);
Cache1(Cache)-->|FullOrTimer|Flush1(Flush);
end
subgraph Goroutine2
Active2(Active);
Active2(Active)<-->Cache2(Cache);
Cache2(Cache)-->|FullOrTimer|Flush2(Flush);
end
subgraph Channel
Buffer-->Active1(Active) & Active2(Active);
end
Available-.->Active1(Active) & Active2(Active);
end

Flush1(Flush) & Flush2(Flush)-->|bulk|ES[(Elasticsearch)];
Flush1(Flush) & Flush2(Flush)-->|done|Available;
```
10 changes: 10 additions & 0 deletions internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,9 @@ func (s *Runner) newFinalBatchProcessor(
FlushBytes string `config:"flush_bytes"`
FlushInterval time.Duration `config:"flush_interval"`
MaxRequests int `config:"max_requests"`
Scaling struct {
Disabled bool `config:"disabled"`
marclop marked this conversation as resolved.
Show resolved Hide resolved
marclop marked this conversation as resolved.
Show resolved Hide resolved
} `config:"scaling"`
marclop marked this conversation as resolved.
Show resolved Hide resolved
}
esConfig.FlushInterval = time.Second
esConfig.Config = elasticsearch.DefaultConfig()
Expand All @@ -568,6 +571,9 @@ func (s *Runner) newFinalBatchProcessor(
FlushInterval: esConfig.FlushInterval,
Tracer: tracer,
MaxRequests: esConfig.MaxRequests,
Scaling: modelindexer.ScalingConfig{
Disabled: esConfig.Scaling.Disabled,
},
})
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -618,6 +624,10 @@ func (s *Runner) newFinalBatchProcessor(
v.OnInt(stats.ActiveBulkRequests)
v.OnKey("completed")
v.OnInt(stats.BulkRequests)
v.OnKey("downscales")
v.OnInt(stats.DownScales)
v.OnKey("upscales")
v.OnInt(stats.UpScales)
})
return indexer, indexer.Close, nil
}
Expand Down
8 changes: 5 additions & 3 deletions internal/beater/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,9 +616,11 @@ func TestServerElasticsearchOutput(t *testing.T) {
assert.Equal(t, map[string]interface{}{
"elasticsearch": map[string]interface{}{
"bulk_requests": map[string]interface{}{
"active": int64(1),
"available": int64(24),
"completed": int64(0),
"active": int64(1),
"available": int64(49),
"completed": int64(0),
"downscales": int64(0),
"upscales": int64(0),
},
},
}, snapshot)
Expand Down
Loading