From d669526bec99e2e660c7a27f3a5f6fec09e706c5 Mon Sep 17 00:00:00 2001 From: Lukasz Jagiello Date: Sun, 10 Sep 2017 11:11:19 -0700 Subject: [PATCH 1/2] NSQLookupd support for nsq_consumer This PR adds support for NSQLookupd in `nsq_consumer`. Why: Existing solution where `nsq_consumer` reads only from a single NSQD server limits ability to take advantages from distributed nature of NSQ. NSQ godocs suggest to use `nsqlookupd` as a prefered method for connection (https://godoc.org/github.com/nsqio/go-nsq#Consumer.ConnectToNSQD): ``` It is recommended to use ConnectToNSQLookupd so that topics are discovered automatically. This method is useful when you want to connect to a single, local, instance. ``` `nsqlookupd` is the daemon that manages topology information. Clients query nsqlookupd to discover nsqd producers for a specific topic and nsqd nodes broadcasts topic and channel information. With `nsqLookupd` support `nsq_consumer` will detect all avaiable nsqd producers and connects to all. --- Godeps | 2 +- etc/telegraf.conf | 7 ++++--- plugins/inputs/nsq_consumer/README.md | 8 +++++--- plugins/inputs/nsq_consumer/nsq_consumer.go | 17 ++++++++++++----- .../inputs/nsq_consumer/nsq_consumer_test.go | 3 ++- 5 files changed, 24 insertions(+), 13 deletions(-) diff --git a/Godeps b/Godeps index 48f9138e8a874..74c07d7717475 100644 --- a/Godeps +++ b/Godeps @@ -44,7 +44,7 @@ github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b github.com/nats-io/go-nats ea9585611a4ab58a205b9b125ebd74c389a6b898 github.com/nats-io/nats ea9585611a4ab58a205b9b125ebd74c389a6b898 github.com/nats-io/nuid 289cccf02c178dc782430d534e3c1f5b72af807f -github.com/nsqio/go-nsq a53d495e81424aaf7a7665a9d32a97715c40e953 +github.com/nsqio/go-nsq eee57a3ac4174c55924125bb15eeeda8cffb6e6f github.com/opencontainers/runc 89ab7f2ccc1e45ddf6485eaa802c35dcf321dfc8 github.com/opentracing-contrib/go-observer a52f2342449246d5bcc273e65cbdcfa5f7d6c63c github.com/opentracing/opentracing-go 06f47b42c792fef2796e9681353e1d908c417827 diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 4831f934b3688..632a5102370ab 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -2597,11 +2597,13 @@ # # Read NSQ topic for metrics. # [[inputs.nsq_consumer]] -# ## An string representing the NSQD TCP Endpoint -# server = "localhost:4150" +# ## An string representing the NSQD TCP/NSQLookupd HTTP Endpoints +# server = ["localhost:4150"] # topic = "telegraf" # channel = "consumer" # max_in_flight = 100 +# ## If nsqlookupd = true, servers are NSQLookupd HTTP API endpoints +# nsqlookupd = false # # ## Data format to consume. # ## Each data format has its own unique set of configuration options, read @@ -2766,4 +2768,3 @@ # [[inputs.zipkin]] # # path = "/api/v1/spans" # URL path for span data # # port = 9411 # Port on which Telegraf listens - diff --git a/plugins/inputs/nsq_consumer/README.md b/plugins/inputs/nsq_consumer/README.md index d207d8de15b28..a3f4a8cf979ad 100644 --- a/plugins/inputs/nsq_consumer/README.md +++ b/plugins/inputs/nsq_consumer/README.md @@ -1,18 +1,20 @@ # NSQ Consumer Input Plugin The [NSQ](http://nsq.io/) consumer plugin polls a specified NSQD -topic and adds messages to InfluxDB. This plugin allows a message to be in any of the supported `data_format` types. +topic and adds messages to InfluxDB. This plugin allows a message to be in any of the supported `data_format` types. ## Configuration ```toml # Read metrics from NSQD topic(s) [[inputs.nsq_consumer]] - ## An array of NSQD HTTP API endpoints - server = "localhost:4150" + ## An string representing the NSQD TCP/NSQLookupd HTTP Endpoints + server = ["localhost:4150"] topic = "telegraf" channel = "consumer" max_in_flight = 100 + ## If nsqlookupd = true, servers are NSQLookupd HTTP API endpoints + nsqlookupd = false ## Data format to consume. ## Each data format has its own unique set of configuration options, read diff --git a/plugins/inputs/nsq_consumer/nsq_consumer.go b/plugins/inputs/nsq_consumer/nsq_consumer.go index b93c4c68ec340..495fdcc5c8e08 100644 --- a/plugins/inputs/nsq_consumer/nsq_consumer.go +++ b/plugins/inputs/nsq_consumer/nsq_consumer.go @@ -6,26 +6,29 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" - "github.com/nsqio/go-nsq" + nsq "github.com/nsqio/go-nsq" ) //NSQConsumer represents the configuration of the plugin type NSQConsumer struct { - Server string + Server []string Topic string Channel string MaxInFlight int + Nsqlookupd bool parser parsers.Parser consumer *nsq.Consumer acc telegraf.Accumulator } var sampleConfig = ` - ## An string representing the NSQD TCP Endpoint - server = "localhost:4150" + ## An string representing the NSQD TCP/NSQLookupd HTTP Endpoints + server = ["localhost:4150"] topic = "telegraf" channel = "consumer" max_in_flight = 100 + ## If nsqlookupd = true, servers are NSQLookupd HTTP API endpoints + nsqlookupd = false ## Data format to consume. ## Each data format has its own unique set of configuration options, read @@ -71,7 +74,11 @@ func (n *NSQConsumer) Start(acc telegraf.Accumulator) error { message.Finish() return nil }), n.MaxInFlight) - n.consumer.ConnectToNSQD(n.Server) + if n.Nsqlookupd { + n.consumer.ConnectToNSQLookupds(n.Server) + } else { + n.consumer.ConnectToNSQDs(n.Server) + } return nil } diff --git a/plugins/inputs/nsq_consumer/nsq_consumer_test.go b/plugins/inputs/nsq_consumer/nsq_consumer_test.go index 9342e13acfd7e..7676a0b74dbb4 100644 --- a/plugins/inputs/nsq_consumer/nsq_consumer_test.go +++ b/plugins/inputs/nsq_consumer/nsq_consumer_test.go @@ -36,10 +36,11 @@ func TestReadsMetricsFromNSQ(t *testing.T) { newMockNSQD(script, addr.String()) consumer := &NSQConsumer{ - Server: "127.0.0.1:4155", + Server: []string{"127.0.0.1:4155"}, Topic: "telegraf", Channel: "consume", MaxInFlight: 1, + Nsqlookupd: false, } p, _ := parsers.NewInfluxParser() From dd640e3b1079db9c16fa0739c3ee190ead9eca19 Mon Sep 17 00:00:00 2001 From: Lukasz Jagiello Date: Thu, 21 Sep 2017 18:34:09 -0700 Subject: [PATCH 2/2] Keep legacy field still working + multiple ways to connect --- etc/telegraf.conf | 10 ++++---- plugins/inputs/nsq_consumer/README.md | 10 ++++---- plugins/inputs/nsq_consumer/nsq_consumer.go | 23 +++++++++++-------- .../inputs/nsq_consumer/nsq_consumer_test.go | 4 ++-- 4 files changed, 27 insertions(+), 20 deletions(-) diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 632a5102370ab..dbd5d971a55d1 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -2597,13 +2597,15 @@ # # Read NSQ topic for metrics. # [[inputs.nsq_consumer]] -# ## An string representing the NSQD TCP/NSQLookupd HTTP Endpoints -# server = ["localhost:4150"] +# ## Server option still works but is deprecated, we just prepend it to the nsqd array. +# # server = "localhost:4150" +# ## An array representing the NSQD TCP HTTP Endpoints +# nsqd = ["localhost:4150"] +# ## An array representing the NSQLookupd HTTP Endpoints +# nsqlookupd = ["localhost:4161"] # topic = "telegraf" # channel = "consumer" # max_in_flight = 100 -# ## If nsqlookupd = true, servers are NSQLookupd HTTP API endpoints -# nsqlookupd = false # # ## Data format to consume. # ## Each data format has its own unique set of configuration options, read diff --git a/plugins/inputs/nsq_consumer/README.md b/plugins/inputs/nsq_consumer/README.md index a3f4a8cf979ad..5ac156eeccd98 100644 --- a/plugins/inputs/nsq_consumer/README.md +++ b/plugins/inputs/nsq_consumer/README.md @@ -8,13 +8,15 @@ topic and adds messages to InfluxDB. This plugin allows a message to be in any o ```toml # Read metrics from NSQD topic(s) [[inputs.nsq_consumer]] - ## An string representing the NSQD TCP/NSQLookupd HTTP Endpoints - server = ["localhost:4150"] + ## Server option still works but is deprecated, we just prepend it to the nsqd array. + # server = "localhost:4150" + ## An array representing the NSQD TCP HTTP Endpoints + nsqd = ["localhost:4150"] + ## An array representing the NSQLookupd HTTP Endpoints + nsqlookupd = ["localhost:4161"] topic = "telegraf" channel = "consumer" max_in_flight = 100 - ## If nsqlookupd = true, servers are NSQLookupd HTTP API endpoints - nsqlookupd = false ## Data format to consume. ## Each data format has its own unique set of configuration options, read diff --git a/plugins/inputs/nsq_consumer/nsq_consumer.go b/plugins/inputs/nsq_consumer/nsq_consumer.go index 495fdcc5c8e08..0823b3ac9c390 100644 --- a/plugins/inputs/nsq_consumer/nsq_consumer.go +++ b/plugins/inputs/nsq_consumer/nsq_consumer.go @@ -11,24 +11,27 @@ import ( //NSQConsumer represents the configuration of the plugin type NSQConsumer struct { - Server []string + Server string + Nsqd []string + Nsqlookupd []string Topic string Channel string MaxInFlight int - Nsqlookupd bool parser parsers.Parser consumer *nsq.Consumer acc telegraf.Accumulator } var sampleConfig = ` - ## An string representing the NSQD TCP/NSQLookupd HTTP Endpoints - server = ["localhost:4150"] + ## Server option still works but is deprecated, we just prepend it to the nsqd array. + # server = "localhost:4150" + ## An array representing the NSQD TCP HTTP Endpoints + nsqd = ["localhost:4150"] + ## An array representing the NSQLookupd HTTP Endpoints + nsqlookupd = ["localhost:4161"] topic = "telegraf" channel = "consumer" max_in_flight = 100 - ## If nsqlookupd = true, servers are NSQLookupd HTTP API endpoints - nsqlookupd = false ## Data format to consume. ## Each data format has its own unique set of configuration options, read @@ -74,11 +77,11 @@ func (n *NSQConsumer) Start(acc telegraf.Accumulator) error { message.Finish() return nil }), n.MaxInFlight) - if n.Nsqlookupd { - n.consumer.ConnectToNSQLookupds(n.Server) - } else { - n.consumer.ConnectToNSQDs(n.Server) + + if len(n.Nsqlookupd) > 0 { + n.consumer.ConnectToNSQLookupds(n.Nsqlookupd) } + n.consumer.ConnectToNSQDs(append(n.Nsqd, n.Server)) return nil } diff --git a/plugins/inputs/nsq_consumer/nsq_consumer_test.go b/plugins/inputs/nsq_consumer/nsq_consumer_test.go index 7676a0b74dbb4..a6d8c27e58c5b 100644 --- a/plugins/inputs/nsq_consumer/nsq_consumer_test.go +++ b/plugins/inputs/nsq_consumer/nsq_consumer_test.go @@ -36,11 +36,11 @@ func TestReadsMetricsFromNSQ(t *testing.T) { newMockNSQD(script, addr.String()) consumer := &NSQConsumer{ - Server: []string{"127.0.0.1:4155"}, + Server: "127.0.0.1:4155", Topic: "telegraf", Channel: "consume", MaxInFlight: 1, - Nsqlookupd: false, + Nsqd: []string{"127.0.0.1:4155"}, } p, _ := parsers.NewInfluxParser()