Skip to content

Commit

Permalink
Add support for NSQLookupd to nsq_consumer (#3215)
Browse files Browse the repository at this point in the history
  • Loading branch information
ljagiello authored and danielnelson committed Sep 25, 2017
1 parent 837e6b1 commit a4b8805
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 11 deletions.
2 changes: 1 addition & 1 deletion Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b
github.com/nats-io/go-nats ea9585611a4ab58a205b9b125ebd74c389a6b898
github.com/nats-io/nats ea9585611a4ab58a205b9b125ebd74c389a6b898
github.com/nats-io/nuid 289cccf02c178dc782430d534e3c1f5b72af807f
github.com/nsqio/go-nsq a53d495e81424aaf7a7665a9d32a97715c40e953
github.com/nsqio/go-nsq eee57a3ac4174c55924125bb15eeeda8cffb6e6f
github.com/opencontainers/runc 89ab7f2ccc1e45ddf6485eaa802c35dcf321dfc8
github.com/opentracing-contrib/go-observer a52f2342449246d5bcc273e65cbdcfa5f7d6c63c
github.com/opentracing/opentracing-go 06f47b42c792fef2796e9681353e1d908c417827
Expand Down
9 changes: 6 additions & 3 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2595,8 +2595,12 @@

# # Read NSQ topic for metrics.
# [[inputs.nsq_consumer]]
# ## An string representing the NSQD TCP Endpoint
# server = "localhost:4150"
# ## Server option still works but is deprecated, we just prepend it to the nsqd array.
# # server = "localhost:4150"
# ## An array representing the NSQD TCP HTTP Endpoints
# nsqd = ["localhost:4150"]
# ## An array representing the NSQLookupd HTTP Endpoints
# nsqlookupd = ["localhost:4161"]
# topic = "telegraf"
# channel = "consumer"
# max_in_flight = 100
Expand Down Expand Up @@ -2764,4 +2768,3 @@
# [[inputs.zipkin]]
# # path = "/api/v1/spans" # URL path for span data
# # port = 9411 # Port on which Telegraf listens

10 changes: 7 additions & 3 deletions plugins/inputs/nsq_consumer/README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
# NSQ Consumer Input Plugin

The [NSQ](http://nsq.io/) consumer plugin polls a specified NSQD
topic and adds messages to InfluxDB. This plugin allows a message to be in any of the supported `data_format` types.
topic and adds messages to InfluxDB. This plugin allows a message to be in any of the supported `data_format` types.

## Configuration

```toml
# Read metrics from NSQD topic(s)
[[inputs.nsq_consumer]]
## An array of NSQD HTTP API endpoints
server = "localhost:4150"
## Server option still works but is deprecated, we just prepend it to the nsqd array.
# server = "localhost:4150"
## An array representing the NSQD TCP HTTP Endpoints
nsqd = ["localhost:4150"]
## An array representing the NSQLookupd HTTP Endpoints
nsqlookupd = ["localhost:4161"]
topic = "telegraf"
channel = "consumer"
max_in_flight = 100
Expand Down
18 changes: 14 additions & 4 deletions plugins/inputs/nsq_consumer/nsq_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/nsqio/go-nsq"
nsq "github.com/nsqio/go-nsq"
)

//NSQConsumer represents the configuration of the plugin
type NSQConsumer struct {
Server string
Nsqd []string
Nsqlookupd []string
Topic string
Channel string
MaxInFlight int
Expand All @@ -21,8 +23,12 @@ type NSQConsumer struct {
}

var sampleConfig = `
## An string representing the NSQD TCP Endpoint
server = "localhost:4150"
## Server option still works but is deprecated, we just prepend it to the nsqd array.
# server = "localhost:4150"
## An array representing the NSQD TCP HTTP Endpoints
nsqd = ["localhost:4150"]
## An array representing the NSQLookupd HTTP Endpoints
nsqlookupd = ["localhost:4161"]
topic = "telegraf"
channel = "consumer"
max_in_flight = 100
Expand Down Expand Up @@ -71,7 +77,11 @@ func (n *NSQConsumer) Start(acc telegraf.Accumulator) error {
message.Finish()
return nil
}), n.MaxInFlight)
n.consumer.ConnectToNSQD(n.Server)

if len(n.Nsqlookupd) > 0 {
n.consumer.ConnectToNSQLookupds(n.Nsqlookupd)
}
n.consumer.ConnectToNSQDs(append(n.Nsqd, n.Server))
return nil
}

Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/nsq_consumer/nsq_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestReadsMetricsFromNSQ(t *testing.T) {
Topic: "telegraf",
Channel: "consume",
MaxInFlight: 1,
Nsqd: []string{"127.0.0.1:4155"},
}

p, _ := parsers.NewInfluxParser()
Expand Down

0 comments on commit a4b8805

Please sign in to comment.