Skip to content

Commit

Permalink
systemtest: remove TestDataStream*, fix Fleet test (#5503)
Browse files Browse the repository at this point in the history
Remove the TestDataStreams tests, which test the
server's internal/private/undocumneted configuration
for enabling data streams. At least for now, this is
only intended to be used with Fleet, so we should
instead just rely on the Fleet system test.

There was a bug in the Fleet test where apm-server
would create "traces-apm-default" as a plain old
index instead of a data stream in certain conditions:
if the apm integration had previously been installed,
then the traces-apm template would be deleted by
CleanupElasticsearch. Because the package was already
installed, creating the policy would not automatically
reinstall it and add the missing template.

We fix the Fleet issue by uninstalling the package
during Fleet cleanup, after unenrolling agents.

(cherry picked from commit 9f7d9b6)
  • Loading branch information
axw authored and mergify-bot committed Jun 22, 2021
1 parent d8b9bb8 commit 9160ca2
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 235 deletions.
21 changes: 16 additions & 5 deletions systemtest/apmservertest/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"go.elastic.co/fastjson"
)

// TODO(axw) move EventMetadata and filteringTransport to go.elastic.co/apmtest,
// TODO(axw) move EventMetadata and FilteringTransport to go.elastic.co/apmtest,
// generalising filteringTransport to work with arbitrary base transports. To do
// that we would need to dynamically check for optional interfaces supported by
// the base transport, and create passthrough methods.
Expand All @@ -47,14 +47,22 @@ type EventMetadataFilter interface {
FilterEventMetadata(*EventMetadata)
}

