diff --git a/systemtest/apmservertest/filter.go b/systemtest/apmservertest/filter.go index a851e1fd772..d41926a9394 100644 --- a/systemtest/apmservertest/filter.go +++ b/systemtest/apmservertest/filter.go @@ -29,7 +29,7 @@ import ( "go.elastic.co/fastjson" ) -// TODO(axw) move EventMetadata and filteringTransport to go.elastic.co/apmtest, +// TODO(axw) move EventMetadata and FilteringTransport to go.elastic.co/apmtest, // generalising filteringTransport to work with arbitrary base transports. To do // that we would need to dynamically check for optional interfaces supported by // the base transport, and create passthrough methods. @@ -47,14 +47,22 @@ type EventMetadataFilter interface { FilterEventMetadata(*EventMetadata) } -type filteringTransport struct { +// FilteringTransport is a transport for the APM Go agent which modifies events +// prior to sending them to the underlying transport. +type FilteringTransport struct { *transport.HTTPTransport filter EventMetadataFilter } +// NewFilteringTransport returns a new FilteringTransport that filters events +// using f, and sends them on to h. +func NewFilteringTransport(h *transport.HTTPTransport, f EventMetadataFilter) *FilteringTransport { + return &FilteringTransport{h, f} +} + // SendStream decodes metadata from reader, passes it through the filters, // and then sends the modified stream to the underlying transport. -func (t *filteringTransport) SendStream(ctx context.Context, stream io.Reader) error { +func (t *FilteringTransport) SendStream(ctx context.Context, stream io.Reader) error { zr, err := zlib.NewReader(stream) if err != nil { return err @@ -98,9 +106,12 @@ func (t *filteringTransport) SendStream(ctx context.Context, stream io.Reader) e return t.HTTPTransport.SendStream(ctx, &buf) } -type defaultMetadataFilter struct{} +// DefaultMetadataFilter implements EventMetadataFilter, setting some default values +// for fields that would otherwise by dynamically discovered. +type DefaultMetadataFilter struct{} -func (defaultMetadataFilter) FilterEventMetadata(m *EventMetadata) { +// FilterEventMetadata updates m with default values for dynamically discovered fields. +func (DefaultMetadataFilter) FilterEventMetadata(m *EventMetadata) { m.System.Platform = "minix" m.System.Architecture = "i386" m.System.Container = nil diff --git a/systemtest/apmservertest/server.go b/systemtest/apmservertest/server.go index 63264cb6d33..f180a92f97e 100644 --- a/systemtest/apmservertest/server.go +++ b/systemtest/apmservertest/server.go @@ -117,7 +117,7 @@ func NewServer(tb testing.TB, args ...string) *Server { func NewUnstartedServer(tb testing.TB, args ...string) *Server { return &Server{ Config: DefaultConfig(), - EventMetadataFilter: defaultMetadataFilter{}, + EventMetadataFilter: DefaultMetadataFilter{}, tb: tb, args: args, } @@ -469,10 +469,7 @@ func (s *Server) Tracer() *apm.Tracer { var transport transport.Transport = httpTransport if s.EventMetadataFilter != nil { - transport = &filteringTransport{ - HTTPTransport: httpTransport, - filter: s.EventMetadataFilter, - } + transport = NewFilteringTransport(httpTransport, s.EventMetadataFilter) } tracer, err := apm.NewTracerOptions(apm.TracerOptions{Transport: transport}) if err != nil { diff --git a/systemtest/approvals/TestDataStreamsEnabled/false.approved.json b/systemtest/approvals/TestDataStreamsEnabled/false.approved.json deleted file mode 100644 index 5b4a28ca9c8..00000000000 --- a/systemtest/approvals/TestDataStreamsEnabled/false.approved.json +++ /dev/null @@ -1,76 +0,0 @@ -{ - "events": [ - { - "@timestamp": "dynamic", - "agent": { - "name": "go", - "version": "0.0.0" - }, - "ecs": { - "version": "dynamic" - }, - "event": { - "ingested": "dynamic", - "outcome": "unknown" - }, - "host": { - "architecture": "i386", - "hostname": "beowulf", - "ip": "127.0.0.1", - "name": "beowulf", - "os": { - "platform": "minix" - } - }, - "observer": { - "ephemeral_id": "dynamic", - "hostname": "dynamic", - "id": "dynamic", - "type": "apm-server", - "version": "dynamic", - "version_major": "dynamic" - }, - "process": { - "pid": 1, - "title": "systemtest.test" - }, - "processor": { - "event": "transaction", - "name": "transaction" - }, - "service": { - "language": { - "name": "go", - "version": "2.0" - }, - "name": "systemtest", - "node": { - "name": "beowulf" - }, - "runtime": { - "name": "gc", - "version": "2.0" - } - }, - "timestamp": { - "us": "dynamic" - }, - "trace": { - "id": "dynamic" - }, - "transaction": { - "duration": { - "us": 1000000 - }, - "id": "dynamic", - "name": "name", - "sampled": true, - "span_count": { - "dropped": 0, - "started": 0 - }, - "type": "type" - } - } - ] -} diff --git a/systemtest/approvals/TestDataStreamsEnabled/true.approved.json b/systemtest/approvals/TestFleetIntegration.approved.json similarity index 94% rename from systemtest/approvals/TestDataStreamsEnabled/true.approved.json rename to systemtest/approvals/TestFleetIntegration.approved.json index 28ca86e8ed5..09739f5aa71 100644 --- a/systemtest/approvals/TestDataStreamsEnabled/true.approved.json +++ b/systemtest/approvals/TestFleetIntegration.approved.json @@ -13,12 +13,14 @@ "version": "dynamic" }, "event": { + "agent_id_status": "untrusted_user", + "ingested": "dynamic", "outcome": "unknown" }, "host": { "architecture": "i386", "hostname": "beowulf", - "ip": "127.0.0.1", + "ip": "10.11.12.13", "name": "beowulf", "os": { "platform": "minix" diff --git a/systemtest/datastreams_test.go b/systemtest/datastreams_test.go deleted file mode 100644 index 88aad971761..00000000000 --- a/systemtest/datastreams_test.go +++ /dev/null @@ -1,140 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package systemtest_test - -import ( - "encoding/json" - "fmt" - "io/ioutil" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap/zapcore" - - "github.com/elastic/apm-server/systemtest" - "github.com/elastic/apm-server/systemtest/apmservertest" - "github.com/elastic/apm-server/systemtest/estest" -) - -func TestDataStreamsEnabled(t *testing.T) { - for _, enabled := range []bool{false, true} { - t.Run(fmt.Sprint(enabled), func(t *testing.T) { - systemtest.CleanupElasticsearch(t) - srv := apmservertest.NewUnstartedServer(t) - if enabled { - // Enable data streams. - srv.Config.DataStreams = &apmservertest.DataStreamsConfig{Enabled: true} - srv.Config.Setup = nil - - // Create a data stream index template. - resp, err := systemtest.Elasticsearch.Indices.PutIndexTemplate("apm-data-streams", strings.NewReader(fmt.Sprintf(`{ - "index_patterns": ["traces-apm*", "logs-apm*", "metrics-apm*"], - "data_stream": {}, - "priority": 200, - "template": {"settings": {"number_of_shards": 1, "refresh_interval": "250ms"}} - }`))) - require.NoError(t, err) - body, _ := ioutil.ReadAll(resp.Body) - require.False(t, resp.IsError(), string(body)) - - // Create an API Key which can write to traces-* etc. - // The default APM Server user can only write to apm-*. - // - // NOTE(axw) importantly, this API key lacks privileges - // to manage templates, pipelines, ILM, etc. Enabling - // data streams should disable all automatic setup. - resp, err = systemtest.Elasticsearch.Security.CreateAPIKey(strings.NewReader(fmt.Sprintf(`{ - "name": "%s", - "expiration": "1h", - "role_descriptors": { - "write-apm-data": { - "cluster": ["monitor"], - "index": [ - { - "names": ["traces-*", "metrics-*", "logs-*"], - "privileges": ["write", "create_index"] - } - ] - } - } - }`, t.Name()))) - require.NoError(t, err) - - var apiKeyResponse struct { - ID string - Name string - APIKey string `json:"api_key"` - } - require.NoError(t, json.NewDecoder(resp.Body).Decode(&apiKeyResponse)) - - // Use an API Key to mimic running under Fleet, with limited permissions. - srv.Config.Output.Elasticsearch.Username = "" - srv.Config.Output.Elasticsearch.Password = "" - srv.Config.Output.Elasticsearch.APIKey = fmt.Sprintf("%s:%s", apiKeyResponse.ID, apiKeyResponse.APIKey) - } - require.NoError(t, srv.Start()) - - tracer := srv.Tracer() - tx := tracer.StartTransaction("name", "type") - tx.Duration = time.Second - tx.End() - tracer.Flush(nil) - - result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*,traces-apm*", estest.TermQuery{ - Field: "processor.event", Value: "transaction", - }) - systemtest.ApproveEvents( - t, t.Name(), result.Hits.Hits, - "@timestamp", "timestamp.us", - "trace.id", "transaction.id", - ) - - // Assert there are no unexpected warnings or errors. - for _, record := range srv.Logs.All() { - assert.Condition(t, func() bool { - if record.Level == zapcore.ErrorLevel { - return assert.Equal(t, "Started apm-server with data streams enabled but no active fleet management mode was specified", record.Message) - } - return record.Level < zapcore.WarnLevel - }, "%s: %s", record.Level, record.Message) - } - }) - } -} - -func TestDataStreamsSetupErrors(t *testing.T) { - cfg := apmservertest.DefaultConfig() - cfg.DataStreams = &apmservertest.DataStreamsConfig{Enabled: true} - cfgargs, err := cfg.Args() - require.NoError(t, err) - - test := func(args []string, expected string) { - args = append(args, cfgargs...) - cmd := apmservertest.ServerCommand("setup", args...) - out, err := cmd.CombinedOutput() - require.Error(t, err) - assert.Equal(t, "Exiting: "+expected+"\n", string(out)) - } - - test([]string{}, "index setup must be performed externally when using data streams, by installing the 'apm' integration package") - test([]string{"--index-management"}, "index setup must be performed externally when using data streams, by installing the 'apm' integration package") - test([]string{"--pipelines"}, "index pipeline setup must be performed externally when using data streams, by installing the 'apm' integration package") -} diff --git a/systemtest/fleet_test.go b/systemtest/fleet_test.go index 761bccaad3c..fa5a7afab7d 100644 --- a/systemtest/fleet_test.go +++ b/systemtest/fleet_test.go @@ -20,6 +20,7 @@ package systemtest_test import ( "context" "io/ioutil" + "net/http" "net/url" "testing" "time" @@ -31,6 +32,7 @@ import ( "go.elastic.co/apm/transport" "github.com/elastic/apm-server/systemtest" + "github.com/elastic/apm-server/systemtest/apmservertest" "github.com/elastic/apm-server/systemtest/fleettest" ) @@ -83,17 +85,36 @@ func TestFleetIntegration(t *testing.T) { require.NoError(t, err) // Elastic Agent has started apm-server. Connect to apm-server and send some data, - // and make sure it gets indexed into a data stream. - transport, err := transport.NewHTTPTransport() + // and make sure it gets indexed into a data stream. We override the transport to + // set known metadata. + httpTransport, err := transport.NewHTTPTransport() require.NoError(t, err) - transport.SetServerURL(&url.URL{Scheme: "http", Host: agent.Addrs["8200"]}) - tracer, err := apm.NewTracerOptions(apm.TracerOptions{Transport: transport}) + origTransport := httpTransport.Client.Transport + httpTransport.Client.Transport = roundTripperFunc(func(r *http.Request) (*http.Response, error) { + r.Header.Set("X-Real-Ip", "10.11.12.13") + return origTransport.RoundTrip(r) + }) + httpTransport.SetServerURL(&url.URL{Scheme: "http", Host: agent.Addrs["8200"]}) + tracer, err := apm.NewTracerOptions(apm.TracerOptions{ + Transport: apmservertest.NewFilteringTransport( + httpTransport, + apmservertest.DefaultMetadataFilter{}, + ), + }) require.NoError(t, err) defer tracer.Close() - tracer.StartTransaction("name", "type").End() + + tx := tracer.StartTransaction("name", "type") + tx.Duration = time.Second + tx.End() tracer.Flush(nil) - systemtest.Elasticsearch.ExpectDocs(t, "traces-*", nil) + result := systemtest.Elasticsearch.ExpectDocs(t, "traces-*", nil) + systemtest.ApproveEvents( + t, t.Name(), result.Hits.Hits, + "@timestamp", "timestamp.us", + "trace.id", "transaction.id", + ) } func TestFleetPackageNonMultiple(t *testing.T) { @@ -143,7 +164,16 @@ func initAPMIntegrationPackagePolicyInputs(t *testing.T, packagePolicy *fleettes } } -func getAPMIntegrationPackage(t *testing.T, fleet *fleettest.Client) *fleettest.Package { +func cleanupFleet(t testing.TB, fleet *fleettest.Client) { + cleanupFleetPolicies(t, fleet) + apmPackage := getAPMIntegrationPackage(t, fleet) + if apmPackage.Status == "installed" { + err := fleet.DeletePackage(apmPackage.Name, apmPackage.Version) + require.NoError(t, err) + } +} + +func getAPMIntegrationPackage(t testing.TB, fleet *fleettest.Client) *fleettest.Package { var apmPackage *fleettest.Package packages, err := fleet.ListPackages() require.NoError(t, err) @@ -161,7 +191,7 @@ func getAPMIntegrationPackage(t *testing.T, fleet *fleettest.Client) *fleettest. panic("unreachable") } -func cleanupFleet(t testing.TB, fleet *fleettest.Client) { +func cleanupFleetPolicies(t testing.TB, fleet *fleettest.Client) { apmAgentPolicies, err := fleet.AgentPolicies("ingest-agent-policies.name:apm_systemtest") require.NoError(t, err) if len(apmAgentPolicies) == 0 { @@ -187,3 +217,9 @@ func cleanupFleet(t testing.TB, fleet *fleettest.Client) { require.NoError(t, err) } } + +type roundTripperFunc func(*http.Request) (*http.Response, error) + +func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { + return f(r) +} diff --git a/systemtest/fleettest/client.go b/systemtest/fleettest/client.go index 5034bd19f87..0f93351f2dd 100644 --- a/systemtest/fleettest/client.go +++ b/systemtest/fleettest/client.go @@ -239,6 +239,17 @@ func (c *Client) Package(name, version string) (*Package, error) { return &result.Response, nil } +// DeletePackage deletes (uninstalls) the package with the given name and version. +func (c *Client) DeletePackage(name, version string) error { + req := c.newFleetRequest("DELETE", "/epm/packages/"+name+"-"+version, nil) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + return consumeResponse(resp, nil) +} + // PackagePolicy returns information about the package policy with the given ID. func (c *Client) PackagePolicy(id string) (*PackagePolicy, error) { resp, err := http.Get(c.fleetURL + "/package_policies/" + id)