Skip to content

Commit

Permalink
Finally correctly configure TLS configuration for nsq input
Browse files Browse the repository at this point in the history
  • Loading branch information
Soulou committed Oct 16, 2018
1 parent aace5d8 commit 8b0f188
Showing 1 changed file with 25 additions and 61 deletions.
86 changes: 25 additions & 61 deletions plugins/inputs/nsq/nsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
package nsq

import (
"crypto/rand"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
Expand All @@ -36,30 +33,27 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)

// Might add Lookupd endpoints for cluster discovery
type NSQ struct {
Endpoints []string
TlsCert string
TlsKey string
TlsCacert string
httpClient *http.Client
httpClientOnce *sync.Once
tlsConfig *tls.Config
tlsConfigOnce *sync.Once
Endpoints []string
tls.ClientConfig
httpClient *http.Client
}

var sampleConfig = `
## An array of NSQD HTTP API endpoints
endpoints = ["http://localhost:4151"]
## Or using HTTPS endpoint
endpoints = ["https://localhost:4152"]
tls_cert = "/path/to/client-cert.pem"
tls_key = "/path/to/client-key.pem"
tls_cacert = "/path/to/ca.pem"
endpoints = ["https://localhost:4152"]
tls_cert = "/path/to/client-cert.pem"
tls_key = "/path/to/client-key.pem"
tls_cacert = "/path/to/ca.pem"
insecure_skip_verify = false
`

const (
Expand All @@ -73,10 +67,7 @@ func init() {
}

func New() *NSQ {
return &NSQ{
httpClientOnce: &sync.Once{},
tlsConfigOnce: &sync.Once{},
}
return &NSQ{}
}

func (n *NSQ) SampleConfig() string {
Expand All @@ -90,11 +81,11 @@ func (n *NSQ) Description() string {
func (n *NSQ) Gather(acc telegraf.Accumulator) error {
var err error

n.tlsConfigOnce.Do(func() {
n.tlsConfig, err = n.buildTLSConfig()
})
if err != nil {
return fmt.Errorf("fail to build tls config: %v", err)
if n.httpClient == nil {
n.httpClient, err = n.getHttpClient()
if err != nil {
return err
}
}

var wg sync.WaitGroup
Expand All @@ -110,54 +101,27 @@ func (n *NSQ) Gather(acc telegraf.Accumulator) error {
return nil
}

func (n *NSQ) buildTLSConfig() (*tls.Config, error) {
if n.TlsCert == "" || n.TlsKey == "" || n.TlsCacert == "" {
return nil, nil
}

caCertBytes, err := ioutil.ReadFile(n.TlsCacert)
func (n *NSQ) getHttpClient() (*http.Client, error) {
tlsConfig, err := n.ClientConfig.TLSConfig()
if err != nil {
return nil, fmt.Errorf("fail to read CA cert file %v: %v", n.TlsCacert, err)
return nil, err
}

cert, err := tls.LoadX509KeyPair(n.TlsCert, n.TlsKey)
if err != nil {
return nil, fmt.Errorf("fail to load certificate %v: %v", n.TlsCert, err)
tr := &http.Transport{
TLSClientConfig: tlsConfig,
}

pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caCertBytes)

config := &tls.Config{
Certificates: []tls.Certificate{cert},
ClientCAs: pool,
RootCAs: pool,
httpClient := &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}

config.Rand = rand.Reader
return config, nil
}

func (n *NSQ) getHttpClient() *http.Client {
n.httpClientOnce.Do(func() {
tr := &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
TLSClientConfig: n.tlsConfig,
}
n.httpClient = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
})
return n.httpClient
return httpClient, nil
}

func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error {
u, err := buildURL(e)
if err != nil {
return err
}
r, err := n.getHttpClient().Get(u.String())
r, err := n.httpClient.Get(u.String())
if err != nil {
return fmt.Errorf("Error while polling %s: %s", u.String(), err)
}
Expand Down

0 comments on commit 8b0f188

Please sign in to comment.