Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

file input: optional file_name_resolved and file_path_resolved labels #364

Merged
merged 2 commits into from
Jul 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## 1.1.6 - Unreleased

### Added
- File input: Added optional labels for resolved symlink file name and path [PR 364](https://github.com/observIQ/stanza/pull/364)

## 1.1.5 - 2021-07-15

### Changed
Expand Down
2 changes: 2 additions & 0 deletions docs/operators/file_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ The `file_input` operator reads logs from files. It will place the lines read in
| `encoding` | `nop` | The encoding of the file being read. See the list of supported encodings below for available options |
| `include_file_name` | `true` | Whether to add the file name as the label `file_name` |
| `include_file_path` | `false` | Whether to add the file path as the label `file_path` |
| `include_file_name_resolved` | `false` | Whether to add the file name after symlinks resolution as the label `file_name_resolved` |
| `include_file_path_resolved` | `false` | Whether to add the file path after symlinks resolution as the label `file_path_resolved` |
| `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end` |
| `fingerprint_size` | `1kb` | The number of bytes with which to identify a file. The first bytes in the file are used as the fingerprint. Decreasing this value at any point will cause existing fingerprints to forgotten, meaning that all files will be read from the beginning (one time). |
| `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory |
Expand Down
88 changes: 52 additions & 36 deletions operator/builtin/input/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@ const (
// NewInputConfig creates a new input config with default values
func NewInputConfig(operatorID string) *InputConfig {
return &InputConfig{
InputConfig: helper.NewInputConfig(operatorID, "file_input"),
PollInterval: helper.Duration{Duration: 200 * time.Millisecond},
IncludeFileName: true,
IncludeFilePath: false,
StartAt: "end",
FingerprintSize: defaultFingerprintSize,
MaxLogSize: defaultMaxLogSize,
MaxConcurrentFiles: defaultMaxConcurrentFiles,
Encoding: helper.NewEncodingConfig(),
InputConfig: helper.NewInputConfig(operatorID, "file_input"),
PollInterval: helper.Duration{Duration: 200 * time.Millisecond},
IncludeFileName: true,
IncludeFilePath: false,
IncludeFileNameResolved: false,
IncludeFilePathResolved: false,
StartAt: "end",
FingerprintSize: defaultFingerprintSize,
MaxLogSize: defaultMaxLogSize,
MaxConcurrentFiles: defaultMaxConcurrentFiles,
Encoding: helper.NewEncodingConfig(),
}
}

Expand All @@ -41,15 +43,17 @@ type InputConfig struct {
Include []string `json:"include,omitempty" yaml:"include,omitempty"`
Exclude []string `json:"exclude,omitempty" yaml:"exclude,omitempty"`

PollInterval helper.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
Multiline helper.MultilineConfig `json:"multiline,omitempty" yaml:"multiline,omitempty"`
IncludeFileName bool `json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"`
IncludeFilePath bool `json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"`
StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"`
FingerprintSize helper.ByteSize `json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"`
MaxLogSize helper.ByteSize `json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
MaxConcurrentFiles int `json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"`
Encoding helper.EncodingConfig `json:",inline,omitempty" yaml:",inline,omitempty"`
PollInterval helper.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
Multiline helper.MultilineConfig `json:"multiline,omitempty" yaml:"multiline,omitempty"`
IncludeFileName bool `json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"`
IncludeFilePath bool `json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"`
IncludeFileNameResolved bool `json:"include_file_name_resolved,omitempty" yaml:"include_file_name_resolved,omitempty"`
IncludeFilePathResolved bool `json:"include_file_path_resolved,omitempty" yaml:"include_file_path_resolved,omitempty"`
StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"`
FingerprintSize helper.ByteSize `json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"`
MaxLogSize helper.ByteSize `json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
MaxConcurrentFiles int `json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"`
Encoding helper.EncodingConfig `json:",inline,omitempty" yaml:",inline,omitempty"`
}

// Build will build a file input operator from the supplied configuration
Expand Down Expand Up @@ -123,25 +127,37 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,
filePathField = entry.NewLabelField("file_path")
}

fileNameResolvedField := entry.NewNilField()
if c.IncludeFileNameResolved {
fileNameResolvedField = entry.NewLabelField("file_name_resolved")
}

filePathResolvedField := entry.NewNilField()
if c.IncludeFilePathResolved {
filePathResolvedField = entry.NewLabelField("file_path_resolved")
}

