Skip to content

Commit

Permalink
more refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
kvch committed Oct 16, 2018
1 parent 452d886 commit dacef17
Showing 1 changed file with 26 additions and 30 deletions.
56 changes: 26 additions & 30 deletions journalbeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,47 +142,43 @@ func (i *Input) publishAll(client beat.Client) {
defer close(out)

var wg sync.WaitGroup
defer wg.Wait()
for _, r := range i.readers {
wg.Add(1)
go i.readEvents(&wg, r, out)
go func() {
defer wg.Done()

for {
select {
case <-i.done:
return
default:
}

event, err := r.Next()
if event == nil {
if err != nil {
i.logger.Errorf("Error while reading event: %v", err)
}
continue
}

select {
case <-i.done:
case out <- event:
}
}
}()
}

loop:
for {
select {
case <-i.done:
break loop
return
case e := <-out:
client.Publish(*e)
}
}
wg.Wait()
}

// readEvents reads events from a reader.
func (i *Input) readEvents(wg *sync.WaitGroup, r *reader.Reader, out chan *beat.Event) {
defer wg.Done()

for {
select {
case <-i.done:
return
default:
}

event, err := r.Next()
if event == nil {
if err != nil {
i.logger.Errorf("Error while reading event: %v", err)
}
continue
}

select {
case <-i.done:
case out <- event:
}
}
}

// Stop stops all readers of the input.
Expand Down

0 comments on commit dacef17

Please sign in to comment.