Skip to content

Commit

Permalink
systemtest: remove TestDataStream*, fix Fleet test
Browse files Browse the repository at this point in the history
Remove the TestDataStreams tests, which test the
server's internal/private/undocumneted configuration
for enabling data streams. At least for now, this is
only intended to be used with Fleet, so we should
instead just rely on the Fleet system test.

There was a bug in the Fleet test where apm-server
would create "traces-apm-default" as a plain old
index instead of a data stream in certain conditions:
if the apm integration had previously been installed,
then the traces-apm template would be deleted by
CleanupElasticsearch. Because the package was already
installed, creating the policy would not automatically
reinstall it and add the missing template.

We fix the Fleet issue by uninstalling the package
during Fleet cleanup, after unenrolling agents.
  • Loading branch information
axw committed Jun 21, 2021
1 parent ab082c3 commit 6275b9d
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 232 deletions.
21 changes: 16 additions & 5 deletions systemtest/apmservertest/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"go.elastic.co/fastjson"
)

// TODO(axw) move EventMetadata and filteringTransport to go.elastic.co/apmtest,
// TODO(axw) move EventMetadata and FilteringTransport to go.elastic.co/apmtest,
// generalising filteringTransport to work with arbitrary base transports. To do
// that we would need to dynamically check for optional interfaces supported by
// the base transport, and create passthrough methods.
Expand All @@ -47,14 +47,22 @@ type EventMetadataFilter interface {
FilterEventMetadata(*EventMetadata)
}

