Skip to content

Commit

Permalink
Introduce httpcommon package in libbeat (add support for Proxy) (elas…
Browse files Browse the repository at this point in the history
  • Loading branch information
Steffen Siering authored Jun 29, 2021
1 parent beaa972 commit 4accfa8
Show file tree
Hide file tree
Showing 75 changed files with 1,044 additions and 738 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Improve ES output error insights. {pull}25825[25825]
- Add orchestrator.cluster.name/url fields as k8s metadata {pull}26056[26056]
- Libbeat: report beat version to monitoring. {pull}26214[26214]
- Ensure common proxy settings support in HTTP clients: proxy_disabled, proxy_url, proxy_headers and typical environment variables HTTP_PROXY, HTTPS_PROXY, NOPROXY. {pull}25219[25219]

*Auditbeat*

Expand Down Expand Up @@ -845,6 +846,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
*Heartbeat*

- Add mime type detection for http responses. {pull}22976[22976]
- Add `proxy_headers` to HTTP monitor. {pull}25219[25219]

*Journalbeat*

Expand Down
19 changes: 12 additions & 7 deletions dev-tools/cmd/dashboards/export_dashboards.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common/transport/httpcommon"
"github.com/elastic/beats/v7/libbeat/dashboards"
"github.com/elastic/beats/v7/libbeat/kibana"
)
Expand Down Expand Up @@ -64,14 +65,18 @@ func main() {
user = u.User.Username()
pass, _ = u.User.Password()
}

transport := httpcommon.DefaultHTTPTransportSettings()
transport.Timeout = kibanaTimeout

