Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File input label regex #376

Merged
merged 12 commits into from
Aug 21, 2021
25 changes: 25 additions & 0 deletions operator/builtin/input/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package file

import (
"fmt"
"regexp"
"time"

"github.com/bmatcuk/doublestar/v2"
Expand Down Expand Up @@ -53,6 +54,7 @@ type InputConfig struct {
FingerprintSize helper.ByteSize `json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"`
MaxLogSize helper.ByteSize `json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
MaxConcurrentFiles int `json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"`
LabelRegex string `json:"label_regex,omitempty" yaml:"label_regex,omitempty"`
Encoding helper.EncodingConfig `json:",inline,omitempty" yaml:",inline,omitempty"`
}

Expand Down Expand Up @@ -117,6 +119,28 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,
return nil, fmt.Errorf("invalid start_at location '%s'", c.StartAt)
}

var labelRegex *regexp.Regexp
if c.LabelRegex != "" {
r, err := regexp.Compile(c.LabelRegex)
if err != nil {
return nil, fmt.Errorf("compiling regex: %s", err)
}

keys := r.SubexpNames()
// keys[0] is always the empty string
if x := len(keys); x != 3 {
return nil, fmt.Errorf("label_regex must contain two capture groups named 'key' and 'value', got %d capture groups", x)
}

hasKeys := make(map[string]bool)
hasKeys[keys[1]] = true
hasKeys[keys[2]] = true
if !hasKeys["key"] || !hasKeys["value"] {
return nil, fmt.Errorf("label_regex must contain two capture groups named 'key' and 'value'")
}
labelRegex = r
}

fileNameField := entry.NewNilField()
if c.IncludeFileName {
fileNameField = entry.NewLabelField("file_name")
Expand Down Expand Up @@ -150,6 +174,7 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,
FileNameResolvedField: fileNameResolvedField,
startAtBeginning: startAtBeginning,
queuedMatches: make([]string, 0),
labelRegex: labelRegex,
encoding: encoding,
firstCheck: true,
cancel: func() {},
Expand Down
41 changes: 41 additions & 0 deletions operator/builtin/input/file/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,15 @@ func TestUnmarshal(t *testing.T) {
return cfg
}(),
},
{
Name: "label_regex",
ExpectErr: false,
Expect: func() *InputConfig {
cfg := defaultCfg()
cfg.LabelRegex = "^(?P<key>[a-zA-z]+ [A-Z]+): (?P<value>.*)"
return cfg
}(),
},
}

for _, tc := range cases {
Expand Down Expand Up @@ -637,6 +646,38 @@ func TestBuild(t *testing.T) {
require.Error,
nil,
},
{
"ValidLabelRegex",
func(f *InputConfig) {
f.LabelRegex = "^(?P<key>[a-zA-z]+ [A-Z]+): (?P<value>.*)"
},
require.NoError,
func(t *testing.T, f *InputOperator) {},
},
{
"ValidLabelRegexReverse",
func(f *InputConfig) {
f.LabelRegex = "^(?P<value>[a-zA-z]+ [A-Z]+): (?P<key>.*)"
},
require.NoError,
func(t *testing.T, f *InputOperator) {},
},
{
"InvalidLabelRegexPattern",
func(f *InputConfig) {
f.LabelRegex = "^(?P<something>[a-zA-z]"
},
require.Error,
nil,
},
{
"InvalidLabelRegexCaptureGroup",
func(f *InputConfig) {
f.LabelRegex = "^(?P<something>[a-zA-z]+ [A-Z]+): (?P<invalid>.*)"
},
require.Error,
nil,
},
}

