From 12279042d3a7347a6f90a2c8e0f83be2592d17ef Mon Sep 17 00:00:00 2001 From: Soulou Date: Tue, 23 Oct 2018 02:50:32 +0200 Subject: [PATCH] Add support for TLS configuration in NSQ input (#3903) --- plugins/inputs/nsq/nsq.go | 49 +++++++++++++++++++++++++++------- plugins/inputs/nsq/nsq_test.go | 10 +++---- 2 files changed, 43 insertions(+), 16 deletions(-) diff --git a/plugins/inputs/nsq/nsq.go b/plugins/inputs/nsq/nsq.go index 1ef47ef054f09..b5aa43d1fd85b 100644 --- a/plugins/inputs/nsq/nsq.go +++ b/plugins/inputs/nsq/nsq.go @@ -33,17 +33,27 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/inputs" ) // Might add Lookupd endpoints for cluster discovery type NSQ struct { Endpoints []string + tls.ClientConfig + httpClient *http.Client } var sampleConfig = ` ## An array of NSQD HTTP API endpoints - endpoints = ["http://localhost:4151"] + endpoints = ["http://localhost:4151"] + + ## Or using HTTPS endpoint + endpoints = ["https://localhost:4152"] + tls_cert = "/path/to/client-cert.pem" + tls_key = "/path/to/client-key.pem" + tls_ca = "/path/to/ca.pem" + insecure_skip_verify = false ` const ( @@ -52,10 +62,14 @@ const ( func init() { inputs.Add("nsq", func() telegraf.Input { - return &NSQ{} + return New() }) } +func New() *NSQ { + return &NSQ{} +} + func (n *NSQ) SampleConfig() string { return sampleConfig } @@ -65,6 +79,15 @@ func (n *NSQ) Description() string { } func (n *NSQ) Gather(acc telegraf.Accumulator) error { + var err error + + if n.httpClient == nil { + n.httpClient, err = n.getHttpClient() + if err != nil { + return err + } + } + var wg sync.WaitGroup for _, e := range n.Endpoints { wg.Add(1) @@ -78,13 +101,19 @@ func (n *NSQ) Gather(acc telegraf.Accumulator) error { return nil } -var tr = &http.Transport{ - ResponseHeaderTimeout: time.Duration(3 * time.Second), -} - -var client = &http.Client{ - Transport: tr, - Timeout: time.Duration(4 * time.Second), +func (n *NSQ) getHttpClient() (*http.Client, error) { + tlsConfig, err := n.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } + tr := &http.Transport{ + TLSClientConfig: tlsConfig, + } + httpClient := &http.Client{ + Transport: tr, + Timeout: time.Duration(4 * time.Second), + } + return httpClient, nil } func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error { @@ -92,7 +121,7 @@ func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error { if err != nil { return err } - r, err := client.Get(u.String()) + r, err := n.httpClient.Get(u.String()) if err != nil { return fmt.Errorf("Error while polling %s: %s", u.String(), err) } diff --git a/plugins/inputs/nsq/nsq_test.go b/plugins/inputs/nsq/nsq_test.go index f3e9ce8689a64..1d3b541e5f1ce 100644 --- a/plugins/inputs/nsq/nsq_test.go +++ b/plugins/inputs/nsq/nsq_test.go @@ -19,9 +19,8 @@ func TestNSQStatsV1(t *testing.T) { })) defer ts.Close() - n := &NSQ{ - Endpoints: []string{ts.URL}, - } + n := New() + n.Endpoints = []string{ts.URL} var acc testutil.Accumulator err := acc.GatherError(n.Gather) @@ -276,9 +275,8 @@ func TestNSQStatsPreV1(t *testing.T) { })) defer ts.Close() - n := &NSQ{ - Endpoints: []string{ts.URL}, - } + n := New() + n.Endpoints = []string{ts.URL} var acc testutil.Accumulator err := acc.GatherError(n.Gather)