op := &InputOperator{
InputOperator: inputOperator,
Include: c.Include,
Exclude: c.Exclude,
SplitFunc: splitFunc,
PollInterval: c.PollInterval.Raw(),
persist: helper.NewScopedDBPersister(context.Database, c.ID()),
FilePathField: filePathField,
FileNameField: fileNameField,
startAtBeginning: startAtBeginning,
queuedMatches: make([]string, 0),
encoding: encoding,
firstCheck: true,
cancel: func() {},
knownFiles: make([]*Reader, 0, 10),
fingerprintSize: int(c.FingerprintSize),
MaxLogSize: int(c.MaxLogSize),
MaxConcurrentFiles: c.MaxConcurrentFiles,
SeenPaths: make(map[string]struct{}, 100),
InputOperator: inputOperator,
Include: c.Include,
Exclude: c.Exclude,
SplitFunc: splitFunc,
PollInterval: c.PollInterval.Raw(),
persist: helper.NewScopedDBPersister(context.Database, c.ID()),
FilePathField: filePathField,
FileNameField: fileNameField,
FilePathResolvedField: filePathResolvedField,
FileNameResolvedField: fileNameResolvedField,
startAtBeginning: startAtBeginning,
queuedMatches: make([]string, 0),
encoding: encoding,
firstCheck: true,
cancel: func() {},
knownFiles: make([]*Reader, 0, 10),
fingerprintSize: int(c.FingerprintSize),
MaxLogSize: int(c.MaxLogSize),
MaxConcurrentFiles: c.MaxConcurrentFiles,
SeenPaths: make(map[string]struct{}, 100),
}

return []operator.Operator{op}, nil
Expand Down
22 changes: 12 additions & 10 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@ import (
type InputOperator struct {
helper.InputOperator

Include []string
Exclude []string
FilePathField entry.Field
FileNameField entry.Field
PollInterval time.Duration
SplitFunc bufio.SplitFunc
MaxLogSize int
MaxConcurrentFiles int
SeenPaths map[string]struct{}
Include []string
Exclude []string
FilePathField entry.Field
FileNameField entry.Field
FilePathResolvedField entry.Field
FileNameResolvedField entry.Field
PollInterval time.Duration
SplitFunc bufio.SplitFunc
MaxLogSize int
MaxConcurrentFiles int
SeenPaths map[string]struct{}

persist helper.Persister

Expand Down Expand Up @@ -299,7 +301,7 @@ func (f *InputOperator) newReader(file *os.File, fp *Fingerprint, firstCheck boo
if err != nil {
return nil, err
}
newReader.Path = file.Name()
newReader.fileLabels = f.resolveFileLabels(file.Name())
return newReader, nil
}

Expand Down
123 changes: 123 additions & 0 deletions operator/builtin/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package file
import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -50,6 +51,128 @@ func TestAddFileFields(t *testing.T) {
require.Equal(t, temp.Name(), e.Labels["file_path"])
}

// AddFileResolvedFields tests that the `file_name_resolved` and `file_path_resolved` fields are included
// when IncludeFileNameResolved and IncludeFilePathResolved are set to true
func TestAddFileResolvedFields(t *testing.T) {
t.Parallel()
operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *InputConfig) {
cfg.IncludeFileName = true
cfg.IncludeFilePath = true
cfg.IncludeFileNameResolved = true
cfg.IncludeFilePathResolved = true
}, nil)

// Create temp dir with log file
dir, err := ioutil.TempDir("", "")
require.NoError(t, err)

file, err := ioutil.TempFile(dir, "")
require.NoError(t, err)

// Create symbolic link in monitored directory
symLinkPath := filepath.Join(tempDir, "symlink")
err = os.Symlink(file.Name(), symLinkPath)
require.NoError(t, err)

// Populate data
writeString(t, file, "testlog\n")

// Resolve path
real, err := filepath.EvalSymlinks(file.Name())
require.NoError(t, err)
resolved, err := filepath.Abs(real)
require.NoError(t, err)

require.NoError(t, operator.Start())
defer operator.Stop()

e := waitForOne(t, logReceived)
require.Equal(t, filepath.Base(symLinkPath), e.Labels["file_name"])
require.Equal(t, symLinkPath, e.Labels["file_path"])
require.Equal(t, filepath.Base(resolved), e.Labels["file_name_resolved"])
require.Equal(t, resolved, e.Labels["file_path_resolved"])

// Clean up (linux based host)
// Ignore error on windows host (The process cannot access the file because it is being used by another process.)
os.RemoveAll(dir)
}

// AddFileResolvedFields tests that the `file.name.resolved` and `file.path.resolved` fields are included
// when IncludeFileNameResolved and IncludeFilePathResolved are set to true and underlaying symlink change
// Scenario:
// monitored file (symlink) -> middleSymlink -> file_1
// monitored file (symlink) -> middleSymlink -> file_2
func TestAddFileResolvedFieldsWithChangeOfSymlinkTarget(t *testing.T) {
t.Parallel()
operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *InputConfig) {
cfg.IncludeFileName = true
cfg.IncludeFilePath = true
cfg.IncludeFileNameResolved = true
cfg.IncludeFilePathResolved = true
}, nil)

// Create temp dir with log file
dir, err := ioutil.TempDir("", "")
require.NoError(t, err)

file1, err := ioutil.TempFile(dir, "")
require.NoError(t, err)

