Skip to content

Commit

Permalink
reduce copy paste
Browse files Browse the repository at this point in the history
  • Loading branch information
kvch committed Jul 22, 2021
1 parent 21e0aa8 commit f81e97c
Show file tree
Hide file tree
Showing 16 changed files with 794 additions and 1,332 deletions.
329 changes: 0 additions & 329 deletions libbeat/common/transport/httpcommon/httpcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,215 +21,11 @@ package httpcommon

import (
"net/http"
"time"

"go.elastic.co/apm/module/apmhttp"
"golang.org/x/net/http2"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/transport"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/logp"
)

// HTTPTransportSettings provides common HTTP settings for HTTP clients.
type HTTPTransportSettings struct {
// TLS provides ssl/tls setup settings
TLS *tlscommon.Config `config:"ssl" yaml:"ssl,omitempty" json:"ssl,omitempty"`

// Timeout configures the `(http.Transport).Timeout`.
Timeout time.Duration `config:"timeout" yaml:"timeout,omitempty" json:"timeout,omitempty"`

Proxy HTTPClientProxySettings `config:",inline" yaml:",inline"`

// TODO: Add more settings:
// - DisableKeepAlive
// - MaxIdleConns
// - IdleConnTimeout
// - ResponseHeaderTimeout
// - ConnectionTimeout (currently 'Timeout' is used for both)
}

// WithKeepaliveSettings options can be used to modify the Keepalive
type WithKeepaliveSettings struct {
Disable bool
MaxIdleConns int
MaxIdleConnsPerHost int
IdleConnTimeout time.Duration
}

var _ httpTransportOption = WithKeepaliveSettings{}

const defaultHTTPTimeout = 90 * time.Second

type (
// TransportOption are applied to the http.RoundTripper to be build
// from HTTPTransportSettings.
TransportOption interface{ sealTransportOption() }

extraSettings struct {
logger *logp.Logger
http2 bool
}

dialerOption interface {
TransportOption
baseDialer() transport.Dialer
}
dialerModOption interface {
TransportOption
applyDialer(*HTTPTransportSettings, transport.Dialer) transport.Dialer
}
httpTransportOption interface {
TransportOption
applyTransport(*HTTPTransportSettings, *http.Transport)
}
roundTripperOption interface {
TransportOption
applyRoundTripper(*HTTPTransportSettings, http.RoundTripper) http.RoundTripper
}
extraOption interface {
TransportOption
applyExtra(*extraSettings)
}
)

type baseDialerFunc func() transport.Dialer

var _ dialerOption = baseDialerFunc(nil)

func (baseDialerFunc) sealTransportOption() {}
func (fn baseDialerFunc) baseDialer() transport.Dialer {
return fn()
}

type dialerOptFunc func(transport.Dialer) transport.Dialer

var _ dialerModOption = dialerOptFunc(nil)

func (dialerOptFunc) sealTransportOption() {}
func (fn dialerOptFunc) applyDialer(_ *HTTPTransportSettings, d transport.Dialer) transport.Dialer {
return fn(d)

}

type transportOptFunc func(*HTTPTransportSettings, *http.Transport)

var _ httpTransportOption = transportOptFunc(nil)

func (transportOptFunc) sealTransportOption() {}
func (fn transportOptFunc) applyTransport(s *HTTPTransportSettings, t *http.Transport) {
fn(s, t)
}

type rtOptFunc func(http.RoundTripper) http.RoundTripper

var _ roundTripperOption = rtOptFunc(nil)

func (rtOptFunc) sealTransportOption() {}
func (fn rtOptFunc) applyRoundTripper(_ *HTTPTransportSettings, rt http.RoundTripper) http.RoundTripper {
return fn(rt)
}

type extraOptionFunc func(*extraSettings)

func (extraOptionFunc) sealTransportOption() {}
func (fn extraOptionFunc) applyExtra(s *extraSettings) { fn(s) }

