diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 06c21b8b48fc..5e4f30747991 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -596,6 +596,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Improve ES output error insights. {pull}25825[25825] - Add orchestrator.cluster.name/url fields as k8s metadata {pull}26056[26056] - Libbeat: report beat version to monitoring. {pull}26214[26214] +- Ensure common proxy settings support in HTTP clients: proxy_disabled, proxy_url, proxy_headers and typical environment variables HTTP_PROXY, HTTPS_PROXY, NOPROXY. {pull}25219[25219] *Auditbeat* @@ -841,6 +842,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Heartbeat* - Add mime type detection for http responses. {pull}22976[22976] +- Add `proxy_headers` to HTTP monitor. {pull}25219[25219] *Journalbeat* diff --git a/dev-tools/cmd/dashboards/export_dashboards.go b/dev-tools/cmd/dashboards/export_dashboards.go index eeae6773a969..a3fe4429aab8 100644 --- a/dev-tools/cmd/dashboards/export_dashboards.go +++ b/dev-tools/cmd/dashboards/export_dashboards.go @@ -29,6 +29,7 @@ import ( "github.com/pkg/errors" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/libbeat/dashboards" "github.com/elastic/beats/v7/libbeat/kibana" ) @@ -64,14 +65,18 @@ func main() { user = u.User.Username() pass, _ = u.User.Password() } + + transport := httpcommon.DefaultHTTPTransportSettings() + transport.Timeout = kibanaTimeout + client, err := kibana.NewClientWithConfig(&kibana.ClientConfig{ - Protocol: u.Scheme, - Host: u.Host, - Username: user, - Password: pass, - Path: u.Path, - SpaceID: *spaceID, - Timeout: kibanaTimeout, + Protocol: u.Scheme, + Host: u.Host, + Username: user, + Password: pass, + Path: u.Path, + SpaceID: *spaceID, + Transport: transport, }) if err != nil { log.Fatalf("Error while connecting to Kibana: %v", err) diff --git a/filebeat/fileset/modules_integration_test.go b/filebeat/fileset/modules_integration_test.go index b4b5c40bb754..7afd9bbb547c 100644 --- a/filebeat/fileset/modules_integration_test.go +++ b/filebeat/fileset/modules_integration_test.go @@ -251,8 +251,7 @@ func TestLoadMultiplePipelinesWithRollback(t *testing.T) { func getTestingElasticsearch(t eslegtest.TestLogger) *eslegclient.Connection { conn, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{ - URL: eslegtest.GetURL(), - Timeout: 0, + URL: eslegtest.GetURL(), }) if err != nil { t.Fatal(err) diff --git a/filebeat/fileset/pipelines_test.go b/filebeat/fileset/pipelines_test.go index 04d48c67f019..f3d9e2d5a07c 100644 --- a/filebeat/fileset/pipelines_test.go +++ b/filebeat/fileset/pipelines_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/logp" ) @@ -91,8 +92,10 @@ func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) { defer testESServer.Close() testESClient, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{ - URL: testESServer.URL, - Timeout: 90 * time.Second, + URL: testESServer.URL, + Transport: httpcommon.HTTPTransportSettings{ + Timeout: 90 * time.Second, + }, }) require.NoError(t, err) diff --git a/heartbeat/docs/monitors/monitor-http.asciidoc b/heartbeat/docs/monitors/monitor-http.asciidoc index bcae6f10b0f4..2b02ba53563c 100644 --- a/heartbeat/docs/monitors/monitor-http.asciidoc +++ b/heartbeat/docs/monitors/monitor-http.asciidoc @@ -41,6 +41,12 @@ that data will span multiple requests. Specifically the fields `http.rtt.content The HTTP proxy URL. This setting is optional. Example `http://proxy.mydomain.com:3128` +[float] +[[monitor-http-proxy-headers]] +==== `proxy_headers` + +Additional headers to send to proxies during CONNECT requests. + [float] [[monitor-http-username]] ==== `username` diff --git a/heartbeat/monitors/active/dialchain/tls.go b/heartbeat/monitors/active/dialchain/tls.go index b4b2c006dfb1..5786d57e3158 100644 --- a/heartbeat/monitors/active/dialchain/tls.go +++ b/heartbeat/monitors/active/dialchain/tls.go @@ -40,11 +40,7 @@ func TLSLayer(cfg *tlscommon.TLSConfig, to time.Duration) Layer { // This gets us the timestamp for when the TLS layer will start the handshake. next = startTimerAfterDial(&timer, next) - dialer, err := transport.TLSDialer(next, cfg, to) - if err != nil { - return nil, err - } - + dialer := transport.TLSDialer(next, cfg, to) return afterDial(dialer, func(conn net.Conn) (net.Conn, error) { tlsConn, ok := conn.(*cryptoTLS.Conn) if !ok { diff --git a/heartbeat/monitors/active/http/config.go b/heartbeat/monitors/active/http/config.go index 2a2c2c049650..85eda196d29e 100644 --- a/heartbeat/monitors/active/http/config.go +++ b/heartbeat/monitors/active/http/config.go @@ -24,15 +24,13 @@ import ( "time" "github.com/elastic/beats/v7/heartbeat/monitors" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/libbeat/conditions" ) type Config struct { URLs []string `config:"urls"` Hosts []string `config:"hosts"` - ProxyURL string `config:"proxy_url"` - Timeout time.Duration `config:"timeout"` MaxRedirects int `config:"max_redirects"` Response responseConfig `config:"response"` @@ -42,11 +40,10 @@ type Config struct { Username string `config:"username"` Password string `config:"password"` - // configure tls (if not configured HTTPS will use system defaults) - TLS *tlscommon.Config `config:"ssl"` - // http(s) ping validation Check checkConfig `config:"check"` + + Transport httpcommon.HTTPTransportSettings `config:",inline"` } type responseConfig struct { @@ -90,27 +87,32 @@ type compressionConfig struct { Level int `config:"level"` } -var defaultConfig = Config{ - Timeout: 16 * time.Second, - MaxRedirects: 0, - Response: responseConfig{ - IncludeBody: "on_error", - IncludeBodyMaxBytes: 2048, - IncludeHeaders: true, - }, - Mode: monitors.DefaultIPSettings, - Check: checkConfig{ - Request: requestParameters{ - Method: "GET", - SendHeaders: nil, - SendBody: "", +func defaultConfig() Config { + cfg := Config{ + MaxRedirects: 0, + Response: responseConfig{ + IncludeBody: "on_error", + IncludeBodyMaxBytes: 2048, + IncludeHeaders: true, }, - Response: responseParameters{ - RecvHeaders: nil, - RecvBody: nil, - RecvJSON: nil, + Mode: monitors.DefaultIPSettings, + Check: checkConfig{ + Request: requestParameters{ + Method: "GET", + SendHeaders: nil, + SendBody: "", + }, + Response: responseParameters{ + RecvHeaders: nil, + RecvBody: nil, + RecvJSON: nil, + }, }, - }, + Transport: httpcommon.DefaultHTTPTransportSettings(), + } + cfg.Transport.Timeout = 16 * time.Second + + return cfg } // Validate validates of the responseConfig object is valid or not @@ -169,7 +171,7 @@ func (c *Config) Validate() error { // updateScheme looks at TLS config to decide if http or https should be used to update the host updateScheme := func(host string) string { - if c.TLS != nil && *c.TLS.Enabled == true { + if c.Transport.TLS != nil && c.Transport.TLS.IsEnabled() { return fmt.Sprint("https://", host) } return fmt.Sprint("http://", host) diff --git a/heartbeat/monitors/active/http/http.go b/heartbeat/monitors/active/http/http.go index 214ed8519d84..c02ae25943e0 100644 --- a/heartbeat/monitors/active/http/http.go +++ b/heartbeat/monitors/active/http/http.go @@ -27,7 +27,7 @@ import ( "github.com/elastic/beats/v7/heartbeat/monitors/jobs" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/transport" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" "github.com/elastic/beats/v7/libbeat/logp" ) @@ -43,16 +43,11 @@ func create( name string, cfg *common.Config, ) (p plugin.Plugin, err error) { - config := defaultConfig + config := defaultConfig() if err := cfg.Unpack(&config); err != nil { return plugin.Plugin{}, err } - tls, err := tlscommon.LoadTLSConfig(config.TLS) - if err != nil { - return plugin.Plugin{}, err - } - var body []byte var enc contentEncoder @@ -84,8 +79,8 @@ func create( // In the event that a ProxyURL is present, or redirect support is enabled // we execute DNS resolution requests inline with the request, not running them as a separate job, and not returning // separate DNS rtt data. - if config.ProxyURL != "" || config.MaxRedirects > 0 { - transport, err := newRoundTripper(&config, tls) + if (config.Transport.Proxy.URL != nil && !config.Transport.Proxy.Disable) || config.MaxRedirects > 0 { + transport, err := newRoundTripper(&config) if err != nil { return plugin.Plugin{}, err } @@ -94,6 +89,13 @@ func create( return newHTTPMonitorHostJob(urlStr, &config, transport, enc, body, validator) } } else { + // preload TLS configuration + tls, err := tlscommon.LoadTLSConfig(config.Transport.TLS) + if err != nil { + return plugin.Plugin{}, err + } + config.Transport.TLS = nil + makeJob = func(urlStr string) (jobs.Job, error) { return newHTTPMonitorIPsJob(&config, urlStr, tls, enc, body, validator) } @@ -119,27 +121,12 @@ func create( return plugin.Plugin{Jobs: js, Close: nil, Endpoints: len(config.Hosts)}, nil } -func newRoundTripper(config *Config, tls *tlscommon.TLSConfig) (*http.Transport, error) { - var proxy func(*http.Request) (*url.URL, error) - if config.ProxyURL != "" { - url, err := url.Parse(config.ProxyURL) - if err != nil { - return nil, err - } - proxy = http.ProxyURL(url) - } - - dialer := transport.NetDialer(config.Timeout) - tlsDialer, err := transport.TLSDialer(dialer, tls, config.Timeout) - if err != nil { - return nil, err - } - - return &http.Transport{ - Proxy: proxy, - Dial: dialer.Dial, - DialTLS: tlsDialer.Dial, - TLSClientConfig: tls.ToConfig(), - DisableKeepAlives: true, - }, nil +func newRoundTripper(config *Config) (http.RoundTripper, error) { + return config.Transport.RoundTripper( + httpcommon.WithAPMHTTPInstrumentation(), + httpcommon.WithoutProxyEnvironmentVariables(), + httpcommon.WithKeepaliveSettings{ + Disable: true, + }, + ) } diff --git a/heartbeat/monitors/active/http/http_test.go b/heartbeat/monitors/active/http/http_test.go index d862f745eb81..fc0c6fce63d5 100644 --- a/heartbeat/monitors/active/http/http_test.go +++ b/heartbeat/monitors/active/http/http_test.go @@ -45,8 +45,6 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/file" - "github.com/elastic/beats/v7/libbeat/common/transport" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" btesting "github.com/elastic/beats/v7/libbeat/testing" "github.com/elastic/go-lookslike" "github.com/elastic/go-lookslike/isdef" @@ -408,7 +406,6 @@ func TestHTTPSServer(t *testing.T) { t.Skip("flaky test: https://github.com/elastic/beats/issues/25857") } server := httptest.NewTLSServer(hbtest.HelloWorldHandler(http.StatusOK)) - runHTTPSServerCheck(t, server, nil) } @@ -613,39 +610,6 @@ func TestNoHeaders(t *testing.T) { ) } -func TestNewRoundTripper(t *testing.T) { - configs := map[string]Config{ - "Plain": {Timeout: time.Second}, - "With Proxy": {Timeout: time.Second, ProxyURL: "http://localhost:1234"}, - } - - for name, config := range configs { - t.Run(name, func(t *testing.T) { - transp, err := newRoundTripper(&config, &tlscommon.TLSConfig{}) - require.NoError(t, err) - - if config.ProxyURL == "" { - require.Nil(t, transp.Proxy) - } else { - require.NotNil(t, transp.Proxy) - } - - // It's hard to compare func types in tests - require.NotNil(t, transp.Dial) - require.NotNil(t, transport.TLSDialer) - - expected := (&tlscommon.TLSConfig{}).ToConfig() - require.Equal(t, expected.InsecureSkipVerify, transp.TLSClientConfig.InsecureSkipVerify) - // When we remove support for the legacy common name treatment - // this test has to be adjusted, as we will not depend on our - // VerifyConnection callback. - require.NotNil(t, transp.TLSClientConfig.VerifyConnection) - require.True(t, transp.DisableKeepAlives) - }) - } - -} - func TestProxy(t *testing.T) { if runtime.GOOS == "windows" && bits.UintSize == 32 { t.Skip("flaky test: https://github.com/elastic/beats/issues/25857") @@ -705,3 +669,11 @@ func httpConnectTunnel(writer http.ResponseWriter, request *http.Request) { }() wg.Wait() } + +func mustParseURL(t *testing.T, url string) *url.URL { + parsed, err := common.ParseURL(url) + if err != nil { + t.Fatal(err) + } + return parsed +} diff --git a/heartbeat/monitors/active/http/task.go b/heartbeat/monitors/active/http/task.go index 02257a15e7df..630283c8baba 100644 --- a/heartbeat/monitors/active/http/task.go +++ b/heartbeat/monitors/active/http/task.go @@ -50,7 +50,7 @@ var userAgent = useragent.UserAgent("Heartbeat") func newHTTPMonitorHostJob( addr string, config *Config, - transport *http.Transport, + transport http.RoundTripper, enc contentEncoder, body []byte, validator multiValidator, @@ -61,17 +61,15 @@ func newHTTPMonitorHostJob( return nil, err } - timeout := config.Timeout - return jobs.MakeSimpleJob(func(event *beat.Event) error { var redirects []string client := &http.Client{ // Trace visited URLs when redirects occur CheckRedirect: makeCheckRedirect(config.MaxRedirects, &redirects), Transport: transport, - Timeout: config.Timeout, + Timeout: config.Transport.Timeout, } - _, _, err := execPing(event, client, request, body, timeout, validator, config.Response) + _, _, err := execPing(event, client, request, body, config.Transport.Timeout, validator, config.Response) if len(redirects) > 0 { event.PutValue("http.response.redirects", redirects) } @@ -112,7 +110,7 @@ func createPingFactory( body []byte, validator multiValidator, ) func(*net.IPAddr) jobs.Job { - timeout := config.Timeout + timeout := config.Transport.Timeout isTLS := request.URL.Scheme == "https" return monitors.MakePingIPFactory(func(event *beat.Event, ip *net.IPAddr) error { diff --git a/libbeat/common/transport/client.go b/libbeat/common/transport/client.go index 7027d1a31423..b59545bc607f 100644 --- a/libbeat/common/transport/client.go +++ b/libbeat/common/transport/client.go @@ -224,8 +224,8 @@ func (c *Client) Test(d testing.Driver) { } else { d.Run("TLS", func(d testing.Driver) { netDialer := NetDialer(c.config.Timeout) - tlsDialer, err := TestTLSDialer(d, netDialer, c.config.TLS, c.config.Timeout) - _, err = tlsDialer.Dial("tcp", c.host) + tlsDialer := TestTLSDialer(d, netDialer, c.config.TLS, c.config.Timeout) + _, err := tlsDialer.Dial("tcp", c.host) d.Fatal("dial up", err) }) } diff --git a/libbeat/common/transport/httpcommon/httpcommon.go b/libbeat/common/transport/httpcommon/httpcommon.go new file mode 100644 index 000000000000..8f79f3571831 --- /dev/null +++ b/libbeat/common/transport/httpcommon/httpcommon.go @@ -0,0 +1,382 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package httpcommon + +import ( + "net/http" + "time" + + "go.elastic.co/apm/module/apmhttp" + "golang.org/x/net/http2" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/transport" + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/logp" +) + +// HTTPTransportSettings provides common HTTP settings for HTTP clients. +type HTTPTransportSettings struct { + // TLS provides ssl/tls setup settings + TLS *tlscommon.Config `config:"ssl" yaml:"ssl,omitempty" json:"ssl,omitempty"` + + // Timeout configures the `(http.Transport).Timeout`. + Timeout time.Duration `config:"timeout" yaml:"timeout,omitempty" json:"timeout,omitempty"` + + Proxy HTTPClientProxySettings `config:",inline" yaml:",inline"` + + // TODO: Add more settings: + // - DisableKeepAlive + // - MaxIdleConns + // - IdleConnTimeout + // - ResponseHeaderTimeout + // - ConnectionTimeout (currently 'Timeout' is used for both) +} + +// WithKeepaliveSettings options can be used to modify the Keepalive +type WithKeepaliveSettings struct { + Disable bool + MaxIdleConns int + MaxIdleConnsPerHost int + IdleConnTimeout time.Duration +} + +var _ httpTransportOption = WithKeepaliveSettings{} + +const defaultHTTPTimeout = 90 * time.Second + +type ( + // TransportOption are applied to the http.RoundTripper to be build + // from HTTPTransportSettings. + TransportOption interface{ sealTransportOption() } + + extraSettings struct { + logger *logp.Logger + http2 bool + } + + dialerOption interface { + TransportOption + baseDialer() transport.Dialer + } + dialerModOption interface { + TransportOption + applyDialer(*HTTPTransportSettings, transport.Dialer) transport.Dialer + } + httpTransportOption interface { + TransportOption + applyTransport(*HTTPTransportSettings, *http.Transport) + } + roundTripperOption interface { + TransportOption + applyRoundTripper(*HTTPTransportSettings, http.RoundTripper) http.RoundTripper + } + extraOption interface { + TransportOption + applyExtra(*extraSettings) + } +) + +type baseDialerFunc func() transport.Dialer + +var _ dialerOption = baseDialerFunc(nil) + +func (baseDialerFunc) sealTransportOption() {} +func (fn baseDialerFunc) baseDialer() transport.Dialer { + return fn() +} + +type dialerOptFunc func(transport.Dialer) transport.Dialer + +var _ dialerModOption = dialerOptFunc(nil) + +func (dialerOptFunc) sealTransportOption() {} +func (fn dialerOptFunc) applyDialer(_ *HTTPTransportSettings, d transport.Dialer) transport.Dialer { + return fn(d) + +} + +type transportOptFunc func(*HTTPTransportSettings, *http.Transport) + +var _ httpTransportOption = transportOptFunc(nil) + +func (transportOptFunc) sealTransportOption() {} +func (fn transportOptFunc) applyTransport(s *HTTPTransportSettings, t *http.Transport) { + fn(s, t) +} + +type rtOptFunc func(http.RoundTripper) http.RoundTripper + +var _ roundTripperOption = rtOptFunc(nil) + +func (rtOptFunc) sealTransportOption() {} +func (fn rtOptFunc) applyRoundTripper(_ *HTTPTransportSettings, rt http.RoundTripper) http.RoundTripper { + return fn(rt) +} + +type extraOptionFunc func(*extraSettings) + +func (extraOptionFunc) sealTransportOption() {} +func (fn extraOptionFunc) applyExtra(s *extraSettings) { fn(s) } + +// DefaultHTTPTransportSettings returns the default HTTP transport setting. +func DefaultHTTPTransportSettings() HTTPTransportSettings { + return HTTPTransportSettings{ + Proxy: DefaultHTTPClientProxySettings(), + Timeout: defaultHTTPTimeout, + } +} + +// Unpack reads a config object into the settings. +func (settings *HTTPTransportSettings) Unpack(cfg *common.Config) error { + tmp := struct { + TLS *tlscommon.Config `config:"ssl"` + Timeout time.Duration `config:"timeout"` + }{Timeout: settings.Timeout} + + if err := cfg.Unpack(&tmp); err != nil { + return err + } + + var proxy HTTPClientProxySettings + if err := cfg.Unpack(&proxy); err != nil { + return err + } + + _, err := tlscommon.LoadTLSConfig(tmp.TLS) + if err != nil { + return err + } + + *settings = HTTPTransportSettings{ + TLS: tmp.TLS, + Timeout: tmp.Timeout, + Proxy: proxy, + } + return nil +} + +// RoundTripper creates a http.RoundTripper for use with http.Client. +// +// The dialers will registers with stats if given. Stats is used to collect metrics for io errors, +// bytes in, and bytes out. +func (settings *HTTPTransportSettings) RoundTripper(opts ...TransportOption) (http.RoundTripper, error) { + var dialer transport.Dialer + + var extra extraSettings + for _, opt := range opts { + if opt, ok := opt.(extraOption); ok { + opt.applyExtra(&extra) + } + } + + for _, opt := range opts { + if dialOpt, ok := opt.(dialerOption); ok { + dialer = dialOpt.baseDialer() + } + } + + if dialer == nil { + dialer = transport.NetDialer(settings.Timeout) + } + + tls, err := tlscommon.LoadTLSConfig(settings.TLS) + if err != nil { + return nil, err + } + + tlsDialer := transport.TLSDialer(dialer, tls, settings.Timeout) + for _, opt := range opts { + if dialOpt, ok := opt.(dialerModOption); ok { + dialer = dialOpt.applyDialer(settings, dialer) + tlsDialer = dialOpt.applyDialer(settings, tlsDialer) + } + } + + if logger := extra.logger; logger != nil { + dialer = transport.LoggingDialer(dialer, logger) + tlsDialer = transport.LoggingDialer(tlsDialer, logger) + } + + var rt http.RoundTripper + if extra.http2 { + rt, err = settings.http2RoundTripper(tls, dialer, tlsDialer, opts...) + } else { + rt, err = settings.httpRoundTripper(tls, dialer, tlsDialer, opts...) + } + + for _, opt := range opts { + if rtOpt, ok := opt.(roundTripperOption); ok { + rt = rtOpt.applyRoundTripper(settings, rt) + } + } + return rt, nil +} + +func (settings *HTTPTransportSettings) httpRoundTripper( + tls *tlscommon.TLSConfig, + dialer, tlsDialer transport.Dialer, + opts ...TransportOption, +) (*http.Transport, error) { + t := http.DefaultTransport.(*http.Transport).Clone() + t.DialContext = nil + t.DialTLSContext = nil + t.Dial = dialer.Dial + t.DialTLS = tlsDialer.Dial + t.TLSClientConfig = tls.ToConfig() + t.ForceAttemptHTTP2 = false + t.Proxy = settings.Proxy.ProxyFunc() + t.ProxyConnectHeader = settings.Proxy.Headers + + // reset some internal timeouts to not change old Beats defaults + t.TLSHandshakeTimeout = 0 + t.ExpectContinueTimeout = 0 + + for _, opt := range opts { + if transportOpt, ok := opt.(httpTransportOption); ok { + transportOpt.applyTransport(settings, t) + } + } + + return t, nil +} + +func (settings *HTTPTransportSettings) http2RoundTripper( + tls *tlscommon.TLSConfig, + dialer, tlsDialer transport.Dialer, + opts ...TransportOption, +) (*http2.Transport, error) { + t1, err := settings.httpRoundTripper(tls, dialer, tlsDialer, opts...) + if err != nil { + return nil, err + } + + t2, err := http2.ConfigureTransports(t1) + if err != nil { + return nil, err + } + + t2.AllowHTTP = true + return t2, nil +} + +// Client creates a new http.Client with configured Transport. The transport is +// instrumented using apmhttp.WrapRoundTripper. +func (settings HTTPTransportSettings) Client(opts ...TransportOption) (*http.Client, error) { + rt, err := settings.RoundTripper(opts...) + if err != nil { + return nil, err + } + + return &http.Client{Transport: rt, Timeout: settings.Timeout}, nil +} + +func (opts WithKeepaliveSettings) sealTransportOption() {} +func (opts WithKeepaliveSettings) applyTransport(_ *HTTPTransportSettings, t *http.Transport) { + t.DisableKeepAlives = opts.Disable + if opts.IdleConnTimeout != 0 { + t.IdleConnTimeout = opts.IdleConnTimeout + } + if opts.MaxIdleConns != 0 { + t.MaxIdleConns = opts.MaxIdleConns + } + if opts.MaxIdleConnsPerHost != 0 { + t.MaxIdleConnsPerHost = opts.MaxIdleConnsPerHost + } +} + +// WithBaseDialer configures the dialer used for TCP and TLS connections. +func WithBaseDialer(d transport.Dialer) TransportOption { + return baseDialerFunc(func() transport.Dialer { + return d + }) +} + +// WithIOStats instruments the RoundTripper dialers with the given statser, such +// that bytes in, bytes out, and errors can be monitored. +func WithIOStats(stats transport.IOStatser) TransportOption { + return dialerOptFunc(func(d transport.Dialer) transport.Dialer { + if stats == nil { + return d + } + return transport.StatsDialer(d, stats) + }) +} + +// WithTransportFunc register a custom function that is used to apply +// custom changes to the net.Transport, when the Client is build. +func WithTransportFunc(fn func(*http.Transport)) TransportOption { + return transportOptFunc(func(_ *HTTPTransportSettings, t *http.Transport) { + fn(t) + }) +} + +// WithHTTP2Only will ensure that a HTTP 2 only roundtripper is created. +func WithHTTP2Only(b bool) TransportOption { + return extraOptionFunc(func(settings *extraSettings) { + settings.http2 = b + }) +} + +// WithForceAttemptHTTP2 sets the `http.Tansport.ForceAttemptHTTP2` field. +func WithForceAttemptHTTP2(b bool) TransportOption { + return transportOptFunc(func(settings *HTTPTransportSettings, t *http.Transport) { + t.ForceAttemptHTTP2 = b + }) +} + +// WithNOProxy disables the configured proxy. Proxy environment variables +// like HTTP_PROXY and HTTPS_PROXY will have no affect. +func WithNOProxy() TransportOption { + return transportOptFunc(func(s *HTTPTransportSettings, t *http.Transport) { + t.Proxy = nil + }) +} + +// WithoutProxyEnvironmentVariables disables support for the HTTP_PROXY, HTTPS_PROXY and +// NO_PROXY envionrment variables. Explicitely configured proxy URLs will still applied. +func WithoutProxyEnvironmentVariables() TransportOption { + return transportOptFunc(func(settings *HTTPTransportSettings, t *http.Transport) { + if settings.Proxy.Disable || settings.Proxy.URL == nil { + t.Proxy = nil + } + }) +} + +// WithModRoundtripper allows customization of the roundtipper. +func WithModRoundtripper(w func(http.RoundTripper) http.RoundTripper) TransportOption { + return rtOptFunc(w) +} + +var withAPMHTTPRountTripper = WithModRoundtripper(func(rt http.RoundTripper) http.RoundTripper { + return apmhttp.WrapRoundTripper(rt) +}) + +// WithAPMHTTPInstrumentation insruments the HTTP client via apmhttp.WrapRoundTripper. +// Custom APM round tripper wrappers can be configured via WithModRoundtripper. +func WithAPMHTTPInstrumentation() TransportOption { + return withAPMHTTPRountTripper +} + +// WithLogger sets the internal logger that will be used to log dial or TCP level errors. +// Logging at the connection level will only happen if the logger has been set. +func WithLogger(logger *logp.Logger) TransportOption { + return extraOptionFunc(func(s *extraSettings) { + s.logger = logger + }) +} diff --git a/libbeat/common/transport/httpcommon/proxy.go b/libbeat/common/transport/httpcommon/proxy.go new file mode 100644 index 000000000000..40275d04af55 --- /dev/null +++ b/libbeat/common/transport/httpcommon/proxy.go @@ -0,0 +1,102 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package httpcommon + +import ( + "net/http" + "net/url" + + "github.com/elastic/beats/v7/libbeat/common" +) + +// HTTPClientProxySettings provides common HTTP proxy setup support. +// +// Proxy usage will be disabled in general if Disable is set. +// If URL is not set, the proxy configuration will default +// to HTTP_PROXY, HTTPS_PPROXY, and NO_PROXY. +// +// The default (and zero) value of HTTPClientProxySettings has Proxy support +// enabled, and will select the proxy per URL based on the environment variables. +type HTTPClientProxySettings struct { + // Proxy URL to use for http connections. If the proxy url is configured, + // it is used for all connection attempts. All proxy related environment + // variables are ignored. + URL *url.URL `config:"proxy_url" yaml:"proxy_url,omitempty"` + + // Headers configures additonal headers that are send to the proxy + // during CONNECT requests. + Headers http.Header `config:"proxy_headers" yaml:"proxy_headers,omitempty"` + + // Disable HTTP proxy support. Configured URLs and environment variables + // are ignored. + Disable bool `config:"proxy_disable" yaml:"proxy_disable,omitempty"` +} + +// DefaultHTTPClientProxySettings returns the default HTTP proxy setting. +func DefaultHTTPClientProxySettings() HTTPClientProxySettings { + return HTTPClientProxySettings{} +} + +// Unpack sets the proxy settings from a config object. +// Note: Unpack is automatically used by the configuration system if `cfg.Unpack(&x)` is and X contains +// a field of type HTTPClientProxySettings. +func (settings *HTTPClientProxySettings) Unpack(cfg *common.Config) error { + tmp := struct { + URL string `config:"proxy_url"` + Disable bool `config:"proxy_disable"` + Headers map[string]string `config:"proxy_headers"` + }{} + + if err := cfg.Unpack(&tmp); err != nil { + return err + } + + url, err := common.ParseURL(tmp.URL) + if err != nil { + return err + } + + var headers http.Header + if len(tmp.Headers) > 0 { + headers = http.Header{} + for k, v := range tmp.Headers { + headers.Add(k, v) + } + } + + *settings = HTTPClientProxySettings{ + URL: url, + Disable: tmp.Disable, + Headers: headers, + } + return nil +} + +// ProxyFunc creates a function that can be used with http.Transport in order to +// configure the HTTP proxy functionality. +func (settings *HTTPClientProxySettings) ProxyFunc() func(*http.Request) (*url.URL, error) { + if settings.Disable { + return nil + } + + if settings.URL == nil { + return http.ProxyFromEnvironment + } + + return http.ProxyURL(settings.URL) +} diff --git a/libbeat/common/transport/tls.go b/libbeat/common/transport/tls.go index edef5a6ab9fe..5f8ade67012c 100644 --- a/libbeat/common/transport/tls.go +++ b/libbeat/common/transport/tls.go @@ -29,7 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/testing" ) -func TLSDialer(forward Dialer, config *tlscommon.TLSConfig, timeout time.Duration) (Dialer, error) { +func TLSDialer(forward Dialer, config *tlscommon.TLSConfig, timeout time.Duration) Dialer { return TestTLSDialer(testing.NullDriver, forward, config, timeout) } @@ -38,7 +38,7 @@ func TestTLSDialer( forward Dialer, config *tlscommon.TLSConfig, timeout time.Duration, -) (Dialer, error) { +) Dialer { var lastTLSConfig *tls.Config var lastNetwork string var lastAddress string @@ -70,7 +70,7 @@ func TestTLSDialer( m.Unlock() return tlsDialWith(d, forward, network, address, timeout, tlsConfig, config) - }), nil + }) } type DialerH2 interface { diff --git a/libbeat/common/transport/tlscommon/config.go b/libbeat/common/transport/tlscommon/config.go index cebc251fd49f..41d1ad6532ce 100644 --- a/libbeat/common/transport/tlscommon/config.go +++ b/libbeat/common/transport/tlscommon/config.go @@ -33,11 +33,11 @@ type Config struct { Enabled *bool `config:"enabled" yaml:"enabled,omitempty"` VerificationMode TLSVerificationMode `config:"verification_mode" yaml:"verification_mode"` // one of 'none', 'full' Versions []TLSVersion `config:"supported_protocols" yaml:"supported_protocols,omitempty"` - CipherSuites []tlsCipherSuite `config:"cipher_suites" yaml:"cipher_suites,omitempty"` + CipherSuites []CipherSuite `config:"cipher_suites" yaml:"cipher_suites,omitempty"` CAs []string `config:"certificate_authorities" yaml:"certificate_authorities,omitempty"` Certificate CertificateConfig `config:",inline" yaml:",inline"` CurveTypes []tlsCurveType `config:"curve_types" yaml:"curve_types,omitempty"` - Renegotiation tlsRenegotiationSupport `config:"renegotiation" yaml:"renegotiation"` + Renegotiation TlsRenegotiationSupport `config:"renegotiation" yaml:"renegotiation"` CASha256 []string `config:"ca_sha256" yaml:"ca_sha256,omitempty"` } @@ -59,11 +59,6 @@ func LoadTLSConfig(config *Config) (*TLSConfig, error) { } } - var cipherSuites []uint16 - for _, suite := range config.CipherSuites { - cipherSuites = append(cipherSuites, uint16(suite)) - } - var curves []tls.CurveID for _, id := range config.CurveTypes { curves = append(curves, tls.CurveID(id)) @@ -91,7 +86,7 @@ func LoadTLSConfig(config *Config) (*TLSConfig, error) { Verification: config.VerificationMode, Certificates: certs, RootCAs: cas, - CipherSuites: cipherSuites, + CipherSuites: config.CipherSuites, CurvePreferences: curves, Renegotiation: tls.RenegotiationSupport(config.Renegotiation), CASha256: config.CASha256, diff --git a/libbeat/common/transport/tlscommon/server_config.go b/libbeat/common/transport/tlscommon/server_config.go index e85a0c409c3a..cf7ab9a390a9 100644 --- a/libbeat/common/transport/tlscommon/server_config.go +++ b/libbeat/common/transport/tlscommon/server_config.go @@ -30,7 +30,7 @@ type ServerConfig struct { Enabled *bool `config:"enabled"` VerificationMode TLSVerificationMode `config:"verification_mode"` // one of 'none', 'full', 'strict', 'certificate' Versions []TLSVersion `config:"supported_protocols"` - CipherSuites []tlsCipherSuite `config:"cipher_suites"` + CipherSuites []CipherSuite `config:"cipher_suites"` CAs []string `config:"certificate_authorities"` Certificate CertificateConfig `config:",inline"` CurveTypes []tlsCurveType `config:"curve_types"` @@ -86,7 +86,7 @@ func LoadTLSServerConfig(config *ServerConfig) (*TLSConfig, error) { Verification: config.VerificationMode, Certificates: certs, ClientCAs: cas, - CipherSuites: cipherSuites, + CipherSuites: config.CipherSuites, CurvePreferences: curves, ClientAuth: tls.ClientAuthType(config.ClientAuth), CASha256: config.CASha256, diff --git a/libbeat/common/transport/tlscommon/tls.go b/libbeat/common/transport/tlscommon/tls.go index e5388eaf8ce3..9850546f221f 100644 --- a/libbeat/common/transport/tlscommon/tls.go +++ b/libbeat/common/transport/tlscommon/tls.go @@ -202,7 +202,7 @@ func ResolveTLSVersion(v uint16) string { // ResolveCipherSuite takes the integer representation and return the cipher name. func ResolveCipherSuite(cipher uint16) string { - return tlsCipherSuite(cipher).String() + return CipherSuite(cipher).String() } // PEMReader allows to read a certificate in PEM format either through the disk or from a string. diff --git a/libbeat/common/transport/tlscommon/tls_config.go b/libbeat/common/transport/tlscommon/tls_config.go index 718dbe42db98..77c60f951f84 100644 --- a/libbeat/common/transport/tlscommon/tls_config.go +++ b/libbeat/common/transport/tlscommon/tls_config.go @@ -56,7 +56,7 @@ type TLSConfig struct { // List of supported cipher suites. If nil, a default list provided by the // implementation will be used. - CipherSuites []uint16 + CipherSuites []CipherSuite // Types of elliptic curves that will be used in an ECDHE handshake. If empty, // the implementation will choose a default. @@ -97,6 +97,7 @@ func (c *TLSConfig) ToConfig() *tls.Config { if c.Verification == VerifyNone { logp.NewLogger("tls").Warn("SSL/TLS verifications disabled.") } + return &tls.Config{ MinVersion: minVersion, MaxVersion: maxVersion, @@ -104,7 +105,7 @@ func (c *TLSConfig) ToConfig() *tls.Config { RootCAs: c.RootCAs, ClientCAs: c.ClientCAs, InsecureSkipVerify: insecure, - CipherSuites: c.CipherSuites, + CipherSuites: convCipherSuites(c.CipherSuites), CurvePreferences: c.CurvePreferences, Renegotiation: c.Renegotiation, ClientAuth: c.ClientAuth, diff --git a/libbeat/common/transport/tlscommon/types.go b/libbeat/common/transport/tlscommon/types.go index 29b11c920100..ed24138394d7 100644 --- a/libbeat/common/transport/tlscommon/types.go +++ b/libbeat/common/transport/tlscommon/types.go @@ -35,44 +35,44 @@ var ( ErrCertificateUnspecified = errors.New("certificate file not configured") ) -var tlsCipherSuites = map[string]tlsCipherSuite{ +var tlsCipherSuites = map[string]CipherSuite{ // ECDHE-ECDSA - "ECDHE-ECDSA-AES-128-CBC-SHA": tlsCipherSuite(tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA), - "ECDHE-ECDSA-AES-128-CBC-SHA256": tlsCipherSuite(tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256), - "ECDHE-ECDSA-AES-128-GCM-SHA256": tlsCipherSuite(tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256), - "ECDHE-ECDSA-AES-256-CBC-SHA": tlsCipherSuite(tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA), - "ECDHE-ECDSA-AES-256-GCM-SHA384": tlsCipherSuite(tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384), - "ECDHE-ECDSA-CHACHA20-POLY1305": tlsCipherSuite(tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305), - "ECDHE-ECDSA-RC4-128-SHA": tlsCipherSuite(tls.TLS_ECDHE_ECDSA_WITH_RC4_128_SHA), + "ECDHE-ECDSA-AES-128-CBC-SHA": CipherSuite(tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA), + "ECDHE-ECDSA-AES-128-CBC-SHA256": CipherSuite(tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256), + "ECDHE-ECDSA-AES-128-GCM-SHA256": CipherSuite(tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256), + "ECDHE-ECDSA-AES-256-CBC-SHA": CipherSuite(tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA), + "ECDHE-ECDSA-AES-256-GCM-SHA384": CipherSuite(tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384), + "ECDHE-ECDSA-CHACHA20-POLY1305": CipherSuite(tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305), + "ECDHE-ECDSA-RC4-128-SHA": CipherSuite(tls.TLS_ECDHE_ECDSA_WITH_RC4_128_SHA), // ECDHE-RSA - "ECDHE-RSA-3DES-CBC3-SHA": tlsCipherSuite(tls.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA), - "ECDHE-RSA-AES-128-CBC-SHA": tlsCipherSuite(tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA), - "ECDHE-RSA-AES-128-CBC-SHA256": tlsCipherSuite(tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256), - "ECDHE-RSA-AES-128-GCM-SHA256": tlsCipherSuite(tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256), - "ECDHE-RSA-AES-256-CBC-SHA": tlsCipherSuite(tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA), - "ECDHE-RSA-AES-256-GCM-SHA384": tlsCipherSuite(tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384), - "ECDHE-RSA-CHACHA20-POLY1205": tlsCipherSuite(tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305), - "ECDHE-RSA-RC4-128-SHA": tlsCipherSuite(tls.TLS_ECDHE_RSA_WITH_RC4_128_SHA), + "ECDHE-RSA-3DES-CBC3-SHA": CipherSuite(tls.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA), + "ECDHE-RSA-AES-128-CBC-SHA": CipherSuite(tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA), + "ECDHE-RSA-AES-128-CBC-SHA256": CipherSuite(tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256), + "ECDHE-RSA-AES-128-GCM-SHA256": CipherSuite(tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256), + "ECDHE-RSA-AES-256-CBC-SHA": CipherSuite(tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA), + "ECDHE-RSA-AES-256-GCM-SHA384": CipherSuite(tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384), + "ECDHE-RSA-CHACHA20-POLY1205": CipherSuite(tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305), + "ECDHE-RSA-RC4-128-SHA": CipherSuite(tls.TLS_ECDHE_RSA_WITH_RC4_128_SHA), // RSA-X - "RSA-RC4-128-SHA": tlsCipherSuite(tls.TLS_RSA_WITH_RC4_128_SHA), - "RSA-3DES-CBC3-SHA": tlsCipherSuite(tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA), + "RSA-RC4-128-SHA": CipherSuite(tls.TLS_RSA_WITH_RC4_128_SHA), + "RSA-3DES-CBC3-SHA": CipherSuite(tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA), // RSA-AES - "RSA-AES-128-CBC-SHA": tlsCipherSuite(tls.TLS_RSA_WITH_AES_128_CBC_SHA), - "RSA-AES-128-CBC-SHA256": tlsCipherSuite(tls.TLS_RSA_WITH_AES_128_CBC_SHA256), - "RSA-AES-128-GCM-SHA256": tlsCipherSuite(tls.TLS_RSA_WITH_AES_128_GCM_SHA256), - "RSA-AES-256-CBC-SHA": tlsCipherSuite(tls.TLS_RSA_WITH_AES_256_CBC_SHA), - "RSA-AES-256-GCM-SHA384": tlsCipherSuite(tls.TLS_RSA_WITH_AES_256_GCM_SHA384), - - "TLS-AES-128-GCM-SHA256": tlsCipherSuite(tls.TLS_AES_128_GCM_SHA256), - "TLS-AES-256-GCM-SHA384": tlsCipherSuite(tls.TLS_AES_256_GCM_SHA384), - "TLS-CHACHA20-POLY1305-SHA256": tlsCipherSuite(tls.TLS_CHACHA20_POLY1305_SHA256), + "RSA-AES-128-CBC-SHA": CipherSuite(tls.TLS_RSA_WITH_AES_128_CBC_SHA), + "RSA-AES-128-CBC-SHA256": CipherSuite(tls.TLS_RSA_WITH_AES_128_CBC_SHA256), + "RSA-AES-128-GCM-SHA256": CipherSuite(tls.TLS_RSA_WITH_AES_128_GCM_SHA256), + "RSA-AES-256-CBC-SHA": CipherSuite(tls.TLS_RSA_WITH_AES_256_CBC_SHA), + "RSA-AES-256-GCM-SHA384": CipherSuite(tls.TLS_RSA_WITH_AES_256_GCM_SHA384), + + "TLS-AES-128-GCM-SHA256": CipherSuite(tls.TLS_AES_128_GCM_SHA256), + "TLS-AES-256-GCM-SHA384": CipherSuite(tls.TLS_AES_256_GCM_SHA384), + "TLS-CHACHA20-POLY1305-SHA256": CipherSuite(tls.TLS_CHACHA20_POLY1305_SHA256), } -var tlsCipherSuitesInverse = make(map[tlsCipherSuite]string, len(tlsCipherSuites)) -var tlsRenegotiationSupportTypesInverse = make(map[tlsRenegotiationSupport]string, len(tlsRenegotiationSupportTypes)) +var tlsCipherSuitesInverse = make(map[CipherSuite]string, len(tlsCipherSuites)) +var tlsRenegotiationSupportTypesInverse = make(map[TlsRenegotiationSupport]string, len(tlsRenegotiationSupportTypes)) var tlsVerificationModesInverse = make(map[TLSVerificationMode]string, len(tlsVerificationModes)) // Init creates a inverse representation of the values mapping. @@ -97,10 +97,10 @@ var tlsCurveTypes = map[string]tlsCurveType{ "X25519": tlsCurveType(tls.X25519), } -var tlsRenegotiationSupportTypes = map[string]tlsRenegotiationSupport{ - "never": tlsRenegotiationSupport(tls.RenegotiateNever), - "once": tlsRenegotiationSupport(tls.RenegotiateOnceAsClient), - "freely": tlsRenegotiationSupport(tls.RenegotiateFreelyAsClient), +var tlsRenegotiationSupportTypes = map[string]TlsRenegotiationSupport{ + "never": TlsRenegotiationSupport(tls.RenegotiateNever), + "once": TlsRenegotiationSupport(tls.RenegotiateOnceAsClient), + "freely": TlsRenegotiationSupport(tls.RenegotiateFreelyAsClient), } type tlsClientAuth int @@ -194,9 +194,9 @@ func (m *tlsClientAuth) Unpack(in interface{}) error { return nil } -type tlsCipherSuite uint16 +type CipherSuite uint16 -func (cs *tlsCipherSuite) Unpack(s string) error { +func (cs *CipherSuite) Unpack(s string) error { suite, found := tlsCipherSuites[s] if !found { return fmt.Errorf("invalid tls cipher suite '%v'", s) @@ -206,7 +206,7 @@ func (cs *tlsCipherSuite) Unpack(s string) error { return nil } -func (cs tlsCipherSuite) String() string { +func (cs CipherSuite) String() string { if s, found := tlsCipherSuitesInverse[cs]; found { return s } @@ -225,9 +225,16 @@ func (ct *tlsCurveType) Unpack(s string) error { return nil } -type tlsRenegotiationSupport tls.RenegotiationSupport +type TlsRenegotiationSupport tls.RenegotiationSupport -func (r *tlsRenegotiationSupport) Unpack(s string) error { +func (r TlsRenegotiationSupport) String() string { + if t, found := tlsRenegotiationSupportTypesInverse[r]; found { + return t + } + return "" +} + +func (r *TlsRenegotiationSupport) Unpack(s string) error { t, found := tlsRenegotiationSupportTypes[s] if !found { return fmt.Errorf("invalid tls renegotiation type '%v'", s) @@ -237,7 +244,7 @@ func (r *tlsRenegotiationSupport) Unpack(s string) error { return nil } -func (r tlsRenegotiationSupport) MarshalText() ([]byte, error) { +func (r TlsRenegotiationSupport) MarshalText() ([]byte, error) { if t, found := tlsRenegotiationSupportTypesInverse[r]; found { return []byte(t), nil } @@ -245,6 +252,14 @@ func (r tlsRenegotiationSupport) MarshalText() ([]byte, error) { return nil, fmt.Errorf("could not marshal '%+v' to text", r) } +func (r TlsRenegotiationSupport) MarshalYAML() (interface{}, error) { + if t, found := tlsRenegotiationSupportTypesInverse[r]; found { + return t, nil + } + + return nil, fmt.Errorf("could not marshal '%+v' to text", r) +} + // CertificateConfig define a common set of fields for a certificate. type CertificateConfig struct { Certificate string `config:"certificate" yaml:"certificate,omitempty"` @@ -265,3 +280,14 @@ func (c *CertificateConfig) Validate() error { } return nil } + +func convCipherSuites(suites []CipherSuite) []uint16 { + if len(suites) == 0 { + return nil + } + cipherSuites := make([]uint16, len(suites)) + for i, s := range suites { + cipherSuites[i] = uint16(s) + } + return cipherSuites +} diff --git a/libbeat/common/transport/transport.go b/libbeat/common/transport/transport.go index 35b397c5876e..db1f1fb908ab 100644 --- a/libbeat/common/transport/transport.go +++ b/libbeat/common/transport/transport.go @@ -58,7 +58,7 @@ func MakeDialer(c Config) (Dialer, error) { } if c.TLS != nil { - return TLSDialer(dialer, c.TLS, c.Timeout) + return TLSDialer(dialer, c.TLS, c.Timeout), nil } return dialer, nil } diff --git a/libbeat/esleg/eslegclient/api_test.go b/libbeat/esleg/eslegclient/api_test.go index 21897b9c1a1c..fb4e42e11b9d 100644 --- a/libbeat/esleg/eslegclient/api_test.go +++ b/libbeat/esleg/eslegclient/api_test.go @@ -172,8 +172,7 @@ func TestReadSearchResult_invalid(t *testing.T) { func newTestConnection(url string) *Connection { conn, _ := NewConnection(ConnectionSettings{ - URL: url, - Timeout: 0, + URL: url, }) conn.Encoder = NewJSONEncoder(nil, false) return conn diff --git a/libbeat/esleg/eslegclient/bulkapi_mock_test.go b/libbeat/esleg/eslegclient/bulkapi_mock_test.go index 3d4e33c42712..3f5c48b731ea 100644 --- a/libbeat/esleg/eslegclient/bulkapi_mock_test.go +++ b/libbeat/esleg/eslegclient/bulkapi_mock_test.go @@ -228,7 +228,6 @@ func TestEnforceParameters(t *testing.T) { client, _ := NewConnection(ConnectionSettings{ Parameters: test.preconfigured, URL: "http://localhost", - Timeout: 0, }) client.Encoder = NewJSONEncoder(nil, false) diff --git a/libbeat/esleg/eslegclient/config.go b/libbeat/esleg/eslegclient/config.go index d9a299d68c78..d442cc2de5dd 100644 --- a/libbeat/esleg/eslegclient/config.go +++ b/libbeat/esleg/eslegclient/config.go @@ -19,11 +19,9 @@ package eslegclient import ( "fmt" - "time" - "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/libbeat/common/transport/kerberos" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" ) type config struct { @@ -33,45 +31,33 @@ type config struct { Params map[string]string `config:"parameters"` Headers map[string]string `config:"headers"` - TLS *tlscommon.Config `config:"ssl"` - Kerberos *kerberos.Config `config:"kerberos"` - - ProxyURL string `config:"proxy_url"` - ProxyDisable bool `config:"proxy_disable"` + Kerberos *kerberos.Config `config:"kerberos"` Username string `config:"username"` Password string `config:"password"` APIKey string `config:"api_key"` - CompressionLevel int `config:"compression_level" validate:"min=0, max=9"` - EscapeHTML bool `config:"escape_html"` - Timeout time.Duration `config:"timeout"` + CompressionLevel int `config:"compression_level" validate:"min=0, max=9"` + EscapeHTML bool `config:"escape_html"` + + Transport httpcommon.HTTPTransportSettings `config:",inline"` } func defaultConfig() config { return config{ Protocol: "", Path: "", - ProxyURL: "", - ProxyDisable: false, Params: nil, Username: "", Password: "", APIKey: "", - Timeout: 90 * time.Second, CompressionLevel: 0, EscapeHTML: false, - TLS: nil, + Transport: httpcommon.DefaultHTTPTransportSettings(), } } func (c *config) Validate() error { - if c.ProxyURL != "" && !c.ProxyDisable { - if _, err := common.ParseURL(c.ProxyURL); err != nil { - return err - } - } - if c.APIKey != "" && (c.Username != "" || c.Password != "") { return fmt.Errorf("cannot set both api_key and username/password") } diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index 6cc64bca2638..795c8c941869 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -31,6 +31,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/transport" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/libbeat/common/transport/kerberos" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" "github.com/elastic/beats/v7/libbeat/logp" @@ -56,16 +57,13 @@ type Connection struct { // ConnectionSettings are the settings needed for a Connection type ConnectionSettings struct { - URL string - Proxy *url.URL - ProxyDisable bool + URL string Username string Password string APIKey string // Raw API key, NOT base64-encoded Headers map[string]string - TLS *tlscommon.TLSConfig Kerberos *kerberos.Config OnConnectCallback func() error @@ -75,12 +73,15 @@ type ConnectionSettings struct { CompressionLevel int EscapeHTML bool - Timeout time.Duration IdleConnTimeout time.Duration + + Transport httpcommon.HTTPTransportSettings } // NewConnection returns a new Elasticsearch client func NewConnection(s ConnectionSettings) (*Connection, error) { + logger := logp.NewLogger("esclientleg") + s = settingsWithDefaults(s) u, err := url.Parse(s.URL) @@ -96,24 +97,7 @@ func NewConnection(s ConnectionSettings) (*Connection, error) { // Re-write URL without credentials. s.URL = u.String() } - logp.Info("elasticsearch url: %s", s.URL) - - // TODO: add socks5 proxy support - var dialer, tlsDialer transport.Dialer - - dialer = transport.NetDialer(s.Timeout) - tlsDialer, err = transport.TLSDialer(dialer, s.TLS, s.Timeout) - if err != nil { - return nil, err - } - - if st := s.Observer; st != nil { - dialer = transport.StatsDialer(dialer, st) - tlsDialer = transport.StatsDialer(tlsDialer, st) - } - logger := logp.NewLogger("esclientleg") - dialer = transport.LoggingDialer(dialer, logger) - tlsDialer = transport.LoggingDialer(tlsDialer, logger) + logger.Infof("elasticsearch url: %s", s.URL) var encoder BodyEncoder compression := s.CompressionLevel @@ -126,36 +110,23 @@ func NewConnection(s ConnectionSettings) (*Connection, error) { } } - var proxy func(*http.Request) (*url.URL, error) - if !s.ProxyDisable { - proxy = http.ProxyFromEnvironment - if s.Proxy != nil { - proxy = http.ProxyURL(s.Proxy) - } - } - - // when dropping the legacy client in favour of the official Go client, it should be instrumented - // eg, like in https://github.com/elastic/apm-server/blob/7.7/elasticsearch/client.go - transp := apmelasticsearch.WrapRoundTripper(&http.Transport{ - Dial: dialer.Dial, - DialTLS: tlsDialer.Dial, - TLSClientConfig: s.TLS.ToConfig(), - Proxy: proxy, - IdleConnTimeout: s.IdleConnTimeout, - }) - - var httpClient esHTTPClient - httpClient = &http.Client{ - Transport: transp, - Timeout: s.Timeout, + httpClient, err := s.Transport.Client( + httpcommon.WithLogger(logger), + httpcommon.WithIOStats(s.Observer), + httpcommon.WithKeepaliveSettings{IdleConnTimeout: s.IdleConnTimeout}, + httpcommon.WithModRoundtripper(func(rt http.RoundTripper) http.RoundTripper { + // when dropping the legacy client in favour of the official Go client, it should be instrumented + // eg, like in https://github.com/elastic/apm-server/blob/7.7/elasticsearch/client.go + return apmelasticsearch.WrapRoundTripper(rt) + }), + ) + if err != nil { + return nil, err } + esClient := esHTTPClient(httpClient) if s.Kerberos.IsEnabled() { - c := &http.Client{ - Transport: transp, - Timeout: s.Timeout, - } - httpClient, err = kerberos.NewClient(s.Kerberos, c, s.URL) + esClient, err = kerberos.NewClient(s.Kerberos, httpClient, s.URL) if err != nil { return nil, err } @@ -164,7 +135,7 @@ func NewConnection(s ConnectionSettings) (*Connection, error) { conn := Connection{ ConnectionSettings: s, - HTTP: httpClient, + HTTP: esClient, Encoder: encoder, log: logger, } @@ -195,20 +166,8 @@ func NewClients(cfg *common.Config) ([]Connection, error) { return nil, err } - tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS) - if err != nil { - return nil, err - } - - var proxyURL *url.URL - if !config.ProxyDisable { - proxyURL, err = common.ParseURL(config.ProxyURL) - if err != nil { - return nil, err - } - if proxyURL != nil { - logp.Info("using proxy URL: %s", proxyURL) - } + if proxyURL := config.Transport.Proxy.URL; proxyURL != nil { + logp.Info("using proxy URL: %s", proxyURL) } params := config.Params @@ -226,17 +185,14 @@ func NewClients(cfg *common.Config) ([]Connection, error) { client, err := NewConnection(ConnectionSettings{ URL: esURL, - Proxy: proxyURL, - ProxyDisable: config.ProxyDisable, - TLS: tlsConfig, Kerberos: config.Kerberos, Username: config.Username, Password: config.Password, APIKey: config.APIKey, Parameters: params, Headers: config.Headers, - Timeout: config.Timeout, CompressionLevel: config.CompressionLevel, + Transport: config.Transport, }) if err != nil { return clients, err @@ -332,7 +288,7 @@ func (conn *Connection) Test(d testing.Driver) { address := u.Host d.Run("connection", func(d testing.Driver) { - netDialer := transport.TestNetDialer(d, conn.Timeout) + netDialer := transport.TestNetDialer(d, conn.Transport.Timeout) _, err = netDialer.Dial("tcp", address) d.Fatal("dial up", err) }) @@ -341,8 +297,13 @@ func (conn *Connection) Test(d testing.Driver) { d.Warn("TLS", "secure connection disabled") } else { d.Run("TLS", func(d testing.Driver) { - netDialer := transport.NetDialer(conn.Timeout) - tlsDialer, err := transport.TestTLSDialer(d, netDialer, conn.TLS, conn.Timeout) + tls, err := tlscommon.LoadTLSConfig(conn.Transport.TLS) + if err != nil { + d.Fatal("load tls config", err) + } + + netDialer := transport.NetDialer(conn.Transport.Timeout) + tlsDialer := transport.TestTLSDialer(d, netDialer, tls, conn.Transport.Timeout) _, err = tlsDialer.Dial("tcp", address) d.Fatal("dial up", err) }) diff --git a/libbeat/esleg/eslegclient/connection_integration_test.go b/libbeat/esleg/eslegclient/connection_integration_test.go index 225edd6f36cd..25fef0ca24ea 100644 --- a/libbeat/esleg/eslegclient/connection_integration_test.go +++ b/libbeat/esleg/eslegclient/connection_integration_test.go @@ -111,14 +111,14 @@ func connectTestEs(t *testing.T, cfg interface{}) (*Connection, error) { URL: hosts, Username: username, Password: password, - Timeout: time.Duration(timeout) * time.Second, CompressionLevel: 3, } + s.Transport.Timeout = time.Duration(timeout) * time.Second if proxy != "" { p, err := url.Parse(proxy) require.NoError(t, err) - s.Proxy = p + s.Transport.Proxy.URL = p } return NewConnection(s) @@ -130,9 +130,10 @@ func getTestingElasticsearch(t eslegtest.TestLogger) *Connection { URL: eslegtest.GetURL(), Username: eslegtest.GetUser(), Password: eslegtest.GetPass(), - Timeout: 60 * time.Second, CompressionLevel: 3, }) + conn.Transport.Timeout = 60 * time.Second + eslegtest.InitConnection(t, conn, err) return conn } diff --git a/libbeat/idxmgmt/ilm/client_handler_integration_test.go b/libbeat/idxmgmt/ilm/client_handler_integration_test.go index 9471da7c9cfb..4d1e6f5ed645 100644 --- a/libbeat/idxmgmt/ilm/client_handler_integration_test.go +++ b/libbeat/idxmgmt/ilm/client_handler_integration_test.go @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/idxmgmt/ilm" "github.com/elastic/beats/v7/libbeat/version" @@ -209,12 +210,14 @@ func newESClientHandler(t *testing.T) ilm.ClientHandler { } func newRawESClient(t *testing.T) ilm.ESClient { + transport := httpcommon.DefaultHTTPTransportSettings() + transport.Timeout = 60 * time.Second client, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{ URL: getURL(), Username: getUser(), Password: getPass(), - Timeout: 60 * time.Second, CompressionLevel: 3, + Transport: transport, }) if err != nil { t.Fatal(err) diff --git a/libbeat/kibana/client.go b/libbeat/kibana/client.go index c2ced9d864d7..f12c49129cab 100644 --- a/libbeat/kibana/client.go +++ b/libbeat/kibana/client.go @@ -32,8 +32,6 @@ import ( "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/transport" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" "github.com/elastic/beats/v7/libbeat/logp" ) @@ -125,38 +123,23 @@ func NewClientWithConfigDefault(config *ClientConfig, defaultPort int) (*Client, log := logp.NewLogger("kibana") log.Infof("Kibana url: %s", kibanaURL) - var dialer, tlsDialer transport.Dialer - - tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS) - if err != nil { - return nil, fmt.Errorf("fail to load the TLS config: %v", err) + headers := make(http.Header) + for k, v := range config.Headers { + headers.Set(k, v) } - dialer = transport.NetDialer(config.Timeout) - tlsDialer, err = transport.TLSDialer(dialer, tlsConfig, config.Timeout) + rt, err := config.Transport.Client() if err != nil { return nil, err } - headers := make(http.Header) - for k, v := range config.Headers { - headers.Set(k, v) - } - client := &Client{ Connection: Connection{ URL: kibanaURL, Username: username, Password: password, Headers: headers, - HTTP: &http.Client{ - Transport: &http.Transport{ - Dial: dialer.Dial, - DialTLS: tlsDialer.Dial, - TLSClientConfig: tlsConfig.ToConfig(), - }, - Timeout: config.Timeout, - }, + HTTP: rt, }, log: log, } diff --git a/libbeat/kibana/client_config.go b/libbeat/kibana/client_config.go index 09709e3d81d1..a10a8031242d 100644 --- a/libbeat/kibana/client_config.go +++ b/libbeat/kibana/client_config.go @@ -18,38 +18,35 @@ package kibana import ( - "time" - - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" ) // ClientConfig to connect to Kibana type ClientConfig struct { - Protocol string `config:"protocol" yaml:"protocol,omitempty"` - Host string `config:"host" yaml:"host,omitempty"` - Path string `config:"path" yaml:"path,omitempty"` - SpaceID string `config:"space.id" yaml:"space.id,omitempty"` - Username string `config:"username" yaml:"username,omitempty"` - Password string `config:"password" yaml:"password,omitempty"` - TLS *tlscommon.Config `config:"ssl" yaml:"ssl"` - Timeout time.Duration `config:"timeout" yaml:"timeout"` + Protocol string `config:"protocol" yaml:"protocol,omitempty"` + Host string `config:"host" yaml:"host,omitempty"` + Path string `config:"path" yaml:"path,omitempty"` + SpaceID string `config:"space.id" yaml:"space.id,omitempty"` + Username string `config:"username" yaml:"username,omitempty"` + Password string `config:"password" yaml:"password,omitempty"` // Headers holds headers to include in every request sent to Kibana. Headers map[string]string `config:"headers" yaml:"headers,omitempty"` IgnoreVersion bool + + Transport httpcommon.HTTPTransportSettings `config:",inline" yaml:",inline"` } // DefaultClientConfig connects to a locally running kibana over HTTP func DefaultClientConfig() ClientConfig { return ClientConfig{ - Protocol: "http", - Host: "localhost:5601", - Path: "", - SpaceID: "", - Username: "", - Password: "", - Timeout: 90 * time.Second, - TLS: nil, + Protocol: "http", + Host: "localhost:5601", + Path: "", + SpaceID: "", + Username: "", + Password: "", + Transport: httpcommon.DefaultHTTPTransportSettings(), } } diff --git a/libbeat/licenser/elastic_fetcher_integration_test.go b/libbeat/licenser/elastic_fetcher_integration_test.go index 031a9b18b2e4..ba1dd21a6302 100644 --- a/libbeat/licenser/elastic_fetcher_integration_test.go +++ b/libbeat/licenser/elastic_fetcher_integration_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/v7/libbeat/common/cli" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" ) @@ -35,13 +36,16 @@ const ( ) func getTestClient() *eslegclient.Connection { + transport := httpcommon.DefaultHTTPTransportSettings() + transport.Timeout = 60 * time.Second + host := "http://" + cli.GetEnvOr("ES_HOST", elasticsearchHost) + ":" + cli.GetEnvOr("ES_POST", elasticsearchPort) client, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{ URL: host, Username: "myelastic", // NOTE: I will refactor this in a followup PR Password: "changeme", CompressionLevel: 3, - Timeout: 60 * time.Second, + Transport: transport, }) if err != nil { diff --git a/libbeat/licenser/elastic_fetcher_test.go b/libbeat/licenser/elastic_fetcher_test.go index 3daa88a2d1ae..731bf5c0618f 100644 --- a/libbeat/licenser/elastic_fetcher_test.go +++ b/libbeat/licenser/elastic_fetcher_test.go @@ -24,7 +24,6 @@ import ( "os" "path/filepath" "testing" - "time" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" @@ -38,8 +37,7 @@ func newServerClientPair(t *testing.T, handler http.HandlerFunc) (*httptest.Serv server := httptest.NewServer(mux) client, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{ - URL: server.URL, - Timeout: 90 * time.Second, + URL: server.URL, }) if err != nil { t.Fatalf("could not create the elasticsearch client, error: %s", err) diff --git a/libbeat/monitoring/report/elasticsearch/config.go b/libbeat/monitoring/report/elasticsearch/config.go index 8712bf1a88b0..6cb30e47d3dd 100644 --- a/libbeat/monitoring/report/elasticsearch/config.go +++ b/libbeat/monitoring/report/elasticsearch/config.go @@ -21,7 +21,7 @@ import ( "fmt" "time" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" ) // config is subset of libbeat/outputs/elasticsearch config tailored @@ -36,9 +36,7 @@ type config struct { APIKey string `config:"api_key"` ProxyURL string `config:"proxy_url"` CompressionLevel int `config:"compression_level" validate:"min=0, max=9"` - TLS *tlscommon.Config `config:"ssl"` MaxRetries int `config:"max_retries"` - Timeout time.Duration `config:"timeout"` MetricsPeriod time.Duration `config:"metrics.period"` StatePeriod time.Duration `config:"state.period"` BulkMaxSize int `config:"bulk_max_size" validate:"min=0"` @@ -46,6 +44,8 @@ type config struct { Tags []string `config:"tags"` Backoff backoff `config:"backoff"` ClusterUUID string `config:"cluster_uuid"` + + Transport httpcommon.HTTPTransportSettings `config:",inline"` } type backoff struct { diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index 8913264a7795..dddb2c53a00c 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -21,13 +21,12 @@ import ( "errors" "io" "math/rand" - "net/url" "strconv" "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" @@ -75,9 +74,7 @@ func defaultConfig(settings report.Settings) config { APIKey: "", ProxyURL: "", CompressionLevel: 0, - TLS: nil, MaxRetries: 3, - Timeout: 60 * time.Second, MetricsPeriod: 10 * time.Second, StatePeriod: 1 * time.Minute, BulkMaxSize: 50, @@ -88,6 +85,7 @@ func defaultConfig(settings report.Settings) config { Max: 60 * time.Second, }, ClusterUUID: settings.ClusterUUID, + Transport: httpcommon.DefaultHTTPTransportSettings(), } if settings.DefaultUsername != "" { @@ -117,18 +115,6 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config) windowSize = 1 } - proxyURL, err := common.ParseURL(config.ProxyURL) - if err != nil { - return nil, err - } - if proxyURL != nil { - log.Infof("Using proxy URL: %s", proxyURL) - } - tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS) - if err != nil { - return nil, err - } - params := makeClientParams(config) hosts, err := outputs.ReadHostList(cfg) @@ -141,7 +127,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config) var clients []outputs.NetworkClient for _, host := range hosts { - client, err := makeClient(host, params, proxyURL, tlsConfig, &config) + client, err := makeClient(host, params, &config) if err != nil { return nil, err } @@ -305,13 +291,7 @@ func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration, } } -func makeClient( - host string, - params map[string]string, - proxyURL *url.URL, - tlsConfig *tlscommon.TLSConfig, - config *config, -) (outputs.NetworkClient, error) { +func makeClient(host string, params map[string]string, config *config) (outputs.NetworkClient, error) { url, err := common.MakeURL(config.Protocol, "", host, 9200) if err != nil { return nil, err @@ -319,15 +299,13 @@ func makeClient( esClient, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{ URL: url, - Proxy: proxyURL, - TLS: tlsConfig, Username: config.Username, Password: config.Password, APIKey: config.APIKey, Parameters: params, Headers: config.Headers, - Timeout: config.Timeout, CompressionLevel: config.CompressionLevel, + Transport: config.Transport, }) if err != nil { return nil, err diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 0794ee1c13bd..b388001aeec5 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -86,15 +86,12 @@ func NewClient( Password: s.Password, APIKey: s.APIKey, Headers: s.Headers, - TLS: s.TLS, Kerberos: s.Kerberos, - Proxy: s.Proxy, - ProxyDisable: s.ProxyDisable, Observer: s.Observer, Parameters: s.Parameters, CompressionLevel: s.CompressionLevel, EscapeHTML: s.EscapeHTML, - Timeout: s.Timeout, + Transport: s.Transport, }) if err != nil { return nil, err @@ -144,31 +141,31 @@ func (client *Client) Clone() *Client { // client's close is for example generated for topology-map support. With params // most likely containing the ingest node pipeline and default callback trying to // create install a template, we don't want these to be included in the clone. + connection := eslegclient.ConnectionSettings{ + URL: client.conn.URL, + Kerberos: client.conn.Kerberos, + Username: client.conn.Username, + Password: client.conn.Password, + APIKey: client.conn.APIKey, + Parameters: nil, // XXX: do not pass params? + Headers: client.conn.Headers, + CompressionLevel: client.conn.CompressionLevel, + OnConnectCallback: nil, + Observer: nil, + EscapeHTML: false, + Transport: client.conn.Transport, + } + + // Without the following nil check on proxyURL, a nil Proxy field will try + // reloading proxy settings from the environment instead of leaving them + // empty. + client.conn.Transport.Proxy.Disable = client.conn.Transport.Proxy.URL == nil c, _ := NewClient( ClientSettings{ - ConnectionSettings: eslegclient.ConnectionSettings{ - URL: client.conn.URL, - Proxy: client.conn.Proxy, - // Without the following nil check on proxyURL, a nil Proxy field will try - // reloading proxy settings from the environment instead of leaving them - // empty. - ProxyDisable: client.conn.Proxy == nil, - TLS: client.conn.TLS, - Kerberos: client.conn.Kerberos, - Username: client.conn.Username, - Password: client.conn.Password, - APIKey: client.conn.APIKey, - Parameters: nil, // XXX: do not pass params? - Headers: client.conn.Headers, - Timeout: client.conn.Timeout, - CompressionLevel: client.conn.CompressionLevel, - OnConnectCallback: nil, - Observer: nil, - EscapeHTML: false, - }, - Index: client.index, - Pipeline: client.pipeline, + ConnectionSettings: connection, + Index: client.index, + Pipeline: client.pipeline, }, nil, // XXX: do not pass connection callback? ) diff --git a/libbeat/outputs/elasticsearch/client_proxy_test.go b/libbeat/outputs/elasticsearch/client_proxy_test.go index 1e368d234ea2..6dfe48fc183d 100644 --- a/libbeat/outputs/elasticsearch/client_proxy_test.go +++ b/libbeat/outputs/elasticsearch/client_proxy_test.go @@ -34,6 +34,7 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/outputs/outil" ) @@ -186,16 +187,20 @@ func doClientPing(t *testing.T) { proxyDisable := os.Getenv("TEST_PROXY_DISABLE") clientSettings := ClientSettings{ ConnectionSettings: eslegclient.ConnectionSettings{ - URL: serverURL, - Headers: map[string]string{headerTestField: headerTestValue}, - ProxyDisable: proxyDisable != "", + URL: serverURL, + Headers: map[string]string{headerTestField: headerTestValue}, + Transport: httpcommon.HTTPTransportSettings{ + Proxy: httpcommon.HTTPClientProxySettings{ + Disable: proxyDisable != "", + }, + }, }, Index: outil.MakeSelector(outil.ConstSelectorExpr("test", outil.SelectorLowerCase)), } if proxy != "" { proxyURL, err := url.Parse(proxy) require.NoError(t, err) - clientSettings.Proxy = proxyURL + clientSettings.Transport.Proxy.URL = proxyURL } client, err := NewClient(clientSettings, nil) require.NoError(t, err) diff --git a/libbeat/outputs/elasticsearch/config.go b/libbeat/outputs/elasticsearch/config.go index d094f005df5d..bf2f7932fbad 100644 --- a/libbeat/outputs/elasticsearch/config.go +++ b/libbeat/outputs/elasticsearch/config.go @@ -21,9 +21,8 @@ import ( "fmt" "time" - "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/libbeat/common/transport/kerberos" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" ) type elasticsearchConfig struct { @@ -34,17 +33,15 @@ type elasticsearchConfig struct { Username string `config:"username"` Password string `config:"password"` APIKey string `config:"api_key"` - ProxyURL string `config:"proxy_url"` - ProxyDisable bool `config:"proxy_disable"` LoadBalance bool `config:"loadbalance"` CompressionLevel int `config:"compression_level" validate:"min=0, max=9"` EscapeHTML bool `config:"escape_html"` - TLS *tlscommon.Config `config:"ssl"` Kerberos *kerberos.Config `config:"kerberos"` BulkMaxSize int `config:"bulk_max_size"` MaxRetries int `config:"max_retries"` - Timeout time.Duration `config:"timeout"` Backoff Backoff `config:"backoff"` + + Transport httpcommon.HTTPTransportSettings `config:",inline"` } type Backoff struct { @@ -60,33 +57,24 @@ var ( defaultConfig = elasticsearchConfig{ Protocol: "", Path: "", - ProxyURL: "", - ProxyDisable: false, Params: nil, Username: "", Password: "", APIKey: "", - Timeout: 90 * time.Second, MaxRetries: 3, CompressionLevel: 0, EscapeHTML: false, - TLS: nil, Kerberos: nil, LoadBalance: true, Backoff: Backoff{ Init: 1 * time.Second, Max: 60 * time.Second, }, + Transport: httpcommon.DefaultHTTPTransportSettings(), } ) func (c *elasticsearchConfig) Validate() error { - if c.ProxyURL != "" && !c.ProxyDisable { - if _, err := common.ParseURL(c.ProxyURL); err != nil { - return err - } - } - if c.APIKey != "" && (c.Username != "" || c.Password != "") { return fmt.Errorf("cannot set both api_key and username/password") } diff --git a/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc b/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc index fbe9a918db37..80b2bb368795 100644 --- a/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc +++ b/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc @@ -170,6 +170,13 @@ output.elasticsearch.headers: It is possible to specify multiple header values for the same header name by separating them with a comma. + +===== `proxy_disable` + +If set to `true` all proxy settings, including `HTTP_PROXY` and `HTTPS_PROXY` +variables are ignored. + + ===== `proxy_url` The URL of the proxy to use when connecting to the Elasticsearch servers. The @@ -179,6 +186,11 @@ then proxy environment variables are used. See the https://golang.org/pkg/net/http/#ProxyFromEnvironment[Go documentation] for more information about the environment variables. + +===== `proxy_headers` + +Additional headers to send to proxies during CONNECT requests. + [[index-option-es]] ===== `index` diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index bf1f9bd378e9..682d0e5a41e1 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -18,11 +18,8 @@ package elasticsearch import ( - "net/url" - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" @@ -61,20 +58,8 @@ func makeES( return outputs.Fail(err) } - tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS) - if err != nil { - return outputs.Fail(err) - } - - var proxyURL *url.URL - if !config.ProxyDisable { - proxyURL, err = common.ParseURL(config.ProxyURL) - if err != nil { - return outputs.Fail(err) - } - if proxyURL != nil { - log.Infof("Using proxy URL: %s", proxyURL) - } + if proxyURL := config.Transport.Proxy.URL; proxyURL != nil && !config.Transport.Proxy.Disable { + log.Infof("Using proxy URL: %s", proxyURL) } params := config.Params @@ -94,19 +79,16 @@ func makeES( client, err = NewClient(ClientSettings{ ConnectionSettings: eslegclient.ConnectionSettings{ URL: esURL, - Proxy: proxyURL, - ProxyDisable: config.ProxyDisable, - TLS: tlsConfig, Kerberos: config.Kerberos, Username: config.Username, Password: config.Password, APIKey: config.APIKey, Parameters: params, Headers: config.Headers, - Timeout: config.Timeout, CompressionLevel: config.CompressionLevel, Observer: observer, EscapeHTML: config.EscapeHTML, + Transport: config.Transport, }, Index: index, Pipeline: pipeline, diff --git a/libbeat/outputs/logstash/logstash_integration_test.go b/libbeat/outputs/logstash/logstash_integration_test.go index 9fdbd11d8756..23db3b376149 100644 --- a/libbeat/outputs/logstash/logstash_integration_test.go +++ b/libbeat/outputs/logstash/logstash_integration_test.go @@ -33,6 +33,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/fmtstr" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/idxmgmt" "github.com/elastic/beats/v7/libbeat/outputs" @@ -102,11 +103,13 @@ func esConnect(t *testing.T, index string) *esConnection { username := os.Getenv("ES_USER") password := os.Getenv("ES_PASS") + transport := httpcommon.DefaultHTTPTransportSettings() + transport.Timeout = 60 * time.Second client, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{ - URL: host, - Username: username, - Password: password, - Timeout: 60 * time.Second, + URL: host, + Username: username, + Password: password, + Transport: transport, }) if err != nil { t.Fatal(err) diff --git a/libbeat/template/load_integration_test.go b/libbeat/template/load_integration_test.go index 528b87a445f8..186c0070c5db 100644 --- a/libbeat/template/load_integration_test.go +++ b/libbeat/template/load_integration_test.go @@ -34,6 +34,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/esleg/eslegtest" "github.com/elastic/beats/v7/libbeat/version" @@ -404,8 +405,8 @@ func path(t *testing.T, fileElems []string) string { func getTestingElasticsearch(t eslegtest.TestLogger) *eslegclient.Connection { conn, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{ - URL: eslegtest.GetURL(), - Timeout: 0, + URL: eslegtest.GetURL(), + Transport: httpcommon.DefaultHTTPTransportSettings(), }) if err != nil { t.Fatal(err) diff --git a/metricbeat/helper/config.go b/metricbeat/helper/config.go index 8581c45a0155..d97e92482459 100644 --- a/metricbeat/helper/config.go +++ b/metricbeat/helper/config.go @@ -20,21 +20,24 @@ package helper import ( "time" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" ) // Config for an HTTP helper type Config struct { - TLS *tlscommon.Config `config:"ssl"` ConnectTimeout time.Duration `config:"connect_timeout"` - Timeout time.Duration `config:"timeout"` Headers map[string]string `config:"headers"` BearerTokenFile string `config:"bearer_token_file"` + + Transport httpcommon.HTTPTransportSettings `config:",inline"` } func defaultConfig() Config { + transport := httpcommon.DefaultHTTPTransportSettings() + transport.Timeout = 10 * time.Second + return Config{ ConnectTimeout: 2 * time.Second, - Timeout: 10 * time.Second, + Transport: transport, } } diff --git a/metricbeat/helper/http.go b/metricbeat/helper/http.go index 9fe541ffeea7..ba53a0e42ea3 100644 --- a/metricbeat/helper/http.go +++ b/metricbeat/helper/http.go @@ -28,8 +28,7 @@ import ( "github.com/pkg/errors" - "github.com/elastic/beats/v7/libbeat/common/transport" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/metricbeat/helper/dialer" "github.com/elastic/beats/v7/metricbeat/mb" ) @@ -74,11 +73,6 @@ func NewHTTPFromConfig(config Config, hostData mb.HostData) (*HTTP, error) { headers.Set("Authorization", header) } - tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS) - if err != nil { - return nil, err - } - // Ensure backward compatibility builder := hostData.Transport if builder == nil { @@ -90,27 +84,21 @@ func NewHTTPFromConfig(config Config, hostData mb.HostData) (*HTTP, error) { return nil, err } - var tlsDialer transport.Dialer - tlsDialer, err = transport.TLSDialer(dialer, tlsConfig, config.ConnectTimeout) + client, err := config.Transport.Client( + httpcommon.WithBaseDialer(dialer), + httpcommon.WithAPMHTTPInstrumentation(), + ) if err != nil { return nil, err } return &HTTP{ hostData: hostData, - client: &http.Client{ - Transport: &http.Transport{ - Dial: dialer.Dial, - DialTLS: tlsDialer.Dial, - TLSClientConfig: tlsConfig.ToConfig(), - Proxy: http.ProxyFromEnvironment, - }, - Timeout: config.Timeout, - }, - headers: headers, - method: "GET", - uri: hostData.SanitizedURI, - body: nil, + client: client, + headers: headers, + method: "GET", + uri: hostData.SanitizedURI, + body: nil, }, nil } diff --git a/metricbeat/helper/http_test.go b/metricbeat/helper/http_test.go index 808147bebf01..a88ae4796fae 100644 --- a/metricbeat/helper/http_test.go +++ b/metricbeat/helper/http_test.go @@ -92,7 +92,7 @@ func TestTimeout(t *testing.T) { defer ts.Close() cfg := defaultConfig() - cfg.Timeout = 1 * time.Millisecond + cfg.Transport.Timeout = 1 * time.Millisecond hostData := mb.HostData{ URI: ts.URL, SanitizedURI: ts.URL, diff --git a/metricbeat/module/apache/status/status_test.go b/metricbeat/module/apache/status/status_test.go index c7346278c9e4..464defd46234 100644 --- a/metricbeat/module/apache/status/status_test.go +++ b/metricbeat/module/apache/status/status_test.go @@ -178,12 +178,12 @@ func TestFetchTimeout(t *testing.T) { elapsed := time.Since(start) var found bool for _, err := range errs { - if strings.Contains(err.Error(), "context deadline exceeded (Client.Timeout exceeded") { + if strings.Contains(err.Error(), "Client.Timeout exceeded") { found = true } } if !found { - assert.Failf(t, "", "expected an error containing 'context deadline exceeded (Client.Timeout exceeded'. Got %v", errs) + assert.Failf(t, "", "expected an error containing 'Client.Timeout exceeded'. Got %v", errs) } // Elapsed should be ~50ms, sometimes it can be up to 1s diff --git a/metricbeat/module/elasticsearch/index/data_test.go b/metricbeat/module/elasticsearch/index/data_test.go index d85ec77aca35..7008e1817129 100644 --- a/metricbeat/module/elasticsearch/index/data_test.go +++ b/metricbeat/module/elasticsearch/index/data_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/mb" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" @@ -50,7 +51,9 @@ func TestMapper(t *testing.T) { httpClient, err := helper.NewHTTPFromConfig(helper.Config{ ConnectTimeout: 30 * time.Second, - Timeout: 30 * time.Second, + Transport: httpcommon.HTTPTransportSettings{ + Timeout: 30 * time.Second, + }, }, mb.HostData{ URI: server.URL, SanitizedURI: server.URL, diff --git a/metricbeat/module/envoyproxy/server/server_test.go b/metricbeat/module/envoyproxy/server/server_test.go index df3b00886c90..50bed4bb107d 100644 --- a/metricbeat/module/envoyproxy/server/server_test.go +++ b/metricbeat/module/envoyproxy/server/server_test.go @@ -184,12 +184,12 @@ func TestFetchTimeout(t *testing.T) { elapsed := time.Since(start) var found bool for _, err := range errs { - if strings.Contains(err.Error(), "context deadline exceeded (Client.Timeout exceeded") { + if strings.Contains(err.Error(), "Client.Timeout exceeded") { found = true } } if !found { - assert.Failf(t, "", "expected an error containing 'context deadline exceeded (Client.Timeout exceeded'. Got %v", errs) + assert.Failf(t, "", "expected an error containing '(Client.Timeout exceeded'. Got %v", errs) } assert.True(t, elapsed < 5*time.Second, "elapsed time: %s", elapsed.String()) diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index 7fe323a0b64e..ffd8febf29c4 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -115,5 +115,6 @@ - Use `filestream` input for internal log collection. {pull}25660[25660] - Enable agent to send custom headers to kibana/ES {pull}26275[26275] - Set `agent.id` to the Fleet Agent ID in events published from inputs backed by Beats. {issue}21121[21121] {pull}26394[26394] +- Add proxy support to artifact downloader and communication with fleet server. {pull}25219[25219] - Enable configuring monitoring namespace {issue}26439[26439] - Communicate with Fleet Server over HTTP2. {pull}26474[26474] diff --git a/x-pack/elastic-agent/pkg/agent/application/pipeline/stream/factory.go b/x-pack/elastic-agent/pkg/agent/application/pipeline/stream/factory.go index 70258ed17949..238be8228d7f 100644 --- a/x-pack/elastic-agent/pkg/agent/application/pipeline/stream/factory.go +++ b/x-pack/elastic-agent/pkg/agent/application/pipeline/stream/factory.go @@ -40,8 +40,22 @@ func Factory(ctx context.Context, agentInfo *info.AgentInfo, cfg *configuration. } } -func newOperator(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, id pipeline.RoutingKey, config *configuration.SettingsConfig, srv *server.Server, r state.Reporter, m monitoring.Monitor, statusController status.Controller) (*operation.Operator, error) { - fetcher := downloader.NewDownloader(log, config.DownloadConfig) +func newOperator( + ctx context.Context, + log *logger.Logger, + agentInfo *info.AgentInfo, + id pipeline.RoutingKey, + config *configuration.SettingsConfig, + srv *server.Server, + r state.Reporter, + m monitoring.Monitor, + statusController status.Controller, +) (*operation.Operator, error) { + fetcher, err := downloader.NewDownloader(log, config.DownloadConfig) + if err != nil { + return nil, err + } + allowEmptyPgp, pgp := release.PGP() verifier, err := downloader.NewVerifier(log, config.DownloadConfig, allowEmptyPgp, pgp) if err != nil { diff --git a/x-pack/elastic-agent/pkg/agent/application/upgrade/step_download.go b/x-pack/elastic-agent/pkg/agent/application/upgrade/step_download.go index 888433ea0bce..2a88b8bfb2ba 100644 --- a/x-pack/elastic-agent/pkg/agent/application/upgrade/step_download.go +++ b/x-pack/elastic-agent/pkg/agent/application/upgrade/step_download.go @@ -61,7 +61,7 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI stri func newDownloader(version string, log *logger.Logger, settings *artifact.Config) (download.Downloader, error) { if !strings.HasSuffix(version, "-SNAPSHOT") { - return downloader.NewDownloader(log, settings), nil + return downloader.NewDownloader(log, settings) } // try snapshot repo before official @@ -70,11 +70,12 @@ func newDownloader(version string, log *logger.Logger, settings *artifact.Config return nil, err } - return composed.NewDownloader( - fs.NewDownloader(settings), - snapDownloader, - http.NewDownloader(settings), - ), nil + httpDownloader, err := http.NewDownloader(settings) + if err != nil { + return nil, err + } + + return composed.NewDownloader(fs.NewDownloader(settings), snapDownloader, httpDownloader), nil } func newVerifier(version string, log *logger.Logger, settings *artifact.Config) (download.Verifier, error) { diff --git a/x-pack/elastic-agent/pkg/agent/cmd/container.go b/x-pack/elastic-agent/pkg/agent/cmd/container.go index 9f69013d41c3..cc925d0cb4eb 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/container.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/container.go @@ -27,6 +27,7 @@ import ( "github.com/spf13/cobra" "gopkg.in/yaml.v2" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" "github.com/elastic/beats/v7/libbeat/kibana" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" @@ -457,12 +458,15 @@ func kibanaClient(cfg kibanaConfig, headers map[string]string) (*kibana.Client, } } + transport := httpcommon.DefaultHTTPTransportSettings() + transport.TLS = tls + return kibana.NewClientWithConfigDefault(&kibana.ClientConfig{ Host: cfg.Fleet.Host, Username: cfg.Fleet.Username, Password: cfg.Fleet.Password, IgnoreVersion: true, - TLS: tls, + Transport: transport, Headers: headers, }, 0) } diff --git a/x-pack/elastic-agent/pkg/agent/cmd/enroll_cmd.go b/x-pack/elastic-agent/pkg/agent/cmd/enroll_cmd.go index e170d9dea0fc..cbc5f2090233 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/enroll_cmd.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/enroll_cmd.go @@ -104,19 +104,18 @@ func (e *enrollCmdOption) remoteConfig() (remote.Config, error) { return remote.Config{}, fmt.Errorf("connection to fleet-server is insecure, strongly recommended to use a secure connection (override with --insecure)") } + var tlsCfg tlscommon.Config + // Add any SSL options from the CLI. if len(e.CAs) > 0 || len(e.CASha256) > 0 { - cfg.TLS = &tlscommon.Config{ - CAs: e.CAs, - CASha256: e.CASha256, - } + tlsCfg.CAs = e.CAs + tlsCfg.CASha256 = e.CASha256 } if e.Insecure { - cfg.TLS = &tlscommon.Config{ - VerificationMode: tlscommon.VerifyNone, - } + tlsCfg.VerificationMode = tlscommon.VerifyNone } + cfg.Transport.TLS = &tlsCfg return cfg, nil } diff --git a/x-pack/elastic-agent/pkg/artifact/config.go b/x-pack/elastic-agent/pkg/artifact/config.go index 78a0e62ad8a6..d1181b778d29 100644 --- a/x-pack/elastic-agent/pkg/artifact/config.go +++ b/x-pack/elastic-agent/pkg/artifact/config.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" ) @@ -27,9 +28,6 @@ type Config struct { // TargetDirectory: path to the directory containing downloaded packages TargetDirectory string `json:"targetDirectory" config:"target_directory"` - // Timeout: timeout for downloading package - Timeout time.Duration `json:"timeout" config:"timeout"` - // InstallPath: path to the directory containing installed packages InstallPath string `yaml:"installPath" config:"install_path"` @@ -39,16 +37,23 @@ type Config struct { // local or network disk. // If not provided FileSystem Downloader will fallback to /beats subfolder of elastic-agent directory. DropPath string `yaml:"dropPath" config:"drop_path"` + + httpcommon.HTTPTransportSettings `config:",inline" yaml:",inline"` // Note: use anonymous struct for json inline } // DefaultConfig creates a config with pre-set default values. func DefaultConfig() *Config { homePath := paths.Home() + transport := httpcommon.DefaultHTTPTransportSettings() + + // binaries are a getting bit larger it might take >30s to download them + transport.Timeout = 120 * time.Second + return &Config{ - SourceURI: "https://artifacts.elastic.co/downloads/", - TargetDirectory: filepath.Join(homePath, "downloads"), - Timeout: 120 * time.Second, // binaries are a getting bit larger it might take >30s to download them - InstallPath: filepath.Join(homePath, "install"), + SourceURI: "https://artifacts.elastic.co/downloads/", + TargetDirectory: filepath.Join(homePath, "downloads"), + InstallPath: filepath.Join(homePath, "install"), + HTTPTransportSettings: transport, } } diff --git a/x-pack/elastic-agent/pkg/artifact/download/fs/verifier_test.go b/x-pack/elastic-agent/pkg/artifact/download/fs/verifier_test.go index 8abea0e59f0b..44ac7b748bda 100644 --- a/x-pack/elastic-agent/pkg/artifact/download/fs/verifier_test.go +++ b/x-pack/elastic-agent/pkg/artifact/download/fs/verifier_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact" ) @@ -47,9 +48,11 @@ func TestFetchVerify(t *testing.T) { TargetDirectory: targetPath, DropPath: dropPath, InstallPath: installPath, - Timeout: timeout, OperatingSystem: "darwin", Architecture: "32", + HTTPTransportSettings: httpcommon.HTTPTransportSettings{ + Timeout: timeout, + }, } err := prepareFetchVerifyTests(dropPath, targetPath, targetFilePath, hashTargetFilePath) @@ -134,9 +137,11 @@ func TestVerify(t *testing.T) { config := &artifact.Config{ TargetDirectory: targetDir, DropPath: filepath.Join(targetDir, "drop"), - Timeout: timeout, OperatingSystem: "linux", Architecture: "32", + HTTPTransportSettings: httpcommon.HTTPTransportSettings{ + Timeout: timeout, + }, } if err := prepareTestCase(beatSpec, version, config); err != nil { diff --git a/x-pack/elastic-agent/pkg/artifact/download/http/downloader.go b/x-pack/elastic-agent/pkg/artifact/download/http/downloader.go index dcb42daf148e..15f969880bf6 100644 --- a/x-pack/elastic-agent/pkg/artifact/download/http/downloader.go +++ b/x-pack/elastic-agent/pkg/artifact/download/http/downloader.go @@ -14,6 +14,7 @@ import ( "path" "strings" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" @@ -36,11 +37,16 @@ type Downloader struct { } // NewDownloader creates and configures Elastic Downloader -func NewDownloader(config *artifact.Config) *Downloader { - client := http.Client{Timeout: config.Timeout} - rt := withHeaders(client.Transport, headers) - client.Transport = rt - return NewDownloaderWithClient(config, client) +func NewDownloader(config *artifact.Config) (*Downloader, error) { + client, err := config.HTTPTransportSettings.Client( + httpcommon.WithAPMHTTPInstrumentation(), + ) + if err != nil { + return nil, err + } + + client.Transport = withHeaders(client.Transport, headers) + return NewDownloaderWithClient(config, *client), nil } // NewDownloaderWithClient creates Elastic Downloader with specific client used diff --git a/x-pack/elastic-agent/pkg/artifact/download/http/elastic_test.go b/x-pack/elastic-agent/pkg/artifact/download/http/elastic_test.go index a329b9b2f8ef..747324a7cc7d 100644 --- a/x-pack/elastic-agent/pkg/artifact/download/http/elastic_test.go +++ b/x-pack/elastic-agent/pkg/artifact/download/http/elastic_test.go @@ -18,6 +18,7 @@ import ( "testing" "time" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact" ) @@ -54,7 +55,9 @@ func TestDownload(t *testing.T) { config := &artifact.Config{ SourceURI: source, TargetDirectory: targetDir, - Timeout: timeout, + HTTPTransportSettings: httpcommon.HTTPTransportSettings{ + Timeout: timeout, + }, } for _, testCase := range testCases { @@ -92,7 +95,9 @@ func TestVerify(t *testing.T) { config := &artifact.Config{ SourceURI: source, TargetDirectory: targetDir, - Timeout: timeout, + HTTPTransportSettings: httpcommon.HTTPTransportSettings{ + Timeout: timeout, + }, } for _, testCase := range testCases { diff --git a/x-pack/elastic-agent/pkg/artifact/download/http/verifier.go b/x-pack/elastic-agent/pkg/artifact/download/http/verifier.go index b206bd5faeac..2e47c8dd6ca0 100644 --- a/x-pack/elastic-agent/pkg/artifact/download/http/verifier.go +++ b/x-pack/elastic-agent/pkg/artifact/download/http/verifier.go @@ -19,6 +19,7 @@ import ( "golang.org/x/crypto/openpgp" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact" @@ -46,12 +47,19 @@ func NewVerifier(config *artifact.Config, allowEmptyPgp bool, pgp []byte) (*Veri return nil, errors.New("expecting PGP but retrieved none", errors.TypeSecurity) } - client := http.Client{Timeout: config.Timeout} - rtt := withHeaders(client.Transport, headers) - client.Transport = rtt + client, err := config.HTTPTransportSettings.Client( + httpcommon.WithAPMHTTPInstrumentation(), + httpcommon.WithModRoundtripper(func(rt http.RoundTripper) http.RoundTripper { + return withHeaders(rt, headers) + }), + ) + if err != nil { + return nil, err + } + v := &Verifier{ config: config, - client: client, + client: *client, allowEmptyPgp: allowEmptyPgp, pgpBytes: pgp, } diff --git a/x-pack/elastic-agent/pkg/artifact/download/localremote/downloader.go b/x-pack/elastic-agent/pkg/artifact/download/localremote/downloader.go index 6934adc1ea33..fe6c79fedeed 100644 --- a/x-pack/elastic-agent/pkg/artifact/download/localremote/downloader.go +++ b/x-pack/elastic-agent/pkg/artifact/download/localremote/downloader.go @@ -17,7 +17,7 @@ import ( // NewDownloader creates a downloader which first checks local directory // and then fallbacks to remote if configured. -func NewDownloader(log *logger.Logger, config *artifact.Config) download.Downloader { +func NewDownloader(log *logger.Logger, config *artifact.Config) (download.Downloader, error) { downloaders := make([]download.Downloader, 0, 3) downloaders = append(downloaders, fs.NewDownloader(config)) @@ -31,6 +31,11 @@ func NewDownloader(log *logger.Logger, config *artifact.Config) download.Downloa } } - downloaders = append(downloaders, http.NewDownloader(config)) - return composed.NewDownloader(downloaders...) + httpDownloader, err := http.NewDownloader(config) + if err != nil { + return nil, err + } + + downloaders = append(downloaders, httpDownloader) + return composed.NewDownloader(downloaders...), nil } diff --git a/x-pack/elastic-agent/pkg/artifact/download/snapshot/downloader.go b/x-pack/elastic-agent/pkg/artifact/download/snapshot/downloader.go index a5b706a243cd..acf6b32328f9 100644 --- a/x-pack/elastic-agent/pkg/artifact/download/snapshot/downloader.go +++ b/x-pack/elastic-agent/pkg/artifact/download/snapshot/downloader.go @@ -23,7 +23,7 @@ func NewDownloader(config *artifact.Config, versionOverride string) (download.Do if err != nil { return nil, err } - return http.NewDownloader(cfg), nil + return http.NewDownloader(cfg) } func snapshotConfig(config *artifact.Config, versionOverride string) (*artifact.Config, error) { @@ -33,13 +33,13 @@ func snapshotConfig(config *artifact.Config, versionOverride string) (*artifact. } return &artifact.Config{ - OperatingSystem: config.OperatingSystem, - Architecture: config.Architecture, - SourceURI: snapshotURI, - TargetDirectory: config.TargetDirectory, - Timeout: config.Timeout, - InstallPath: config.InstallPath, - DropPath: config.DropPath, + OperatingSystem: config.OperatingSystem, + Architecture: config.Architecture, + SourceURI: snapshotURI, + TargetDirectory: config.TargetDirectory, + InstallPath: config.InstallPath, + DropPath: config.DropPath, + HTTPTransportSettings: config.HTTPTransportSettings, }, nil } diff --git a/x-pack/elastic-agent/pkg/remote/client.go b/x-pack/elastic-agent/pkg/remote/client.go index ad5f136f7e0d..44f5b7ad5620 100644 --- a/x-pack/elastic-agent/pkg/remote/client.go +++ b/x-pack/elastic-agent/pkg/remote/client.go @@ -15,11 +15,9 @@ import ( "time" "github.com/pkg/errors" - "golang.org/x/net/http2" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/transport" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" ) @@ -118,12 +116,11 @@ func NewWithConfig(log *logger.Logger, cfg Config, wrapper wrapperFunc) (*Client if err != nil { return nil, errors.Wrap(err, "invalid fleet-server endpoint") } - addr, err := url.Parse(connStr) - if err != nil { - return nil, errors.Wrap(err, "invalid fleet-server endpoint") - } - transport, err := makeTransport(addr.Scheme, cfg.Timeout, cfg.TLS) + transport, err := cfg.Transport.RoundTripper( + httpcommon.WithAPMHTTPInstrumentation(), + httpcommon.WithForceAttemptHTTP2(true), + ) if err != nil { return nil, err } @@ -142,7 +139,7 @@ func NewWithConfig(log *logger.Logger, cfg Config, wrapper wrapperFunc) (*Client httpClient := http.Client{ Transport: transport, - Timeout: cfg.Timeout, + Timeout: cfg.Transport.Timeout, } clients[i] = &requestClient{ @@ -275,20 +272,3 @@ func prefixRequestFactory(URL string) requestFunc { return http.NewRequest(method, newPath, body) } } - -// makeTransport create a transport object based on the TLS configuration. -func makeTransport(scheme string, timeout time.Duration, tls *tlscommon.Config) (http.RoundTripper, error) { - dialer := transport.NetDialer(timeout) - if scheme == "http" { - return &http.Transport{Dial: dialer.Dial}, nil - } - tlsConfig, err := tlscommon.LoadTLSConfig(tls) - if err != nil { - return nil, errors.Wrap(err, "invalid TLS configuration") - } - tlsDialer, err := transport.TLSDialerH2(dialer, tlsConfig, timeout) - if err != nil { - return nil, errors.Wrap(err, "fail to create TLS dialer") - } - return &http2.Transport{DialTLS: tlsDialer.Dial}, nil -} diff --git a/x-pack/elastic-agent/pkg/remote/config.go b/x-pack/elastic-agent/pkg/remote/config.go index 02b32274f2fd..31ae29f70bad 100644 --- a/x-pack/elastic-agent/pkg/remote/config.go +++ b/x-pack/elastic-agent/pkg/remote/config.go @@ -8,20 +8,20 @@ import ( "fmt" "time" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" ) // Config is the configuration for the client. type Config struct { - Protocol Protocol `config:"protocol" yaml:"protocol"` - SpaceID string `config:"space.id" yaml:"space.id,omitempty"` - Username string `config:"username" yaml:"username,omitempty"` - Password string `config:"password" yaml:"password,omitempty"` - Path string `config:"path" yaml:"path,omitempty"` - Host string `config:"host" yaml:"host,omitempty"` - Hosts []string `config:"hosts" yaml:"hosts,omitempty"` - Timeout time.Duration `config:"timeout" yaml:"timeout,omitempty"` - TLS *tlscommon.Config `config:"ssl" yaml:"ssl,omitempty"` + Protocol Protocol `config:"protocol" yaml:"protocol"` + SpaceID string `config:"space.id" yaml:"space.id,omitempty"` + Username string `config:"username" yaml:"username,omitempty"` + Password string `config:"password" yaml:"password,omitempty"` + Path string `config:"path" yaml:"path,omitempty"` + Host string `config:"host" yaml:"host,omitempty"` + Hosts []string `config:"hosts" yaml:"hosts,omitempty"` + + Transport httpcommon.HTTPTransportSettings `config:",inline" yaml:",inline"` } // Protocol define the protocol to use to make the connection. (Either HTTPS or HTTP) @@ -46,16 +46,18 @@ func (p *Protocol) Unpack(from string) error { // DefaultClientConfig creates default configuration for client. func DefaultClientConfig() Config { + transport := httpcommon.DefaultHTTPTransportSettings() + // Default timeout 10 minutes, expecting Fleet Server to control the long poll with default timeout of 5 minutes + transport.Timeout = 10 * time.Minute + return Config{ - Protocol: ProtocolHTTP, - Host: "localhost:5601", - Path: "", - SpaceID: "", - Username: "", - Password: "", - // Default timeout 10 minutes, expecting Fleet Server to control the long poll with default timeout of 5 minutes - Timeout: 10 * time.Minute, - TLS: nil, + Protocol: ProtocolHTTP, + Host: "localhost:5601", + Path: "", + SpaceID: "", + Username: "", + Password: "", + Transport: transport, } } diff --git a/x-pack/elastic-agent/pkg/remote/config_test.go b/x-pack/elastic-agent/pkg/remote/config_test.go index 21843a4b0479..403609735ddc 100644 --- a/x-pack/elastic-agent/pkg/remote/config_test.go +++ b/x-pack/elastic-agent/pkg/remote/config_test.go @@ -13,6 +13,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" ) func TestPackUnpack(t *testing.T) { @@ -22,7 +24,9 @@ func TestPackUnpack(t *testing.T) { Username: "foo", Password: "bar", Path: "/ok", - Timeout: 10 * time.Second, + Transport: httpcommon.HTTPTransportSettings{ + Timeout: 10 * time.Second, + }, } b, err := yaml.Marshal(&c) diff --git a/x-pack/filebeat/input/httpjson/config.go b/x-pack/filebeat/input/httpjson/config.go index 6384811aaa43..a2965ace4ef3 100644 --- a/x-pack/filebeat/input/httpjson/config.go +++ b/x-pack/filebeat/input/httpjson/config.go @@ -14,7 +14,7 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" ) // config contains information about httpjson configuration @@ -35,9 +35,10 @@ type config struct { RetryMax int `config:"retry.max_attempts"` RetryWaitMin time.Duration `config:"retry.wait_min"` RetryWaitMax time.Duration `config:"retry.wait_max"` - TLS *tlscommon.Config `config:"ssl"` URL *urlConfig `config:"url" validate:"required"` DateCursor *dateCursorConfig `config:"date_cursor"` + + Transport httpcommon.HTTPTransportSettings `config:",inline"` } // Pagination contains information about httpjson pagination settings diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index ca2e43483d1b..0411fa65d417 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -20,7 +20,7 @@ import ( stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/libbeat/common/useragent" "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/logp" @@ -85,19 +85,6 @@ func Plugin(log *logp.Logger, store cursor.StateStore) inputv2.Plugin { } } -func newTLSConfig(config config) (*tlscommon.TLSConfig, error) { - if err := config.Validate(); err != nil { - return nil, err - } - - tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS) - if err != nil { - return nil, err - } - - return tlsConfig, nil -} - func test(url *url.URL) error { port := func() string { if url.Port() != "" { @@ -121,7 +108,6 @@ func test(url *url.URL) error { func run( ctx inputv2.Context, config config, - tlsConfig *tlscommon.TLSConfig, publisher cursor.Publisher, cursor *cursor.Cursor, ) error { @@ -129,7 +115,7 @@ func run( stdCtx := ctxtool.FromCanceller(ctx.Cancelation) - httpClient, err := newHTTPClient(stdCtx, config, tlsConfig) + httpClient, err := newHTTPClient(stdCtx, config) if err != nil { return err } @@ -169,19 +155,21 @@ func run( return nil } -func newHTTPClient(ctx context.Context, config config, tlsConfig *tlscommon.TLSConfig) (*http.Client, error) { +func newHTTPClient(ctx context.Context, config config) (*http.Client, error) { + config.Transport.Timeout = config.HTTPClientTimeout + + httpClient, err := + config.Transport.Client( + httpcommon.WithAPMHTTPInstrumentation(), + httpcommon.WithKeepaliveSettings{Disable: true}, + ) + if err != nil { + return nil, err + } + // Make retryable HTTP client client := &retryablehttp.Client{ - HTTPClient: &http.Client{ - Transport: &http.Transport{ - DialContext: (&net.Dialer{ - Timeout: config.HTTPClientTimeout, - }).DialContext, - TLSClientConfig: tlsConfig.ToConfig(), - DisableKeepAlives: true, - }, - Timeout: config.HTTPClientTimeout, - }, + HTTPClient: httpClient, Logger: newRetryLogger(), RetryWaitMin: config.RetryWaitMin, RetryWaitMax: config.RetryWaitMax, diff --git a/x-pack/filebeat/input/httpjson/input_cursor.go b/x-pack/filebeat/input/httpjson/input_cursor.go index d18a91f39183..0ee0ac44e9a9 100644 --- a/x-pack/filebeat/input/httpjson/input_cursor.go +++ b/x-pack/filebeat/input/httpjson/input_cursor.go @@ -8,7 +8,6 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" ) type cursorInput struct{} @@ -18,8 +17,7 @@ func (cursorInput) Name() string { } type source struct { - config config - tlsConfig *tlscommon.TLSConfig + config config } func (src source) Name() string { @@ -31,23 +29,15 @@ func cursorConfigure(cfg *common.Config) ([]cursor.Source, cursor.Input, error) if err := cfg.Unpack(&conf); err != nil { return nil, nil, err } - return newCursorInput(conf) + + sources, inp := newCursorInput(conf) + return sources, inp, nil } -func newCursorInput(config config) ([]cursor.Source, cursor.Input, error) { - tlsConfig, err := newTLSConfig(config) - if err != nil { - return nil, nil, err - } +func newCursorInput(config config) ([]cursor.Source, cursor.Input) { // we only allow one url per config, if we wanted to allow more than one // each source should hold only one url - return []cursor.Source{ - &source{config: config, - tlsConfig: tlsConfig, - }, - }, - &cursorInput{}, - nil + return []cursor.Source{&source{config: config}}, &cursorInput{} } func (in *cursorInput) Test(src cursor.Source, _ v2.TestContext) error { @@ -63,5 +53,5 @@ func (in *cursorInput) Run( publisher cursor.Publisher, ) error { s := src.(*source) - return run(ctx, s.config, s.tlsConfig, publisher, &cursor) + return run(ctx, s.config, publisher, &cursor) } diff --git a/x-pack/filebeat/input/httpjson/input_stateless.go b/x-pack/filebeat/input/httpjson/input_stateless.go index c7ebf6c3d4c0..fbf28b2d20c3 100644 --- a/x-pack/filebeat/input/httpjson/input_stateless.go +++ b/x-pack/filebeat/input/httpjson/input_stateless.go @@ -9,12 +9,10 @@ import ( stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" ) type statelessInput struct { - config config - tlsConfig *tlscommon.TLSConfig + config config } func (statelessInput) Name() string { @@ -26,15 +24,11 @@ func statelessConfigure(cfg *common.Config) (stateless.Input, error) { if err := cfg.Unpack(&conf); err != nil { return nil, err } - return newStatelessInput(conf) + return newStatelessInput(conf), nil } -func newStatelessInput(config config) (*statelessInput, error) { - tlsConfig, err := newTLSConfig(config) - if err != nil { - return nil, err - } - return &statelessInput{config: config, tlsConfig: tlsConfig}, nil +func newStatelessInput(config config) *statelessInput { + return &statelessInput{config: config} } func (in *statelessInput) Test(v2.TestContext) error { @@ -54,5 +48,5 @@ func (pub statelessPublisher) Publish(event beat.Event, _ interface{}) error { // It will return on context cancellation, any other error will be retried. func (in *statelessInput) Run(ctx v2.Context, publisher stateless.Publisher) error { pub := statelessPublisher{wrapped: publisher} - return run(ctx, in.config, in.tlsConfig, pub, nil) + return run(ctx, in.config, pub, nil) } diff --git a/x-pack/filebeat/input/httpjson/input_test.go b/x-pack/filebeat/input/httpjson/input_test.go index 242811d27953..9630d0c81fb8 100644 --- a/x-pack/filebeat/input/httpjson/input_test.go +++ b/x-pack/filebeat/input/httpjson/input_test.go @@ -227,9 +227,7 @@ func TestStatelessHTTPJSONInput(t *testing.T) { conf := newDefaultConfig() assert.NoError(t, cfg.Unpack(&conf)) - input, err := newStatelessInput(conf) - - assert.NoError(t, err) + input := newStatelessInput(conf) assert.Equal(t, "httpjson-stateless", input.Name()) assert.NoError(t, input.Test(v2.TestContext{})) diff --git a/x-pack/filebeat/input/httpjson/internal/v2/config.go b/x-pack/filebeat/input/httpjson/internal/v2/config.go index 2795aad5de92..aa2be1c01135 100644 --- a/x-pack/filebeat/input/httpjson/internal/v2/config.go +++ b/x-pack/filebeat/input/httpjson/internal/v2/config.go @@ -7,6 +7,8 @@ package v2 import ( "errors" "time" + + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" ) type config struct { @@ -37,16 +39,17 @@ func (c config) Validate() error { } func defaultConfig() config { - timeout := 30 * time.Second maxAttempts := 5 waitMin := time.Second waitMax := time.Minute + transport := httpcommon.DefaultHTTPTransportSettings() + transport.Timeout = 30 * time.Second + return config{ Interval: time.Minute, Auth: &authConfig{}, Request: &requestConfig{ - Timeout: &timeout, - Method: "GET", + Method: "GET", Retry: retryConfig{ MaxAttempts: &maxAttempts, WaitMin: &waitMin, @@ -54,6 +57,7 @@ func defaultConfig() config { }, RedirectForwardHeaders: false, RedirectMaxRedirects: 10, + Transport: transport, }, Response: &responseConfig{}, } diff --git a/x-pack/filebeat/input/httpjson/internal/v2/config_request.go b/x-pack/filebeat/input/httpjson/internal/v2/config_request.go index f64a03d98993..a8bc7204a230 100644 --- a/x-pack/filebeat/input/httpjson/internal/v2/config_request.go +++ b/x-pack/filebeat/input/httpjson/internal/v2/config_request.go @@ -12,7 +12,7 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" ) type retryConfig struct { @@ -76,26 +76,18 @@ func (u *urlConfig) Unpack(in string) error { } type requestConfig struct { - URL *urlConfig `config:"url" validate:"required"` - Method string `config:"method" validate:"required"` - Body *common.MapStr `config:"body"` - EncodeAs string `config:"encode_as"` - Timeout *time.Duration `config:"timeout"` - SSL *tlscommon.Config `config:"ssl"` - Retry retryConfig `config:"retry"` - RedirectForwardHeaders bool `config:"redirect.forward_headers"` - RedirectHeadersBanList []string `config:"redirect.headers_ban_list"` - RedirectMaxRedirects int `config:"redirect.max_redirects"` - RateLimit *rateLimitConfig `config:"rate_limit"` - Transforms transformsConfig `config:"transforms"` - ProxyURL *urlConfig `config:"proxy_url"` -} - -func (c requestConfig) getTimeout() time.Duration { - if c.Timeout == nil { - return 0 - } - return *c.Timeout + URL *urlConfig `config:"url" validate:"required"` + Method string `config:"method" validate:"required"` + Body *common.MapStr `config:"body"` + EncodeAs string `config:"encode_as"` + Retry retryConfig `config:"retry"` + RedirectForwardHeaders bool `config:"redirect.forward_headers"` + RedirectHeadersBanList []string `config:"redirect.headers_ban_list"` + RedirectMaxRedirects int `config:"redirect.max_redirects"` + RateLimit *rateLimitConfig `config:"rate_limit"` + Transforms transformsConfig `config:"transforms"` + + Transport httpcommon.HTTPTransportSettings `config:",inline"` } func (c *requestConfig) Validate() error { @@ -110,10 +102,6 @@ func (c *requestConfig) Validate() error { return fmt.Errorf("unsupported method %q", c.Method) } - if c.Timeout != nil && *c.Timeout <= 0 { - return errors.New("timeout must be greater than 0") - } - if _, err := newBasicTransformsFromConfig(c.Transforms, requestNamespace, nil); err != nil { return err } diff --git a/x-pack/filebeat/input/httpjson/internal/v2/input.go b/x-pack/filebeat/input/httpjson/internal/v2/input.go index e8c8fe510829..9cbd017c6ba9 100644 --- a/x-pack/filebeat/input/httpjson/internal/v2/input.go +++ b/x-pack/filebeat/input/httpjson/internal/v2/input.go @@ -20,7 +20,7 @@ import ( inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/libbeat/common/useragent" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/go-concert/ctxtool" @@ -64,19 +64,6 @@ func (log *retryLogger) Warn(format string, args ...interface{}) { log.log.Warnf(format, args...) } -func newTLSConfig(config config) (*tlscommon.TLSConfig, error) { - if err := config.Validate(); err != nil { - return nil, err - } - - tlsConfig, err := tlscommon.LoadTLSConfig(config.Request.SSL) - if err != nil { - return nil, err - } - - return tlsConfig, nil -} - func test(url *url.URL) error { port := func() string { if url.Port() != "" { @@ -100,7 +87,6 @@ func test(url *url.URL) error { func run( ctx v2.Context, config config, - tlsConfig *tlscommon.TLSConfig, publisher inputcursor.Publisher, cursor *inputcursor.Cursor, ) error { @@ -108,7 +94,7 @@ func run( stdCtx := ctxtool.FromCanceller(ctx.Cancelation) - httpClient, err := newHTTPClient(stdCtx, config, tlsConfig, log) + httpClient, err := newHTTPClient(stdCtx, config, log) if err != nil { return err } @@ -147,27 +133,20 @@ func run( return nil } -func newHTTPClient(ctx context.Context, config config, tlsConfig *tlscommon.TLSConfig, log *logp.Logger) (*httpClient, error) { - timeout := config.Request.getTimeout() - proxy_url := config.Request.ProxyURL - +func newHTTPClient(ctx context.Context, config config, log *logp.Logger) (*httpClient, error) { // Make retryable HTTP client - transport := &http.Transport{ - DialContext: (&net.Dialer{ - Timeout: timeout, - }).DialContext, - TLSClientConfig: tlsConfig.ToConfig(), - DisableKeepAlives: true, - } - if proxy_url != nil && proxy_url.URL != nil { - transport.Proxy = http.ProxyURL(proxy_url.URL) + netHTTPClient, err := config.Request.Transport.Client( + httpcommon.WithAPMHTTPInstrumentation(), + httpcommon.WithKeepaliveSettings{Disable: true}, + ) + if err != nil { + return nil, err } + + netHTTPClient.CheckRedirect = checkRedirect(config.Request, log) + client := &retryablehttp.Client{ - HTTPClient: &http.Client{ - Transport: transport, - Timeout: timeout, - CheckRedirect: checkRedirect(config.Request, log), - }, + HTTPClient: netHTTPClient, Logger: newRetryLogger(log), RetryWaitMin: config.Request.Retry.getWaitMin(), RetryWaitMax: config.Request.Retry.getWaitMax(), diff --git a/x-pack/filebeat/input/httpjson/internal/v2/input_cursor.go b/x-pack/filebeat/input/httpjson/internal/v2/input_cursor.go index 537e67762df8..db454f6776d0 100644 --- a/x-pack/filebeat/input/httpjson/internal/v2/input_cursor.go +++ b/x-pack/filebeat/input/httpjson/internal/v2/input_cursor.go @@ -8,7 +8,6 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" ) type cursorInput struct{} @@ -18,8 +17,7 @@ func (cursorInput) Name() string { } type source struct { - config config - tlsConfig *tlscommon.TLSConfig + config config } func (src source) Name() string { @@ -31,23 +29,14 @@ func cursorConfigure(cfg *common.Config) ([]inputcursor.Source, inputcursor.Inpu if err := cfg.Unpack(&conf); err != nil { return nil, nil, err } - return newCursorInput(conf) + sources, inp := newCursorInput(conf) + return sources, inp, nil } -func newCursorInput(config config) ([]inputcursor.Source, inputcursor.Input, error) { - tlsConfig, err := newTLSConfig(config) - if err != nil { - return nil, nil, err - } +func newCursorInput(config config) ([]inputcursor.Source, inputcursor.Input) { // we only allow one url per config, if we wanted to allow more than one // each source should hold only one url - return []inputcursor.Source{ - &source{config: config, - tlsConfig: tlsConfig, - }, - }, - &cursorInput{}, - nil + return []inputcursor.Source{&source{config: config}}, &cursorInput{} } func (in *cursorInput) Test(src inputcursor.Source, _ v2.TestContext) error { @@ -63,5 +52,5 @@ func (in *cursorInput) Run( publisher inputcursor.Publisher, ) error { s := src.(*source) - return run(ctx, s.config, s.tlsConfig, publisher, &cursor) + return run(ctx, s.config, publisher, &cursor) } diff --git a/x-pack/filebeat/input/httpjson/internal/v2/input_stateless.go b/x-pack/filebeat/input/httpjson/internal/v2/input_stateless.go index 92a1b8ae2dd9..4a871d62d779 100644 --- a/x-pack/filebeat/input/httpjson/internal/v2/input_stateless.go +++ b/x-pack/filebeat/input/httpjson/internal/v2/input_stateless.go @@ -9,12 +9,10 @@ import ( stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" ) type statelessInput struct { - config config - tlsConfig *tlscommon.TLSConfig + config config } func (statelessInput) Name() string { @@ -30,11 +28,7 @@ func statelessConfigure(cfg *common.Config) (stateless.Input, error) { } func newStatelessInput(config config) (*statelessInput, error) { - tlsConfig, err := newTLSConfig(config) - if err != nil { - return nil, err - } - return &statelessInput{config: config, tlsConfig: tlsConfig}, nil + return &statelessInput{config: config}, nil } func (in *statelessInput) Test(v2.TestContext) error { @@ -54,5 +48,5 @@ func (pub statelessPublisher) Publish(event beat.Event, _ interface{}) error { // It will return on context cancellation, any other error will be retried. func (in *statelessInput) Run(ctx v2.Context, publisher stateless.Publisher) error { pub := statelessPublisher{wrapped: publisher} - return run(ctx, in.config, in.tlsConfig, pub, nil) + return run(ctx, in.config, pub, nil) } diff --git a/x-pack/filebeat/input/httpjson/internal/v2/request_test.go b/x-pack/filebeat/input/httpjson/internal/v2/request_test.go index f6564af3ed0e..6e35d5cc45f5 100644 --- a/x-pack/filebeat/input/httpjson/internal/v2/request_test.go +++ b/x-pack/filebeat/input/httpjson/internal/v2/request_test.go @@ -58,7 +58,7 @@ func TestCtxAfterDoRequest(t *testing.T) { log := logp.NewLogger("") ctx := context.Background() - client, err := newHTTPClient(ctx, config, nil, log) + client, err := newHTTPClient(ctx, config, log) assert.NoError(t, err) requestFactory := newRequestFactory(config.Request, nil, log) diff --git a/x-pack/libbeat/common/cloudfoundry/config.go b/x-pack/libbeat/common/cloudfoundry/config.go index 2f15d0c7cf9e..a78755062e40 100644 --- a/x-pack/libbeat/common/cloudfoundry/config.go +++ b/x-pack/libbeat/common/cloudfoundry/config.go @@ -5,12 +5,11 @@ package cloudfoundry import ( - "crypto/tls" "fmt" "strings" "time" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" ) const ( @@ -26,9 +25,6 @@ type Config struct { ClientID string `config:"client_id" validate:"required"` ClientSecret string `config:"client_secret" validate:"required"` - // TLS configuration for the client - TLS *tlscommon.Config `config:"ssl"` - // Override URLs returned from the CF client APIAddress string `config:"api_address" validate:"required"` DopplerAddress string `config:"doppler_address"` @@ -44,6 +40,8 @@ type Config struct { // Time to wait before retrying to get application info in case of error. CacheRetryDelay time.Duration `config:"cache_retry_delay"` + + Transport httpcommon.HTTPTransportSettings `config:",inline"` } // InitDefaults initialize the defaults for the configuration. @@ -51,6 +49,8 @@ func (c *Config) InitDefaults() { c.CacheDuration = 120 * time.Second c.CacheRetryDelay = 20 * time.Second c.Version = ConsumerVersionV1 + + c.Transport = httpcommon.DefaultHTTPTransportSettings() } func (c *Config) Validate() error { @@ -61,15 +61,6 @@ func (c *Config) Validate() error { return nil } -// TLSConfig returns the TLS configuration. -func (c *Config) TLSConfig() (*tls.Config, error) { - tls, err := tlscommon.LoadTLSConfig(c.TLS) - if err != nil { - return nil, err - } - return tls.ToConfig(), nil -} - func anyOf(elems []string, s string) bool { for _, elem := range elems { if s == elem { diff --git a/x-pack/libbeat/common/cloudfoundry/hub.go b/x-pack/libbeat/common/cloudfoundry/hub.go index 4cf9757c2780..9bcf929ded2c 100644 --- a/x-pack/libbeat/common/cloudfoundry/hub.go +++ b/x-pack/libbeat/common/cloudfoundry/hub.go @@ -11,6 +11,8 @@ import ( "github.com/cloudfoundry-community/go-cfclient" "github.com/pkg/errors" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" "github.com/elastic/beats/v7/libbeat/logp" ) @@ -151,23 +153,11 @@ func (h *Hub) doerFromClient(client *cfclient.Client) (*authTokenDoer, error) { // httpClient returns an HTTP client configured with the configuration TLS. func (h *Hub) httpClient() (*http.Client, bool, error) { - tls, err := h.cfg.TLSConfig() + httpClient, err := h.cfg.Transport.Client(httpcommon.WithAPMHTTPInstrumentation()) if err != nil { - return nil, true, err + return nil, false, err } - httpClient := cfclient.DefaultConfig().HttpClient - tp := defaultTransport() - tp.TLSClientConfig = tls - httpClient.Transport = tp - return httpClient, tls.InsecureSkipVerify, nil -} -// defaultTransport returns a new http.Transport for http.Client -func defaultTransport() *http.Transport { - defaultTransport := http.DefaultTransport.(*http.Transport) - return &http.Transport{ - Proxy: defaultTransport.Proxy, - TLSHandshakeTimeout: defaultTransport.TLSHandshakeTimeout, - ExpectContinueTimeout: defaultTransport.ExpectContinueTimeout, - } + tls, _ := tlscommon.LoadTLSConfig(h.cfg.Transport.TLS) + return httpClient, tls.ToConfig().InsecureSkipVerify, nil } diff --git a/x-pack/metricbeat/module/openmetrics/collector/_meta/data.json b/x-pack/metricbeat/module/openmetrics/collector/_meta/data.json index 88dba9c9659b..5c3e9aec6ba8 100644 --- a/x-pack/metricbeat/module/openmetrics/collector/_meta/data.json +++ b/x-pack/metricbeat/module/openmetrics/collector/_meta/data.json @@ -11,7 +11,7 @@ }, "openmetrics": { "labels": { - "device": "br-10229e3512d9", + "device": "br-3a285aa5e58c", "job": "openmetrics" }, "metrics": { diff --git a/x-pack/metricbeat/module/prometheus/collector/_meta/data.json b/x-pack/metricbeat/module/prometheus/collector/_meta/data.json index f0fed23d976d..f6c2e256b753 100644 --- a/x-pack/metricbeat/module/prometheus/collector/_meta/data.json +++ b/x-pack/metricbeat/module/prometheus/collector/_meta/data.json @@ -1,5 +1,5 @@ { - "@timestamp": "2017-10-12T08:05:34.853Z", + "@timestamp": "2019-03-01T08:05:34.853Z", "event": { "dataset": "prometheus.collector", "duration": 115000, @@ -11,21 +11,15 @@ }, "prometheus": { "labels": { - "instance": "172.27.0.2:9090", - "interval": "15s", + "device": "br-210476dc4ef8", "job": "prometheus" }, - "prometheus_target_interval_length_seconds_count": { - "counter": 1, - "rate": 0 - }, - "prometheus_target_interval_length_seconds_sum": { - "counter": 15.000401344, - "rate": 0 + "node_network_carrier": { + "value": 0 } }, "service": { - "address": "172.27.0.2:9090", + "address": "127.0.0.1:55555", "type": "prometheus" } } \ No newline at end of file