Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: Introduce NewHTTPTransportOptions #1168

Merged
merged 8 commits into from
Jan 11, 2022
Merged
122 changes: 98 additions & 24 deletions transport/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,49 @@ var (
defaultServerTimeout = 30 * time.Second
)

// HTTPTransportOptions for the HTTPTransport.
type HTTPTransportOptions struct {
// APIKey holds the APIKey base64-encoded string. Specifying an APIKey will
// the SecretToken, if set.
APIKey string

// SecretToken holds the secret token configured in the APM Server.
SecretToken string

// ServerCert can be set if you have configured your APM Server with a
// self signed TLS certificate, or you want to verify the server certificate
// matches this exact TLS certificate.
ServerCert string

// ServerCACert can be set if you want to specify a path to the PEM-encoded
// Certificate Authority to verify the Server TLS certificate.
ServerCACert string

// ServerURLs holds the URLs for your Elastic APM Server. The Server
// supports both HTTP and HTTPS. If you use HTTPS, then you may need to
// configure your client machines so that the server certificate can be
// verified. You can disable certificate verification with SkipServerVerify.
// If no URL is specified, then the transport will use the default URL
// "http://localhost:8200".
ServerURLs []*url.URL

// ServerTimeout holds the timeout for requests made to your Elastic APM
// server. When set to zero, it will default to 30 seconds. Negative values
// are not allowed.
ServerTimeout time.Duration

// SkipServerVerify skips TLS certificate validation of the APM Server.
SkipServerVerify bool
}

// Validate ensures the HTTPTransportOptions are valid.
func (opts HTTPTransportOptions) Validate() error {
if opts.ServerTimeout < 0 {
return errors.New("apm transport options: ServerTimeout must be greater or equal to 0")
}
return nil
}