// DefaultHTTPTransportSettings returns the default HTTP transport setting.
func DefaultHTTPTransportSettings() HTTPTransportSettings {
return HTTPTransportSettings{
Proxy: DefaultHTTPClientProxySettings(),
Timeout: defaultHTTPTimeout,
}
}

// Unpack reads a config object into the settings.
func (settings *HTTPTransportSettings) Unpack(cfg *common.Config) error {
tmp := struct {
TLS *tlscommon.Config `config:"ssl"`
Timeout time.Duration `config:"timeout"`
}{Timeout: settings.Timeout}

if err := cfg.Unpack(&tmp); err != nil {
return err
}

var proxy HTTPClientProxySettings
if err := cfg.Unpack(&proxy); err != nil {
return err
}

_, err := tlscommon.LoadTLSConfig(tmp.TLS)
if err != nil {
return err
}

*settings = HTTPTransportSettings{
TLS: tmp.TLS,
Timeout: tmp.Timeout,
Proxy: proxy,
}
return nil
}

// RoundTripper creates a http.RoundTripper for use with http.Client.
//
// The dialers will registers with stats if given. Stats is used to collect metrics for io errors,
// bytes in, and bytes out.
func (settings *HTTPTransportSettings) RoundTripper(opts ...TransportOption) (http.RoundTripper, error) {
var dialer transport.Dialer

var extra extraSettings
for _, opt := range opts {
if opt, ok := opt.(extraOption); ok {
opt.applyExtra(&extra)
}
}

for _, opt := range opts {
if dialOpt, ok := opt.(dialerOption); ok {
dialer = dialOpt.baseDialer()
}
}

if dialer == nil {
dialer = transport.NetDialer(settings.Timeout)
}

tls, err := tlscommon.LoadTLSConfig(settings.TLS)
if err != nil {
return nil, err
}

tlsDialer := transport.TLSDialer(dialer, tls, settings.Timeout)
for _, opt := range opts {
if dialOpt, ok := opt.(dialerModOption); ok {
dialer = dialOpt.applyDialer(settings, dialer)
tlsDialer = dialOpt.applyDialer(settings, tlsDialer)
}
}

if logger := extra.logger; logger != nil {
dialer = transport.LoggingDialer(dialer, logger)
tlsDialer = transport.LoggingDialer(tlsDialer, logger)
}

var rt http.RoundTripper
if extra.http2 {
rt, err = settings.http2RoundTripper(tls, dialer, tlsDialer, opts...)
} else {
rt, err = settings.httpRoundTripper(tls, dialer, tlsDialer, opts...)
}

for _, opt := range opts {
if rtOpt, ok := opt.(roundTripperOption); ok {
rt = rtOpt.applyRoundTripper(settings, rt)
}
}
return rt, nil
}

