Skip to content

Commit

Permalink
[7.x](backport #27509) Add a header round tripper option to httpcommon (
Browse files Browse the repository at this point in the history
#27788)

* Add a header round tripper option to httpcommon (#27509)

* Add a header round tripper option to httpcommon

Add a new TransportOption to add a set of headers to each HTTP request
through a custom http.RoundTripper. It will set the passed headers to
each request if the header key is not present. Use the new transport
option to add User-Agent headers to heartbeat, metricbeat, and filebeat.

* Fix useragent injection in heartbeat IPMonitor jobs

* Add User-Agent header to kibana and eslegclient

Add the User-Agent header with the RoundTripper to requests made by the
Kibana and eslegclient. The value for the User-Agent will be constructed
from what is returned by the os executable name.

* Fix syntax errors

* Get user agent from existing ingo

Get user-agent values for the Kibana and eslegclient from existing
config settings that get passed into the clients.

* change from settings.Name to b.Info.Name

* Fix missing param

* Rename export dashboard useragent value

* Review feedback

* Change beat.Info.Name to beat.Info.Beat

* Fix typo

(cherry picked from commit 8a5dac6)

# Conflicts:
#	heartbeat/monitors/active/http/task.go
#	heartbeat/monitors/active/http/task_test.go
#	libbeat/cmd/export/dashboard.go

* Cleanup merge

* Fix for Filebeat

Co-authored-by: Michel Laterman <[email protected]>
Co-authored-by: michel-laterman <[email protected]>
  • Loading branch information
3 people authored Sep 8, 2021
1 parent 3f12525 commit e0a5ac1
Show file tree
Hide file tree
Showing 23 changed files with 158 additions and 65 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Bump AWS SDK version to v0.24.0 for WebIdentity authentication flow {issue}19393[19393] {pull}27126[27126]
- Add Linux pressure metricset {pull}27355[27355]
- Add support for kube-state-metrics v2.0.0 {pull}27552[27552]
- Add User-Agent header to HTTP requests. {issue}18160[18160] {pull}27509[27509]

*Packetbeat*

Expand Down
2 changes: 1 addition & 1 deletion dev-tools/cmd/dashboards/export_dashboards.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func main() {
Path: u.Path,
SpaceID: *spaceID,
Transport: transport,
})
}, "Beat Development Tools")
if err != nil {
log.Fatalf("Error while connecting to Kibana: %v", err)
}
Expand Down
8 changes: 4 additions & 4 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {

overwritePipelines := true
b.OverwritePipelinesCallback = func(esConfig *common.Config) error {
esClient, err := eslegclient.NewConnectedClient(esConfig)
esClient, err := eslegclient.NewConnectedClient(esConfig, "Filebeat")
if err != nil {
return err
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err
}

esConfig := b.Config.Output.Config()
esClient, err := eslegclient.NewConnectedClient(esConfig)
esClient, err := eslegclient.NewConnectedClient(esConfig, "Filebeat")
if err != nil {
return errors.Errorf("Error creating Elasticsearch client: %v", err)
}
Expand All @@ -256,7 +256,7 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err
}
}

