Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for TLS configuration in NSQ input to reach HTTPS nsqd endpoints #3903

Merged
merged 4 commits into from
Oct 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 39 additions & 10 deletions plugins/inputs/nsq/nsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,27 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)

// Might add Lookupd endpoints for cluster discovery
type NSQ struct {
Endpoints []string
tls.ClientConfig
httpClient *http.Client
}

var sampleConfig = `
## An array of NSQD HTTP API endpoints
endpoints = ["http://localhost:4151"]
endpoints = ["http://localhost:4151"]

## Or using HTTPS endpoint
endpoints = ["https://localhost:4152"]
tls_cert = "/path/to/client-cert.pem"
tls_key = "/path/to/client-key.pem"
tls_ca = "/path/to/ca.pem"
insecure_skip_verify = false
`

const (
Expand All @@ -52,10 +62,14 @@ const (

func init() {
inputs.Add("nsq", func() telegraf.Input {
return &NSQ{}
return New()
})
}

func New() *NSQ {
return &NSQ{}
}

func (n *NSQ) SampleConfig() string {
return sampleConfig
}
Expand All @@ -65,6 +79,15 @@ func (n *NSQ) Description() string {
}

func (n *NSQ) Gather(acc telegraf.Accumulator) error {
var err error

if n.httpClient == nil {
n.httpClient, err = n.getHttpClient()
if err != nil {
return err
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is going to act a little strange if the tls.Config isn't created succesfully, because the second time Gather is called it will not return an error here. You may want to look at the nginx input for a good example.


var wg sync.WaitGroup
for _, e := range n.Endpoints {
wg.Add(1)
Expand All @@ -78,21 +101,27 @@ func (n *NSQ) Gather(acc telegraf.Accumulator) error {
return nil
}

var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}

var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
func (n *NSQ) getHttpClient() (*http.Client, error) {
tlsConfig, err := n.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
tr := &http.Transport{
TLSClientConfig: tlsConfig,
}
httpClient := &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
return httpClient, nil
}

func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error {
u, err := buildURL(e)
if err != nil {
return err
}
r, err := client.Get(u.String())
r, err := n.httpClient.Get(u.String())
if err != nil {
return fmt.Errorf("Error while polling %s: %s", u.String(), err)
}
Expand Down
10 changes: 4 additions & 6 deletions plugins/inputs/nsq/nsq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ func TestNSQStatsV1(t *testing.T) {
}))
defer ts.Close()

n := &NSQ{
Endpoints: []string{ts.URL},
}
n := New()
n.Endpoints = []string{ts.URL}

var acc testutil.Accumulator
err := acc.GatherError(n.Gather)
Expand Down Expand Up @@ -276,9 +275,8 @@ func TestNSQStatsPreV1(t *testing.T) {
}))
defer ts.Close()

n := &NSQ{
Endpoints: []string{ts.URL},
}
n := New()
n.Endpoints = []string{ts.URL}

var acc testutil.Accumulator
err := acc.GatherError(n.Gather)
Expand Down