client, err := kibana.NewClientWithConfig(&kibana.ClientConfig{
Protocol: u.Scheme,
Host: u.Host,
Username: user,
Password: pass,
Path: u.Path,
SpaceID: *spaceID,
Timeout: kibanaTimeout,
Protocol: u.Scheme,
Host: u.Host,
Username: user,
Password: pass,
Path: u.Path,
SpaceID: *spaceID,
Transport: transport,
})
if err != nil {
log.Fatalf("Error while connecting to Kibana: %v", err)
Expand Down
3 changes: 1 addition & 2 deletions filebeat/fileset/modules_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,7 @@ func TestLoadMultiplePipelinesWithRollback(t *testing.T) {

func getTestingElasticsearch(t eslegtest.TestLogger) *eslegclient.Connection {
conn, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{
URL: eslegtest.GetURL(),
Timeout: 0,
URL: eslegtest.GetURL(),
})
if err != nil {
t.Fatal(err)
Expand Down
7 changes: 5 additions & 2 deletions filebeat/fileset/pipelines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common/transport/httpcommon"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/logp"
)
Expand Down Expand Up @@ -91,8 +92,10 @@ func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) {
defer testESServer.Close()

testESClient, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{
URL: testESServer.URL,
Timeout: 90 * time.Second,
URL: testESServer.URL,
Transport: httpcommon.HTTPTransportSettings{
Timeout: 90 * time.Second,
},
})
require.NoError(t, err)

Expand Down
6 changes: 6 additions & 0 deletions heartbeat/docs/monitors/monitor-http.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ that data will span multiple requests. Specifically the fields `http.rtt.content

The HTTP proxy URL. This setting is optional. Example `http://proxy.mydomain.com:3128`

[float]
[[monitor-http-proxy-headers]]
==== `proxy_headers`

Additional headers to send to proxies during CONNECT requests.

[float]
[[monitor-http-username]]
==== `username`
Expand Down
6 changes: 1 addition & 5 deletions heartbeat/monitors/active/dialchain/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,7 @@ func TLSLayer(cfg *tlscommon.TLSConfig, to time.Duration) Layer {
// This gets us the timestamp for when the TLS layer will start the handshake.
next = startTimerAfterDial(&timer, next)

dialer, err := transport.TLSDialer(next, cfg, to)
if err != nil {
return nil, err
}

dialer := transport.TLSDialer(next, cfg, to)
return afterDial(dialer, func(conn net.Conn) (net.Conn, error) {
tlsConn, ok := conn.(*cryptoTLS.Conn)
if !ok {
Expand Down
54 changes: 28 additions & 26 deletions heartbeat/monitors/active/http/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@ import (
"time"

"github.com/elastic/beats/v7/heartbeat/monitors"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/common/transport/httpcommon"
"github.com/elastic/beats/v7/libbeat/conditions"
)

type Config struct {
URLs []string `config:"urls"`
Hosts []string `config:"hosts"`
ProxyURL string `config:"proxy_url"`
Timeout time.Duration `config:"timeout"`
MaxRedirects int `config:"max_redirects"`
Response responseConfig `config:"response"`

Expand All @@ -42,11 +40,10 @@ type Config struct {
Username string `config:"username"`
Password string `config:"password"`

// configure tls (if not configured HTTPS will use system defaults)
TLS *tlscommon.Config `config:"ssl"`

// http(s) ping validation
Check checkConfig `config:"check"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`
}

type responseConfig struct {
Expand Down Expand Up @@ -90,27 +87,32 @@ type compressionConfig struct {
Level int `config:"level"`
}

var defaultConfig = Config{
Timeout: 16 * time.Second,
MaxRedirects: 0,
Response: responseConfig{
IncludeBody: "on_error",
IncludeBodyMaxBytes: 2048,
IncludeHeaders: true,
},
Mode: monitors.DefaultIPSettings,
Check: checkConfig{
Request: requestParameters{
Method: "GET",
SendHeaders: nil,
SendBody: "",
func defaultConfig() Config {
cfg := Config{
MaxRedirects: 0,
Response: responseConfig{
IncludeBody: "on_error",
IncludeBodyMaxBytes: 2048,
IncludeHeaders: true,
},
Response: responseParameters{
RecvHeaders: nil,
RecvBody: nil,
RecvJSON: nil,
Mode: monitors.DefaultIPSettings,
Check: checkConfig{
Request: requestParameters{
Method: "GET",
SendHeaders: nil,
SendBody: "",
},
Response: responseParameters{
RecvHeaders: nil,
RecvBody: nil,
RecvJSON: nil,
},
},
},
Transport: httpcommon.DefaultHTTPTransportSettings(),
}
cfg.Transport.Timeout = 16 * time.Second

return cfg
}

// Validate validates of the responseConfig object is valid or not
Expand Down Expand Up @@ -169,7 +171,7 @@ func (c *Config) Validate() error {

// updateScheme looks at TLS config to decide if http or https should be used to update the host
updateScheme := func(host string) string {
if c.TLS != nil && *c.TLS.Enabled == true {
if c.Transport.TLS != nil && c.Transport.TLS.IsEnabled() {
return fmt.Sprint("https://", host)
}
return fmt.Sprint("http://", host)
Expand Down
51 changes: 19 additions & 32 deletions heartbeat/monitors/active/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/elastic/beats/v7/heartbeat/monitors/jobs"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/transport"
"github.com/elastic/beats/v7/libbeat/common/transport/httpcommon"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/logp"
)
Expand All @@ -43,16 +43,11 @@ func create(
name string,
cfg *common.Config,
) (p plugin.Plugin, err error) {
config := defaultConfig
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return plugin.Plugin{}, err
}

tls, err := tlscommon.LoadTLSConfig(config.TLS)
if err != nil {
return plugin.Plugin{}, err
}

var body []byte
var enc contentEncoder

Expand Down Expand Up @@ -84,8 +79,8 @@ func create(
// In the event that a ProxyURL is present, or redirect support is enabled
// we execute DNS resolution requests inline with the request, not running them as a separate job, and not returning
// separate DNS rtt data.
if config.ProxyURL != "" || config.MaxRedirects > 0 {
transport, err := newRoundTripper(&config, tls)
if (config.Transport.Proxy.URL != nil && !config.Transport.Proxy.Disable) || config.MaxRedirects > 0 {
transport, err := newRoundTripper(&config)
if err != nil {
return plugin.Plugin{}, err
}
Expand All @@ -94,6 +89,13 @@ func create(
return newHTTPMonitorHostJob(urlStr, &config, transport, enc, body, validator)
}
} else {
// preload TLS configuration
tls, err := tlscommon.LoadTLSConfig(config.Transport.TLS)
if err != nil {
return plugin.Plugin{}, err
}
config.Transport.TLS = nil

makeJob = func(urlStr string) (jobs.Job, error) {
return newHTTPMonitorIPsJob(&config, urlStr, tls, enc, body, validator)
}
Expand All @@ -119,27 +121,12 @@ func create(
return plugin.Plugin{Jobs: js, Close: nil, Endpoints: len(config.Hosts)}, nil
}

func newRoundTripper(config *Config, tls *tlscommon.TLSConfig) (*http.Transport, error) {
var proxy func(*http.Request) (*url.URL, error)
if config.ProxyURL != "" {
url, err := url.Parse(config.ProxyURL)
if err != nil {
return nil, err
}
proxy = http.ProxyURL(url)
}

dialer := transport.NetDialer(config.Timeout)
tlsDialer, err := transport.TLSDialer(dialer, tls, config.Timeout)
if err != nil {
return nil, err
}

return &http.Transport{
Proxy: proxy,
Dial: dialer.Dial,
DialTLS: tlsDialer.Dial,
TLSClientConfig: tls.ToConfig(),
DisableKeepAlives: true,
}, nil
func newRoundTripper(config *Config) (http.RoundTripper, error) {
return config.Transport.RoundTripper(
httpcommon.WithAPMHTTPInstrumentation(),
httpcommon.WithoutProxyEnvironmentVariables(),
httpcommon.WithKeepaliveSettings{
Disable: true,
},
)
}
44 changes: 8 additions & 36 deletions heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/file"
"github.com/elastic/beats/v7/libbeat/common/transport"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
btesting "github.com/elastic/beats/v7/libbeat/testing"
"github.com/elastic/go-lookslike"
"github.com/elastic/go-lookslike/isdef"
Expand Down Expand Up @@ -408,7 +406,6 @@ func TestHTTPSServer(t *testing.T) {
t.Skip("flaky test: https://github.com/elastic/beats/issues/25857")
}
server := httptest.NewTLSServer(hbtest.HelloWorldHandler(http.StatusOK))

runHTTPSServerCheck(t, server, nil)
}

Expand Down Expand Up @@ -613,39 +610,6 @@ func TestNoHeaders(t *testing.T) {
)
}

func TestNewRoundTripper(t *testing.T) {
configs := map[string]Config{
"Plain": {Timeout: time.Second},
"With Proxy": {Timeout: time.Second, ProxyURL: "http://localhost:1234"},
}

for name, config := range configs {
t.Run(name, func(t *testing.T) {
transp, err := newRoundTripper(&config, &tlscommon.TLSConfig{})
require.NoError(t, err)

if config.ProxyURL == "" {
require.Nil(t, transp.Proxy)
} else {
require.NotNil(t, transp.Proxy)
}

// It's hard to compare func types in tests
require.NotNil(t, transp.Dial)
require.NotNil(t, transport.TLSDialer)

expected := (&tlscommon.TLSConfig{}).ToConfig()
require.Equal(t, expected.InsecureSkipVerify, transp.TLSClientConfig.InsecureSkipVerify)
// When we remove support for the legacy common name treatment
// this test has to be adjusted, as we will not depend on our
// VerifyConnection callback.
require.NotNil(t, transp.TLSClientConfig.VerifyConnection)
require.True(t, transp.DisableKeepAlives)
})
}

}

func TestProxy(t *testing.T) {
if runtime.GOOS == "windows" && bits.UintSize == 32 {
t.Skip("flaky test: https://github.com/elastic/beats/issues/25857")
Expand Down Expand Up @@ -705,3 +669,11 @@ func httpConnectTunnel(writer http.ResponseWriter, request *http.Request) {
}()
wg.Wait()
}

func mustParseURL(t *testing.T, url string) *url.URL {
parsed, err := common.ParseURL(url)
if err != nil {
t.Fatal(err)
}
return parsed
}
10 changes: 4 additions & 6 deletions heartbeat/monitors/active/http/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var userAgent = useragent.UserAgent("Heartbeat")
func newHTTPMonitorHostJob(
addr string,
config *Config,
transport *http.Transport,
transport http.RoundTripper,
enc contentEncoder,
body []byte,
validator multiValidator,
Expand All @@ -61,17 +61,15 @@ func newHTTPMonitorHostJob(
return nil, err
}

timeout := config.Timeout

return jobs.MakeSimpleJob(func(event *beat.Event) error {
var redirects []string
client := &http.Client{
// Trace visited URLs when redirects occur
CheckRedirect: makeCheckRedirect(config.MaxRedirects, &redirects),
Transport: transport,
Timeout: config.Timeout,
Timeout: config.Transport.Timeout,
}
_, _, err := execPing(event, client, request, body, timeout, validator, config.Response)
_, _, err := execPing(event, client, request, body, config.Transport.Timeout, validator, config.Response)
if len(redirects) > 0 {
event.PutValue("http.response.redirects", redirects)
}
Expand Down Expand Up @@ -112,7 +110,7 @@ func createPingFactory(
body []byte,
validator multiValidator,
) func(*net.IPAddr) jobs.Job {
timeout := config.Timeout
timeout := config.Transport.Timeout
isTLS := request.URL.Scheme == "https"

return monitors.MakePingIPFactory(func(event *beat.Event, ip *net.IPAddr) error {
Expand Down
Loading

0 comments on commit 4accfa8

Please sign in to comment.