func (settings *HTTPTransportSettings) httpRoundTripper(
tls *tlscommon.TLSConfig,
dialer, tlsDialer transport.Dialer,
Expand Down Expand Up @@ -257,128 +53,3 @@ func (settings *HTTPTransportSettings) httpRoundTripper(

return t, nil
}

func (settings *HTTPTransportSettings) http2RoundTripper(
tls *tlscommon.TLSConfig,
dialer, tlsDialer transport.Dialer,
opts ...TransportOption,
) (*http2.Transport, error) {
t1, err := settings.httpRoundTripper(tls, dialer, tlsDialer, opts...)
if err != nil {
return nil, err
}

t2, err := http2.ConfigureTransports(t1)
if err != nil {
return nil, err
}

t2.AllowHTTP = true
return t2, nil
}

// Client creates a new http.Client with configured Transport. The transport is
// instrumented using apmhttp.WrapRoundTripper.
func (settings HTTPTransportSettings) Client(opts ...TransportOption) (*http.Client, error) {
rt, err := settings.RoundTripper(opts...)
if err != nil {
return nil, err
}

return &http.Client{Transport: rt, Timeout: settings.Timeout}, nil
}

func (opts WithKeepaliveSettings) sealTransportOption() {}
func (opts WithKeepaliveSettings) applyTransport(_ *HTTPTransportSettings, t *http.Transport) {
t.DisableKeepAlives = opts.Disable
if opts.IdleConnTimeout != 0 {
t.IdleConnTimeout = opts.IdleConnTimeout
}
if opts.MaxIdleConns != 0 {
t.MaxIdleConns = opts.MaxIdleConns
}
if opts.MaxIdleConnsPerHost != 0 {
t.MaxIdleConnsPerHost = opts.MaxIdleConnsPerHost
}
}

// WithBaseDialer configures the dialer used for TCP and TLS connections.
func WithBaseDialer(d transport.Dialer) TransportOption {
return baseDialerFunc(func() transport.Dialer {
return d
})
}

// WithIOStats instruments the RoundTripper dialers with the given statser, such
// that bytes in, bytes out, and errors can be monitored.
func WithIOStats(stats transport.IOStatser) TransportOption {
return dialerOptFunc(func(d transport.Dialer) transport.Dialer {
if stats == nil {
return d
}
return transport.StatsDialer(d, stats)
})
}

// WithTransportFunc register a custom function that is used to apply
// custom changes to the net.Transport, when the Client is build.
func WithTransportFunc(fn func(*http.Transport)) TransportOption {
return transportOptFunc(func(_ *HTTPTransportSettings, t *http.Transport) {
fn(t)
})
}

// WithHTTP2Only will ensure that a HTTP 2 only roundtripper is created.
func WithHTTP2Only(b bool) TransportOption {
return extraOptionFunc(func(settings *extraSettings) {
settings.http2 = b
})
}

// WithForceAttemptHTTP2 sets the `http.Tansport.ForceAttemptHTTP2` field.
func WithForceAttemptHTTP2(b bool) TransportOption {
return transportOptFunc(func(settings *HTTPTransportSettings, t *http.Transport) {
t.ForceAttemptHTTP2 = b
})
}

// WithNOProxy disables the configured proxy. Proxy environment variables
// like HTTP_PROXY and HTTPS_PROXY will have no affect.
func WithNOProxy() TransportOption {
return transportOptFunc(func(s *HTTPTransportSettings, t *http.Transport) {
t.Proxy = nil
})
}

// WithoutProxyEnvironmentVariables disables support for the HTTP_PROXY, HTTPS_PROXY and
// NO_PROXY envionrment variables. Explicitely configured proxy URLs will still applied.
func WithoutProxyEnvironmentVariables() TransportOption {
return transportOptFunc(func(settings *HTTPTransportSettings, t *http.Transport) {
if settings.Proxy.Disable || settings.Proxy.URL == nil {
t.Proxy = nil
}
})
}

// WithModRoundtripper allows customization of the roundtipper.
func WithModRoundtripper(w func(http.RoundTripper) http.RoundTripper) TransportOption {
return rtOptFunc(w)
}

var withAPMHTTPRountTripper = WithModRoundtripper(func(rt http.RoundTripper) http.RoundTripper {
return apmhttp.WrapRoundTripper(rt)
})

// WithAPMHTTPInstrumentation insruments the HTTP client via apmhttp.WrapRoundTripper.
// Custom APM round tripper wrappers can be configured via WithModRoundtripper.
func WithAPMHTTPInstrumentation() TransportOption {
return withAPMHTTPRountTripper
}

// WithLogger sets the internal logger that will be used to log dial or TCP level errors.
// Logging at the connection level will only happen if the logger has been set.
func WithLogger(logger *logp.Logger) TransportOption {
return extraOptionFunc(func(s *extraSettings) {
s.logger = logger
})
}
Loading

0 comments on commit f81e97c

Please sign in to comment.