Skip to content

Commit

Permalink
[receiver/filelog] Add Support for only reading the current file (#23633
Browse files Browse the repository at this point in the history
)

Adding a feature to filelog to allow reading of only the current file
from a collection of files through sorting the file names and picking
the newest one.
  • Loading branch information
Miguel Rodriguez authored Jun 30, 2023
1 parent 95f0a13 commit 3c9ddf9
Show file tree
Hide file tree
Showing 11 changed files with 860 additions and 23 deletions.
20 changes: 20 additions & 0 deletions .chloggen/current-file-support-filelog.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for tracking the current file in filelogreceiver

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [22998]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
10 changes: 10 additions & 0 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,16 @@ func (c Config) validate() error {
}
}

if len(c.OrderingCriteria.SortBy) != 0 && c.OrderingCriteria.Regex == "" {
return fmt.Errorf("`regex` must be specified when `sort_by` is specified")
}

for _, sr := range c.OrderingCriteria.SortBy {
if err := sr.validate(); err != nil {
return err
}
}

if c.MaxLogSize <= 0 {
return fmt.Errorf("`max_log_size` must be positive")
}
Expand Down
56 changes: 56 additions & 0 deletions pkg/stanza/fileconsumer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,45 @@ func TestUnmarshal(t *testing.T) {
return newMockOperatorConfig(cfg)
}(),
},
{
Name: "sort_by_timestamp",
Expect: func() *mockOperatorConfig {
cfg := NewConfig()
cfg.OrderingCriteria.Regex = `err\.[a-zA-Z]\.\d+\.(?P<rotation_time>\d{10})\.log`
cfg.OrderingCriteria.SortBy = []SortRuleImpl{
{
TimestampSortRule{
BaseSortRule: BaseSortRule{
SortType: sortTypeTimestamp,
RegexKey: "rotation_time",
Ascending: true,
},
Location: "utc",
Layout: `%Y%m%d%H`,
},
},
}
return newMockOperatorConfig(cfg)
}(),
},
{
Name: "sort_by_numeric",
Expect: func() *mockOperatorConfig {
cfg := NewConfig()
cfg.OrderingCriteria.Regex = `err\.(?P<file_num>[a-zA-Z])\.\d+\.\d{10}\.log`
cfg.OrderingCriteria.SortBy = []SortRuleImpl{
{
NumericSortRule{
BaseSortRule: BaseSortRule{
SortType: sortTypeNumeric,
RegexKey: "file_num",
},
},
},
}
return newMockOperatorConfig(cfg)
}(),
},
{
Name: "poll_interval_no_units",
Expect: func() *mockOperatorConfig {
Expand Down Expand Up @@ -534,6 +573,23 @@ func TestBuild(t *testing.T) {
require.Error,
nil,
},
{
"BadOrderingCriteriaRegex",
func(f *Config) {
f.OrderingCriteria.SortBy = []SortRuleImpl{
{
NumericSortRule{
BaseSortRule: BaseSortRule{
RegexKey: "value",
SortType: sortTypeNumeric,
},
},
},
}
},
require.Error,
nil,
},
}

for _, tc := range cases {
Expand Down
10 changes: 8 additions & 2 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ func (m *Manager) Start(persister operator.Persister) error {
return fmt.Errorf("read known files from database: %w", err)
}

if len(m.finder.FindFiles()) == 0 {
if files, err := m.finder.FindFiles(); err != nil {
m.Warnw("error occurred while finding files", "error", err.Error())
} else if len(files) == 0 {
m.Warnw("no files match the configured include patterns",
"include", m.finder.Include,
"exclude", m.finder.Exclude)
Expand Down Expand Up @@ -108,7 +110,11 @@ func (m *Manager) poll(ctx context.Context) {
batchesProcessed := 0

// Get the list of paths on disk
matches := m.finder.FindFiles()
matches, err := m.finder.FindFiles()
if err != nil {
m.Errorf("error finding files: %s", err)
}

for len(matches) > m.maxBatchFiles {
m.consume(ctx, matches[:m.maxBatchFiles])

Expand Down
242 changes: 242 additions & 0 deletions pkg/stanza/fileconsumer/file_sort.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer"

import (
"fmt"
"regexp"
"sort"
"strconv"

"go.opentelemetry.io/collector/confmap"
"go.uber.org/multierr"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/timeutils"
)

const (
sortTypeNumeric = "numeric"
sortTypeTimestamp = "timestamp"
sortTypeAlphabetical = "alphabetical"
)

type sortRule interface {
validate() error
sort(re *regexp.Regexp, files []string) ([]string, error)
}

type BaseSortRule struct {
RegexKey string `mapstructure:"regex_key,omitempty"`
Ascending bool `mapstructure:"ascending,omitempty"`
SortType string `mapstructure:"sort_type,omitempty"`
}

type SortRuleImpl struct {
sortRule
}

func (sr *SortRuleImpl) Unmarshal(component *confmap.Conf) error {
if !component.IsSet("sort_type") {
return fmt.Errorf("missing required field 'sort_type'")
}
typeInterface := component.Get("sort_type")

typeString, ok := typeInterface.(string)
if !ok {
return fmt.Errorf("non-string type %T for field 'sort_type'", typeInterface)
}

switch typeString {
case sortTypeNumeric:
var numericSortRule NumericSortRule
err := component.Unmarshal(&numericSortRule, confmap.WithErrorUnused())
if err != nil {
return err
}
sr.sortRule = numericSortRule
case sortTypeAlphabetical:
var alphabeticalSortRule AlphabeticalSortRule
err := component.Unmarshal(&alphabeticalSortRule, confmap.WithErrorUnused())
if err != nil {
return err
}
sr.sortRule = alphabeticalSortRule
case sortTypeTimestamp:
var timestampSortRule TimestampSortRule
err := component.Unmarshal(&timestampSortRule, confmap.WithErrorUnused())
if err != nil {
return err
}
sr.sortRule = timestampSortRule
default:
return fmt.Errorf("invalid sort type %s", typeString)
}

return nil
}

type NumericSortRule struct {
BaseSortRule `mapstructure:",squash"`
}

func (f NumericSortRule) validate() error {
if f.RegexKey == "" {
return fmt.Errorf("regex key must be specified for numeric sort")
}
return nil
}

type AlphabeticalSortRule struct {
BaseSortRule `mapstructure:",squash"`
}

func (f AlphabeticalSortRule) validate() error {
if f.RegexKey == "" {
return fmt.Errorf("regex key must be specified for alphabetical sort")
}
return nil
}

type TimestampSortRule struct {
BaseSortRule `mapstructure:",squash"`
Layout string `mapstructure:"layout,omitempty"`
Location string `mapstructure:"location,omitempty"`
}

func (f TimestampSortRule) validate() error {
if f.RegexKey == "" {
return fmt.Errorf("regex key must be specified for timestamp sort")
}
if f.Layout == "" {
return fmt.Errorf("format must be specified for timestamp sort")
}
if f.Location == "" {
f.Location = "UTC"
}

loc, err := timeutils.GetLocation(&f.Location, nil)
if err != nil {
return fmt.Errorf("parse location %s: %w", f.Location, err)
}

_, err = timeutils.ParseStrptime(f.Layout, "", loc)
if err != nil {
return fmt.Errorf("parse format %s: %w", f.Layout, err)
}
return nil
}

func (f NumericSortRule) sort(re *regexp.Regexp, files []string) ([]string, error) {
var errs error
sort.Slice(files, func(i, j int) bool {
valI, valJ, err := extractValues(re, f.RegexKey, files[i], files[j])
if err != nil {
errs = multierr.Append(errs, err)
return false
}

numI, err := strconv.Atoi(valI)
if err != nil {
errs = multierr.Append(errs, fmt.Errorf("parse %s to int: %w", valI, err))
return false
}

numJ, err := strconv.Atoi(valJ)
if err != nil {
errs = multierr.Append(errs, fmt.Errorf("parse %s to int: %w", valJ, err))
return false
}

if f.Ascending {
return numI < numJ
}
return numI > numJ
})

return files, errs
}

func (f TimestampSortRule) sort(re *regexp.Regexp, files []string) ([]string, error) {
// apply regex to each file and sort the results
location, err := timeutils.GetLocation(&f.Location, nil)
if err != nil {
return files, fmt.Errorf("load location %s: %w", f.Location, err)
}

var errs error

sort.Slice(files, func(i, j int) bool {
valI, valJ, err := extractValues(re, f.RegexKey, files[i], files[j])
if err != nil {
errs = multierr.Append(errs, err)
return false
}

timeI, err := timeutils.ParseStrptime(f.Layout, valI, location)
if err != nil {
errs = multierr.Append(errs, fmt.Errorf("parse %s to Time: %w", timeI, err))
return false
}

timeJ, err := timeutils.ParseStrptime(f.Layout, valJ, location)
if err != nil {
errs = multierr.Append(errs, fmt.Errorf("parse %s to Time: %w", timeI, err))
return false
}

// if ascending, return true if timeI is before timeJ
if f.Ascending {
return timeI.Before(timeJ)
}
return timeI.After(timeJ)
})

return files, errs
}

func (f AlphabeticalSortRule) sort(re *regexp.Regexp, files []string) ([]string, error) {
var errs error
sort.Slice(files, func(i, j int) bool {
valI, valJ, err := extractValues(re, f.RegexKey, files[i], files[j])
if err != nil {
errs = multierr.Append(errs, err)
return false
}

if f.Ascending {
return valI < valJ
}
return valI > valJ
})

return files, errs
}

func extractValues(re *regexp.Regexp, reKey, file1, file2 string) (string, string, error) {
valI := extractValue(re, reKey, file1)
if valI == "" {
return "", "", fmt.Errorf("find capture group %q in regex for file: %s", reKey, file1)
}
valJ := extractValue(re, reKey, file2)
if valJ == "" {
return "", "", fmt.Errorf("find capture group %q in regex for file: %s", reKey, file2)
}

return valI, valJ, nil
}

func extractValue(re *regexp.Regexp, reKey, input string) string {
match := re.FindStringSubmatch(input)
if match == nil {
return ""
}

for i, name := range re.SubexpNames() {
if name == reKey && i < len(match) {
return match[i]
}
}

return ""
}
Loading

0 comments on commit 3c9ddf9

Please sign in to comment.