From cad036d1cbee8c074dca400dc82950e53f252531 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 15 Feb 2016 17:21:38 -0700 Subject: [PATCH] Add `Accumulator` to the ServiceInput Start() function closes #666 --- agent/agent.go | 20 +-- input.go | 2 +- internal/config/config.go | 12 +- internal/models/running_output.go | 119 +++++++++++++----- .../inputs/github_webhooks/github_webhooks.go | 2 +- .../inputs/kafka_consumer/kafka_consumer.go | 7 +- .../kafka_consumer_integration_test.go | 12 +- plugins/inputs/mqtt_consumer/mqtt_consumer.go | 52 +++----- plugins/inputs/nats_consumer/nats_consumer.go | 4 +- plugins/inputs/statsd/statsd.go | 2 +- 10 files changed, 145 insertions(+), 87 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index bd52e78756079..5a70097fc1d69 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -58,7 +58,8 @@ func (a *Agent) Connect() error { } err := o.Output.Connect() if err != nil { - log.Printf("Failed to connect to output %s, retrying in 15s, error was '%s' \n", o.Name, err) + log.Printf("Failed to connect to output %s, retrying in 15s, "+ + "error was '%s' \n", o.Name, err) time.Sleep(15 * time.Second) err = o.Output.Connect() if err != nil { @@ -241,7 +242,7 @@ func (a *Agent) Test() error { return nil } -// flush writes a list of points to all configured outputs +// flush writes a list of metrics to all configured outputs func (a *Agent) flush() { var wg sync.WaitGroup @@ -260,7 +261,7 @@ func (a *Agent) flush() { wg.Wait() } -// flusher monitors the points input channel and flushes on the minimum interval +// flusher monitors the metrics input channel and flushes on the minimum interval func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error { // Inelegant, but this sleep is to allow the Gather threads to run, so that // the flusher will flush after metrics are collected. @@ -271,14 +272,14 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er for { select { case <-shutdown: - log.Println("Hang on, flushing any cached points before shutdown") + log.Println("Hang on, flushing any cached metrics before shutdown") a.flush() return nil case <-ticker.C: a.flush() case m := <-metricC: for _, o := range a.Config.Outputs { - o.AddPoint(m) + o.AddMetric(m) } } } @@ -318,8 +319,8 @@ func (a *Agent) Run(shutdown chan struct{}) error { a.Config.Agent.Interval.Duration, a.Config.Agent.Debug, a.Config.Agent.Quiet, a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration) - // channel shared between all input threads for accumulating points - metricC := make(chan telegraf.Metric, 1000) + // channel shared between all input threads for accumulating metrics + metricC := make(chan telegraf.Metric, 10000) // Round collection to nearest interval by sleeping if a.Config.Agent.RoundInterval { @@ -342,7 +343,10 @@ func (a *Agent) Run(shutdown chan struct{}) error { // Start service of any ServicePlugins switch p := input.Input.(type) { case telegraf.ServiceInput: - if err := p.Start(); err != nil { + acc := NewAccumulator(input.Config, metricC) + acc.SetDebug(a.Config.Agent.Debug) + acc.setDefaultTags(a.Config.Tags) + if err := p.Start(acc); err != nil { log.Printf("Service for input %s failed to start, exiting\n%s\n", input.Name, err.Error()) return err diff --git a/input.go b/input.go index 6992c1b433a82..f7e1493e2de18 100644 --- a/input.go +++ b/input.go @@ -24,7 +24,7 @@ type ServiceInput interface { Gather(Accumulator) error // Start starts the ServiceInput's service, whatever that may be - Start() error + Start(Accumulator) error // Stop stops the services and closes any necessary channels and connections Stop() diff --git a/internal/config/config.go b/internal/config/config.go index ffd4f632a9765..82246f2a48ff5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -68,7 +68,7 @@ type AgentConfig struct { // same time, which can have a measurable effect on the system. CollectionJitter internal.Duration - // Interval at which to flush data + // FlushInterval is the Interval at which to flush data FlushInterval internal.Duration // FlushJitter Jitters the flush interval by a random amount. @@ -82,6 +82,11 @@ type AgentConfig struct { // full, the oldest metrics will be overwritten. MetricBufferLimit int + // FlushBufferWhenFull tells Telegraf to flush the metric buffer whenever + // it fills up, regardless of FlushInterval. Setting this option to true + // does _not_ deactivate FlushInterval. + FlushBufferWhenFull bool + // TODO(cam): Remove UTC and Precision parameters, they are no longer // valid for the agent config. Leaving them here for now for backwards- // compatability @@ -157,6 +162,8 @@ var header = `################################################################## ### Telegraf will cache metric_buffer_limit metrics for each output, and will ### flush this buffer on a successful write. metric_buffer_limit = 10000 + ### Flush the buffer whenever full, regardless of flush_interval. + flush_buffer_when_full = true ### Collection jitter is used to jitter the collection by a random amount. ### Each plugin will sleep for a random time within jitter before collecting. @@ -421,8 +428,9 @@ func (c *Config) addOutput(name string, table *ast.Table) error { ro := internal_models.NewRunningOutput(name, output, outputConfig) if c.Agent.MetricBufferLimit > 0 { - ro.PointBufferLimit = c.Agent.MetricBufferLimit + ro.MetricBufferLimit = c.Agent.MetricBufferLimit } + ro.FlushBufferWhenFull = c.Agent.FlushBufferWhenFull ro.Quiet = c.Agent.Quiet c.Outputs = append(c.Outputs, ro) return nil diff --git a/internal/models/running_output.go b/internal/models/running_output.go index 49a01f8ee108b..b94d94731892d 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -2,22 +2,34 @@ package internal_models import ( "log" + "sync" "time" "github.com/influxdata/telegraf" ) -const DEFAULT_POINT_BUFFER_LIMIT = 10000 +const ( + // Default number of metrics kept between flushes. + DEFAULT_METRIC_BUFFER_LIMIT = 10000 + + // Limit how many full metric buffers are kept due to failed writes. + FULL_METRIC_BUFFERS_LIMIT = 100 +) type RunningOutput struct { - Name string - Output telegraf.Output - Config *OutputConfig - Quiet bool - PointBufferLimit int + Name string + Output telegraf.Output + Config *OutputConfig + Quiet bool + MetricBufferLimit int + FlushBufferWhenFull bool - metrics []telegraf.Metric - overwriteCounter int + metrics []telegraf.Metric + tmpmetrics map[int][]telegraf.Metric + overwriteI int + mapI int + + sync.Mutex } func NewRunningOutput( @@ -26,47 +38,98 @@ func NewRunningOutput( conf *OutputConfig, ) *RunningOutput { ro := &RunningOutput{ - Name: name, - metrics: make([]telegraf.Metric, 0), - Output: output, - Config: conf, - PointBufferLimit: DEFAULT_POINT_BUFFER_LIMIT, + Name: name, + metrics: make([]telegraf.Metric, 0), + tmpmetrics: make(map[int][]telegraf.Metric), + Output: output, + Config: conf, + MetricBufferLimit: DEFAULT_METRIC_BUFFER_LIMIT, } return ro } -func (ro *RunningOutput) AddPoint(point telegraf.Metric) { +// AddMetric adds a metric to the output. This function can also write cached +// points if FlushBufferWhenFull is true. +func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { if ro.Config.Filter.IsActive { - if !ro.Config.Filter.ShouldMetricPass(point) { + if !ro.Config.Filter.ShouldMetricPass(metric) { return } } - if len(ro.metrics) < ro.PointBufferLimit { - ro.metrics = append(ro.metrics, point) + if len(ro.metrics) < ro.MetricBufferLimit { + ro.Lock() + ro.metrics = append(ro.metrics, metric) + ro.Unlock() } else { - log.Printf("WARNING: overwriting cached metrics, you may want to " + - "increase the metric_buffer_limit setting in your [agent] config " + - "if you do not wish to overwrite metrics.\n") - if ro.overwriteCounter == len(ro.metrics) { - ro.overwriteCounter = 0 + if ro.FlushBufferWhenFull { + ro.Lock() + tmpmetrics := make([]telegraf.Metric, len(ro.metrics)) + copy(tmpmetrics, ro.metrics) + ro.metrics = make([]telegraf.Metric, 0) + ro.Unlock() + err := ro.write(tmpmetrics) + if err != nil { + log.Printf("ERROR writing full metric buffer to output %s, %s", + ro.Name, err) + if len(ro.tmpmetrics) == FULL_METRIC_BUFFERS_LIMIT { + ro.mapI = 0 + // overwrite one + ro.tmpmetrics[ro.mapI] = tmpmetrics + ro.mapI++ + } else { + ro.tmpmetrics[ro.mapI] = tmpmetrics + ro.mapI++ + } + } + } else { + log.Printf("WARNING: overwriting cached metrics, you may want to " + + "increase the metric_buffer_limit setting in your [agent] " + + "config if you do not wish to overwrite metrics.\n") + ro.Lock() + if ro.overwriteI == len(ro.metrics) { + ro.overwriteI = 0 + } + ro.metrics[ro.overwriteI] = metric + ro.overwriteI++ + ro.Unlock() } - ro.metrics[ro.overwriteCounter] = point - ro.overwriteCounter++ } } +// Write writes all cached points to this output. func (ro *RunningOutput) Write() error { + ro.Lock() + err := ro.write(ro.metrics) + if err != nil { + return err + } else { + ro.metrics = make([]telegraf.Metric, 0) + ro.overwriteI = 0 + } + ro.Unlock() + + // Write any cached metric buffers that failed previously + for i, tmpmetrics := range ro.tmpmetrics { + if err := ro.write(tmpmetrics); err != nil { + return err + } else { + delete(ro.tmpmetrics, i) + } + } + + return nil +} + +func (ro *RunningOutput) write(metrics []telegraf.Metric) error { start := time.Now() - err := ro.Output.Write(ro.metrics) + err := ro.Output.Write(metrics) elapsed := time.Since(start) if err == nil { if !ro.Quiet { log.Printf("Wrote %d metrics to output %s in %s\n", - len(ro.metrics), ro.Name, elapsed) + len(metrics), ro.Name, elapsed) } - ro.metrics = make([]telegraf.Metric, 0) - ro.overwriteCounter = 0 } return err } diff --git a/plugins/inputs/github_webhooks/github_webhooks.go b/plugins/inputs/github_webhooks/github_webhooks.go index a66563addb292..6dc97f5a332b0 100644 --- a/plugins/inputs/github_webhooks/github_webhooks.go +++ b/plugins/inputs/github_webhooks/github_webhooks.go @@ -61,7 +61,7 @@ func (gh *GithubWebhooks) Listen() { } } -func (gh *GithubWebhooks) Start() error { +func (gh *GithubWebhooks) Start(_ telegraf.Accumulator) error { go gh.Listen() log.Printf("Started the github_webhooks service on %s\n", gh.ServiceAddress) return nil diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 9fa47dee9bf1e..155be3e49ecd3 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -36,6 +36,9 @@ type Kafka struct { metricC chan telegraf.Metric done chan struct{} + // keep the accumulator internally: + acc telegraf.Accumulator + // doNotCommitMsgs tells the parser not to call CommitUpTo on the consumer // this is mostly for test purposes, but there may be a use-case for it later. doNotCommitMsgs bool @@ -72,11 +75,13 @@ func (k *Kafka) SetParser(parser parsers.Parser) { k.parser = parser } -func (k *Kafka) Start() error { +func (k *Kafka) Start(acc telegraf.Accumulator) error { k.Lock() defer k.Unlock() var consumerErr error + k.acc = acc + config := consumergroup.NewConfig() switch strings.ToLower(k.Offset) { case "oldest", "": diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go index 458d43d355e26..527856b41b3b1 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go @@ -44,7 +44,12 @@ func TestReadsMetricsFromKafka(t *testing.T) { } p, _ := parsers.NewInfluxParser() k.SetParser(p) - if err := k.Start(); err != nil { + + // Verify that we can now gather the sent message + var acc testutil.Accumulator + // Sanity check + assert.Equal(t, 0, len(acc.Metrics), "There should not be any points") + if err := k.Start(&acc); err != nil { t.Fatal(err.Error()) } else { defer k.Stop() @@ -52,11 +57,6 @@ func TestReadsMetricsFromKafka(t *testing.T) { waitForPoint(k, t) - // Verify that we can now gather the sent message - var acc testutil.Accumulator - // Sanity check - assert.Equal(t, 0, len(acc.Metrics), "There should not be any points") - // Gather points err = k.Gather(&acc) require.NoError(t, err) diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 8ca0d44b1a281..52843aacb9791 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -15,12 +15,11 @@ import ( ) type MQTTConsumer struct { - Servers []string - Topics []string - Username string - Password string - MetricBuffer int - QoS int `toml:"qos"` + Servers []string + Topics []string + Username string + Password string + QoS int `toml:"qos"` parser parsers.Parser @@ -35,13 +34,12 @@ type MQTTConsumer struct { sync.Mutex client *mqtt.Client - // channel for all incoming parsed mqtt metrics - metricC chan telegraf.Metric - // channel for the topics of all incoming metrics (for tagging metrics) - topicC chan string // channel of all incoming raw mqtt messages in chan mqtt.Message done chan struct{} + + // keep the accumulator internally: + acc telegraf.Accumulator } var sampleConfig = ` @@ -56,9 +54,6 @@ var sampleConfig = ` "sensors/#", ] - ### Maximum number of metrics to buffer between collection intervals - metric_buffer = 100000 - ### username and password to connect MQTT server. # username = "telegraf" # password = "metricsmetricsmetricsmetrics" @@ -89,9 +84,11 @@ func (m *MQTTConsumer) SetParser(parser parsers.Parser) { m.parser = parser } -func (m *MQTTConsumer) Start() error { +func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { m.Lock() defer m.Unlock() + + m.acc = acc if m.QoS > 2 || m.QoS < 0 { return fmt.Errorf("MQTT Consumer, invalid QoS value: %d", m.QoS) } @@ -106,13 +103,8 @@ func (m *MQTTConsumer) Start() error { return token.Error() } - m.in = make(chan mqtt.Message, m.MetricBuffer) + m.in = make(chan mqtt.Message, 1000) m.done = make(chan struct{}) - if m.MetricBuffer == 0 { - m.MetricBuffer = 100000 - } - m.metricC = make(chan telegraf.Metric, m.MetricBuffer) - m.topicC = make(chan string, m.MetricBuffer) topics := make(map[string]byte) for _, topic := range m.Topics { @@ -145,13 +137,9 @@ func (m *MQTTConsumer) receiver() { } for _, metric := range metrics { - select { - case m.metricC <- metric: - m.topicC <- topic - default: - log.Printf("MQTT Consumer buffer is full, dropping a metric." + - " You may want to increase the metric_buffer setting") - } + tags := metric.Tags() + tags["topic"] = topic + m.acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time()) } } } @@ -169,16 +157,6 @@ func (m *MQTTConsumer) Stop() { } func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error { - m.Lock() - defer m.Unlock() - nmetrics := len(m.metricC) - for i := 0; i < nmetrics; i++ { - metric := <-m.metricC - topic := <-m.topicC - tags := metric.Tags() - tags["topic"] = topic - acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time()) - } return nil } diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 56d56990f27a8..2de7062d85b34 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -55,7 +55,7 @@ var sampleConfig = ` queue_group = "telegraf_consumers" ### Maximum number of metrics to buffer between collection intervals metric_buffer = 100000 - + ### Data format to consume. This can be "json", "influx" or "graphite" ### Each data format has it's own unique set of configuration options, read ### more about them here: @@ -84,7 +84,7 @@ func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e erro } // Start the nats consumer. Caller must call *natsConsumer.Stop() to clean up. -func (n *natsConsumer) Start() error { +func (n *natsConsumer) Start(_ telegraf.Accumulator) error { n.Lock() defer n.Unlock() diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index fb8de402e4185..470e318848cc4 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -213,7 +213,7 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { return nil } -func (s *Statsd) Start() error { +func (s *Statsd) Start(_ telegraf.Accumulator) error { // Make data structures s.done = make(chan struct{}) s.in = make(chan []byte, s.AllowedPendingMessages)