Skip to content

Commit

Permalink
Query cluster UUID on apm-server startup (#6591)
Browse files Browse the repository at this point in the history
(cherry picked from commit ecf0df2)

# Conflicts:
#	changelogs/head.asciidoc
  • Loading branch information
stuartnelson3 authored and mergify-bot committed Nov 15, 2021
1 parent 0c2205d commit 76ee939
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 0 deletions.
62 changes: 62 additions & 0 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package beater

import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -625,6 +626,12 @@ func (s *serverRunner) waitReady(ctx context.Context, kibanaClient kibana.Client
)
})
}

if s.config.DataStreams.Enabled {
preconditions = append(preconditions, func(ctx context.Context) error {
return queryClusterUUID(ctx, esOutputClient)
})
}
}

// When running standalone with data streams enabled, by default we will add
Expand Down Expand Up @@ -1034,3 +1041,58 @@ func (r *chanReloader) serve(ctx context.Context, reloader reload.Reloadable) er
}
}
}

// TODO: This is copying behavior from libbeat:
// https://github.com/elastic/beats/blob/b9ced47dba8bb55faa3b2b834fd6529d3c4d0919/libbeat/cmd/instance/beat.go#L927-L950
// Remove this when cluster_uuid no longer needs to be queried from ES.
func queryClusterUUID(ctx context.Context, esClient elasticsearch.Client) error {
stateRegistry := monitoring.GetNamespace("state").GetRegistry()
outputES := "outputs.elasticsearch"
// Running under elastic-agent, the callback linked above is not
// registered until later, meaning we need to check and instantiate the
// registries if they don't exist.
elasticsearchRegistry := stateRegistry.GetRegistry(outputES)
if elasticsearchRegistry == nil {
elasticsearchRegistry = stateRegistry.NewRegistry(outputES)
}

var (
s *monitoring.String
ok bool
)

clusterUUID := "cluster_uuid"
clusterUUIDRegVar := elasticsearchRegistry.Get(clusterUUID)
if clusterUUIDRegVar != nil {
s, ok = clusterUUIDRegVar.(*monitoring.String)
if !ok {
return fmt.Errorf("couldn't cast to String")
}
} else {
s = monitoring.NewString(elasticsearchRegistry, clusterUUID)
}

var response struct {
ClusterUUID string `json:"cluster_uuid"`
}

req, err := http.NewRequest("GET", "/", nil)
if err != nil {
return err
}
resp, err := esClient.Perform(req.WithContext(ctx))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode > 299 {
return fmt.Errorf("error querying cluster_uuid: status_code=%d", resp.StatusCode)
}
err = json.NewDecoder(resp.Body).Decode(&response)
if err != nil {
return err
}

s.Set(response.ClusterUUID)
return nil
}
60 changes: 60 additions & 0 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
package beater

