Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

systemtest: remove TestDataStream*, fix Fleet test #5503

Merged
merged 2 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions systemtest/apmservertest/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"go.elastic.co/fastjson"
)

// TODO(axw) move EventMetadata and filteringTransport to go.elastic.co/apmtest,
// TODO(axw) move EventMetadata and FilteringTransport to go.elastic.co/apmtest,
// generalising filteringTransport to work with arbitrary base transports. To do
// that we would need to dynamically check for optional interfaces supported by
// the base transport, and create passthrough methods.
Expand All @@ -47,14 +47,22 @@ type EventMetadataFilter interface {
FilterEventMetadata(*EventMetadata)
}

type filteringTransport struct {
// FilteringTransport is a transport for the APM Go agent which modifies events
// prior to sending them to the underlying transport.
type FilteringTransport struct {
*transport.HTTPTransport
filter EventMetadataFilter
}

// NewFilteringTransport returns a new FilteringTransport that filters events
// using f, and sends them on to h.
func NewFilteringTransport(h *transport.HTTPTransport, f EventMetadataFilter) *FilteringTransport {
return &FilteringTransport{h, f}
}

// SendStream decodes metadata from reader, passes it through the filters,
// and then sends the modified stream to the underlying transport.
func (t *filteringTransport) SendStream(ctx context.Context, stream io.Reader) error {
func (t *FilteringTransport) SendStream(ctx context.Context, stream io.Reader) error {
zr, err := zlib.NewReader(stream)
if err != nil {
return err
Expand Down Expand Up @@ -98,9 +106,12 @@ func (t *filteringTransport) SendStream(ctx context.Context, stream io.Reader) e
return t.HTTPTransport.SendStream(ctx, &buf)
}

type defaultMetadataFilter struct{}
// DefaultMetadataFilter implements EventMetadataFilter, setting some default values
// for fields that would otherwise by dynamically discovered.
type DefaultMetadataFilter struct{}

func (defaultMetadataFilter) FilterEventMetadata(m *EventMetadata) {
// FilterEventMetadata updates m with default values for dynamically discovered fields.
func (DefaultMetadataFilter) FilterEventMetadata(m *EventMetadata) {
m.System.Platform = "minix"
m.System.Architecture = "i386"
m.System.Container = nil
Expand Down
7 changes: 2 additions & 5 deletions systemtest/apmservertest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func NewServer(tb testing.TB, args ...string) *Server {
func NewUnstartedServer(tb testing.TB, args ...string) *Server {
return &Server{
Config: DefaultConfig(),
EventMetadataFilter: defaultMetadataFilter{},
EventMetadataFilter: DefaultMetadataFilter{},
tb: tb,
args: args,
}
Expand Down Expand Up @@ -469,10 +469,7 @@ func (s *Server) Tracer() *apm.Tracer {

var transport transport.Transport = httpTransport
if s.EventMetadataFilter != nil {
transport = &filteringTransport{
HTTPTransport: httpTransport,
filter: s.EventMetadataFilter,
}
transport = NewFilteringTransport(httpTransport, s.EventMetadataFilter)
}
tracer, err := apm.NewTracerOptions(apm.TracerOptions{Transport: transport})
if err != nil {
Expand Down
76 changes: 0 additions & 76 deletions systemtest/approvals/TestDataStreamsEnabled/false.approved.json

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
"version": "dynamic"
},
"event": {
"agent_id_status": "untrusted_user",
"ingested": "dynamic",
"outcome": "unknown"
},
"host": {
"architecture": "i386",
"hostname": "beowulf",
"ip": "127.0.0.1",
"ip": "10.11.12.13",
"name": "beowulf",
"os": {
"platform": "minix"
Expand Down
140 changes: 0 additions & 140 deletions systemtest/datastreams_test.go

This file was deleted.

52 changes: 44 additions & 8 deletions systemtest/fleet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package systemtest_test
import (
"context"
"io/ioutil"
"net/http"
"net/url"
"testing"
"time"
Expand All @@ -31,6 +32,7 @@ import (
"go.elastic.co/apm/transport"

"github.com/elastic/apm-server/systemtest"
"github.com/elastic/apm-server/systemtest/apmservertest"
"github.com/elastic/apm-server/systemtest/fleettest"
)

Expand Down Expand Up @@ -83,17 +85,36 @@ func TestFleetIntegration(t *testing.T) {
require.NoError(t, err)

// Elastic Agent has started apm-server. Connect to apm-server and send some data,
// and make sure it gets indexed into a data stream.
transport, err := transport.NewHTTPTransport()
// and make sure it gets indexed into a data stream. We override the transport to
// set known metadata.
httpTransport, err := transport.NewHTTPTransport()
require.NoError(t, err)
transport.SetServerURL(&url.URL{Scheme: "http", Host: agent.Addrs["8200"]})
tracer, err := apm.NewTracerOptions(apm.TracerOptions{Transport: transport})
origTransport := httpTransport.Client.Transport
httpTransport.Client.Transport = roundTripperFunc(func(r *http.Request) (*http.Response, error) {
r.Header.Set("X-Real-Ip", "10.11.12.13")
return origTransport.RoundTrip(r)
})
httpTransport.SetServerURL(&url.URL{Scheme: "http", Host: agent.Addrs["8200"]})
tracer, err := apm.NewTracerOptions(apm.TracerOptions{
Transport: apmservertest.NewFilteringTransport(
httpTransport,
apmservertest.DefaultMetadataFilter{},
),
})
require.NoError(t, err)
defer tracer.Close()
tracer.StartTransaction("name", "type").End()

tx := tracer.StartTransaction("name", "type")
tx.Duration = time.Second
tx.End()
tracer.Flush(nil)

systemtest.Elasticsearch.ExpectDocs(t, "traces-*", nil)
result := systemtest.Elasticsearch.ExpectDocs(t, "traces-*", nil)
systemtest.ApproveEvents(
t, t.Name(), result.Hits.Hits,
"@timestamp", "timestamp.us",
"trace.id", "transaction.id",
)
}

func TestFleetPackageNonMultiple(t *testing.T) {
Expand Down Expand Up @@ -143,7 +164,16 @@ func initAPMIntegrationPackagePolicyInputs(t *testing.T, packagePolicy *fleettes
}
}

func getAPMIntegrationPackage(t *testing.T, fleet *fleettest.Client) *fleettest.Package {
func cleanupFleet(t testing.TB, fleet *fleettest.Client) {
cleanupFleetPolicies(t, fleet)
apmPackage := getAPMIntegrationPackage(t, fleet)
if apmPackage.Status == "installed" {
err := fleet.DeletePackage(apmPackage.Name, apmPackage.Version)
require.NoError(t, err)
}
}

func getAPMIntegrationPackage(t testing.TB, fleet *fleettest.Client) *fleettest.Package {
var apmPackage *fleettest.Package
packages, err := fleet.ListPackages()
require.NoError(t, err)
Expand All @@ -161,7 +191,7 @@ func getAPMIntegrationPackage(t *testing.T, fleet *fleettest.Client) *fleettest.
panic("unreachable")
}

func cleanupFleet(t testing.TB, fleet *fleettest.Client) {
func cleanupFleetPolicies(t testing.TB, fleet *fleettest.Client) {
apmAgentPolicies, err := fleet.AgentPolicies("ingest-agent-policies.name:apm_systemtest")
require.NoError(t, err)
if len(apmAgentPolicies) == 0 {
Expand All @@ -187,3 +217,9 @@ func cleanupFleet(t testing.TB, fleet *fleettest.Client) {
require.NoError(t, err)
}
}

type roundTripperFunc func(*http.Request) (*http.Response, error)

func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return f(r)
}
Loading