-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Plugin/reader each interval #4332
Changes from 20 commits
e12eced
08a11d7
9c4b522
4e24a1b
ec7f131
504d978
542c030
554b960
36a23ea
f40371e
9c84595
cc40629
79d9ea4
bbd68b3
bf7220d
a931eb1
e450b26
001658a
7fa27f4
1be2a8e
aa750ec
892c95a
04f09d6
8063b38
bfc13a7
67db143
8a9da28
cafa95e
c6087ab
e4b6f23
d224673
f52ceeb
285cf0b
0c3ac29
74900ed
d0f5389
dd778a9
5449eb7
1f58dd7
2a18ca2
50f49fe
08d1397
63da4e6
88a85a7
2638186
1fe3adb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
version: '3' | ||
|
||
services: | ||
telegraf: | ||
image: glinton/scratch | ||
volumes: | ||
- ./telegraf.conf:/telegraf.conf | ||
- ../../../../telegraf:/telegraf | ||
- ./json_a.log:/var/log/test.log | ||
entrypoint: | ||
- /telegraf | ||
- --config | ||
- /telegraf.conf |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
{ | ||
"parent": { | ||
"child": 3.0, | ||
"ignored_child": "hi" | ||
}, | ||
"ignored_null": null, | ||
"integer": 4, | ||
"list": [3, 4], | ||
"ignored_parent": { | ||
"another_ignored_null": null, | ||
"ignored_string": "hello, world!" | ||
}, | ||
"another_list": [4] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
[[inputs.reader]] | ||
files = ["/var/log/test.log"] | ||
data_format = "json" | ||
name_override = "json_reader" | ||
|
||
[[outputs.file]] | ||
files = ["stdout"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
package reader | ||
|
||
import ( | ||
"io/ioutil" | ||
"log" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/internal/globpath" | ||
"github.com/influxdata/telegraf/plugins/inputs" | ||
"github.com/influxdata/telegraf/plugins/parsers" | ||
) | ||
|
||
type Reader struct { | ||
Filepaths []string `toml:"files"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Call this |
||
FromBeginning bool | ||
parser parsers.Parser | ||
|
||
Filenames []string | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make this a unexported field with a lowercase letter, otherwise you can set this from the config file which is undesired. |
||
} | ||
|
||
const sampleConfig = `## Files to parse each interval. | ||
## These accept standard unix glob matching rules, but with the addition of | ||
## ** as a "super asterisk". ie: | ||
## /var/log/**.log -> recursively find all .log files in /var/log | ||
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log | ||
## /var/log/apache.log -> only tail the apache log file | ||
files = ["/var/log/apache/access.log"] | ||
|
||
## The dataformat to be read from files | ||
## Each data format has its own unique set of configuration options, read | ||
## more about them here: | ||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md | ||
data_format = "" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use 2 space indention in the sample config, make sure to update the README as well. You can generate the config for the readme with |
||
` | ||
|
||
// SampleConfig returns the default configuration of the Input | ||
func (r *Reader) SampleConfig() string { | ||
return sampleConfig | ||
} | ||
|
||
func (r *Reader) Description() string { | ||
return "reload and gather from file[s] on telegraf's interval" | ||
} | ||
|
||
func (r *Reader) Gather(acc telegraf.Accumulator) error { | ||
r.refreshFilePaths() | ||
for _, k := range r.Filenames { | ||
metrics, err := r.readMetric(k) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for i, m := range metrics { | ||
|
||
//error if m is nil | ||
if m == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It shouldn't be possible for on of the metrics to be nil, it would be a programming mistake, so don't check for it. If it did happen panic'ing would be okay. |
||
log.Printf("E! Metric could not be parsed from: %v, on line %v", k, i) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will go away if you follow my previous comment, but I should also mention that it is not strictly true that each line will have a metric and this would line up. |
||
continue | ||
} | ||
acc.AddFields(m.Name(), m.Fields(), m.Tags()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also add the m.Time(). |
||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (r *Reader) SetParser(p parsers.Parser) { | ||
r.parser = p | ||
} | ||
|
||
func (r *Reader) refreshFilePaths() { | ||
var allFiles []string | ||
for _, filepath := range r.Filepaths { | ||
g, err := globpath.Compile(filepath) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would compile only once and store the result on the Reader struct. Since we don't have a constructor function yet, I would probably check if the value on the struct is nil and if so compile. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My thought was that if people want to specify a directory via some globpath, then this will check for any new files added there during runtime |
||
if err != nil { | ||
log.Printf("E! Error Glob %s failed to compile, %s", filepath, err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This error should probably stop the input since it indicates a user error, if any glob fails return the error up and out of Gather. |
||
continue | ||
} | ||
files := g.Match() | ||
|
||
for k := range files { | ||
allFiles = append(allFiles, k) | ||
} | ||
} | ||
|
||
r.Filenames = allFiles | ||
} | ||
|
||
//requires that Parser has been compiled | ||
func (r *Reader) readMetric(filename string) ([]telegraf.Metric, error) { | ||
fileContents, err := ioutil.ReadFile(filename) | ||
if err != nil { | ||
log.Printf("E! File could not be opened: %v", filename) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error should be along the lines of "File could not be read" for accuracy, but also this error should be returned instead of logged. In general it is not safe to do anything with the return value if an error occurs. |
||
} | ||
return r.parser.Parse(fileContents) | ||
|
||
} | ||
|
||
func init() { | ||
inputs.Add("reader", func() telegraf.Input { | ||
return &Reader{} | ||
}) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
package reader | ||
|
||
import ( | ||
"runtime" | ||
"strings" | ||
"testing" | ||
|
||
"github.com/influxdata/telegraf/plugins/parsers" | ||
"github.com/influxdata/telegraf/testutil" | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestRefreshFilePaths(t *testing.T) { | ||
testDir := getPluginDir() | ||
r := Reader{ | ||
Filepaths: []string{testDir + "/logparser/grok/testdata/**.log"}, | ||
} | ||
|
||
r.refreshFilePaths() | ||
assert.Equal(t, len(r.Filenames), 2) | ||
} | ||
func TestJSONParserCompile(t *testing.T) { | ||
testDir := getPluginDir() | ||
var acc testutil.Accumulator | ||
r := Reader{ | ||
Filepaths: []string{testDir + "/reader/testfiles/json_a.log"}, | ||
} | ||
parserConfig := parsers.Config{ | ||
DataFormat: "json", | ||
TagKeys: []string{"parent_ignored_child"}, | ||
} | ||
nParser, err := parsers.NewParser(&parserConfig) | ||
r.parser = nParser | ||
assert.NoError(t, err) | ||
|
||
r.Gather(&acc) | ||
assert.Equal(t, map[string]string{"parent_ignored_child": "hi"}, acc.Metrics[0].Tags) | ||
assert.Equal(t, 5, len(acc.Metrics[0].Fields)) | ||
} | ||
|
||
func TestGrokParser(t *testing.T) { | ||
testDir := getPluginDir() | ||
var acc testutil.Accumulator | ||
r := Reader{ | ||
Filepaths: []string{testDir + "/reader/testfiles/grok_a.log"}, | ||
} | ||
|
||
parserConfig := parsers.Config{ | ||
DataFormat: "grok", | ||
Patterns: []string{"%{COMMON_LOG_FORMAT}"}, | ||
} | ||
|
||
nParser, err := parsers.NewParser(&parserConfig) | ||
r.parser = nParser | ||
assert.NoError(t, err) | ||
|
||
err = r.Gather(&acc) | ||
assert.Equal(t, 2, len(acc.Metrics)) | ||
} | ||
|
||
func getPluginDir() string { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the cwd is actually guaranteed by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check if this is required and if not remove. |
||
_, filename, _, _ := runtime.Caller(1) | ||
return strings.Replace(filename, "/reader/reader_test.go", "", 1) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326 | ||
128.0.0.1 user-identifier tony [10/Oct/2000:13:55:36 -0800] "GET /apache_pb.gif HTTP/1.0" 300 45 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
{ | ||
"parent": { | ||
"child": 3.0, | ||
"ignored_child": "hi" | ||
}, | ||
"ignored_null": null, | ||
"integer": 4, | ||
"list": [3, 4], | ||
"ignored_parent": { | ||
"another_ignored_null": null, | ||
"ignored_string": "hello, world!" | ||
}, | ||
"another_list": [4] | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add end of line for this file, also fix the indention. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these all are at plugin level now, lets prefix them with
grok_