From 9c1c359406ac3d241bea235677b0f88b320e5f88 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 9 Jul 2020 15:08:48 -0400 Subject: [PATCH] [Elastic Agent] Add support for multiple hosts in connection to kibana (#19628) (#19794) * Add ability for multiple hosts to be defined for agents connection to kibana. * Fix reference. * Add changelog. (cherry picked from commit 57c42808d0c46c48f22c2ac63a06197b031d8559) --- x-pack/elastic-agent/CHANGELOG.asciidoc | 1 + .../_meta/config/common.p2.yml.tmpl | 2 +- .../_meta/config/common.reference.p2.yml.tmpl | 2 +- .../config/elastic-agent.docker.yml.tmpl | 2 +- x-pack/elastic-agent/_meta/elastic-agent.yml | 2 +- x-pack/elastic-agent/elastic-agent.docker.yml | 2 +- .../elastic-agent/elastic-agent.reference.yml | 2 +- x-pack/elastic-agent/elastic-agent.yml | 2 +- x-pack/elastic-agent/pkg/kibana/client.go | 184 ++++++++++++------ .../elastic-agent/pkg/kibana/client_test.go | 87 ++++++++- x-pack/elastic-agent/pkg/kibana/config.go | 11 ++ 11 files changed, 228 insertions(+), 69 deletions(-) diff --git a/x-pack/elastic-agent/CHANGELOG.asciidoc b/x-pack/elastic-agent/CHANGELOG.asciidoc index 8c145817b437..6cad5b3cfc9f 100644 --- a/x-pack/elastic-agent/CHANGELOG.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.asciidoc @@ -79,3 +79,4 @@ - Rename input.type logs to logfile {pull}19360[19360] - Agent now installs/uninstalls Elastic Endpoint {pull}19248[19248] - Agent now downloads Elastic Endpoint {pull}19503[19503] +- Agent now load balances across multiple Kibana instances {pull}19628[19628] diff --git a/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl b/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl index 81dbde654195..3cb0bcdac463 100644 --- a/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl +++ b/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl @@ -42,7 +42,7 @@ inputs: # access_token: "" # kibana: # # kibana minimal configuration -# host: "localhost:5601" +# hosts: ["localhost:5601"] # ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] # # optional values diff --git a/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl b/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl index 59ea36c70bc4..97290ee4daef 100644 --- a/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl +++ b/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl @@ -34,7 +34,7 @@ inputs: # access_token: "" # kibana: # # kibana minimal configuration -# host: "localhost:5601" +# hosts: ["localhost:5601"] # ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] # # optional values diff --git a/x-pack/elastic-agent/_meta/config/elastic-agent.docker.yml.tmpl b/x-pack/elastic-agent/_meta/config/elastic-agent.docker.yml.tmpl index 212d6944b6ef..70b4f496c3c0 100644 --- a/x-pack/elastic-agent/_meta/config/elastic-agent.docker.yml.tmpl +++ b/x-pack/elastic-agent/_meta/config/elastic-agent.docker.yml.tmpl @@ -34,7 +34,7 @@ inputs: # access_token: "" # kibana: # # kibana minimal configuration -# host: "localhost:5601" +# hosts: ["localhost:5601"] # ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] # # optional values diff --git a/x-pack/elastic-agent/_meta/elastic-agent.yml b/x-pack/elastic-agent/_meta/elastic-agent.yml index e8d47c9ea75e..8c3c518194dd 100644 --- a/x-pack/elastic-agent/_meta/elastic-agent.yml +++ b/x-pack/elastic-agent/_meta/elastic-agent.yml @@ -34,7 +34,7 @@ inputs: # access_token: "" # kibana: # # kibana minimal configuration -# host: "localhost:5601" +# hosts: ["localhost:5601"] # ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] # # optional values diff --git a/x-pack/elastic-agent/elastic-agent.docker.yml b/x-pack/elastic-agent/elastic-agent.docker.yml index 8c51ecd61206..6f800c1dc082 100644 --- a/x-pack/elastic-agent/elastic-agent.docker.yml +++ b/x-pack/elastic-agent/elastic-agent.docker.yml @@ -34,7 +34,7 @@ inputs: # access_token: "" # kibana: # # kibana minimal configuration -# host: "localhost:5601" +# hosts: ["localhost:5601"] # ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] # # optional values diff --git a/x-pack/elastic-agent/elastic-agent.reference.yml b/x-pack/elastic-agent/elastic-agent.reference.yml index d5a5966dfb48..0607ef8d2266 100644 --- a/x-pack/elastic-agent/elastic-agent.reference.yml +++ b/x-pack/elastic-agent/elastic-agent.reference.yml @@ -40,7 +40,7 @@ inputs: # access_token: "" # kibana: # # kibana minimal configuration -# host: "localhost:5601" +# hosts: ["localhost:5601"] # ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] # # optional values diff --git a/x-pack/elastic-agent/elastic-agent.yml b/x-pack/elastic-agent/elastic-agent.yml index d1c7c6220be5..b709e9c8ab52 100644 --- a/x-pack/elastic-agent/elastic-agent.yml +++ b/x-pack/elastic-agent/elastic-agent.yml @@ -48,7 +48,7 @@ inputs: # access_token: "" # kibana: # # kibana minimal configuration -# host: "localhost:5601" +# hosts: ["localhost:5601"] # ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] # # optional values diff --git a/x-pack/elastic-agent/pkg/kibana/client.go b/x-pack/elastic-agent/pkg/kibana/client.go index 5a20d1b077af..7411742d2510 100644 --- a/x-pack/elastic-agent/pkg/kibana/client.go +++ b/x-pack/elastic-agent/pkg/kibana/client.go @@ -10,6 +10,7 @@ import ( "net/http" "net/url" "strings" + "sync" "time" "github.com/pkg/errors" @@ -24,20 +25,19 @@ import ( const ( kibanaPort = 5601 kibanaHTTPSPort = 443 + + kibanaRetryOnBadConnTimeout = 5 * time.Minute ) type requestFunc func(string, string, url.Values, io.Reader) (*http.Request, error) type wrapperFunc func(rt http.RoundTripper) (http.RoundTripper, error) -type clienter interface { - Send( - method string, - path string, - params url.Values, - headers http.Header, - body io.Reader, - ) (*http.Response, error) - Close() error +type requestClient struct { + request requestFunc + client http.Client + lastUsed time.Time + lastErr error + lastErrOcc time.Time } // Client wraps an http.Client and takes care of making the raw calls to kibana, the client should @@ -46,27 +46,11 @@ type clienter interface { // implementations that will take care of the boiler plates. type Client struct { log *logger.Logger - request requestFunc - client http.Client + lock sync.Mutex + clients []*requestClient config *Config } -// New creates new Kibana API client. -func New( - log *logger.Logger, - factory requestFunc, - cfg *Config, - httpClient http.Client, -) (*Client, error) { - c := &Client{ - log: log, - request: factory, - client: httpClient, - config: cfg, - } - return c, nil -} - // NewConfigFromURL returns a Kibana Config based on a received host. func NewConfigFromURL(kURL string) (*Config, error) { u, err := url.Parse(kURL) @@ -112,29 +96,6 @@ func NewWithRawConfig(log *logger.Logger, config *config.Config, wrapper wrapper // NewWithConfig takes a Kibana Config and return a client. func NewWithConfig(log *logger.Logger, cfg *Config, wrapper wrapperFunc) (*Client, error) { - var transport http.RoundTripper - transport, err := makeTransport(cfg.Timeout, cfg.TLS) - if err != nil { - return nil, err - } - - if cfg.IsBasicAuth() { - // Pass basic auth credentials to all the underlying calls. - transport = NewBasicAuthRoundTripper(transport, cfg.Username, cfg.Password) - } - - if wrapper != nil { - transport, err = wrapper(transport) - if err != nil { - return nil, errors.Wrap(err, "fail to create transport client") - } - } - - httpClient := http.Client{ - Transport: transport, - Timeout: cfg.Timeout, - } - // Normalize the URL with the path any spaces configured. var p string if len(cfg.SpaceID) > 0 { @@ -152,22 +113,53 @@ func NewWithConfig(log *logger.Logger, cfg *Config, wrapper wrapperFunc) (*Clien usedDefaultPort = kibanaHTTPSPort } - kibanaURL, err := common.MakeURL(string(cfg.Protocol), p, cfg.Host, usedDefaultPort) - if err != nil { - return nil, errors.Wrap(err, "invalid Kibana endpoint") + hosts := cfg.GetHosts() + clients := make([]*requestClient, len(hosts)) + for i, host := range cfg.GetHosts() { + var transport http.RoundTripper + transport, err := makeTransport(cfg.Timeout, cfg.TLS) + if err != nil { + return nil, err + } + + if cfg.IsBasicAuth() { + // Pass basic auth credentials to all the underlying calls. + transport = NewBasicAuthRoundTripper(transport, cfg.Username, cfg.Password) + } + + if wrapper != nil { + transport, err = wrapper(transport) + if err != nil { + return nil, errors.Wrap(err, "fail to create transport client") + } + } + + httpClient := http.Client{ + Transport: transport, + Timeout: cfg.Timeout, + } + + kibanaURL, err := common.MakeURL(string(cfg.Protocol), p, host, usedDefaultPort) + if err != nil { + return nil, errors.Wrap(err, "invalid Kibana endpoint") + } + clients[i] = &requestClient{ + request: prefixRequestFactory(kibanaURL), + client: httpClient, + } } - return New(log, prefixRequestFactory(kibanaURL), cfg, httpClient) + return new(log, cfg, clients...) } -// Send executes a direct calls agains't the Kibana API, the method will takes cares of cloning +// Send executes a direct calls against the Kibana API, the method will takes cares of cloning // also add necessary headers for Kibana likes: "Content-Type", "Accept", and "kbn-xsrf". -// No assumptions is done on the response concerning the received format, this will be the responsability +// No assumptions is done on the response concerning the received format, this will be the responsibility // of the implementation to correctly unpack any received data. // // NOTE: -// - The caller of this method is free to overrides any values found in the headers. -// - The magic of unpack kibana errors is not done in the Send method, an helper methods is provided. +// - The caller of this method is free to override any value found in the headers. +// - The magic of unpack kibana errors is not done in the Send method, a helper method is provided. func (c *Client) Send( ctx context.Context, method, path string, @@ -176,8 +168,11 @@ func (c *Client) Send( body io.Reader, ) (*http.Response, error) { c.log.Debugf("Request method: %s, path: %s", method, path) + c.lock.Lock() + defer c.lock.Unlock() + requester := c.nextRequester() - req, err := c.request(method, path, params, body) + req, err := requester.request(method, path, params, body) if err != nil { return nil, errors.Wrapf(err, "fail to create HTTP request using method %s to %s", method, path) } @@ -195,12 +190,79 @@ func (c *Client) Send( } } - return c.client.Do(req.WithContext(ctx)) + requester.lastUsed = time.Now().UTC() + resp, err := requester.client.Do(req.WithContext(ctx)) + if err != nil { + requester.lastErr = err + requester.lastErrOcc = time.Now().UTC() + } else { + requester.lastErr = nil + requester.lastErrOcc = time.Time{} + } + return resp, err } // URI returns the remote URI. func (c *Client) URI() string { - return string(c.config.Protocol) + "://" + c.config.Host + "/" + c.config.Path + host := c.config.GetHosts()[0] + return string(c.config.Protocol) + "://" + host + "/" + c.config.Path +} + +// new creates new Kibana API client. +func new( + log *logger.Logger, + cfg *Config, + httpClients ...*requestClient, +) (*Client, error) { + c := &Client{ + log: log, + clients: httpClients, + config: cfg, + } + return c, nil +} + +// nextRequester returns the requester to use. +// +// It excludes clients that have errored in the last 5 minutes. +func (c *Client) nextRequester() *requestClient { + var selected *requestClient + + now := time.Now().UTC() + for _, requester := range c.clients { + if requester.lastErr != nil && now.Sub(requester.lastErrOcc) > kibanaRetryOnBadConnTimeout { + requester.lastErr = nil + requester.lastErrOcc = time.Time{} + } + if requester.lastErr != nil { + continue + } + if requester.lastUsed.IsZero() { + // never been used, instant winner! + selected = requester + break + } + if selected == nil { + selected = requester + continue + } + if requester.lastUsed.Before(selected.lastUsed) { + selected = requester + } + } + if selected == nil { + // all are erroring; select the oldest one that errored + for _, requester := range c.clients { + if selected == nil { + selected = requester + continue + } + if requester.lastErrOcc.Before(selected.lastErrOcc) { + selected = requester + } + } + } + return selected } func prefixRequestFactory(URL string) requestFunc { diff --git a/x-pack/elastic-agent/pkg/kibana/client_test.go b/x-pack/elastic-agent/pkg/kibana/client_test.go index 818b9a5ea2f0..36d24b88a009 100644 --- a/x-pack/elastic-agent/pkg/kibana/client_test.go +++ b/x-pack/elastic-agent/pkg/kibana/client_test.go @@ -14,6 +14,7 @@ import ( "strings" "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -57,7 +58,7 @@ func TestPortDefaults(t *testing.T) { c, err := NewWithConfig(l, cfg, nil) require.NoError(t, err) - r, err := c.request("GET", "/", nil, strings.NewReader("")) + r, err := c.nextRequester().request("GET", "/", nil, strings.NewReader("")) require.NoError(t, err) assert.True(t, strings.HasSuffix(r.Host, fmt.Sprintf(":%d", tc.ExpectedPort))) @@ -303,6 +304,90 @@ func TestHTTPClient(t *testing.T) { )) } +func TestNextRequester(t *testing.T) { + t.Run("Picks first requester on initial call", func(t *testing.T) { + one := &requestClient{} + two := &requestClient{} + client, err := new(nil, nil, one, two) + require.NoError(t, err) + assert.Equal(t, one, client.nextRequester()) + }) + + t.Run("Picks second requester when first has error", func(t *testing.T) { + one := &requestClient{ + lastErr: fmt.Errorf("fake error"), + lastErrOcc: time.Now().UTC(), + } + two := &requestClient{} + client, err := new(nil, nil, one, two) + require.NoError(t, err) + assert.Equal(t, two, client.nextRequester()) + }) + + t.Run("Picks second requester when first has used", func(t *testing.T) { + one := &requestClient{ + lastUsed: time.Now().UTC(), + } + two := &requestClient{} + client, err := new(nil, nil, one, two) + require.NoError(t, err) + assert.Equal(t, two, client.nextRequester()) + }) + + t.Run("Picks second requester when its oldest", func(t *testing.T) { + one := &requestClient{ + lastUsed: time.Now().UTC().Add(-time.Minute), + } + two := &requestClient{ + lastUsed: time.Now().UTC().Add(-3 * time.Minute), + } + three := &requestClient{ + lastUsed: time.Now().UTC().Add(-2 * time.Minute), + } + client, err := new(nil, nil, one, two, three) + require.NoError(t, err) + assert.Equal(t, two, client.nextRequester()) + }) + + t.Run("Picks third requester when its second has error and first is last used", func(t *testing.T) { + one := &requestClient{ + lastUsed: time.Now().UTC().Add(-time.Minute), + } + two := &requestClient{ + lastUsed: time.Now().UTC().Add(-3 * time.Minute), + lastErr: fmt.Errorf("fake error"), + lastErrOcc: time.Now().Add(-time.Minute), + } + three := &requestClient{ + lastUsed: time.Now().UTC().Add(-2 * time.Minute), + } + client, err := new(nil, nil, one, two, three) + require.NoError(t, err) + assert.Equal(t, three, client.nextRequester()) + }) + + t.Run("Picks second requester when its oldest and all have old errors", func(t *testing.T) { + one := &requestClient{ + lastUsed: time.Now().UTC().Add(-time.Minute), + lastErr: fmt.Errorf("fake error"), + lastErrOcc: time.Now().Add(-time.Minute), + } + two := &requestClient{ + lastUsed: time.Now().UTC().Add(-3 * time.Minute), + lastErr: fmt.Errorf("fake error"), + lastErrOcc: time.Now().Add(-3 * time.Minute), + } + three := &requestClient{ + lastUsed: time.Now().UTC().Add(-2 * time.Minute), + lastErr: fmt.Errorf("fake error"), + lastErrOcc: time.Now().Add(-2 * time.Minute), + } + client, err := new(nil, nil, one, two, three) + require.NoError(t, err) + assert.Equal(t, two, client.nextRequester()) + }) +} + func withServer(m func(t *testing.T) *http.ServeMux, test func(t *testing.T, host string)) func(t *testing.T) { return func(t *testing.T) { s := httptest.NewServer(m(t)) diff --git a/x-pack/elastic-agent/pkg/kibana/config.go b/x-pack/elastic-agent/pkg/kibana/config.go index 14182898bc50..928dff7106d9 100644 --- a/x-pack/elastic-agent/pkg/kibana/config.go +++ b/x-pack/elastic-agent/pkg/kibana/config.go @@ -19,6 +19,7 @@ type Config struct { Password string `config:"password" yaml:"password,omitempty"` Path string `config:"path" yaml:"path,omitempty"` Host string `config:"host" yaml:"host,omitempty"` + Hosts []string `config:"hosts" yaml:"hosts,omitempty"` Timeout time.Duration `config:"timeout" yaml:"timeout,omitempty"` TLS *tlscommon.Config `config:"ssl" yaml:"ssl,omitempty"` } @@ -53,3 +54,13 @@ func defaultClientConfig() Config { func (c *Config) IsBasicAuth() bool { return len(c.Username) > 0 && len(c.Password) > 0 } + +// GetHosts returns the hosts to connect to kibana. +// +// This looks first at `Hosts` and then at `Host` when `Hosts` is not defined. +func (c *Config) GetHosts() []string { + if len(c.Hosts) > 0 { + return c.Hosts + } + return []string{c.Host} +}