Skip to content

Commit

Permalink
Move exec WaitGroup from Exec instance level to Gather.
Browse files Browse the repository at this point in the history
If Gather is run concurently the shared WaitGroup variable never finishes.

closes #1463
closes #1464
  • Loading branch information
allen13 authored and sparrc committed Jul 18, 2016
1 parent 2d6c876 commit 1d9745e
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ should now look like:
- [#1412](https://github.com/influxdata/telegraf/pull/1412): Instrumental output has better reconnect behavior
- [#1460](https://github.com/influxdata/telegraf/issues/1460): Remove PID from procstat plugin to fix cardinality issues.
- [#1427](https://github.com/influxdata/telegraf/issues/1427): Cassandra input: version 2.x "column family" fix.
- [#1463](https://github.com/influxdata/telegraf/issues/1463): Shared WaitGroup in Exec plugin

## v1.0 beta 2 [2016-06-21]

Expand Down
13 changes: 6 additions & 7 deletions plugins/inputs/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ type Exec struct {

parser parsers.Parser

wg sync.WaitGroup

runner Runner
errChan chan error
}
Expand Down Expand Up @@ -119,8 +117,8 @@ func (c CommandRunner) Run(
return out.Bytes(), nil
}

func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) {
defer e.wg.Done()
func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync.WaitGroup) {
defer wg.Done()

out, err := e.runner.Run(e, command, acc)
if err != nil {
Expand Down Expand Up @@ -151,6 +149,7 @@ func (e *Exec) SetParser(parser parsers.Parser) {
}

func (e *Exec) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
// Legacy single command support
if e.Command != "" {
e.Commands = append(e.Commands, e.Command)
Expand Down Expand Up @@ -190,11 +189,11 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error {
errChan := errchan.New(len(commands))
e.errChan = errChan.C

e.wg.Add(len(commands))
wg.Add(len(commands))
for _, command := range commands {
go e.ProcessCommand(command, acc)
go e.ProcessCommand(command, acc, &wg)
}
e.wg.Wait()
wg.Wait()
return errChan.Error()
}

Expand Down

0 comments on commit 1d9745e

Please sign in to comment.