From 76ee9392de257f3294bd133b6bd765e9703c9e55 Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Mon, 15 Nov 2021 17:05:53 +0100 Subject: [PATCH] Query cluster UUID on apm-server startup (#6591) (cherry picked from commit ecf0df25d07064e92830aec6d0e7605e67d90fe7) # Conflicts: # changelogs/head.asciidoc --- beater/beater.go | 62 ++++++++++++++++++++++++++++++ beater/beater_test.go | 60 +++++++++++++++++++++++++++++ changelogs/head.asciidoc | 81 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 203 insertions(+) create mode 100644 changelogs/head.asciidoc diff --git a/beater/beater.go b/beater/beater.go index 133721de418..bf7c444ffe3 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -19,6 +19,7 @@ package beater import ( "context" + "encoding/json" "fmt" "net" "net/http" @@ -625,6 +626,12 @@ func (s *serverRunner) waitReady(ctx context.Context, kibanaClient kibana.Client ) }) } + + if s.config.DataStreams.Enabled { + preconditions = append(preconditions, func(ctx context.Context) error { + return queryClusterUUID(ctx, esOutputClient) + }) + } } // When running standalone with data streams enabled, by default we will add @@ -1034,3 +1041,58 @@ func (r *chanReloader) serve(ctx context.Context, reloader reload.Reloadable) er } } } + +// TODO: This is copying behavior from libbeat: +// https://github.com/elastic/beats/blob/b9ced47dba8bb55faa3b2b834fd6529d3c4d0919/libbeat/cmd/instance/beat.go#L927-L950 +// Remove this when cluster_uuid no longer needs to be queried from ES. +func queryClusterUUID(ctx context.Context, esClient elasticsearch.Client) error { + stateRegistry := monitoring.GetNamespace("state").GetRegistry() + outputES := "outputs.elasticsearch" + // Running under elastic-agent, the callback linked above is not + // registered until later, meaning we need to check and instantiate the + // registries if they don't exist. + elasticsearchRegistry := stateRegistry.GetRegistry(outputES) + if elasticsearchRegistry == nil { + elasticsearchRegistry = stateRegistry.NewRegistry(outputES) + } + + var ( + s *monitoring.String + ok bool + ) + + clusterUUID := "cluster_uuid" + clusterUUIDRegVar := elasticsearchRegistry.Get(clusterUUID) + if clusterUUIDRegVar != nil { + s, ok = clusterUUIDRegVar.(*monitoring.String) + if !ok { + return fmt.Errorf("couldn't cast to String") + } + } else { + s = monitoring.NewString(elasticsearchRegistry, clusterUUID) + } + + var response struct { + ClusterUUID string `json:"cluster_uuid"` + } + + req, err := http.NewRequest("GET", "/", nil) + if err != nil { + return err + } + resp, err := esClient.Perform(req.WithContext(ctx)) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode > 299 { + return fmt.Errorf("error querying cluster_uuid: status_code=%d", resp.StatusCode) + } + err = json.NewDecoder(resp.Body).Decode(&response) + if err != nil { + return err + } + + s.Set(response.ClusterUUID) + return nil +} diff --git a/beater/beater_test.go b/beater/beater_test.go index 25192940751..665f9d0d2f6 100644 --- a/beater/beater_test.go +++ b/beater/beater_test.go @@ -18,10 +18,13 @@ package beater import ( + "bytes" "compress/zlib" "context" + "encoding/json" "errors" "fmt" + "io" "io/ioutil" "net" "net/http" @@ -46,6 +49,7 @@ import ( "github.com/elastic/beats/v7/libbeat/idxmgmt" "github.com/elastic/beats/v7/libbeat/instrumentation" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/libbeat/outputs" ) @@ -346,3 +350,59 @@ func TestFleetStoreUsed(t *testing.T) { assert.True(t, called) } + +func TestQueryClusterUUIDRegistriesExist(t *testing.T) { + stateRegistry := monitoring.GetNamespace("state").GetRegistry() + elasticsearchRegistry := stateRegistry.NewRegistry("outputs.elasticsearch") + monitoring.NewString(elasticsearchRegistry, "cluster_uuid") + + ctx := context.Background() + clusterUUID := "abc123" + client := &mockClusterUUIDClient{ClusterUUID: clusterUUID} + err := queryClusterUUID(ctx, client) + require.NoError(t, err) + + fs := monitoring.CollectFlatSnapshot(elasticsearchRegistry, monitoring.Full, false) + assert.Equal(t, clusterUUID, fs.Strings["cluster_uuid"]) +} + +func TestQueryClusterUUIDRegistriesDoNotExist(t *testing.T) { + ctx := context.Background() + clusterUUID := "abc123" + client := &mockClusterUUIDClient{ClusterUUID: clusterUUID} + err := queryClusterUUID(ctx, client) + require.NoError(t, err) + + stateRegistry := monitoring.GetNamespace("state").GetRegistry() + elasticsearchRegistry := stateRegistry.GetRegistry("outputs.elasticsearch") + require.NotNil(t, elasticsearchRegistry) + + fs := monitoring.CollectFlatSnapshot(elasticsearchRegistry, monitoring.Full, false) + assert.Equal(t, clusterUUID, fs.Strings["cluster_uuid"]) +} + +type mockClusterUUIDClient struct { + ClusterUUID string `json:"cluster_uuid"` +} + +func (c *mockClusterUUIDClient) Perform(r *http.Request) (*http.Response, error) { + m, err := json.Marshal(c) + if err != nil { + return nil, err + } + resp := &http.Response{ + StatusCode: 200, + Body: &mockReadCloser{bytes.NewReader(m)}, + Request: r, + } + return resp, nil +} + +func (c *mockClusterUUIDClient) NewBulkIndexer(_ elasticsearch.BulkIndexerConfig) (elasticsearch.BulkIndexer, error) { + return nil, nil +} + +type mockReadCloser struct{ r io.Reader } + +func (r *mockReadCloser) Read(p []byte) (int, error) { return r.r.Read(p) } +func (r *mockReadCloser) Close() error { return nil } diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc new file mode 100644 index 00000000000..dd8dcb32db3 --- /dev/null +++ b/changelogs/head.asciidoc @@ -0,0 +1,81 @@ +[[release-notes-head]] +== APM version HEAD + +https://github.com/elastic/apm-server/compare/7.15\...master[View commits] + +[float] +==== Breaking Changes +- Removed unused stacktrace/frame monitoring counters {pull}5984[5984] +- Removed unused support for top-level metricsets and metricset tags for RUMv3 {pull}6065[6065] +- Removed `apm-server.mode` configuration, and "experimental" fields {pull}6086[6086] +- `transaction.sampled` is now only set for sampled transactions {pull}6066[6066] +- Unknown metrics are dropped when `transaction.*` or `span.*` are present in a metricset {pull}6111[6111] +- Removed `metricset.period` from service_destination metrics {pull}6111[6111] +- Removed `http.request.socket` fields {pull}6152[6152] +- Removed unused `transaction.duration.{count,sum.us}` metric fields {pull}6174[6174] +- experimental:["This breaking change applies to the experimental tail-based sampling feature."] Removed `apm-server.sampling.tail.storage_dir` config {pull}6236[6236] +- Removed `ProcessPending` self-instrumentation events {pull}6243[6243] +- experimental:["This breaking change applies to the experimental tail-based sampling feature."] Changed `apm-server.sampling.tail.events.*` metrics semantics {pull}6273[6273] +- Removed warm phase from default ILM policy {pull}6322[6322] +- Removed unused `transaction.breakdown.count` metric field {pull}6366[6366] +- Removed legacy Jaeger gRPC/HTTP endpoints {pull}6417[6417] +- Removed source map upload endpoint {pull}6447[6447] +- Removed unsupported libbeat `processors` configuration {pull}6474[6474] +- Removed `apm-server.aggregation.transactions.enabled` configuration option {pull}6495[6495] +- Removed `apm-server.aggregation.service_destinations.enabled` configuration option {pull}6503[6503] +- Removed `apm-server.sampling.keep_unsampled` configuration option {pull}6514[6514] +- Removed `apm-server.jaeger` configuration options {pull}6560[6560] +- Removed `apm-server.instrumentation` configuration options in favor of `instrumentation` {pull}6560[6560] +- Removed `apm-server.rum.{allowed_service,event_rate}` configuration option in favor of `apm-server.auth.anonymous.{allow_service,rate_limit}` {pull}6560[6560] +- Removed `apm-server.{api_key,secret_token}` configuration options in favor of `apm-server.auth.{api_key,secret_token}` {pull}6560[6560] +- Onboarding documents are no longer indexed {pull}6431[6431] +- Removed `apm-server.register.ingest.pipeline` and `output.elasticsearch.pipeline` configuration options {pull}6575[6575] +- Removed unused `span.start.us` field, and deprecated `span.http.*` fields {pull}6602[6602] +- Removed `logging.ecs` and `logging.json` config {pull}6613[6613] + +[float] +==== Bug fixes +- Use timestamp of original events for transaction/span metrics {pull}6311[6311] +- Populate the timestamp field for `agent_config` metricsets {pull}6382[6382] +- Query elasticsearch output `cluster_uuid` on startup {pull}6591[6591] + +[float] +==== Intake API Changes +- `faas`, `service.origin.*`, and `cloud.origin.*` added for supporting function as a service fields {pull}6161[6161] +- `context.message.routing_key` was added to the intake API {pull}6177[6177] +- `transaction.dropped_spans_stats` was added to the intake API {pull}6200[6200] +- map incoming OTel spans from agent bridges to apm spans/transactions {pull}6308[6308] +- `transaction.name` was added to the error objects in the intake API {pull}6539[6539] + +[float] +==== Added +- The `error.log.message` or `error.exception.message` field of errors will be copied to the ECS field `message` {pull}5974[5974] +- Define index sorting for internal metrics data stream {pull}6116[6116] +- Add histogram dynamic_template to app metrics data stream {pull}6043[6043] +- Index OpenTelemetry span events and Jaeger logs into a log data stream {pull}6122[6122] +- With `apm-server.data_streams.enabled` in standalone mode, the server now accepts and enqueues events while waiting for the integration to be installed {pull}6130[6130] +- HTTP server errors (e.g. TLS handshake errors) are now logged {pull}6141[6141] +- Span documents now duplicate extended HTTP fields, which were previously only under `span.http.*`, under `http.*` {pull}6147[6147] +- We now record the direct network peer for incoming requests as `source.ip` and `source.port`; origin IP is recorded in `client.ip` {pull}6152[6152] +- We now collect span destination metrics for transactions with too many spans (for example due to transaction_max_spans or exit_span_min_duration) when collected and sent by APM agents {pull}6200[6200] +- Add an experimental endpoint for AWS Kinesis Data Firehose {pull}6299[6299] +- `output.elasticsearch.experimental` can be used to enable a new, experimental Elasticsearch output using the go-elasticsearch client {pull}5970[5970] +- Transaction metrics now also group by `service.node.name`, `cloud.provider`, `cloud.region`, `cloud.availability_zone` {pull}6323[6323] +- Standalone apm-server can now fetch source maps uploaded to Kibana, when `apm-server.kibana` is configured {pull}6447[6447] +- Added support for gzip compression to the experimental Elasticsearch output {pull}6449[6449] +- Introduced a delete phase for all data streams. Traces, errors and logs are kept for 10 days, metrics are kept for 90 days {pull}6480[6480] +- Changed RUM traces to use a dedicated data stream (`traces-apm.rum`). RUM traces are kept for 90 days {pull}6480[6480] +- When `auth.anonymous.enabled` isn't specified and RUM is enabled (`rum.enabled:true`), `auth.anonymous.enabled` will be set to `true` {pull}6607[6607] + +[float] +==== Deprecated +- Setting `service.version` as a span tag (Jaeger) or attribute (OTel) is deprecated; use tracer tags (Jaeger) and resource attributes (OTel) {pull}6131[6131] +- Setting up Elasticsearch templates, ILM policies, and pipelines directly with apm-server is now deprecated. Users should use the integration package {pull}6145[6145] +- `span.http.*` fields are deprecated, replaced by `http.*`, and will be removed in 8.0 {pull}6147[6147] +- Add deprecation warning for `sampling.keep_unsampled=true` {pull}6285[6285] +- `processors.*` config, which was never officially supported in apm-server, is now explicitly deprecated and will be removed in 8.0 {pull}6367[6367] +- Support for uploading source maps to APM Server is deprecated, and will be removed in 8.0. Users should use the new Kibana REST API in conjunction with the integration package {pull}6432[6432] + +[float] +==== Licensing Changes +- Updated the `x-pack` source files license to the Elastic License 2.0 {pull}6524[6524]