type filteringTransport struct {
// FilteringTransport is a transport for the APM Go agent which modifies events
// prior to sending them to the underlying transport.
type FilteringTransport struct {
*transport.HTTPTransport
filter EventMetadataFilter
}

// NewFilteringTransport returns a new FilteringTransport that filters events
// using f, and sends them on to h.
func NewFilteringTransport(h *transport.HTTPTransport, f EventMetadataFilter) *FilteringTransport {
return &FilteringTransport{h, f}
}

// SendStream decodes metadata from reader, passes it through the filters,
// and then sends the modified stream to the underlying transport.
func (t *filteringTransport) SendStream(ctx context.Context, stream io.Reader) error {
func (t *FilteringTransport) SendStream(ctx context.Context, stream io.Reader) error {
zr, err := zlib.NewReader(stream)
if err != nil {
return err
Expand Down Expand Up @@ -98,9 +106,12 @@ func (t *filteringTransport) SendStream(ctx context.Context, stream io.Reader) e
return t.HTTPTransport.SendStream(ctx, &buf)
}

type defaultMetadataFilter struct{}
// DefaultMetadataFilter implements EventMetadataFilter, setting some default values
// for fields that would otherwise by dynamically discovered.
type DefaultMetadataFilter struct{}

func (defaultMetadataFilter) FilterEventMetadata(m *EventMetadata) {
// FilterEventMetadata updates m with default values for dynamically discovered fields.
func (DefaultMetadataFilter) FilterEventMetadata(m *EventMetadata) {
m.System.Platform = "minix"
m.System.Architecture = "i386"
m.System.Container = nil
Expand Down
7 changes: 2 additions & 5 deletions systemtest/apmservertest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func NewServer(tb testing.TB, args ...string) *Server {
func NewUnstartedServer(tb testing.TB, args ...string) *Server {
return &Server{
Config: DefaultConfig(),
EventMetadataFilter: defaultMetadataFilter{},
EventMetadataFilter: DefaultMetadataFilter{},
tb: tb,
args: args,
}
Expand Down Expand Up @@ -469,10 +469,7 @@ func (s *Server) Tracer() *apm.Tracer {

var transport transport.Transport = httpTransport
if s.EventMetadataFilter != nil {
transport = &filteringTransport{
HTTPTransport: httpTransport,
filter: s.EventMetadataFilter,
}
transport = NewFilteringTransport(httpTransport, s.EventMetadataFilter)
}
tracer, err := apm.NewTracerOptions(apm.TracerOptions{Transport: transport})
if err != nil {
Expand Down
76 changes: 0 additions & 76 deletions systemtest/approvals/TestDataStreamsEnabled/false.approved.json

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
"version": "dynamic"
},
"event": {
"agent_id_status": "untrusted_user",
"ingested": "dynamic",
"outcome": "unknown"
},
"host": {
"architecture": "i386",
"hostname": "beowulf",
"ip": "127.0.0.1",
"ip": "10.11.12.13",
"name": "beowulf",
"os": {
"platform": "minix"
Expand Down
140 changes: 0 additions & 140 deletions systemtest/datastreams_test.go

This file was deleted.

52 changes: 44 additions & 8 deletions systemtest/fleet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package systemtest_test
import (
"context"
"io/ioutil"
"net/http"
"net/url"
"testing"
"time"
Expand All @@ -31,6 +32,7 @@ import (
"go.elastic.co/apm/transport"

"github.com/elastic/apm-server/systemtest"
"github.com/elastic/apm-server/systemtest/apmservertest"
"github.com/elastic/apm-server/systemtest/fleettest"
)

Expand Down Expand Up @@ -83,17 +85,36 @@ func TestFleetIntegration(t *testing.T) {
require.NoError(t, err)

// Elastic Agent has started apm-server. Connect to apm-server and send some data,
// and make sure it gets indexed into a data stream.
transport, err := transport.NewHTTPTransport()
// and make sure it gets indexed into a data stream. We override the transport to
// set known metadata.
httpTransport, err := transport.NewHTTPTransport()
require.NoError(t, err)
transport.SetServerURL(&url.URL{Scheme: "http", Host: agent.Addrs["8200"]})
tracer, err := apm.NewTracerOptions(apm.TracerOptions{Transport: transport})
origTransport := httpTransport.Client.Transport
httpTransport.Client.Transport = roundTripperFunc(func(r *http.Request) (*http.Response, error) {
r.Header.Set("X-Real-Ip", "10.11.12.13")
return origTransport.RoundTrip(r)
})
httpTransport.SetServerURL(&url.URL{Scheme: "http", Host: agent.Addrs["8200"]})
tracer, err := apm.NewTracerOptions(apm.TracerOptions{
Transport: apmservertest.NewFilteringTransport(
httpTransport,
apmservertest.DefaultMetadataFilter{},
),
})
require.NoError(t, err)
defer tracer.Close()
tracer.StartTransaction("name", "type").End()

tx := tracer.StartTransaction("name", "type")
tx.Duration = time.Second
tx.End()
tracer.Flush(nil)

systemtest.Elasticsearch.ExpectDocs(t, "traces-*", nil)
result := systemtest.Elasticsearch.ExpectDocs(t, "traces-*", nil)
systemtest.ApproveEvents(
t, t.Name(), result.Hits.Hits,
"@timestamp", "timestamp.us",
"trace.id", "transaction.id",
)
}

func TestFleetPackageNonMultiple(t *testing.T) {
Expand Down Expand Up @@ -143,7 +164,16 @@ func initAPMIntegrationPackagePolicyInputs(t *testing.T, packagePolicy *fleettes
}
}

func getAPMIntegrationPackage(t *testing.T, fleet *fleettest.Client) *fleettest.Package {
func cleanupFleet(t testing.TB, fleet *fleettest.Client) {
cleanupFleetPolicies(t, fleet)
apmPackage := getAPMIntegrationPackage(t, fleet)
if apmPackage.Status == "installed" {
err := fleet.DeletePackage(apmPackage.Name, apmPackage.Version)
require.NoError(t, err)
}
}

func getAPMIntegrationPackage(t testing.TB, fleet *fleettest.Client) *fleettest.Package {
var apmPackage *fleettest.Package
packages, err := fleet.ListPackages()
require.NoError(t, err)
Expand All @@ -161,7 +191,7 @@ func getAPMIntegrationPackage(t *testing.T, fleet *fleettest.Client) *fleettest.
panic("unreachable")
}

func cleanupFleet(t testing.TB, fleet *fleettest.Client) {
func cleanupFleetPolicies(t testing.TB, fleet *fleettest.Client) {
apmAgentPolicies, err := fleet.AgentPolicies("ingest-agent-policies.name:apm_systemtest")
require.NoError(t, err)
if len(apmAgentPolicies) == 0 {
Expand All @@ -187,3 +217,9 @@ func cleanupFleet(t testing.TB, fleet *fleettest.Client) {
require.NoError(t, err)
}
}

type roundTripperFunc func(*http.Request) (*http.Response, error)

func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return f(r)
}
Loading

0 comments on commit 9160ca2

Please sign in to comment.