Skip to content

Commit

Permalink
Add NATS consumer input plugin.
Browse files Browse the repository at this point in the history
  • Loading branch information
netixen authored and sparrc committed Feb 12, 2016
1 parent d003ca4 commit 512d982
Show file tree
Hide file tree
Showing 5 changed files with 394 additions and 0 deletions.
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ github.com/matttproud/golang_protobuf_extensions d0c3fe89de86839aecf2e0579c40ba3
github.com/mreiferson/go-snappystream 028eae7ab5c4c9e2d1cb4c4ca1e53259bbe7e504
github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b
github.com/naoina/toml 751171607256bb66e64c9f0220c00662420c38e9
github.com/nats-io/nats 6a83f1a633cfbfd90aa648ac99fb38c06a8b40df
github.com/nsqio/go-nsq 2118015c120962edc5d03325c680daf3163a8b5f
github.com/pborman/uuid dee7705ef7b324f27ceb85a121c61f2c2e8ce988
github.com/pmezard/go-difflib 792786c7400a136282c1664665ae0a8db921c6c2
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/memcached"
_ "github.com/influxdata/telegraf/plugins/inputs/mongodb"
_ "github.com/influxdata/telegraf/plugins/inputs/mysql"
_ "github.com/influxdata/telegraf/plugins/inputs/nats_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx"
_ "github.com/influxdata/telegraf/plugins/inputs/nsq"
_ "github.com/influxdata/telegraf/plugins/inputs/passenger"
Expand Down
38 changes: 38 additions & 0 deletions plugins/inputs/nats_consumer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# NATS Consumer

The [NATS](http://www.nats.io/about/) consumer plugin reads from
specified NATS subjects and adds messages to InfluxDB. The plugin expects messages
in the [Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md).
A [Queue Group](http://www.nats.io/documentation/concepts/nats-queueing/)
is used when subscribing to subjects so multiple instances of telegraf can read
from a NATS cluster in parallel.

## Configuration
```
# Read metrics from NATS subject(s)
[[inputs.nats_consumer]]
### urls of NATS servers
servers = ["nats://localhost:4222"]
### Use Transport Layer Security
secure = false
### subject(s) to consume
subjects = ["telegraf"]
### name a queue group
queue_group = "telegraf_consumers"
### Maximum number of points to buffer between collection intervals
point_buffer = 100000
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
data_format = "influx"
```

## Testing

To run tests:

```
go test
```
202 changes: 202 additions & 0 deletions plugins/inputs/nats_consumer/nats_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package natsconsumer

import (
"fmt"
"log"
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/nats-io/nats"
)

type natsError struct {
conn *nats.Conn
sub *nats.Subscription
err error
}

func (e natsError) Error() string {
return fmt.Sprintf("%s url:%s id:%s sub:%s queue:%s",
e.err.Error(), e.conn.ConnectedUrl(), e.conn.ConnectedServerId(), e.sub.Subject, e.sub.Queue)
}

type natsConsumer struct {
QueueGroup string
Subjects []string
Servers []string
Secure bool

PointBuffer int
parser parsers.Parser

sync.Mutex
Conn *nats.Conn
Subs []*nats.Subscription

// channel for all incoming NATS messages
in chan *nats.Msg
// channel for all NATS read errors
errs chan error
// channel for all incoming parsed points
metricC chan telegraf.Metric
done chan struct{}
}

var sampleConfig = `
### urls of NATS servers
servers = ["nats://localhost:4222"]
### Use Transport Layer Security
secure = false
### subject(s) to consume
subjects = ["telegraf"]
### name a queue group
queue_group = "telegraf_consumers"
### Maximum number of points to buffer between collection intervals
point_buffer = 100000
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
data_format = "influx"
`

func (n *natsConsumer) SampleConfig() string {
return sampleConfig
}

func (n *natsConsumer) Description() string {
return "Read metrics from NATS subject(s)"
}

func (n *natsConsumer) SetParser(parser parsers.Parser) {
n.parser = parser
}

func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e error) {
select {
case n.errs <- natsError{conn: c, sub: s, err: e}:
default:
return
}
}

// Start the nats consumer. Caller must call *natsConsumer.Stop() to clean up.
func (n *natsConsumer) Start() error {
n.Lock()
defer n.Unlock()

var connectErr error

opts := nats.DefaultOptions
opts.Servers = n.Servers
opts.Secure = n.Secure

if n.Conn == nil || n.Conn.IsClosed() {
n.Conn, connectErr = opts.Connect()
if connectErr != nil {
return connectErr
}

// Setup message and error channels
n.errs = make(chan error)
n.Conn.SetErrorHandler(n.natsErrHandler)

n.in = make(chan *nats.Msg)
for _, subj := range n.Subjects {
sub, err := n.Conn.ChanQueueSubscribe(subj, n.QueueGroup, n.in)
if err != nil {
return err
}
n.Subs = append(n.Subs, sub)
}
}

n.done = make(chan struct{})
if n.PointBuffer == 0 {
n.PointBuffer = 100000
}

n.metricC = make(chan telegraf.Metric, n.PointBuffer)

// Start the message reader
go n.receiver()
log.Printf("Started the NATS consumer service, nats: %v, subjects: %v, queue: %v\n",
n.Conn.ConnectedUrl(), n.Subjects, n.QueueGroup)

return nil
}

// receiver() reads all incoming messages from NATS, and parses them into
// influxdb metric points.
func (n *natsConsumer) receiver() {
defer n.clean()
for {
select {
case <-n.done:
return
case err := <-n.errs:
log.Printf("error reading from %s\n", err.Error())
case msg := <-n.in:
metrics, err := n.parser.Parse(msg.Data)
if err != nil {
log.Printf("subject: %s, error: %s", msg.Subject, err.Error())
}

for _, metric := range metrics {
select {
case n.metricC <- metric:
continue
default:
log.Printf("NATS Consumer buffer is full, dropping a metric." +
" You may want to increase the point_buffer setting")
}
}

}
}
}

func (n *natsConsumer) clean() {
n.Lock()
defer n.Unlock()
close(n.in)
close(n.metricC)
close(n.errs)

for _, sub := range n.Subs {
if err := sub.Unsubscribe(); err != nil {
log.Printf("Error unsubscribing from subject %s in queue %s: %s\n",
sub.Subject, sub.Queue, err.Error())
}
}

if n.Conn != nil && !n.Conn.IsClosed() {
n.Conn.Close()
}
}

func (n *natsConsumer) Stop() {
n.Lock()
close(n.done)
n.Unlock()
}

func (n *natsConsumer) Gather(acc telegraf.Accumulator) error {
n.Lock()
defer n.Unlock()
npoints := len(n.metricC)
for i := 0; i < npoints; i++ {
point := <-n.metricC
acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time())
}
return nil
}

func init() {
inputs.Add("nats_consumer", func() telegraf.Input {
return &natsConsumer{}
})
}
Loading

0 comments on commit 512d982

Please sign in to comment.