diff --git a/journalbeat/input/input.go b/journalbeat/input/input.go index 1fbf33361af4..c3ce765b2f9e 100644 --- a/journalbeat/input/input.go +++ b/journalbeat/input/input.go @@ -142,47 +142,43 @@ func (i *Input) publishAll(client beat.Client) { defer close(out) var wg sync.WaitGroup + defer wg.Wait() for _, r := range i.readers { wg.Add(1) - go i.readEvents(&wg, r, out) + go func() { + defer wg.Done() + + for { + select { + case <-i.done: + return + default: + } + + event, err := r.Next() + if event == nil { + if err != nil { + i.logger.Errorf("Error while reading event: %v", err) + } + continue + } + + select { + case <-i.done: + case out <- event: + } + } + }() } -loop: for { select { case <-i.done: - break loop + return case e := <-out: client.Publish(*e) } } - wg.Wait() -} - -// readEvents reads events from a reader. -func (i *Input) readEvents(wg *sync.WaitGroup, r *reader.Reader, out chan *beat.Event) { - defer wg.Done() - - for { - select { - case <-i.done: - return - default: - } - - event, err := r.Next() - if event == nil { - if err != nil { - i.logger.Errorf("Error while reading event: %v", err) - } - continue - } - - select { - case <-i.done: - case out <- event: - } - } } // Stop stops all readers of the input.