Skip to content

Commit

Permalink
apmsoak: Support arbitrary headers via -header (#10550)
Browse files Browse the repository at this point in the history
Adds support adding arbitrary headers to the `apmsoak` binary via a new
`header` flag which can be set multiple times.

---------

Signed-off-by: Marc Lopez Rubio <[email protected]>
  • Loading branch information
marclop authored Mar 29, 2023
1 parent 3d72e9d commit 4c1e156
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 9 deletions.
1 change: 1 addition & 0 deletions systemtest/benchtest/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func NewEventHandler(tb testing.TB, p string, l *rate.Limiter) *eventhandler.Han
Limiter: l,
RewriteIDs: serverCfg.RewriteIDs,
RewriteTimestamps: serverCfg.RewriteTimestamps,
Headers: serverCfg.Headers,
})
if err != nil {
tb.Fatal(err)
Expand Down
23 changes: 19 additions & 4 deletions systemtest/loadgen/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var Config struct {
MaxEPM int
RewriteIDs bool
RewriteTimestamps bool
Headers map[string]string
}

func init() {
Expand Down Expand Up @@ -87,6 +88,20 @@ func init() {
false,
"rewrite event IDs every iteration, maintaining event relationships",
)
flag.Func("header",
"extra headers to use when sending data to the apm-server",
func(s string) error {
k, v, ok := strings.Cut(s, "=")
if !ok {
return fmt.Errorf("invalid header '%s': format must be key=value", s)
}
if len(Config.Headers) == 0 {
Config.Headers = make(map[string]string)
}
Config.Headers[k] = v
return nil
},
)

// For configs that can be set via environment variables, set the required
// flags from env if they are not explicitly provided via command line
Expand All @@ -97,10 +112,10 @@ func setFlagsFromEnv() {
// value[0] is environment key
// value[1] is default value
flagEnvMap := map[string][]string{
"server": []string{"ELASTIC_APM_SERVER_URL", "http://127.0.0.1:8200"},
"secret-token": []string{"ELASTIC_APM_SECRET_TOKEN", ""},
"api-key": []string{"ELASTIC_APM_API_KEY", ""},
"secure": []string{"ELASTIC_APM_VERIFY_SERVER_CERT", "false"},
"server": {"ELASTIC_APM_SERVER_URL", "http://127.0.0.1:8200"},
"secret-token": {"ELASTIC_APM_SECRET_TOKEN", ""},
"api-key": {"ELASTIC_APM_API_KEY", ""},
"secure": {"ELASTIC_APM_VERIFY_SERVER_CERT", "false"},
}

for k, v := range flagEnvMap {
Expand Down
3 changes: 2 additions & 1 deletion systemtest/loadgen/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type EventHandlerParams struct {
Rand *rand.Rand
RewriteIDs bool
RewriteTimestamps bool
Headers map[string]string
}

// NewEventHandler creates a eventhandler which loads the files matching the
Expand All @@ -54,7 +55,7 @@ func NewEventHandler(p EventHandlerParams) (*eventhandler.Handler, error) {
if err != nil {
return nil, err
}
transp := eventhandler.NewTransport(t.Client, p.URL, p.Token, p.APIKey)
transp := eventhandler.NewTransport(t.Client, p.URL, p.Token, p.APIKey, p.Headers)
return eventhandler.New(eventhandler.Config{
Path: filepath.Join("events", p.Path),
Transport: transp,
Expand Down
2 changes: 1 addition & 1 deletion systemtest/loadgen/eventhandler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func newHandler(tb testing.TB, opts ...newHandlerOption) (*Handler, *mockServer)
ms := &mockServer{got: &bytes.Buffer{}}
srv := httptest.NewServer(ms)
ms.close = srv.Close
transp := NewTransport(srv.Client(), srv.URL, "", "")
transp := NewTransport(srv.Client(), srv.URL, "", "", nil)

config := Config{
Path: "*.ndjson",
Expand Down
5 changes: 4 additions & 1 deletion systemtest/loadgen/eventhandler/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,15 @@ type Transport struct {
}

// NewTransport initializes a new ReplayTransport.
func NewTransport(c *http.Client, srvURL, token string, apiKey string) *Transport {
func NewTransport(c *http.Client, srvURL, token, apiKey string, headers map[string]string) *Transport {
intakeHeaders := make(http.Header)
intakeHeaders.Set("Content-Encoding", "deflate")
intakeHeaders.Set("Content-Type", "application/x-ndjson")
intakeHeaders.Set("Transfer-Encoding", "chunked")
intakeHeaders.Set("Authorization", getAuthHeader(token, apiKey))
for name, header := range headers {
intakeHeaders.Set(name, header)
}
return &Transport{
client: c,
intakeV2URL: srvURL + `/intake/v2/events`,
Expand Down
5 changes: 3 additions & 2 deletions systemtest/soaktest/soaktest.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ func RunBlocking(ctx context.Context) error {
expr := expr
g.Go(func() error {
rng := rand.New(rand.NewSource(rngseed))
return runAgent(gCtx, expr, limiter, rng)
return runAgent(gCtx, expr, limiter, rng, loadgencfg.Config.Headers)
})
}
}

return g.Wait()
}

func runAgent(ctx context.Context, expr string, limiter *rate.Limiter, rng *rand.Rand) error {
func runAgent(ctx context.Context, expr string, limiter *rate.Limiter, rng *rand.Rand, headers map[string]string) error {
handler, err := loadgen.NewEventHandler(loadgen.EventHandlerParams{
Path: expr,
URL: loadgencfg.Config.ServerURL.String(),
Expand All @@ -66,6 +66,7 @@ func runAgent(ctx context.Context, expr string, limiter *rate.Limiter, rng *rand
Rand: rng,
RewriteIDs: loadgencfg.Config.RewriteIDs,
RewriteTimestamps: loadgencfg.Config.RewriteTimestamps,
Headers: headers,
})
if err != nil {
return err
Expand Down

0 comments on commit 4c1e156

Please sign in to comment.