for _, tc := range cases {
Expand Down
17 changes: 13 additions & 4 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"os"
"path/filepath"
"regexp"
"sync"
"time"

Expand Down Expand Up @@ -44,6 +45,8 @@ type InputOperator struct {

fingerprintSize int

labelRegex *regexp.Regexp

encoding helper.Encoding

wg sync.WaitGroup
Expand Down Expand Up @@ -129,7 +132,7 @@ func (f *InputOperator) poll(ctx context.Context) {
}
}

readers := f.makeReaders(matches)
readers := f.makeReaders(ctx, matches)
f.firstCheck = false

// Detect files that have been rotated out of matching pattern
Expand Down Expand Up @@ -204,7 +207,7 @@ func getMatches(includes, excludes []string) []string {
// makeReaders takes a list of paths, then creates readers from each of those paths,
// discarding any that have a duplicate fingerprint to other files that have already
// been read this polling interval
func (f *InputOperator) makeReaders(filePaths []string) []*Reader {
func (f *InputOperator) makeReaders(ctx context.Context, filePaths []string) []*Reader {
// Open the files first to minimize the time between listing and opening
files := make([]*os.File, 0, len(filePaths))
for _, path := range filePaths {
Expand Down Expand Up @@ -262,7 +265,7 @@ OUTER:

readers := make([]*Reader, 0, len(fps))
for i := 0; i < len(fps); i++ {
reader, err := f.newReader(files[i], fps[i], f.firstCheck)
reader, err := f.newReader(ctx, files[i], fps[i], f.firstCheck)
if err != nil {
f.Errorw("Failed to create reader", zap.Error(err))
continue
Expand Down Expand Up @@ -294,7 +297,7 @@ func (f *InputOperator) saveCurrent(readers []*Reader) {
}
}

func (f *InputOperator) newReader(file *os.File, fp *Fingerprint, firstCheck bool) (*Reader, error) {
func (f *InputOperator) newReader(ctx context.Context, file *os.File, fp *Fingerprint, firstCheck bool) (*Reader, error) {
// Check if the new path has the same fingerprint as an old path
if oldReader, ok := f.findFingerprintMatch(fp); ok {
newReader, err := oldReader.Copy(file)
Expand All @@ -310,6 +313,12 @@ func (f *InputOperator) newReader(file *os.File, fp *Fingerprint, firstCheck boo
if err != nil {
return nil, err
}
if f.labelRegex != nil {
/*if err := newReader.readHeaders(ctx); err != nil {
f.Errorf("error while reading file headers: %s", err)
}*/
Comment on lines +317 to +319
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove this

newReader.ReadHeaders(ctx)
}
startAtBeginning := !firstCheck || f.startAtBeginning
if err := newReader.InitializeOffset(startAtBeginning); err != nil {
return nil, fmt.Errorf("initialize offset: %s", err)
Expand Down
7 changes: 7 additions & 0 deletions operator/builtin/input/file/fingerprint.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ const minFingerprintSize = 16 // bytes
// A file's fingerprint is the first N bytes of the file,
// where N is the fingerprintSize on the file_input operator
type Fingerprint struct {
// FirstBytes represents the first N bytes of a file
FirstBytes []byte

// Labels is an optional map that contains entry labels
// added to every record from a given file
Labels map[string]string
}

// NewFingerprint creates a new fingerprint from an open file
Expand All @@ -30,6 +35,8 @@ func (f *InputOperator) NewFingerprint(file *os.File) (*Fingerprint, error) {
FirstBytes: buf[:n],
}

fp.Labels = make(map[string]string)

return fp, nil
}

Expand Down
53 changes: 47 additions & 6 deletions operator/builtin/input/file/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/errors"
"go.uber.org/zap"
"golang.org/x/text/encoding"
Expand Down Expand Up @@ -96,39 +97,71 @@ func (f *Reader) InitializeOffset(startAtBeginning bool) error {
return nil
}

type consumerFunc func(context.Context, []byte) error

// ReadToEnd will read until the end of the file
func (f *Reader) ReadToEnd(ctx context.Context) {
f.readFile(ctx, f.emit)
}

// ReadHeaders will read a files headers
func (f *Reader) ReadHeaders(ctx context.Context) {
f.readFile(ctx, f.readHeaders)
}

func (f *Reader) readFile(ctx context.Context, consumer consumerFunc) {
if _, err := f.file.Seek(f.Offset, 0); err != nil {
f.Errorw("Failed to seek", zap.Error(err))
return
}

fr := NewFingerprintUpdatingReader(f.file, f.Offset, f.Fingerprint, f.fileInput.fingerprintSize)
scanner := NewPositionalScanner(fr, f.fileInput.MaxLogSize, f.Offset, f.fileInput.SplitFunc)

// Iterate over the tokenized file, emitting entries as we go
// Iterate over the tokenized file
for {
select {
case <-ctx.Done():
return
default:
}

ok := scanner.Scan()
if !ok {
if err := getScannerError(scanner); err != nil {
f.Errorw("Failed during scan", zap.Error(err))
}
break
}

if err := f.emit(ctx, scanner.Bytes()); err != nil {
f.Error("Failed to emit entry", zap.Error(err))
if err := consumer(ctx, scanner.Bytes()); err != nil {
// return if header parsing is done
if err == errEndOfHeaders {
return
}
f.Error("Failed to consume entry", zap.Error(err))
}
f.Offset = scanner.Pos()
}
}

var errEndOfHeaders = fmt.Errorf("finished header parsing, no header found")

func (f *Reader) readHeaders(ctx context.Context, msgBuf []byte) error {
byteMatches := f.fileInput.labelRegex.FindSubmatch(msgBuf)
if len(byteMatches) != 3 {
// return early, assume this failure means the file does not
// contain anymore headers
return errEndOfHeaders
}
matches := make([]string, len(byteMatches))
for i, byteSlice := range byteMatches {
matches[i] = string(byteSlice)
}
if f.Fingerprint.Labels == nil {
f.Fingerprint.Labels = make(map[string]string)
}
f.Fingerprint.Labels[matches[1]] = matches[2]
return nil
}

// Close will close the file
func (f *Reader) Close() {
if f.file != nil {
Expand Down Expand Up @@ -170,6 +203,14 @@ func (f *Reader) emit(ctx context.Context, msgBuf []byte) error {
return err
}

// Set W3C headers as labels
for k, v := range f.Fingerprint.Labels {
field := entry.NewLabelField(k)
if err := e.Set(field, v); err != nil {
return err
}
}

f.fileInput.Write(ctx, e)
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions operator/builtin/input/file/testdata/label_regex.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
type: file_input
label_regex: "^(?P<key>[a-zA-z]+ [A-Z]+): (?P<value>.*)"