Skip to content

Commit

Permalink
Add Accumulator to the ServiceInput Start() function
Browse files Browse the repository at this point in the history
closes #666
  • Loading branch information
sparrc committed Feb 16, 2016
1 parent 7f539c9 commit 76a4590
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 38 deletions.
7 changes: 5 additions & 2 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration)

// channel shared between all input threads for accumulating points
metricC := make(chan telegraf.Metric, 1000)
metricC := make(chan telegraf.Metric, 10000)

// Round collection to nearest interval by sleeping
if a.Config.Agent.RoundInterval {
Expand All @@ -342,7 +342,10 @@ func (a *Agent) Run(shutdown chan struct{}) error {
// Start service of any ServicePlugins
switch p := input.Input.(type) {
case telegraf.ServiceInput:
if err := p.Start(); err != nil {
acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(a.Config.Agent.Debug)
acc.setDefaultTags(a.Config.Tags)
if err := p.Start(acc); err != nil {
log.Printf("Service for input %s failed to start, exiting\n%s\n",
input.Name, err.Error())
return err
Expand Down
2 changes: 1 addition & 1 deletion input.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type ServiceInput interface {
Gather(Accumulator) error

// Start starts the ServiceInput's service, whatever that may be
Start() error
Start(Accumulator) error

// Stop stops the services and closes any necessary channels and connections
Stop()
Expand Down
12 changes: 10 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type AgentConfig struct {
// same time, which can have a measurable effect on the system.
CollectionJitter internal.Duration

// Interval at which to flush data
// FlushInterval is the Interval at which to flush data
FlushInterval internal.Duration

// FlushJitter Jitters the flush interval by a random amount.
Expand All @@ -82,6 +82,11 @@ type AgentConfig struct {
// full, the oldest metrics will be overwritten.
MetricBufferLimit int

// FlushBufferWhenFull tells Telegraf to flush the metric buffer whenever
// it fills up, regardless of FlushInterval. Setting this option to true
// does _not_ deactivate FlushInterval.
FlushBufferWhenFull bool

// TODO(cam): Remove UTC and Precision parameters, they are no longer
// valid for the agent config. Leaving them here for now for backwards-
// compatability
Expand Down Expand Up @@ -157,6 +162,8 @@ var header = `##################################################################
### Telegraf will cache metric_buffer_limit metrics for each output, and will
### flush this buffer on a successful write.
metric_buffer_limit = 10000
### Flush the buffer whenever full, regardless of flush_interval
flush_buffer_when_full = true
### Collection jitter is used to jitter the collection by a random amount.
### Each plugin will sleep for a random time within jitter before collecting.
Expand Down Expand Up @@ -421,8 +428,9 @@ func (c *Config) addOutput(name string, table *ast.Table) error {

ro := internal_models.NewRunningOutput(name, output, outputConfig)
if c.Agent.MetricBufferLimit > 0 {
ro.PointBufferLimit = c.Agent.MetricBufferLimit
ro.MetricBufferLimit = c.Agent.MetricBufferLimit
}
ro.FlushBufferWhenFull = c.Agent.FlushBufferWhenFull
ro.Quiet = c.Agent.Quiet
c.Outputs = append(c.Outputs, ro)
return nil
Expand Down
94 changes: 73 additions & 21 deletions internal/models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,33 @@ package internal_models

import (
"log"
"sync"
"time"

"github.com/influxdata/telegraf"
)

const DEFAULT_POINT_BUFFER_LIMIT = 10000
const (
// Default number of metrics kept between flushes.
DEFAULT_METRIC_BUFFER_LIMIT = 10000

// Limit how many full metric buffers are kept due to failed writes.
FULL_METRIC_BUFFERS_LIMIT = 100
)

type RunningOutput struct {
Name string
Output telegraf.Output
Config *OutputConfig
Quiet bool
PointBufferLimit int
Name string
Output telegraf.Output
Config *OutputConfig
Quiet bool
MetricBufferLimit int
FlushBufferWhenFull bool

metrics []telegraf.Metric
tmpmetrics chan []telegraf.Metric
overwriteCounter int

sync.Mutex
}

func NewRunningOutput(
Expand All @@ -26,11 +37,12 @@ func NewRunningOutput(
conf *OutputConfig,
) *RunningOutput {
ro := &RunningOutput{
Name: name,
metrics: make([]telegraf.Metric, 0),
Output: output,
Config: conf,
PointBufferLimit: DEFAULT_POINT_BUFFER_LIMIT,
Name: name,
metrics: make([]telegraf.Metric, 0),
tmpmetrics: make(chan []telegraf.Metric, FULL_METRIC_BUFFERS_LIMIT),
Output: output,
Config: conf,
MetricBufferLimit: DEFAULT_METRIC_BUFFER_LIMIT,
}
return ro
}
Expand All @@ -42,28 +54,68 @@ func (ro *RunningOutput) AddPoint(point telegraf.Metric) {
}
}

if len(ro.metrics) < ro.PointBufferLimit {
if len(ro.metrics) < ro.MetricBufferLimit {
ro.Lock()
ro.metrics = append(ro.metrics, point)
ro.Unlock()
} else {
log.Printf("WARNING: overwriting cached metrics, you may want to " +
"increase the metric_buffer_limit setting in your [agent] config " +
"if you do not wish to overwrite metrics.\n")
if ro.overwriteCounter == len(ro.metrics) {
ro.overwriteCounter = 0
if ro.FlushBufferWhenFull {
ro.Lock()
tmpmetrics := make([]telegraf.Metric, len(ro.metrics))
copy(tmpmetrics, ro.metrics)
ro.metrics = make([]telegraf.Metric, 0)
ro.Unlock()
err := ro.write(tmpmetrics)
if err != nil {
log.Printf("ERROR writing full metric buffer to output %s, %s",
ro.Name, err)
if len(ro.tmpmetrics) == FULL_METRIC_BUFFERS_LIMIT {
// drop one on the floor...
<-ro.tmpmetrics
}
ro.tmpmetrics <- tmpmetrics
}
} else {
log.Printf("WARNING: overwriting cached metrics, you may want to " +
"increase the metric_buffer_limit setting in your [agent] " +
"config if you do not wish to overwrite metrics.\n")
if ro.overwriteCounter == len(ro.metrics) {
ro.overwriteCounter = 0
}
ro.metrics[ro.overwriteCounter] = point
ro.overwriteCounter++
}
ro.metrics[ro.overwriteCounter] = point
ro.overwriteCounter++
}
}

func (ro *RunningOutput) Write() error {
ro.Lock()
err := ro.write(ro.metrics)
ro.Unlock()
if err != nil {
return err
}

// Write any cached metric buffers that failed previously
ntmps := len(ro.tmpmetrics)
for i := 0; i < ntmps; i++ {
tmpmetrics := <-ro.tmpmetrics
if err := ro.write(tmpmetrics); err != nil {
return err
}
}

return nil
}

func (ro *RunningOutput) write(metrics []telegraf.Metric) error {
start := time.Now()
err := ro.Output.Write(ro.metrics)
err := ro.Output.Write(metrics)
elapsed := time.Since(start)
if err == nil {
if !ro.Quiet {
log.Printf("Wrote %d metrics to output %s in %s\n",
len(ro.metrics), ro.Name, elapsed)
len(metrics), ro.Name, elapsed)
}
ro.metrics = make([]telegraf.Metric, 0)
ro.overwriteCounter = 0
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/github_webhooks/github_webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (gh *GithubWebhooks) Listen() {
}
}

func (gh *GithubWebhooks) Start() error {
func (gh *GithubWebhooks) Start(_ telegraf.Accumulator) error {
go gh.Listen()
log.Printf("Started the github_webhooks service on %s\n", gh.ServiceAddress)
return nil
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (k *Kafka) SetParser(parser parsers.Parser) {
k.parser = parser
}

func (k *Kafka) Start() error {
func (k *Kafka) Start(_ telegraf.Accumulator) error {
k.Lock()
defer k.Unlock()
var consumerErr error
Expand Down
12 changes: 6 additions & 6 deletions plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,19 @@ func TestReadsMetricsFromKafka(t *testing.T) {
}
p, _ := parsers.NewInfluxParser()
k.SetParser(p)
if err := k.Start(); err != nil {

// Verify that we can now gather the sent message
var acc testutil.Accumulator
// Sanity check
assert.Equal(t, 0, len(acc.Metrics), "There should not be any points")
if err := k.Start(&acc); err != nil {
t.Fatal(err.Error())
} else {
defer k.Stop()
}

waitForPoint(k, t)

// Verify that we can now gather the sent message
var acc testutil.Accumulator
// Sanity check
assert.Equal(t, 0, len(acc.Metrics), "There should not be any points")

// Gather points
err = k.Gather(&acc)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/mqtt_consumer/mqtt_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (m *MQTTConsumer) SetParser(parser parsers.Parser) {
m.parser = parser
}

func (m *MQTTConsumer) Start() error {
func (m *MQTTConsumer) Start(_ telegraf.Accumulator) error {
m.Lock()
defer m.Unlock()
if m.QoS > 2 || m.QoS < 0 {
Expand Down
4 changes: 2 additions & 2 deletions plugins/inputs/nats_consumer/nats_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var sampleConfig = `
queue_group = "telegraf_consumers"
### Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
Expand Down Expand Up @@ -84,7 +84,7 @@ func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e erro
}

// Start the nats consumer. Caller must call *natsConsumer.Stop() to clean up.
func (n *natsConsumer) Start() error {
func (n *natsConsumer) Start(_ telegraf.Accumulator) error {
n.Lock()
defer n.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
return nil
}

func (s *Statsd) Start() error {
func (s *Statsd) Start(_ telegraf.Accumulator) error {
// Make data structures
s.done = make(chan struct{})
s.in = make(chan []byte, s.AllowedPendingMessages)
Expand Down

0 comments on commit 76a4590

Please sign in to comment.