-
Notifications
You must be signed in to change notification settings - Fork 4
/
reader.go
80 lines (69 loc) · 1.58 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package main
import (
tail "github.com/papertrail/go-tail/follower"
"io"
)
// AsyncEventReader defines a StartStopper interface for an async read worker
type AsyncEventReader interface {
Start()
Stop()
}
// ReaderOption defines the interface for event enrichment
type ReaderOption interface {
Apply(FailedConnEvent) (FailedConnEvent, error)
}
type fileReader struct {
parser EventParser
respChan chan FailedConnEvent
errorChan chan error
done chan bool
options []ReaderOption
filename string
}
// NewFileReader returns an instance of reader
func NewFileReader(filename string, parser EventParser, respChan chan FailedConnEvent, errorChan chan error, options ...ReaderOption) AsyncEventReader {
done := make(chan bool)
return fileReader{
filename: filename,
parser: parser,
respChan: respChan,
errorChan: errorChan,
done: done,
options: options,
}
}
func (fr fileReader) Stop() {
fr.done <- true
}
func (fr fileReader) Start() {
t, err := tail.New(fr.filename, tail.Config{
Whence: io.SeekStart,
Offset: 0,
Reopen: true,
})
if err != nil {
panic("Error tracking: " + err.Error())
}
linesChan := t.Lines()
for {
select {
case s := <-linesChan:
ev, err := fr.parser.Parse(string(s.Bytes()))
if err != nil {
fr.errorChan <- err
continue // wrong format is not considered an error, I'll handle this better later
}
for _, opt := range fr.options {
ev, err = opt.Apply(ev)
if err != nil {
fr.errorChan <- err
continue
}
}
fr.respChan <- ev
case <-fr.done:
close(linesChan)
return
}
}
}