diff --git a/agent/agent.go b/agent/agent.go index bd52e78756079..f1e2d41c89bcb 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -319,7 +319,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration) // channel shared between all input threads for accumulating points - metricC := make(chan telegraf.Metric, 1000) + metricC := make(chan telegraf.Metric, 10000) // Round collection to nearest interval by sleeping if a.Config.Agent.RoundInterval { @@ -342,7 +342,10 @@ func (a *Agent) Run(shutdown chan struct{}) error { // Start service of any ServicePlugins switch p := input.Input.(type) { case telegraf.ServiceInput: - if err := p.Start(); err != nil { + acc := NewAccumulator(input.Config, metricC) + acc.SetDebug(a.Config.Agent.Debug) + acc.setDefaultTags(a.Config.Tags) + if err := p.Start(acc); err != nil { log.Printf("Service for input %s failed to start, exiting\n%s\n", input.Name, err.Error()) return err diff --git a/input.go b/input.go index 6992c1b433a82..f7e1493e2de18 100644 --- a/input.go +++ b/input.go @@ -24,7 +24,7 @@ type ServiceInput interface { Gather(Accumulator) error // Start starts the ServiceInput's service, whatever that may be - Start() error + Start(Accumulator) error // Stop stops the services and closes any necessary channels and connections Stop() diff --git a/plugins/inputs/github_webhooks/github_webhooks.go b/plugins/inputs/github_webhooks/github_webhooks.go index a66563addb292..6dc97f5a332b0 100644 --- a/plugins/inputs/github_webhooks/github_webhooks.go +++ b/plugins/inputs/github_webhooks/github_webhooks.go @@ -61,7 +61,7 @@ func (gh *GithubWebhooks) Listen() { } } -func (gh *GithubWebhooks) Start() error { +func (gh *GithubWebhooks) Start(_ telegraf.Accumulator) error { go gh.Listen() log.Printf("Started the github_webhooks service on %s\n", gh.ServiceAddress) return nil diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 9fa47dee9bf1e..7ff06d4acebbc 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -72,7 +72,7 @@ func (k *Kafka) SetParser(parser parsers.Parser) { k.parser = parser } -func (k *Kafka) Start() error { +func (k *Kafka) Start(_ telegraf.Accumulator) error { k.Lock() defer k.Unlock() var consumerErr error diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 8ca0d44b1a281..1eab50944ba48 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -89,7 +89,7 @@ func (m *MQTTConsumer) SetParser(parser parsers.Parser) { m.parser = parser } -func (m *MQTTConsumer) Start() error { +func (m *MQTTConsumer) Start(_ telegraf.Accumulator) error { m.Lock() defer m.Unlock() if m.QoS > 2 || m.QoS < 0 { diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 56d56990f27a8..2de7062d85b34 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -55,7 +55,7 @@ var sampleConfig = ` queue_group = "telegraf_consumers" ### Maximum number of metrics to buffer between collection intervals metric_buffer = 100000 - + ### Data format to consume. This can be "json", "influx" or "graphite" ### Each data format has it's own unique set of configuration options, read ### more about them here: @@ -84,7 +84,7 @@ func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e erro } // Start the nats consumer. Caller must call *natsConsumer.Stop() to clean up. -func (n *natsConsumer) Start() error { +func (n *natsConsumer) Start(_ telegraf.Accumulator) error { n.Lock() defer n.Unlock() diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index fb8de402e4185..470e318848cc4 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -213,7 +213,7 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { return nil } -func (s *Statsd) Start() error { +func (s *Statsd) Start(_ telegraf.Accumulator) error { // Make data structures s.done = make(chan struct{}) s.in = make(chan []byte, s.AllowedPendingMessages)