Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NSQLookupd support for nsq_consumer #3215

Merged
merged 2 commits into from
Sep 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b
github.com/nats-io/go-nats ea9585611a4ab58a205b9b125ebd74c389a6b898
github.com/nats-io/nats ea9585611a4ab58a205b9b125ebd74c389a6b898
github.com/nats-io/nuid 289cccf02c178dc782430d534e3c1f5b72af807f
github.com/nsqio/go-nsq a53d495e81424aaf7a7665a9d32a97715c40e953
github.com/nsqio/go-nsq eee57a3ac4174c55924125bb15eeeda8cffb6e6f
github.com/opencontainers/runc 89ab7f2ccc1e45ddf6485eaa802c35dcf321dfc8
github.com/opentracing-contrib/go-observer a52f2342449246d5bcc273e65cbdcfa5f7d6c63c
github.com/opentracing/opentracing-go 06f47b42c792fef2796e9681353e1d908c417827
Expand Down
9 changes: 6 additions & 3 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2597,8 +2597,12 @@

# # Read NSQ topic for metrics.
# [[inputs.nsq_consumer]]
# ## An string representing the NSQD TCP Endpoint
# server = "localhost:4150"
# ## Server option still works but is deprecated, we just prepend it to the nsqd array.
# # server = "localhost:4150"
# ## An array representing the NSQD TCP HTTP Endpoints
# nsqd = ["localhost:4150"]
# ## An array representing the NSQLookupd HTTP Endpoints
# nsqlookupd = ["localhost:4161"]
# topic = "telegraf"
# channel = "consumer"
# max_in_flight = 100
Expand Down Expand Up @@ -2766,4 +2770,3 @@
# [[inputs.zipkin]]
# # path = "/api/v1/spans" # URL path for span data
# # port = 9411 # Port on which Telegraf listens

10 changes: 7 additions & 3 deletions plugins/inputs/nsq_consumer/README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
# NSQ Consumer Input Plugin

The [NSQ](http://nsq.io/) consumer plugin polls a specified NSQD
topic and adds messages to InfluxDB. This plugin allows a message to be in any of the supported `data_format` types.
topic and adds messages to InfluxDB. This plugin allows a message to be in any of the supported `data_format` types.

## Configuration

```toml
# Read metrics from NSQD topic(s)
[[inputs.nsq_consumer]]
## An array of NSQD HTTP API endpoints
server = "localhost:4150"
## Server option still works but is deprecated, we just prepend it to the nsqd array.
# server = "localhost:4150"
## An array representing the NSQD TCP HTTP Endpoints
nsqd = ["localhost:4150"]
## An array representing the NSQLookupd HTTP Endpoints
nsqlookupd = ["localhost:4161"]
topic = "telegraf"
channel = "consumer"
max_in_flight = 100
Expand Down
18 changes: 14 additions & 4 deletions plugins/inputs/nsq_consumer/nsq_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/nsqio/go-nsq"
nsq "github.com/nsqio/go-nsq"
)

//NSQConsumer represents the configuration of the plugin
type NSQConsumer struct {
Server string
Nsqd []string
Nsqlookupd []string
Topic string
Channel string
MaxInFlight int
Expand All @@ -21,8 +23,12 @@ type NSQConsumer struct {
}

var sampleConfig = `
## An string representing the NSQD TCP Endpoint
server = "localhost:4150"
## Server option still works but is deprecated, we just prepend it to the nsqd array.
# server = "localhost:4150"
## An array representing the NSQD TCP HTTP Endpoints
nsqd = ["localhost:4150"]
## An array representing the NSQLookupd HTTP Endpoints
nsqlookupd = ["localhost:4161"]
topic = "telegraf"
channel = "consumer"
max_in_flight = 100
Expand Down Expand Up @@ -71,7 +77,11 @@ func (n *NSQConsumer) Start(acc telegraf.Accumulator) error {
message.Finish()
return nil
}), n.MaxInFlight)
n.consumer.ConnectToNSQD(n.Server)

if len(n.Nsqlookupd) > 0 {
n.consumer.ConnectToNSQLookupds(n.Nsqlookupd)
}
n.consumer.ConnectToNSQDs(append(n.Nsqd, n.Server))
return nil
}

Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/nsq_consumer/nsq_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestReadsMetricsFromNSQ(t *testing.T) {
Topic: "telegraf",
Channel: "consume",
MaxInFlight: 1,
Nsqd: []string{"127.0.0.1:4155"},
}

p, _ := parsers.NewInfluxParser()
Expand Down