type filteringTransport struct {
// FilteringTransport is a transport for the APM Go agent which modifies events
// prior to sending them to the underlying transport.
type FilteringTransport struct {
*transport.HTTPTransport
filter EventMetadataFilter
}

// NewFilteringTransport returns a new FilteringTransport that filters events
// using f, and sends them on to h.
func NewFilteringTransport(h *transport.HTTPTransport, f EventMetadataFilter) *FilteringTransport {
return &FilteringTransport{h, f}
}

// SendStream decodes metadata from reader, passes it through the filters,
// and then sends the modified stream to the underlying transport.
func (t *filteringTransport) SendStream(ctx context.Context, stream io.Reader) error {
func (t *FilteringTransport) SendStream(ctx context.Context, stream io.Reader) error {
zr, err := zlib.NewReader(stream)
if err != nil {
return err
Expand Down Expand Up @@ -98,9 +106,12 @@ func (t *filteringTransport) SendStream(ctx context.Context, stream io.Reader) e
return t.HTTPTransport.SendStream(ctx, &buf)
}

type defaultMetadataFilter struct{}
// DefaultMetadataFilter implements EventMetadataFilter, setting some default values
// for fields that would otherwise by dynamically discovered.
type DefaultMetadataFilter struct{}

func (defaultMetadataFilter) FilterEventMetadata(m *EventMetadata) {
// FilterEventMetadata updates m with default values for dynamically discovered fields.
func (DefaultMetadataFilter) FilterEventMetadata(m *EventMetadata) {
m.System.Platform = "minix"
m.System.Architecture = "i386"
m.System.Container = nil
Expand Down
7 changes: 2 additions & 5 deletions systemtest/apmservertest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func NewServer(tb testing.TB, args ...string) *Server {
func NewUnstartedServer(tb testing.TB, args ...string) *Server {
return &Server{
Config: DefaultConfig(),
EventMetadataFilter: defaultMetadataFilter{},
EventMetadataFilter: DefaultMetadataFilter{},
tb: tb,
args: args,
}
Expand Down Expand Up @@ -469,10 +469,7 @@ func (s *Server) Tracer() *apm.Tracer {

var transport transport.Transport = httpTransport
if s.EventMetadataFilter != nil {
transport = &filteringTransport{
HTTPTransport: httpTransport,
filter: s.EventMetadataFilter,
}
transport = NewFilteringTransport(httpTransport, s.EventMetadataFilter)
}
tracer, err := apm.NewTracerOptions(apm.TracerOptions{Transport: transport})
if err != nil {
Expand Down
76 changes: 0 additions & 76 deletions systemtest/approvals/TestDataStreamsEnabled/false.approved.json

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
"version": "dynamic"
},
"event": {
"agent_id_status": "untrusted_user",
"ingested": "dynamic",
"outcome": "unknown"
},
"host": {
"architecture": "i386",
"hostname": "beowulf",
"ip": "127.0.0.1",
"ip": "172.20.0.1",
"name": "beowulf",
"os": {
"platform": "minix"
Expand Down
140 changes: 0 additions & 140 deletions systemtest/datastreams_test.go

This file was deleted.

33 changes: 28 additions & 5 deletions systemtest/fleet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.elastic.co/apm/transport"

"github.com/elastic/apm-server/systemtest"
"github.com/elastic/apm-server/systemtest/apmservertest"
"github.com/elastic/apm-server/systemtest/fleettest"
)

Expand Down Expand Up @@ -87,13 +88,26 @@ func TestFleetIntegration(t *testing.T) {
transport, err := transport.NewHTTPTransport()
require.NoError(t, err)
transport.SetServerURL(&url.URL{Scheme: "http", Host: agent.Addrs["8200"]})
tracer, err := apm.NewTracerOptions(apm.TracerOptions{Transport: transport})
tracer, err := apm.NewTracerOptions(apm.TracerOptions{
Transport: apmservertest.NewFilteringTransport(
transport,
apmservertest.DefaultMetadataFilter{},
),
})
require.NoError(t, err)
defer tracer.Close()
tracer.StartTransaction("name", "type").End()

tx := tracer.StartTransaction("name", "type")
tx.Duration = time.Second
tx.End()
tracer.Flush(nil)

systemtest.Elasticsearch.ExpectDocs(t, "traces-*", nil)
result := systemtest.Elasticsearch.ExpectDocs(t, "traces-*", nil)
systemtest.ApproveEvents(
t, t.Name(), result.Hits.Hits,
"@timestamp", "timestamp.us",
"trace.id", "transaction.id",
)
}

func TestFleetPackageNonMultiple(t *testing.T) {
Expand Down Expand Up @@ -143,7 +157,16 @@ func initAPMIntegrationPackagePolicyInputs(t *testing.T, packagePolicy *fleettes
}
}

func getAPMIntegrationPackage(t *testing.T, fleet *fleettest.Client) *fleettest.Package {
func cleanupFleet(t testing.TB, fleet *fleettest.Client) {
cleanupFleetPolicies(t, fleet)
apmPackage := getAPMIntegrationPackage(t, fleet)
if apmPackage.Status == "installed" {
err := fleet.DeletePackage(apmPackage.Name, apmPackage.Version)
require.NoError(t, err)
}
}

func getAPMIntegrationPackage(t testing.TB, fleet *fleettest.Client) *fleettest.Package {
var apmPackage *fleettest.Package
packages, err := fleet.ListPackages()
require.NoError(t, err)
Expand All @@ -161,7 +184,7 @@ func getAPMIntegrationPackage(t *testing.T, fleet *fleettest.Client) *fleettest.
panic("unreachable")
}

func cleanupFleet(t testing.TB, fleet *fleettest.Client) {
func cleanupFleetPolicies(t testing.TB, fleet *fleettest.Client) {
apmAgentPolicies, err := fleet.AgentPolicies("ingest-agent-policies.name:apm_systemtest")
require.NoError(t, err)
if len(apmAgentPolicies) == 0 {
Expand Down
11 changes: 11 additions & 0 deletions systemtest/fleettest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,17 @@ func (c *Client) Package(name, version string) (*Package, error) {
return &result.Response, nil
}

// DeletePackage deletes (uninstalls) the package with the given name and version.
func (c *Client) DeletePackage(name, version string) error {
req := c.newFleetRequest("DELETE", "/epm/packages/"+name+"-"+version, nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
return consumeResponse(resp, nil)
}

// PackagePolicy returns information about the package policy with the given ID.
func (c *Client) PackagePolicy(id string) (*PackagePolicy, error) {
resp, err := http.Get(c.fleetURL + "/package_policies/" + id)
Expand Down

0 comments on commit 6275b9d

Please sign in to comment.