Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query cluster UUID on apm-server startup #6591

Merged
merged 13 commits into from
Nov 15, 2021
49 changes: 49 additions & 0 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package beater

import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -616,6 +617,12 @@ func (s *serverRunner) waitReady(ctx context.Context, kibanaClient kibana.Client
)
})
}

if s.config.DataStreams.Enabled {
preconditions = append(preconditions, func(ctx context.Context) error {
return queryClusterUUID(ctx, esOutputClient)
})
}
}

// When running standalone with data streams enabled, by default we will add
Expand Down Expand Up @@ -969,3 +976,45 @@ func (r *chanReloader) serve(ctx context.Context, reloader reload.Reloadable) er
}
}
}

// TODO: This is copying behavior from libbeat:
// https://github.com/elastic/beats/blob/b9ced47dba8bb55faa3b2b834fd6529d3c4d0919/libbeat/cmd/instance/beat.go#L927-L950
// Remove this when cluster_uuid no longer needs to be queried from ES.
func queryClusterUUID(ctx context.Context, esClient elasticsearch.Client) error {
stateRegistry := monitoring.GetNamespace("state").GetRegistry()
outputES := "outputs.elasticsearch"
elasticsearchRegistry := stateRegistry.GetRegistry(outputES)
if elasticsearchRegistry == nil {
elasticsearchRegistry = stateRegistry.NewRegistry(outputES)
}
stuartnelson3 marked this conversation as resolved.
Show resolved Hide resolved
// TODO: How can I just update a registry value?
clusterUUID := "cluster_uuid"
if elasticsearchRegistry.Get(clusterUUID) != nil {
elasticsearchRegistry.Remove(clusterUUID)
}
clusterUUIDRegVar := monitoring.NewString(elasticsearchRegistry, clusterUUID)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked through the api but it wasn't immediately apparent to me how to update a value already existing in a registry.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a Get method on the monitoring.Registry which I think you can use?

Would the libbeat code have been called yet at this point? I'm wondering though if there's any risk that the libbeat code could kick in concurrently and panic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a Get method on the monitoring.Registry which I think you can use?

roger, I have to cast to *monitoring.String, but it appears to work (the return value is Var interface type, it seems, which only exposes a single Visit method).

Would the libbeat code have been called yet at this point? I'm wondering though if there's any risk that the libbeat code could kick in concurrently and panic.

The registry code is called synchronously, before *beater.Run, so it should always be set up. The Set method on the *monitoring.String could be called at the same time, but it has an internal mutex, so access should be fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, thanks. I see you removed the test guards, looks much safer to me now.


var response struct {
ClusterUUID string `json:"cluster_uuid"`
}

req, err := http.NewRequest("GET", "/", nil)
if err != nil {
return err
}
resp, err := esClient.Perform(req.WithContext(ctx))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode > 299 {
return err
}
err = json.NewDecoder(resp.Body).Decode(&response)
if err != nil {
return err
}

clusterUUIDRegVar.Set(response.ClusterUUID)
return nil
}
45 changes: 45 additions & 0 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
package beater

import (
"bytes"
"compress/zlib"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
Expand All @@ -46,6 +49,7 @@ import (
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/instrumentation"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/outputs"
)

Expand Down Expand Up @@ -361,3 +365,44 @@ func Test_newDropLogsBeatProcessor(t *testing.T) {
require.NoError(t, err)
require.Nil(t, result)
}

func TestQueryClusterUUID(t *testing.T) {
ctx := context.Background()
clusterUUID := "abc123"
client := &mockClusterUUIDClient{ClusterUUID: clusterUUID}
err := queryClusterUUID(ctx, client)
require.NoError(t, err)

stateRegistry := monitoring.GetNamespace("state").GetRegistry()
elasticsearchRegistry := stateRegistry.GetRegistry("outputs.elasticsearch")
require.NotNil(t, elasticsearchRegistry)

fs := monitoring.CollectFlatSnapshot(elasticsearchRegistry, monitoring.Full, false)
assert.Equal(t, clusterUUID, fs.Strings["cluster_uuid"])
}

type mockClusterUUIDClient struct {
ClusterUUID string `json:"cluster_uuid"`
}

func (c *mockClusterUUIDClient) Perform(r *http.Request) (*http.Response, error) {
m, err := json.Marshal(c)
if err != nil {
return nil, err
}
resp := &http.Response{
StatusCode: 200,
Body: &mockReadCloser{bytes.NewReader(m)},
Request: r,
}
return resp, nil
}

func (c *mockClusterUUIDClient) NewBulkIndexer(_ elasticsearch.BulkIndexerConfig) (elasticsearch.BulkIndexer, error) {
return nil, nil
}

type mockReadCloser struct{ r io.Reader }

func (r *mockReadCloser) Read(p []byte) (int, error) { return r.r.Read(p) }
func (r *mockReadCloser) Close() error { return nil }
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ https://github.com/elastic/apm-server/compare/7.15\...master[View commits]
==== Bug fixes
- Use timestamp of original events for transaction/span metrics {pull}6311[6311]
- Populate the timestamp field for `agent_config` metricsets {pull}6382[6382]
- Query elasticsearch output `cluster_uuid` on startup {pull}6591[6591]

[float]
==== Intake API Changes
Expand Down