// HTTPTransport is an implementation of Transport, sending payloads via
// a net/http client.
type HTTPTransport struct {
Expand Down Expand Up @@ -99,6 +142,9 @@ type HTTPTransport struct {
// - ELASTIC_APM_SERVER_TIMEOUT: timeout for requests to the APM Server.
// If not specified, defaults to 30 seconds.
//
// - ELASTIC_APM_API_KEY: base64-encoded string used for authentication.
// Setting this environment variable ignores ELASTIC_APM_SECRET_TOKEN.
//
// - ELASTIC_APM_SECRET_TOKEN: used to authenticate the agent.
//
// - ELASTIC_APM_SERVER_CERT: path to a PEM-encoded certificate that
Expand All @@ -116,26 +162,45 @@ func NewHTTPTransport() (*HTTPTransport, error) {
if err != nil {
return nil, err
}

serverTimeout, err := configutil.ParseDurationEnv(envServerTimeout, defaultServerTimeout)
if err != nil {
return nil, err
}
if serverTimeout < 0 {
serverTimeout = 0
}

serverURLs, err := initServerURLs()
if err != nil {
return nil, err
}

tlsConfig := &tls.Config{InsecureSkipVerify: !verifyServerCert}
serverCertPath := os.Getenv(envServerCert)
if serverCertPath != "" {
serverCert, err := loadCertificate(serverCertPath)
opts := HTTPTransportOptions{
SkipServerVerify: !verifyServerCert,
ServerURLs: serverURLs,
ServerTimeout: serverTimeout,
ServerCert: os.Getenv(envServerCert),
ServerCACert: os.Getenv(envCACert),
}
if apiKey := os.Getenv(envAPIKey); apiKey != "" {
opts.APIKey = apiKey
} else if secretToken := os.Getenv(envSecretToken); secretToken != "" {
opts.SecretToken = secretToken
}
return NewHTTPTransportOptions(opts)
}

// NewHTTPTransportOptions returns a customized HTTPTransport which can be used
// for streaming data to the APM Server.
func NewHTTPTransportOptions(opts HTTPTransportOptions) (*HTTPTransport, error) {
simitt marked this conversation as resolved.
Show resolved Hide resolved
if err := opts.Validate(); err != nil {
return nil, err
}

tlsConfig := tls.Config{InsecureSkipVerify: opts.SkipServerVerify}
if opts.ServerCert != "" {
serverCert, err := loadCertificate(opts.ServerCert)
if err != nil {
return nil, errors.Wrapf(err, "failed to load certificate from %s", serverCertPath)
return nil, errors.Wrapf(err, "failed to load certificate from %s", opts.ServerCert)
}
// Disable standard verification, we'll check that the
// server supplies the exact certificate provided.
Expand All @@ -144,30 +209,32 @@ func NewHTTPTransport() (*HTTPTransport, error) {
return verifyPeerCertificate(rawCerts, serverCert)
}
}

caCertPath := os.Getenv(envCACert)
if caCertPath != "" {
if opts.ServerCACert != "" {
rootCAs := x509.NewCertPool()
additionalCerts, err := ioutil.ReadFile(caCertPath)
additionalCerts, err := ioutil.ReadFile(opts.ServerCACert)
if err != nil {
return nil, errors.Wrapf(err, "failed to load root CA file from %s", caCertPath)
return nil, errors.Wrapf(err, "failed to load root CA file from %s", opts.ServerCACert)
}
if !rootCAs.AppendCertsFromPEM(additionalCerts) {
return nil, fmt.Errorf("failed to load CA certs from %s", caCertPath)
return nil, fmt.Errorf("failed to load CA certs from %s", opts.ServerCACert)
}
tlsConfig.RootCAs = rootCAs
}

// If the ServerTimeout is unspecified, set it to defaultServerTimeout.
if opts.ServerTimeout == 0 {
opts.ServerTimeout = defaultServerTimeout
}
client := &http.Client{
Timeout: serverTimeout,
Timeout: opts.ServerTimeout,
marclop marked this conversation as resolved.
Show resolved Hide resolved
Transport: &http.Transport{
Proxy: defaultHTTPTransport.Proxy,
DialContext: defaultHTTPTransport.DialContext,
MaxIdleConns: defaultHTTPTransport.MaxIdleConns,
IdleConnTimeout: defaultHTTPTransport.IdleConnTimeout,
TLSHandshakeTimeout: defaultHTTPTransport.TLSHandshakeTimeout,
ExpectContinueTimeout: defaultHTTPTransport.ExpectContinueTimeout,
TLSClientConfig: tlsConfig,
TLSClientConfig: &tlsConfig,
},
}

Expand All @@ -187,21 +254,27 @@ func NewHTTPTransport() (*HTTPTransport, error) {
intakeHeaders: intakeHeaders,
profileHeaders: profileHeaders,
}
if apiKey := os.Getenv(envAPIKey); apiKey != "" {
t.SetAPIKey(apiKey)
} else if secretToken := os.Getenv(envSecretToken); secretToken != "" {
t.SetSecretToken(secretToken)
if opts.APIKey != "" {
t.SetAPIKey(opts.APIKey)
} else if opts.SecretToken != "" {
t.SetSecretToken(opts.SecretToken)
}

if len(opts.ServerURLs) == 0 {
opts.ServerURLs = []*url.URL{defaultServerURL}
}
if err := t.SetServerURL(opts.ServerURLs...); err != nil {
return nil, err
}
t.SetServerURL(serverURLs...)
return t, nil
}

// SetServerURL sets the APM Server URL (or URLs) for sending requests.
// At least one URL must be specified, or the method will panic. The
// list will be randomly shuffled.
func (t *HTTPTransport) SetServerURL(u ...*url.URL) {
// At least one URL must be specified, or the method will return an error.
// The list will be randomly shuffled.
func (t *HTTPTransport) SetServerURL(u ...*url.URL) error {
marclop marked this conversation as resolved.
Show resolved Hide resolved
if len(u) == 0 {
panic("SetServerURL expects at least one URL")
return errors.New("SetServerURL expects at least one URL")
}
intakeURLs := make([]*url.URL, len(u))
configURLs := make([]*url.URL, len(u))
Expand All @@ -226,6 +299,7 @@ func (t *HTTPTransport) SetServerURL(u ...*url.URL) {
t.configURLs = configURLs
t.profileURLs = profileURLs
t.urlIndex = 0
return nil
}

// SetUserAgent sets the User-Agent header that will be sent with each request.
Expand Down
78 changes: 78 additions & 0 deletions transport/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,84 @@ func TestHTTPTransportSendProfile(t *testing.T) {
)
}

func TestHTTPTransportOptionsValidation(t *testing.T) {
validURL, err := url.Parse("http://localhost:8200")
require.NoError(t, err)

t.Run("valid", func(t *testing.T) {
transport, err := transport.NewHTTPTransportOptions(transport.HTTPTransportOptions{
ServerURLs: []*url.URL{validURL},
ServerTimeout: 30 * time.Second,
})
assert.NoError(t, err)
assert.NotNil(t, transport)
})
t.Run("invalid_timeout", func(t *testing.T) {
transport, err := transport.NewHTTPTransportOptions(transport.HTTPTransportOptions{
ServerTimeout: -1,
})
assert.EqualError(t, err, "apm transport options: ServerTimeout must be greater or equal to 0")
assert.Nil(t, transport)
})
}

func TestHTTPTransportOptionsEmptyURL(t *testing.T) {
var h recordingHandler
server := httptest.NewUnstartedServer(&h)
defer server.Close()

lis, err := net.Listen("tcp", "localhost:8200")
if err != nil {
t.Skipf("cannot listen on default server address: %s", err)
}
server.Listener.Close()
server.Listener = lis
server.Start()

transport, err := transport.NewHTTPTransportOptions(transport.HTTPTransportOptions{})
require.NoError(t, err)
require.NotNil(t, transport)

err = transport.SendStream(context.Background(), strings.NewReader(""))
assert.NoError(t, err)
assert.Len(t, h.requests, 1)
}

func TestHTTPTransportOptionsDefaults(t *testing.T) {
validURL, err := url.Parse("http://localhost:8200")
require.NoError(t, err)
transport, err := transport.NewHTTPTransportOptions(transport.HTTPTransportOptions{
ServerURLs: []*url.URL{validURL},
})
assert.NoError(t, err)
assert.Equal(t, transport.Client.Timeout, 30*time.Second)
}

func TestSetServerURL(t *testing.T) {
t.Run("valid", func(t *testing.T) {
validURL, err := url.Parse("http://localhost:8200")
require.NoError(t, err)
transport, err := transport.NewHTTPTransportOptions(transport.HTTPTransportOptions{
ServerURLs: []*url.URL{validURL},
})
anotherURL, err := url.Parse("http://somethingelse:8200")
require.NoError(t, err)

err = transport.SetServerURL(anotherURL)
require.NoError(t, err)
})
t.Run("invalid", func(t *testing.T) {
validURL, err := url.Parse("http://localhost:8200")
require.NoError(t, err)
transport, err := transport.NewHTTPTransportOptions(transport.HTTPTransportOptions{
ServerURLs: []*url.URL{validURL},
})

err = transport.SetServerURL()
require.EqualError(t, err, "SetServerURL expects at least one URL")
})
}

func newHTTPTransport(t *testing.T, handler http.Handler) (*transport.HTTPTransport, *httptest.Server) {
server := httptest.NewServer(handler)
transport, err := transport.NewHTTPTransport()
Expand Down