From 56766e5107b8e6da4d8eba10b58b7c2ab26fa205 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Mon, 21 Jun 2021 17:24:14 +0800 Subject: [PATCH] systemtest: remove TestDataStream*, fix Fleet test Remove the TestDataStreams tests, which test the server's internal/private/undocumneted configuration for enabling data streams. At least for now, this is only intended to be used with Fleet, so we should instead just rely on the Fleet system test. There was a bug in the Fleet test where apm-server would create "traces-apm-default" as a plain old index instead of a data stream in certain conditions: if the apm integration had previously been installed, then the traces-apm template would be deleted by CleanupElasticsearch. Because the package was already installed, creating the policy would not automatically reinstall it and add the missing template. We fix the Fleet issue by uninstalling the package during Fleet cleanup, after unenrolling agents. --- systemtest/apmservertest/filter.go | 21 ++- systemtest/apmservertest/server.go | 7 +- .../false.approved.json | 76 ---------- ...son => TestFleetIntegration.approved.json} | 4 +- systemtest/datastreams_test.go | 140 ------------------ systemtest/fleet_test.go | 52 ++++++- systemtest/fleettest/client.go | 11 ++ 7 files changed, 76 insertions(+), 235 deletions(-) delete mode 100644 systemtest/approvals/TestDataStreamsEnabled/false.approved.json rename systemtest/approvals/{TestDataStreamsEnabled/true.approved.json => TestFleetIntegration.approved.json} (94%) delete mode 100644 systemtest/datastreams_test.go diff --git a/systemtest/apmservertest/filter.go b/systemtest/apmservertest/filter.go index a851e1fd772..d41926a9394 100644 --- a/systemtest/apmservertest/filter.go +++ b/systemtest/apmservertest/filter.go @@ -29,7 +29,7 @@ import ( "go.elastic.co/fastjson" ) -// TODO(axw) move EventMetadata and filteringTransport to go.elastic.co/apmtest, +// TODO(axw) move EventMetadata and FilteringTransport to go.elastic.co/apmtest, // generalising filteringTransport to work with arbitrary base transports. To do // that we would need to dynamically check for optional interfaces supported by // the base transport, and create passthrough methods. @@ -47,14 +47,22 @@ type EventMetadataFilter interface { FilterEventMetadata(*EventMetadata) } -type filteringTransport struct { +// FilteringTransport is a transport for the APM Go agent which modifies events +// prior to sending them to the underlying transport. +type FilteringTransport struct { *transport.HTTPTransport filter EventMetadataFilter } +// NewFilteringTransport returns a new FilteringTransport that filters events +// using f, and sends them on to h. +func NewFilteringTransport(h *transport.HTTPTransport, f EventMetadataFilter) *FilteringTransport { + return &FilteringTransport{h, f} +} + // SendStream decodes metadata from reader, passes it through the filters, // and then sends the modified stream to the underlying transport. -func (t *filteringTransport) SendStream(ctx context.Context, stream io.Reader) error { +func (t *FilteringTransport) SendStream(ctx context.Context, stream io.Reader) error { zr, err := zlib.NewReader(stream) if err != nil { return err @@ -98,9 +106,12 @@ func (t *filteringTransport) SendStream(ctx context.Context, stream io.Reader) e return t.HTTPTransport.SendStream(ctx, &buf) } -type defaultMetadataFilter struct{} +// DefaultMetadataFilter implements EventMetadataFilter, setting some default values +// for fields that would otherwise by dynamically discovered. +type DefaultMetadataFilter struct{} -func (defaultMetadataFilter) FilterEventMetadata(m *EventMetadata) { +// FilterEventMetadata updates m with default values for dynamically discovered fields. +func (DefaultMetadataFilter) FilterEventMetadata(m *EventMetadata) { m.System.Platform = "minix" m.System.Architecture = "i386" m.System.Container = nil diff --git a/systemtest/apmservertest/server.go b/systemtest/apmservertest/server.go index 63264cb6d33..f180a92f97e 100644 --- a/systemtest/apmservertest/server.go +++ b/systemtest/apmservertest/server.go @@ -117,7 +117,7 @@ func NewServer(tb testing.TB, args ...string) *Server { func NewUnstartedServer(tb testing.TB, args ...string) *Server { return &Server{ Config: DefaultConfig(), - EventMetadataFilter: defaultMetadataFilter{}, + EventMetadataFilter: DefaultMetadataFilter{}, tb: tb, args: args, } @@ -469,10 +469,7 @@ func (s *Server) Tracer() *apm.Tracer { var transport transport.Transport = httpTransport if s.EventMetadataFilter != nil { - transport = &filteringTransport{ - HTTPTransport: httpTransport, - filter: s.EventMetadataFilter, - } + transport = NewFilteringTransport(httpTransport, s.EventMetadataFilter) } tracer, err := apm.NewTracerOptions(apm.TracerOptions{Transport: transport}) if err != nil { diff --git a/systemtest/approvals/TestDataStreamsEnabled/false.approved.json b/systemtest/approvals/TestDataStreamsEnabled/false.approved.json deleted file mode 100644 index 5b4a28ca9c8..00000000000 --- a/systemtest/approvals/TestDataStreamsEnabled/false.approved.json +++ /dev/null @@ -1,76 +0,0 @@ -{ - "events": [ - { - "@timestamp": "dynamic", - "agent": { - "name": "go", - "version": "0.0.0" - }, - "ecs": { - "version": "dynamic" - }, - "event": { - "ingested": "dynamic", - "outcome": "unknown" - }, - "host": { - "architecture": "i386", - "hostname": "beowulf", - "ip": "127.0.0.1", - "name": "beowulf", - "os": { - "platform": "minix" - } - }, - "observer": { - "ephemeral_id": "dynamic", - "hostname": "dynamic", - "id": "dynamic", - "type": "apm-server", - "version": "dynamic", - "version_major": "dynamic" - }, - "process": { - "pid": 1, - "title": "systemtest.test" - }, - "processor": { - "event": "transaction", - "name": "transaction" - }, - "service": { - "language": { - "name": "go", - "version": "2.0" - }, - "name": "systemtest", - "node": { - "name": "beowulf" - }, - "runtime": { - "name": "gc", - "version": "2.0" - } - }, - "timestamp": { - "us": "dynamic" - }, - "trace": { - "id": "dynamic" - }, - "transaction": { - "duration": { - "us": 1000000 - }, - "id": "dynamic", - "name": "name", - "sampled": true, - "span_count": { - "dropped": 0, - "started": 0 - }, - "type": "type" - } - } - ] -} diff --git a/systemtest/approvals/TestDataStreamsEnabled/true.approved.json b/systemtest/approvals/TestFleetIntegration.approved.json similarity index 94% rename from systemtest/approvals/TestDataStreamsEnabled/true.approved.json rename to systemtest/approvals/TestFleetIntegration.approved.json index 28ca86e8ed5..09739f5aa71 100644 --- a/systemtest/approvals/TestDataStreamsEnabled/true.approved.json +++ b/systemtest/approvals/TestFleetIntegration.approved.json @@ -13,12 +13,14 @@ "version": "dynamic" }, "event": { + "agent_id_status": "untrusted_user", + "ingested": "dynamic", "outcome": "unknown" }, "host": { "architecture": "i386", "hostname": "beowulf", - "ip": "127.0.0.1", + "ip": "10.11.12.13", "name": "beowulf", "os": { "platform": "minix" diff --git a/systemtest/datastreams_test.go b/systemtest/datastreams_test.go deleted file mode 100644 index 88aad971761..00000000000 --- a/systemtest/datastreams_test.go +++ /dev/null @@ -1,140 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package systemtest_test - -import ( - "encoding/json" - "fmt" - "io/ioutil" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap/zapcore" - - "github.com/elastic/apm-server/systemtest" - "github.com/elastic/apm-server/systemtest/apmservertest" - "github.com/elastic/apm-server/systemtest/estest" -) - -func TestDataStreamsEnabled(t *testing.T) { - for _, enabled := range []bool{false, true} { - t.Run(fmt.Sprint(enabled), func(t *testing.T) { - systemtest.CleanupElasticsearch(t) - srv := apmservertest.NewUnstartedServer(t) - if enabled { - // Enable data streams. - srv.Config.DataStreams = &apmservertest.DataStreamsConfig{Enabled: true} - srv.Config.Setup = nil - - // Create a data stream index template. - resp, err := systemtest.Elasticsearch.Indices.PutIndexTemplate("apm-data-streams", strings.NewReader(fmt.Sprintf(`{ - "index_patterns": ["traces-apm*", "logs-apm*", "metrics-apm*"], - "data_stream": {}, - "priority": 200, - "template": {"settings": {"number_of_shards": 1, "refresh_interval": "250ms"}} - }`))) - require.NoError(t, err) - body, _ := ioutil.ReadAll(resp.Body) - require.False(t, resp.IsError(), string(body)) - - // Create an API Key which can write to traces-* etc. - // The default APM Server user can only write to apm-*. - // - // NOTE(axw) importantly, this API key lacks privileges - // to manage templates, pipelines, ILM, etc. Enabling - // data streams should disable all automatic setup. - resp, err = systemtest.Elasticsearch.Security.CreateAPIKey(strings.NewReader(fmt.Sprintf(`{ - "name": "%s", - "expiration": "1h", - "role_descriptors": { - "write-apm-data": { - "cluster": ["monitor"], - "index": [ - { - "names": ["traces-*", "metrics-*", "logs-*"], - "privileges": ["write", "create_index"] - } - ] - } - } - }`, t.Name()))) - require.NoError(t, err) - - var apiKeyResponse struct { - ID string - Name string - APIKey string `json:"api_key"` - } - require.NoError(t, json.NewDecoder(resp.Body).Decode(&apiKeyResponse)) - - // Use an API Key to mimic running under Fleet, with limited permissions. - srv.Config.Output.Elasticsearch.Username = "" - srv.Config.Output.Elasticsearch.Password = "" - srv.Config.Output.Elasticsearch.APIKey = fmt.Sprintf("%s:%s", apiKeyResponse.ID, apiKeyResponse.APIKey) - } - require.NoError(t, srv.Start()) - - tracer := srv.Tracer() - tx := tracer.StartTransaction("name", "type") - tx.Duration = time.Second - tx.End() - tracer.Flush(nil) - - result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*,traces-apm*", estest.TermQuery{ - Field: "processor.event", Value: "transaction", - }) - systemtest.ApproveEvents( - t, t.Name(), result.Hits.Hits, - "@timestamp", "timestamp.us", - "trace.id", "transaction.id", - ) - - // Assert there are no unexpected warnings or errors. - for _, record := range srv.Logs.All() { - assert.Condition(t, func() bool { - if record.Level == zapcore.ErrorLevel { - return assert.Equal(t, "Started apm-server with data streams enabled but no active fleet management mode was specified", record.Message) - } - return record.Level < zapcore.WarnLevel - }, "%s: %s", record.Level, record.Message) - } - }) - } -} - -func TestDataStreamsSetupErrors(t *testing.T) { - cfg := apmservertest.DefaultConfig() - cfg.DataStreams = &apmservertest.DataStreamsConfig{Enabled: true} - cfgargs, err := cfg.Args() - require.NoError(t, err) - - test := func(args []string, expected string) { - args = append(args, cfgargs...) - cmd := apmservertest.ServerCommand("setup", args...) - out, err := cmd.CombinedOutput() - require.Error(t, err) - assert.Equal(t, "Exiting: "+expected+"\n", string(out)) - } - - test([]string{}, "index setup must be performed externally when using data streams, by installing the 'apm' integration package") - test([]string{"--index-management"}, "index setup must be performed externally when using data streams, by installing the 'apm' integration package") - test([]string{"--pipelines"}, "index pipeline setup must be performed externally when using data streams, by installing the 'apm' integration package") -} diff --git a/systemtest/fleet_test.go b/systemtest/fleet_test.go index 761bccaad3c..fa5a7afab7d 100644 --- a/systemtest/fleet_test.go +++ b/systemtest/fleet_test.go @@ -20,6 +20,7 @@ package systemtest_test import ( "context" "io/ioutil" + "net/http" "net/url" "testing" "time" @@ -31,6 +32,7 @@ import ( "go.elastic.co/apm/transport" "github.com/elastic/apm-server/systemtest" + "github.com/elastic/apm-server/systemtest/apmservertest" "github.com/elastic/apm-server/systemtest/fleettest" ) @@ -83,17 +85,36 @@ func TestFleetIntegration(t *testing.T) { require.NoError(t, err) // Elastic Agent has started apm-server. Connect to apm-server and send some data, - // and make sure it gets indexed into a data stream. - transport, err := transport.NewHTTPTransport() + // and make sure it gets indexed into a data stream. We override the transport to + // set known metadata. + httpTransport, err := transport.NewHTTPTransport() require.NoError(t, err) - transport.SetServerURL(&url.URL{Scheme: "http", Host: agent.Addrs["8200"]}) - tracer, err := apm.NewTracerOptions(apm.TracerOptions{Transport: transport}) + origTransport := httpTransport.Client.Transport + httpTransport.Client.Transport = roundTripperFunc(func(r *http.Request) (*http.Response, error) { + r.Header.Set("X-Real-Ip", "10.11.12.13") + return origTransport.RoundTrip(r) + }) + httpTransport.SetServerURL(&url.URL{Scheme: "http", Host: agent.Addrs["8200"]}) + tracer, err := apm.NewTracerOptions(apm.TracerOptions{ + Transport: apmservertest.NewFilteringTransport( + httpTransport, + apmservertest.DefaultMetadataFilter{}, + ), + }) require.NoError(t, err) defer tracer.Close() - tracer.StartTransaction("name", "type").End() + + tx := tracer.StartTransaction("name", "type") + tx.Duration = time.Second + tx.End() tracer.Flush(nil) - systemtest.Elasticsearch.ExpectDocs(t, "traces-*", nil) + result := systemtest.Elasticsearch.ExpectDocs(t, "traces-*", nil) + systemtest.ApproveEvents( + t, t.Name(), result.Hits.Hits, + "@timestamp", "timestamp.us", + "trace.id", "transaction.id", + ) } func TestFleetPackageNonMultiple(t *testing.T) { @@ -143,7 +164,16 @@ func initAPMIntegrationPackagePolicyInputs(t *testing.T, packagePolicy *fleettes } } -func getAPMIntegrationPackage(t *testing.T, fleet *fleettest.Client) *fleettest.Package { +func cleanupFleet(t testing.TB, fleet *fleettest.Client) { + cleanupFleetPolicies(t, fleet) + apmPackage := getAPMIntegrationPackage(t, fleet) + if apmPackage.Status == "installed" { + err := fleet.DeletePackage(apmPackage.Name, apmPackage.Version) + require.NoError(t, err) + } +} + +func getAPMIntegrationPackage(t testing.TB, fleet *fleettest.Client) *fleettest.Package { var apmPackage *fleettest.Package packages, err := fleet.ListPackages() require.NoError(t, err) @@ -161,7 +191,7 @@ func getAPMIntegrationPackage(t *testing.T, fleet *fleettest.Client) *fleettest. panic("unreachable") } -func cleanupFleet(t testing.TB, fleet *fleettest.Client) { +func cleanupFleetPolicies(t testing.TB, fleet *fleettest.Client) { apmAgentPolicies, err := fleet.AgentPolicies("ingest-agent-policies.name:apm_systemtest") require.NoError(t, err) if len(apmAgentPolicies) == 0 { @@ -187,3 +217,9 @@ func cleanupFleet(t testing.TB, fleet *fleettest.Client) { require.NoError(t, err) } } + +type roundTripperFunc func(*http.Request) (*http.Response, error) + +func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { + return f(r) +} diff --git a/systemtest/fleettest/client.go b/systemtest/fleettest/client.go index 5034bd19f87..0f93351f2dd 100644 --- a/systemtest/fleettest/client.go +++ b/systemtest/fleettest/client.go @@ -239,6 +239,17 @@ func (c *Client) Package(name, version string) (*Package, error) { return &result.Response, nil } +// DeletePackage deletes (uninstalls) the package with the given name and version. +func (c *Client) DeletePackage(name, version string) error { + req := c.newFleetRequest("DELETE", "/epm/packages/"+name+"-"+version, nil) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + return consumeResponse(resp, nil) +} + // PackagePolicy returns information about the package policy with the given ID. func (c *Client) PackagePolicy(id string) (*PackagePolicy, error) { resp, err := http.Get(c.fleetURL + "/package_policies/" + id)