file2, err := ioutil.TempFile(dir, "")
require.NoError(t, err)

// Resolve paths
real1, err := filepath.EvalSymlinks(file1.Name())
require.NoError(t, err)
resolved1, err := filepath.Abs(real1)
require.NoError(t, err)

real2, err := filepath.EvalSymlinks(file2.Name())
require.NoError(t, err)
resolved2, err := filepath.Abs(real2)
require.NoError(t, err)

// Create symbolic link in monitored directory
// symLinkPath(target of file input) -> middleSymLinkPath -> file1
middleSymLinkPath := filepath.Join(dir, "symlink")
symLinkPath := filepath.Join(tempDir, "symlink")
err = os.Symlink(file1.Name(), middleSymLinkPath)
require.NoError(t, err)
err = os.Symlink(middleSymLinkPath, symLinkPath)
require.NoError(t, err)

// Populate data
writeString(t, file1, "testlog\n")

require.NoError(t, operator.Start())
defer operator.Stop()

e := waitForOne(t, logReceived)
require.Equal(t, filepath.Base(symLinkPath), e.Labels["file_name"])
require.Equal(t, symLinkPath, e.Labels["file_path"])
require.Equal(t, filepath.Base(resolved1), e.Labels["file_name_resolved"])
require.Equal(t, resolved1, e.Labels["file_path_resolved"])

// Change middleSymLink to point to file2
err = os.Remove(middleSymLinkPath)
require.NoError(t, err)
err = os.Symlink(file2.Name(), middleSymLinkPath)
require.NoError(t, err)

// Populate data (different content due to fingerprint)
writeString(t, file2, "testlog2\n")

e = waitForOne(t, logReceived)
require.Equal(t, filepath.Base(symLinkPath), e.Labels["file_name"])
require.Equal(t, symLinkPath, e.Labels["file_path"])
require.Equal(t, filepath.Base(resolved2), e.Labels["file_name_resolved"])
require.Equal(t, resolved2, e.Labels["file_path_resolved"])

// Clean up (linux based host)
// Ignore error on windows host (The process cannot access the file because it is being used by another process.)
os.RemoveAll(dir)
}

// ReadExistingLogs tests that, when starting from beginning, we
// read all the lines that are already there
func TestReadExistingLogs(t *testing.T) {
Expand Down
47 changes: 42 additions & 5 deletions operator/builtin/input/file/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,44 @@ import (
"golang.org/x/text/transform"
)

// File labels contains information about file paths
type fileLabels struct {
Name string
Path string
ResolvedName string
ResolvedPath string
}

// resolveFileLabels resolves file labels
// and sets it to empty string in case of error
func (f *InputOperator) resolveFileLabels(path string) *fileLabels {
resolved, err := filepath.EvalSymlinks(path)
if err != nil {
f.Error(err)
}

abs, err := filepath.Abs(resolved)
if err != nil {
f.Error(err)
}

return &fileLabels{
Path: path,
Name: filepath.Base(path),
ResolvedPath: abs,
ResolvedName: filepath.Base(abs),
}
}

// Reader manages a single file
type Reader struct {
Fingerprint *Fingerprint
Offset int64
Path string

generation int
fileInput *InputOperator
file *os.File
fileLabels *fileLabels

decoder *encoding.Decoder
decodeBuffer []byte
Expand All @@ -35,18 +64,18 @@ func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint) (
r := &Reader{
Fingerprint: fp,
file: file,
Path: path,
fileInput: f,
SugaredLogger: f.SugaredLogger.With("path", path),
decoder: f.encoding.Encoding.NewDecoder(),
decodeBuffer: make([]byte, 1<<12),
fileLabels: f.resolveFileLabels(path),
}
return r, nil
}

// Copy creates a deep copy of a Reader
func (f *Reader) Copy(file *os.File) (*Reader, error) {
reader, err := f.fileInput.NewReader(f.Path, file, f.Fingerprint.Copy())
reader, err := f.fileInput.NewReader(f.fileLabels.Path, file, f.Fingerprint.Copy())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -127,12 +156,20 @@ func (f *Reader) emit(ctx context.Context, msgBuf []byte) error {
return fmt.Errorf("create entry: %s", err)
}

if err := e.Set(f.fileInput.FilePathField, f.Path); err != nil {
if err := e.Set(f.fileInput.FilePathField, f.fileLabels.Path); err != nil {
return err
}
if err := e.Set(f.fileInput.FileNameField, filepath.Base(f.fileLabels.Path)); err != nil {
return err
}

if err := e.Set(f.fileInput.FilePathResolvedField, f.fileLabels.ResolvedPath); err != nil {
return err
}
if err := e.Set(f.fileInput.FileNameField, filepath.Base(f.Path)); err != nil {
if err := e.Set(f.fileInput.FileNameResolvedField, f.fileLabels.ResolvedName); err != nil {
return err
}

f.fileInput.Write(ctx, e)
return nil
}
Expand Down