import (
"bytes"
"compress/zlib"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
Expand All @@ -46,6 +49,7 @@ import (
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/instrumentation"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/outputs"
)

Expand Down Expand Up @@ -346,3 +350,59 @@ func TestFleetStoreUsed(t *testing.T) {

assert.True(t, called)
}

func TestQueryClusterUUIDRegistriesExist(t *testing.T) {
stateRegistry := monitoring.GetNamespace("state").GetRegistry()
elasticsearchRegistry := stateRegistry.NewRegistry("outputs.elasticsearch")
monitoring.NewString(elasticsearchRegistry, "cluster_uuid")

ctx := context.Background()
clusterUUID := "abc123"
client := &mockClusterUUIDClient{ClusterUUID: clusterUUID}
err := queryClusterUUID(ctx, client)
require.NoError(t, err)

fs := monitoring.CollectFlatSnapshot(elasticsearchRegistry, monitoring.Full, false)
assert.Equal(t, clusterUUID, fs.Strings["cluster_uuid"])
}

func TestQueryClusterUUIDRegistriesDoNotExist(t *testing.T) {
ctx := context.Background()
clusterUUID := "abc123"
client := &mockClusterUUIDClient{ClusterUUID: clusterUUID}
err := queryClusterUUID(ctx, client)
require.NoError(t, err)

stateRegistry := monitoring.GetNamespace("state").GetRegistry()
elasticsearchRegistry := stateRegistry.GetRegistry("outputs.elasticsearch")
require.NotNil(t, elasticsearchRegistry)

fs := monitoring.CollectFlatSnapshot(elasticsearchRegistry, monitoring.Full, false)
assert.Equal(t, clusterUUID, fs.Strings["cluster_uuid"])
}

type mockClusterUUIDClient struct {
ClusterUUID string `json:"cluster_uuid"`
}

func (c *mockClusterUUIDClient) Perform(r *http.Request) (*http.Response, error) {
m, err := json.Marshal(c)
if err != nil {
return nil, err
}
resp := &http.Response{
StatusCode: 200,
Body: &mockReadCloser{bytes.NewReader(m)},
Request: r,
}
return resp, nil
}

func (c *mockClusterUUIDClient) NewBulkIndexer(_ elasticsearch.BulkIndexerConfig) (elasticsearch.BulkIndexer, error) {
return nil, nil
}

type mockReadCloser struct{ r io.Reader }

func (r *mockReadCloser) Read(p []byte) (int, error) { return r.r.Read(p) }
func (r *mockReadCloser) Close() error { return nil }
81 changes: 81 additions & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
[[release-notes-head]]
== APM version HEAD

https://github.com/elastic/apm-server/compare/7.15\...master[View commits]

[float]
==== Breaking Changes
- Removed unused stacktrace/frame monitoring counters {pull}5984[5984]
- Removed unused support for top-level metricsets and metricset tags for RUMv3 {pull}6065[6065]
- Removed `apm-server.mode` configuration, and "experimental" fields {pull}6086[6086]
- `transaction.sampled` is now only set for sampled transactions {pull}6066[6066]
- Unknown metrics are dropped when `transaction.*` or `span.*` are present in a metricset {pull}6111[6111]
- Removed `metricset.period` from service_destination metrics {pull}6111[6111]
- Removed `http.request.socket` fields {pull}6152[6152]
- Removed unused `transaction.duration.{count,sum.us}` metric fields {pull}6174[6174]
- experimental:["This breaking change applies to the experimental tail-based sampling feature."] Removed `apm-server.sampling.tail.storage_dir` config {pull}6236[6236]
- Removed `ProcessPending` self-instrumentation events {pull}6243[6243]
- experimental:["This breaking change applies to the experimental tail-based sampling feature."] Changed `apm-server.sampling.tail.events.*` metrics semantics {pull}6273[6273]
- Removed warm phase from default ILM policy {pull}6322[6322]
- Removed unused `transaction.breakdown.count` metric field {pull}6366[6366]
- Removed legacy Jaeger gRPC/HTTP endpoints {pull}6417[6417]
- Removed source map upload endpoint {pull}6447[6447]
- Removed unsupported libbeat `processors` configuration {pull}6474[6474]
- Removed `apm-server.aggregation.transactions.enabled` configuration option {pull}6495[6495]
- Removed `apm-server.aggregation.service_destinations.enabled` configuration option {pull}6503[6503]
- Removed `apm-server.sampling.keep_unsampled` configuration option {pull}6514[6514]
- Removed `apm-server.jaeger` configuration options {pull}6560[6560]
- Removed `apm-server.instrumentation` configuration options in favor of `instrumentation` {pull}6560[6560]
- Removed `apm-server.rum.{allowed_service,event_rate}` configuration option in favor of `apm-server.auth.anonymous.{allow_service,rate_limit}` {pull}6560[6560]
- Removed `apm-server.{api_key,secret_token}` configuration options in favor of `apm-server.auth.{api_key,secret_token}` {pull}6560[6560]
- Onboarding documents are no longer indexed {pull}6431[6431]
- Removed `apm-server.register.ingest.pipeline` and `output.elasticsearch.pipeline` configuration options {pull}6575[6575]
- Removed unused `span.start.us` field, and deprecated `span.http.*` fields {pull}6602[6602]
- Removed `logging.ecs` and `logging.json` config {pull}6613[6613]

[float]
==== Bug fixes
- Use timestamp of original events for transaction/span metrics {pull}6311[6311]
- Populate the timestamp field for `agent_config` metricsets {pull}6382[6382]
- Query elasticsearch output `cluster_uuid` on startup {pull}6591[6591]

[float]
==== Intake API Changes
- `faas`, `service.origin.*`, and `cloud.origin.*` added for supporting function as a service fields {pull}6161[6161]
- `context.message.routing_key` was added to the intake API {pull}6177[6177]
- `transaction.dropped_spans_stats` was added to the intake API {pull}6200[6200]
- map incoming OTel spans from agent bridges to apm spans/transactions {pull}6308[6308]
- `transaction.name` was added to the error objects in the intake API {pull}6539[6539]

[float]
==== Added
- The `error.log.message` or `error.exception.message` field of errors will be copied to the ECS field `message` {pull}5974[5974]
- Define index sorting for internal metrics data stream {pull}6116[6116]
- Add histogram dynamic_template to app metrics data stream {pull}6043[6043]
- Index OpenTelemetry span events and Jaeger logs into a log data stream {pull}6122[6122]
- With `apm-server.data_streams.enabled` in standalone mode, the server now accepts and enqueues events while waiting for the integration to be installed {pull}6130[6130]
- HTTP server errors (e.g. TLS handshake errors) are now logged {pull}6141[6141]
- Span documents now duplicate extended HTTP fields, which were previously only under `span.http.*`, under `http.*` {pull}6147[6147]
- We now record the direct network peer for incoming requests as `source.ip` and `source.port`; origin IP is recorded in `client.ip` {pull}6152[6152]
- We now collect span destination metrics for transactions with too many spans (for example due to transaction_max_spans or exit_span_min_duration) when collected and sent by APM agents {pull}6200[6200]
- Add an experimental endpoint for AWS Kinesis Data Firehose {pull}6299[6299]
- `output.elasticsearch.experimental` can be used to enable a new, experimental Elasticsearch output using the go-elasticsearch client {pull}5970[5970]
- Transaction metrics now also group by `service.node.name`, `cloud.provider`, `cloud.region`, `cloud.availability_zone` {pull}6323[6323]
- Standalone apm-server can now fetch source maps uploaded to Kibana, when `apm-server.kibana` is configured {pull}6447[6447]
- Added support for gzip compression to the experimental Elasticsearch output {pull}6449[6449]
- Introduced a delete phase for all data streams. Traces, errors and logs are kept for 10 days, metrics are kept for 90 days {pull}6480[6480]
- Changed RUM traces to use a dedicated data stream (`traces-apm.rum`). RUM traces are kept for 90 days {pull}6480[6480]
- When `auth.anonymous.enabled` isn't specified and RUM is enabled (`rum.enabled:true`), `auth.anonymous.enabled` will be set to `true` {pull}6607[6607]

[float]
==== Deprecated
- Setting `service.version` as a span tag (Jaeger) or attribute (OTel) is deprecated; use tracer tags (Jaeger) and resource attributes (OTel) {pull}6131[6131]
- Setting up Elasticsearch templates, ILM policies, and pipelines directly with apm-server is now deprecated. Users should use the integration package {pull}6145[6145]
- `span.http.*` fields are deprecated, replaced by `http.*`, and will be removed in 8.0 {pull}6147[6147]
- Add deprecation warning for `sampling.keep_unsampled=true` {pull}6285[6285]
- `processors.*` config, which was never officially supported in apm-server, is now explicitly deprecated and will be removed in 8.0 {pull}6367[6367]
- Support for uploading source maps to APM Server is deprecated, and will be removed in 8.0. Users should use the new Kibana REST API in conjunction with the integration package {pull}6432[6432]

[float]
==== Licensing Changes
- Updated the `x-pack` source files license to the Elastic License 2.0 {pull}6524[6524]

0 comments on commit 76ee939

Please sign in to comment.