kibanaClient, err := kibana.NewKibanaClient(kibanaConfig)
kibanaClient, err := kibana.NewKibanaClient(kibanaConfig, "Filebeat")
if err != nil {
return errors.Errorf("Error creating Kibana client: %v", err)
}
Expand Down Expand Up @@ -521,7 +521,7 @@ func (fb *Filebeat) Stop() {
// Create a new pipeline loader (es client) factory
func newPipelineLoaderFactory(esConfig *common.Config) fileset.PipelineLoaderFactory {
pipelineLoaderFactory := func() (fileset.PipelineLoader, error) {
esClient, err := eslegclient.NewConnectedClient(esConfig)
esClient, err := eslegclient.NewConnectedClient(esConfig, "Filebeat")
if err != nil {
return nil, errors.Wrap(err, "Error creating Elasticsearch client")
}
Expand Down
4 changes: 4 additions & 0 deletions heartbeat/monitors/active/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/transport/httpcommon"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/common/useragent"
"github.com/elastic/beats/v7/libbeat/logp"
)

Expand All @@ -38,6 +39,8 @@ func init() {

var debugf = logp.MakeDebug("http")

var userAgent = useragent.UserAgent("Heartbeat", true)

// Create makes a new HTTP monitor
func create(
name string,
Expand Down Expand Up @@ -128,5 +131,6 @@ func newRoundTripper(config *Config) (http.RoundTripper, error) {
httpcommon.WithKeepaliveSettings{
Disable: true,
},
httpcommon.WithHeaderRoundTripper(map[string]string{"User-Agent": userAgent}),
)
}
26 changes: 26 additions & 0 deletions heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/heartbeat/hbtest"
Expand Down Expand Up @@ -674,3 +675,28 @@ func mustParseURL(t *testing.T, url string) *url.URL {
}
return parsed
}

func TestUserAgentInject(t *testing.T) {
ua := ""
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ua = r.Header.Get("User-Agent")
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()

cfg, err := common.NewConfigFrom(map[string]interface{}{
"urls": ts.URL,
})
require.NoError(t, err)

p, err := create("ua", cfg)
require.NoError(t, err)

sched, _ := schedule.Parse("@every 1s")
job := wrappers.WrapCommon(p.Jobs, stdfields.StdMonitorFields{ID: "test", Type: "http", Schedule: sched, Timeout: 1})[0]

event := &beat.Event{}
_, err = job(event)
require.NoError(t, err)
assert.Contains(t, ua, "Heartbeat")
}
44 changes: 20 additions & 24 deletions heartbeat/monitors/active/http/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,10 @@ import (
"github.com/elastic/beats/v7/heartbeat/reason"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/transport/httpcommon"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/common/useragent"
)

var userAgent = useragent.UserAgent("Heartbeat", true)

func newHTTPMonitorHostJob(
addr string,
config *Config,
Expand Down Expand Up @@ -141,27 +139,28 @@ func createPingFactory(
// prevents following redirects in this case, we know that
// config.MaxRedirects must be zero to even be here
checkRedirect := makeCheckRedirect(0, nil)
transport := &SimpleTransport{
Dialer: dialer,
OnStartWrite: func() {
cbMutex.Lock()
writeStart = time.Now()
cbMutex.Unlock()
},
OnEndWrite: func() {
cbMutex.Lock()
writeEnd = time.Now()
cbMutex.Unlock()
},
OnStartRead: func() {
cbMutex.Lock()
readStart = time.Now()
cbMutex.Unlock()
},
}
client := &http.Client{
CheckRedirect: checkRedirect,
Timeout: timeout,
Transport: &SimpleTransport{
Dialer: dialer,
OnStartWrite: func() {
cbMutex.Lock()
writeStart = time.Now()
cbMutex.Unlock()
},
OnEndWrite: func() {
cbMutex.Lock()
writeEnd = time.Now()
cbMutex.Unlock()
},
OnStartRead: func() {
cbMutex.Lock()
readStart = time.Now()
cbMutex.Unlock()
},
},
Transport: httpcommon.HeaderRoundTripper(transport, map[string]string{"User-Agent": userAgent}),
}

_, end, err := execPing(event, client, request, body, timeout, validator, config.Response)
Expand Down Expand Up @@ -206,9 +205,6 @@ func buildRequest(addr string, config *Config, enc contentEncoder) (*http.Reques

request.Header.Add(k, v)
}
if ua := request.Header.Get("User-Agent"); ua == "" {
request.Header.Set("User-Agent", userAgent)
}

if enc != nil {
enc.AddHeaders(&request.Header)
Expand Down
9 changes: 0 additions & 9 deletions heartbeat/monitors/active/http/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"reflect"
"testing"

"github.com/elastic/beats/v7/libbeat/common/useragent"

"github.com/stretchr/testify/require"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -174,13 +172,6 @@ func TestRequestBuildingWithCustomHost(t *testing.T) {
}
}

func TestRequestBuildingWithNoUserAgent(t *testing.T) {
request, err := buildRequest("localhost", &Config{}, nilEncoder{})

require.Nil(t, err)
assert.Equal(t, useragent.UserAgent("Heartbeat", true), request.Header.Get("User-Agent"))
}

func TestRequestBuildingWithExplicitUserAgent(t *testing.T) {
expectedUserAgent := "some-user-agent"

Expand Down
2 changes: 1 addition & 1 deletion libbeat/cmd/export/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func GenDashboardCmd(settings instance.Settings) *cobra.Command {
b.Config.Kibana = common.NewConfig()
}

client, err := kibana.NewKibanaClient(b.Config.Kibana)
client, err := kibana.NewKibanaClient(b.Config.Kibana, b.Info.Beat)
if err != nil {
fatalf("Error creating Kibana client: %+v.\n", err)
}
Expand Down
4 changes: 2 additions & 2 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func (b *Beat) Setup(settings Settings, bt beat.Creator, setup SetupSettings) er
if outCfg.Name() != "elasticsearch" {
return fmt.Errorf("Index management requested but the Elasticsearch output is not configured/enabled")
}
esClient, err := eslegclient.NewConnectedClient(outCfg.Config())
esClient, err := eslegclient.NewConnectedClient(outCfg.Config(), b.Info.Beat)
if err != nil {
return err
}
Expand Down Expand Up @@ -822,7 +822,7 @@ func (b *Beat) loadDashboards(ctx context.Context, force bool) error {
return fmt.Errorf("error initKibanaConfig: %v", err)
}

client, err := kibana.NewKibanaClient(kibanaConfig)
client, err := kibana.NewKibanaClient(kibanaConfig, b.Info.Beat)
if err != nil {
return fmt.Errorf("error connecting to Kibana: %v", err)
}
Expand Down
27 changes: 27 additions & 0 deletions libbeat/common/transport/httpcommon/httpcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,20 @@ type extraOptionFunc func(*extraSettings)
func (extraOptionFunc) sealTransportOption() {}
func (fn extraOptionFunc) applyExtra(s *extraSettings) { fn(s) }

type headerRoundTripper struct {
headers map[string]string
rt http.RoundTripper
}

func (rt *headerRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
for k, v := range rt.headers {
if len(req.Header.Get(k)) == 0 {
req.Header.Set(k, v)
}
}
return rt.rt.RoundTrip(req)
}

// DefaultHTTPTransportSettings returns the default HTTP transport setting.
func DefaultHTTPTransportSettings() HTTPTransportSettings {
return HTTPTransportSettings{
Expand Down Expand Up @@ -373,6 +387,19 @@ func WithAPMHTTPInstrumentation() TransportOption {
return withAPMHTTPRountTripper
}

// HeaderRoundTripper will return a RoundTripper that sets header KVs if the key is not present.
func HeaderRoundTripper(rt http.RoundTripper, headers map[string]string) http.RoundTripper {
return &headerRoundTripper{headers, rt}
}

// WithHeaderRoundTripper instuments the HTTP client via a custom http.RoundTripper.
// This RoundTripper will add headers to each request if the key is not present.
func WithHeaderRoundTripper(headers map[string]string) TransportOption {
return WithModRoundtripper(func(rt http.RoundTripper) http.RoundTripper {
return HeaderRoundTripper(rt, headers)
})
}

// WithLogger sets the internal logger that will be used to log dial or TCP level errors.
// Logging at the connection level will only happen if the logger has been set.
func WithLogger(logger *logp.Logger) TransportOption {
Expand Down
6 changes: 3 additions & 3 deletions libbeat/dashboards/dashboards.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ func ImportDashboards(
return errors.New("kibana configuration missing for loading dashboards.")
}

return setupAndImportDashboardsViaKibana(ctx, beatInfo.Hostname, kibanaConfig, &dashConfig, msgOutputter, pattern)
return setupAndImportDashboardsViaKibana(ctx, beatInfo.Hostname, beatInfo.Beat, kibanaConfig, &dashConfig, msgOutputter, pattern)
}

func setupAndImportDashboardsViaKibana(ctx context.Context, hostname string, kibanaConfig *common.Config,
func setupAndImportDashboardsViaKibana(ctx context.Context, hostname, beatname string, kibanaConfig *common.Config,
dashboardsConfig *Config, msgOutputter MessageOutputter, fields common.MapStr) error {

kibanaLoader, err := NewKibanaLoader(ctx, kibanaConfig, dashboardsConfig, hostname, msgOutputter)
kibanaLoader, err := NewKibanaLoader(ctx, kibanaConfig, dashboardsConfig, hostname, msgOutputter, beatname)
if err != nil {
return fmt.Errorf("fail to create the Kibana loader: %v", err)
}
Expand Down
10 changes: 5 additions & 5 deletions libbeat/dashboards/kibana_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ type KibanaLoader struct {
}

// NewKibanaLoader creates a new loader to load Kibana files
func NewKibanaLoader(ctx context.Context, cfg *common.Config, dashboardsConfig *Config, hostname string, msgOutputter MessageOutputter) (*KibanaLoader, error) {
func NewKibanaLoader(ctx context.Context, cfg *common.Config, dashboardsConfig *Config, hostname string, msgOutputter MessageOutputter, beatname string) (*KibanaLoader, error) {

if cfg == nil || !cfg.Enabled() {
return nil, fmt.Errorf("Kibana is not configured or enabled")
}

client, err := getKibanaClient(ctx, cfg, dashboardsConfig.Retry, 0)
client, err := getKibanaClient(ctx, cfg, dashboardsConfig.Retry, 0, beatname)
if err != nil {
return nil, fmt.Errorf("Error creating Kibana client: %v", err)
}
Expand All @@ -73,15 +73,15 @@ func NewKibanaLoader(ctx context.Context, cfg *common.Config, dashboardsConfig *
return &loader, nil
}

func getKibanaClient(ctx context.Context, cfg *common.Config, retryCfg *Retry, retryAttempt uint) (*kibana.Client, error) {
client, err := kibana.NewKibanaClient(cfg)
func getKibanaClient(ctx context.Context, cfg *common.Config, retryCfg *Retry, retryAttempt uint, beatname string) (*kibana.Client, error) {
client, err := kibana.NewKibanaClient(cfg, beatname)
if err != nil {
if retryCfg.Enabled && (retryCfg.Maximum == 0 || retryCfg.Maximum > retryAttempt) {
select {
case <-ctx.Done():
return nil, err
case <-time.After(retryCfg.Interval):
return getKibanaClient(ctx, cfg, retryCfg, retryAttempt+1)
return getKibanaClient(ctx, cfg, retryCfg, retryAttempt+1, beatname)
}
}
return nil, fmt.Errorf("Error creating Kibana client: %v", err)
Expand Down
17 changes: 13 additions & 4 deletions libbeat/esleg/eslegclient/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common/transport/httpcommon"
"github.com/elastic/beats/v7/libbeat/common/transport/kerberos"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/common/useragent"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/testing"
)
Expand All @@ -57,7 +58,8 @@ type Connection struct {

// ConnectionSettings are the settings needed for a Connection
type ConnectionSettings struct {
URL string
URL string
Beatname string

Username string
Password string
Expand Down Expand Up @@ -110,6 +112,11 @@ func NewConnection(s ConnectionSettings) (*Connection, error) {
}
}

if s.Beatname == "" {
s.Beatname = "Libbeat"
}
userAgent := useragent.UserAgent(s.Beatname, true)

httpClient, err := s.Transport.Client(
httpcommon.WithLogger(logger),
httpcommon.WithIOStats(s.Observer),
Expand All @@ -119,6 +126,7 @@ func NewConnection(s ConnectionSettings) (*Connection, error) {
// eg, like in https://github.com/elastic/apm-server/blob/7.7/elasticsearch/client.go
return apmelasticsearch.WrapRoundTripper(rt)
}),
httpcommon.WithHeaderRoundTripper(map[string]string{"User-Agent": userAgent}),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -160,7 +168,7 @@ func settingsWithDefaults(s ConnectionSettings) ConnectionSettings {
// configuration. It accepts the same configuration parameters as the Elasticsearch
// output, except for the output specific configuration options. If multiple hosts
// are defined in the configuration, a client is returned for each of them.
func NewClients(cfg *common.Config) ([]Connection, error) {
func NewClients(cfg *common.Config, beatname string) ([]Connection, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, err
Expand All @@ -185,6 +193,7 @@ func NewClients(cfg *common.Config) ([]Connection, error) {

client, err := NewConnection(ConnectionSettings{
URL: esURL,
Beatname: beatname,
Kerberos: config.Kerberos,
Username: config.Username,
Password: config.Password,
Expand All @@ -205,8 +214,8 @@ func NewClients(cfg *common.Config) ([]Connection, error) {
return clients, nil
}

func NewConnectedClient(cfg *common.Config) (*Connection, error) {
clients, err := NewClients(cfg)
func NewConnectedClient(cfg *common.Config, beatname string) (*Connection, error) {
clients, err := NewClients(cfg, beatname)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit e0a5ac1

Please sign in to comment.