Skip to content

Commit

Permalink
Query cluster UUID on apm-server startup (#6591) (#6620)
Browse files Browse the repository at this point in the history
(cherry picked from commit ecf0df2)

Co-authored-by: stuart nelson <[email protected]>
  • Loading branch information
mergify[bot] and stuartnelson3 authored Nov 16, 2021
1 parent cc68d45 commit 59a43b1
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 0 deletions.
62 changes: 62 additions & 0 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package beater

import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -592,6 +593,12 @@ func (s *serverRunner) waitReady(ctx context.Context, kibanaClient kibana.Client
)
})
}

if s.config.DataStreams.Enabled {
preconditions = append(preconditions, func(ctx context.Context) error {
return queryClusterUUID(ctx, esOutputClient)
})
}
}

// When running standalone with data streams enabled, by default we will add
Expand Down Expand Up @@ -880,3 +887,58 @@ func (r *chanReloader) serve(ctx context.Context, reloader reload.Reloadable) er
}
}
}

// TODO: This is copying behavior from libbeat:
// https://github.com/elastic/beats/blob/b9ced47dba8bb55faa3b2b834fd6529d3c4d0919/libbeat/cmd/instance/beat.go#L927-L950
// Remove this when cluster_uuid no longer needs to be queried from ES.
func queryClusterUUID(ctx context.Context, esClient elasticsearch.Client) error {
stateRegistry := monitoring.GetNamespace("state").GetRegistry()
outputES := "outputs.elasticsearch"
// Running under elastic-agent, the callback linked above is not
// registered until later, meaning we need to check and instantiate the
// registries if they don't exist.
elasticsearchRegistry := stateRegistry.GetRegistry(outputES)
if elasticsearchRegistry == nil {
elasticsearchRegistry = stateRegistry.NewRegistry(outputES)
}

var (
s *monitoring.String
ok bool
)

clusterUUID := "cluster_uuid"
clusterUUIDRegVar := elasticsearchRegistry.Get(clusterUUID)
if clusterUUIDRegVar != nil {
s, ok = clusterUUIDRegVar.(*monitoring.String)
if !ok {
return fmt.Errorf("couldn't cast to String")
}
} else {
s = monitoring.NewString(elasticsearchRegistry, clusterUUID)
}

var response struct {
ClusterUUID string `json:"cluster_uuid"`
}

req, err := http.NewRequest("GET", "/", nil)
if err != nil {
return err
}
resp, err := esClient.Perform(req.WithContext(ctx))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode > 299 {
return fmt.Errorf("error querying cluster_uuid: status_code=%d", resp.StatusCode)
}
err = json.NewDecoder(resp.Body).Decode(&response)
if err != nil {
return err
}

s.Set(response.ClusterUUID)
return nil
}
60 changes: 60 additions & 0 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
package beater

import (
"bytes"
"compress/zlib"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
Expand All @@ -46,6 +49,7 @@ import (
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/instrumentation"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/outputs"
)

Expand Down Expand Up @@ -343,3 +347,59 @@ func TestFleetStoreUsed(t *testing.T) {

assert.True(t, called)
}

func TestQueryClusterUUIDRegistriesExist(t *testing.T) {
stateRegistry := monitoring.GetNamespace("state").GetRegistry()
elasticsearchRegistry := stateRegistry.NewRegistry("outputs.elasticsearch")
monitoring.NewString(elasticsearchRegistry, "cluster_uuid")

ctx := context.Background()
clusterUUID := "abc123"
client := &mockClusterUUIDClient{ClusterUUID: clusterUUID}
err := queryClusterUUID(ctx, client)
require.NoError(t, err)

fs := monitoring.CollectFlatSnapshot(elasticsearchRegistry, monitoring.Full, false)
assert.Equal(t, clusterUUID, fs.Strings["cluster_uuid"])
}

func TestQueryClusterUUIDRegistriesDoNotExist(t *testing.T) {
ctx := context.Background()
clusterUUID := "abc123"
client := &mockClusterUUIDClient{ClusterUUID: clusterUUID}
err := queryClusterUUID(ctx, client)
require.NoError(t, err)

stateRegistry := monitoring.GetNamespace("state").GetRegistry()
elasticsearchRegistry := stateRegistry.GetRegistry("outputs.elasticsearch")
require.NotNil(t, elasticsearchRegistry)

fs := monitoring.CollectFlatSnapshot(elasticsearchRegistry, monitoring.Full, false)
assert.Equal(t, clusterUUID, fs.Strings["cluster_uuid"])
}

type mockClusterUUIDClient struct {
ClusterUUID string `json:"cluster_uuid"`
}

func (c *mockClusterUUIDClient) Perform(r *http.Request) (*http.Response, error) {
m, err := json.Marshal(c)
if err != nil {
return nil, err
}
resp := &http.Response{
StatusCode: 200,
Body: &mockReadCloser{bytes.NewReader(m)},
Request: r,
}
return resp, nil
}

func (c *mockClusterUUIDClient) NewBulkIndexer(_ elasticsearch.BulkIndexerConfig) (elasticsearch.BulkIndexer, error) {
return nil, nil
}

type mockReadCloser struct{ r io.Reader }

func (r *mockReadCloser) Read(p []byte) (int, error) { return r.r.Read(p) }
func (r *mockReadCloser) Close() error { return nil }
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ https://github.com/elastic/apm-server/compare/7.15\...master[View commits]
==== Bug fixes
- Use timestamp of original events for transaction/span metrics {pull}6311[6311]
- Populate the timestamp field for `agent_config` metricsets {pull}6382[6382]
- Query elasticsearch output `cluster_uuid` on startup {pull}6591[6591]

[float]
==== Intake API Changes
Expand Down

0 comments on commit 59a43b1

Please sign in to comment.