From 911336cad136f583155845591a0b1e8c7dbade13 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Tue, 25 Jul 2023 11:23:42 -0400 Subject: [PATCH] [chore][fileconsumer] Extract internal header package (#24264) This continues from #24036 by moving additional complexity into an internal package. Some files contain headers that are different than the rest of the file. In such cases, we may need to extract file attributes that will be applied to logs from that file. --- pkg/stanza/fileconsumer/config.go | 17 +- pkg/stanza/fileconsumer/config_test.go | 6 +- pkg/stanza/fileconsumer/file_test.go | 1 - pkg/stanza/fileconsumer/header.go | 147 -------------- pkg/stanza/fileconsumer/header_test.go | 179 ------------------ .../fileconsumer/internal/header/config.go | 84 ++++++++ .../internal/header/config_test.go | 144 ++++++++++++++ .../fileconsumer/internal/header/output.go | 56 ++++++ .../fileconsumer/internal/header/reader.go | 75 ++++++++ .../internal/header/reader_test.go | 87 +++++++++ pkg/stanza/fileconsumer/reader.go | 92 +++------ pkg/stanza/fileconsumer/reader_factory.go | 27 +-- pkg/stanza/fileconsumer/reader_test.go | 14 +- 13 files changed, 496 insertions(+), 433 deletions(-) delete mode 100644 pkg/stanza/fileconsumer/header.go delete mode 100644 pkg/stanza/fileconsumer/header_test.go create mode 100644 pkg/stanza/fileconsumer/internal/header/config.go create mode 100644 pkg/stanza/fileconsumer/internal/header/config_test.go create mode 100644 pkg/stanza/fileconsumer/internal/header/output.go create mode 100644 pkg/stanza/fileconsumer/internal/header/reader.go create mode 100644 pkg/stanza/fileconsumer/internal/header/reader_test.go diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index cf9ada6e6373..dc3b13b3851e 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -15,6 +15,8 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" ) @@ -72,6 +74,11 @@ type Config struct { Header *HeaderConfig `mapstructure:"header,omitempty"` } +type HeaderConfig struct { + Pattern string `mapstructure:"pattern"` + MetadataOperators []operator.Config `mapstructure:"metadata_operators"` +} + // Build will build a file input operator from the supplied configuration func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback) (*Manager, error) { if err := c.validate(); err != nil { @@ -120,14 +127,14 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact return nil, fmt.Errorf("invalid start_at location '%s'", c.StartAt) } - var hs *headerSettings + var hCfg *header.Config if c.Header != nil { enc, err := c.Splitter.EncodingConfig.Build() if err != nil { return nil, fmt.Errorf("failed to create encoding: %w", err) } - hs, err = c.Header.buildHeaderSettings(enc.Encoding) + hCfg, err = header.NewConfig(c.Header.Pattern, c.Header.MetadataOperators, enc.Encoding) if err != nil { return nil, fmt.Errorf("failed to build header config: %w", err) } @@ -150,7 +157,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact fromBeginning: startAtBeginning, splitterFactory: factory, encodingConfig: c.Splitter.EncodingConfig, - headerSettings: hs, + headerConfig: hCfg, }, finder: c.MatchingCriteria, roller: newRoller(), @@ -226,13 +233,13 @@ func (c Config) validate() error { return errors.New("`max_batches` must not be negative") } - _, err := c.Splitter.EncodingConfig.Build() + enc, err := c.Splitter.EncodingConfig.Build() if err != nil { return err } if c.Header != nil { - if err := c.Header.validate(); err != nil { + if _, err := header.NewConfig(c.Header.Pattern, c.Header.MetadataOperators, enc.Encoding); err != nil { return fmt.Errorf("invalid config for `header`: %w", err) } } diff --git a/pkg/stanza/fileconsumer/config_test.go b/pkg/stanza/fileconsumer/config_test.go index 9ebef1861aa9..4b6b82edf752 100644 --- a/pkg/stanza/fileconsumer/config_test.go +++ b/pkg/stanza/fileconsumer/config_test.go @@ -790,10 +790,7 @@ func TestBuildWithHeader(t *testing.T) { }, require.NoError, func(t *testing.T, f *Manager) { - require.NotNil(t, f.readerFactory.headerSettings) - require.NotNil(t, f.readerFactory.headerSettings.matchRegex) - require.NotNil(t, f.readerFactory.headerSettings.splitFunc) - require.NotNil(t, f.readerFactory.headerSettings.config) + require.NotNil(t, f.readerFactory.headerConfig.SplitFunc) }, }, } @@ -810,7 +807,6 @@ func TestBuildWithHeader(t *testing.T) { if err != nil { return } - tc.validate(t, input) }) } diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 235bcb7e9556..a8b608c9c3e0 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -1630,7 +1630,6 @@ func TestHeaderPersistanceInHeader(t *testing.T) { }) require.NoError(t, op2.Stop()) - } func TestStalePartialFingerprintDiscarded(t *testing.T) { diff --git a/pkg/stanza/fileconsumer/header.go b/pkg/stanza/fileconsumer/header.go deleted file mode 100644 index 12b640e0df8f..000000000000 --- a/pkg/stanza/fileconsumer/header.go +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" - -import ( - "bufio" - "bytes" - "context" - "errors" - "fmt" - "regexp" - - "go.uber.org/zap" - "golang.org/x/text/encoding" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" -) - -const headerPipelineOutputType = "header_log_emitter" - -type HeaderConfig struct { - Pattern string `mapstructure:"pattern"` - MetadataOperators []operator.Config `mapstructure:"metadata_operators"` -} - -// validate returns an error describing why the configuration is invalid, or nil if the configuration is valid. -func (hc *HeaderConfig) validate() error { - if len(hc.MetadataOperators) == 0 { - return errors.New("at least one operator must be specified for `metadata_operators`") - } - - nopLogger := zap.NewNop().Sugar() - outOp := newHeaderPipelineOutput(nopLogger) - p, err := pipeline.Config{ - Operators: hc.MetadataOperators, - DefaultOutput: outOp, - }.Build(nopLogger) - - if err != nil { - return fmt.Errorf("failed to build pipelines: %w", err) - } - - for _, op := range p.Operators() { - // This is the default output we created, it's always valid - if op.Type() == headerPipelineOutputType { - continue - } - - if !op.CanProcess() { - return fmt.Errorf("operator '%s' in `metadata_operators` cannot process entries", op.ID()) - } - - if !op.CanOutput() { - return fmt.Errorf("operator '%s' in `metadata_operators` does not propagate entries", op.ID()) - } - - // Filter processor also may fail to propagate some entries - if op.Type() == "filter" { - return fmt.Errorf("operator of type filter is not allowed in `metadata_operators`") - } - } - - return nil -} - -func (hc *HeaderConfig) buildHeaderSettings(enc encoding.Encoding) (*headerSettings, error) { - var err error - matchRegex, err := regexp.Compile(hc.Pattern) - if err != nil { - return nil, fmt.Errorf("failed to compile `pattern`: %w", err) - } - - splitFunc, err := helper.NewNewlineSplitFunc(enc, false, func(b []byte) []byte { - return bytes.Trim(b, "\r\n") - }) - if err != nil { - return nil, fmt.Errorf("failed to create split func: %w", err) - } - - return &headerSettings{ - matchRegex: matchRegex, - splitFunc: splitFunc, - config: hc, - }, nil -} - -// headerSettings contains compiled objects defined by a HeaderConfig -type headerSettings struct { - matchRegex *regexp.Regexp - splitFunc bufio.SplitFunc - config *HeaderConfig -} - -// headerPipelineOutput is a stanza operator that emits log entries to a channel -type headerPipelineOutput struct { - helper.OutputOperator - logChan chan *entry.Entry -} - -// newHeaderPipelineOutput creates a new receiver output -func newHeaderPipelineOutput(logger *zap.SugaredLogger) *headerPipelineOutput { - return &headerPipelineOutput{ - OutputOperator: helper.OutputOperator{ - BasicOperator: helper.BasicOperator{ - OperatorID: headerPipelineOutputType, - OperatorType: headerPipelineOutputType, - SugaredLogger: logger, - }, - }, - logChan: make(chan *entry.Entry, 1), - } -} - -// Start starts the goroutine(s) required for this operator -func (e *headerPipelineOutput) Start(_ operator.Persister) error { - return nil -} - -// Stop will close the log channel and stop running goroutines -func (e *headerPipelineOutput) Stop() error { - return nil -} - -func (e *headerPipelineOutput) Process(_ context.Context, ent *entry.Entry) error { - // Drop the entry if logChan is full, in order to avoid this operator blocking. - // This protects against a case where an operator could return an error, but continue propagating a log entry, - // leaving an unexpected entry in the output channel. - select { - case e.logChan <- ent: - default: - } - - return nil -} - -func (e *headerPipelineOutput) WaitForEntry(ctx context.Context) (*entry.Entry, error) { - select { - case <-ctx.Done(): - return nil, fmt.Errorf("got context cancellation while waiting for entry: %w", ctx.Err()) - case ent := <-e.logChan: - return ent, nil - } -} diff --git a/pkg/stanza/fileconsumer/header_test.go b/pkg/stanza/fileconsumer/header_test.go deleted file mode 100644 index 945e6335059b..000000000000 --- a/pkg/stanza/fileconsumer/header_test.go +++ /dev/null @@ -1,179 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package fileconsumer - -import ( - "testing" - - "github.com/stretchr/testify/require" - "golang.org/x/text/encoding" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/generate" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/output/stdout" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/filter" -) - -func TestHeaderConfig_validate(t *testing.T) { - regexConf := regex.NewConfig() - regexConf.Regex = "^#(?P.*)" - - invalidRegexConf := regex.NewConfig() - invalidRegexConf.Regex = "(" - - generateConf := generate.NewConfig("") - stdoutConf := stdout.NewConfig("") - filterConfg := filter.NewConfig() - filterConfg.Expression = "true" - - testCases := []struct { - name string - conf HeaderConfig - expectedErr string - }{ - { - name: "Valid config", - conf: HeaderConfig{ - Pattern: "^#", - MetadataOperators: []operator.Config{ - { - Builder: regexConf, - }, - }, - }, - }, - { - name: "Valid without specified header size", - conf: HeaderConfig{ - Pattern: "^#", - MetadataOperators: []operator.Config{ - { - Builder: regexConf, - }, - }, - }, - }, - { - name: "No operators specified", - conf: HeaderConfig{ - Pattern: "^#", - MetadataOperators: []operator.Config{}, - }, - expectedErr: "at least one operator must be specified for `metadata_operators`", - }, - { - name: "Invalid operator specified", - conf: HeaderConfig{ - Pattern: "^#", - MetadataOperators: []operator.Config{ - { - Builder: invalidRegexConf, - }, - }, - }, - expectedErr: "failed to build pipelines:", - }, - { - name: "first operator cannot process", - conf: HeaderConfig{ - Pattern: "^#", - MetadataOperators: []operator.Config{ - { - Builder: generateConf, - }, - }, - }, - expectedErr: "operator 'generate_input' in `metadata_operators` cannot process entries", - }, - { - name: "operator cannot output", - conf: HeaderConfig{ - Pattern: "^#", - MetadataOperators: []operator.Config{ - { - Builder: stdoutConf, - }, - }, - }, - expectedErr: "operator 'stdout' in `metadata_operators` does not propagate entries", - }, - { - name: "filter operator present", - conf: HeaderConfig{ - Pattern: "^#", - MetadataOperators: []operator.Config{ - { - Builder: filterConfg, - }, - }, - }, - expectedErr: "operator of type filter is not allowed in `metadata_operators`", - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - err := tc.conf.validate() - if tc.expectedErr != "" { - require.ErrorContains(t, err, tc.expectedErr) - } else { - require.NoError(t, err) - } - }) - } -} - -func TestHeaderConfig_buildHeaderSettings(t *testing.T) { - regexConf := regex.NewConfig() - regexConf.Regex = "^#(?P.*)" - - invalidRegexConf := regex.NewConfig() - invalidRegexConf.Regex = "(" - - testCases := []struct { - name string - enc encoding.Encoding - conf HeaderConfig - expectedErr string - }{ - { - name: "valid config", - enc: encoding.Nop, - conf: HeaderConfig{ - Pattern: "^#", - MetadataOperators: []operator.Config{ - { - Builder: regexConf, - }, - }, - }, - }, - { - name: "Invalid pattern", - conf: HeaderConfig{ - Pattern: "(", - MetadataOperators: []operator.Config{ - { - Builder: regexConf, - }, - }, - }, - expectedErr: "failed to compile `pattern`:", - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - h, err := tc.conf.buildHeaderSettings(tc.enc) - if tc.expectedErr != "" { - require.ErrorContains(t, err, tc.expectedErr) - } else { - require.NoError(t, err) - require.NotNil(t, h) - } - - }) - } -} diff --git a/pkg/stanza/fileconsumer/internal/header/config.go b/pkg/stanza/fileconsumer/internal/header/config.go new file mode 100644 index 000000000000..9bd329a95ec8 --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/header/config.go @@ -0,0 +1,84 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package header // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" + +import ( + "bufio" + "bytes" + "errors" + "fmt" + "regexp" + + "go.uber.org/zap" + "golang.org/x/text/encoding" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" +) + +type Config struct { + regex *regexp.Regexp + SplitFunc bufio.SplitFunc + metadataOperators []operator.Config +} + +func NewConfig(matchRegex string, metadataOperators []operator.Config, enc encoding.Encoding) (*Config, error) { + var err error + if len(metadataOperators) == 0 { + return nil, errors.New("at least one operator must be specified for `metadata_operators`") + } + + if enc == nil { + return nil, errors.New("encoding must be specified") + } + + nopLogger := zap.NewNop().Sugar() + p, err := pipeline.Config{ + Operators: metadataOperators, + DefaultOutput: newPipelineOutput(nopLogger), + }.Build(nopLogger) + + if err != nil { + return nil, fmt.Errorf("failed to build pipelines: %w", err) + } + + for _, op := range p.Operators() { + // This is the default output we created, it's always valid + if op.Type() == pipelineOutputType { + continue + } + + if !op.CanProcess() { + return nil, fmt.Errorf("operator '%s' in `metadata_operators` cannot process entries", op.ID()) + } + + if !op.CanOutput() { + return nil, fmt.Errorf("operator '%s' in `metadata_operators` does not propagate entries", op.ID()) + } + + // Filter processor also may fail to propagate some entries + if op.Type() == "filter" { + return nil, fmt.Errorf("operator of type filter is not allowed in `metadata_operators`") + } + } + + regex, err := regexp.Compile(matchRegex) + if err != nil { + return nil, fmt.Errorf("failed to compile `pattern`: %w", err) + } + + splitFunc, err := helper.NewNewlineSplitFunc(enc, false, func(b []byte) []byte { + return bytes.Trim(b, "\r\n") + }) + if err != nil { + return nil, fmt.Errorf("failed to create split func: %w", err) + } + + return &Config{ + regex: regex, + SplitFunc: splitFunc, + metadataOperators: metadataOperators, + }, nil +} diff --git a/pkg/stanza/fileconsumer/internal/header/config_test.go b/pkg/stanza/fileconsumer/internal/header/config_test.go new file mode 100644 index 000000000000..bab02f437161 --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/header/config_test.go @@ -0,0 +1,144 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package header + +import ( + "testing" + + "github.com/stretchr/testify/require" + "golang.org/x/text/encoding" + "golang.org/x/text/encoding/unicode" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/generate" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/output/stdout" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/filter" +) + +func TestBuild(t *testing.T) { + regexConf := regex.NewConfig() + regexConf.Regex = "^#(?P.*)" + + invalidRegexConf := regex.NewConfig() + invalidRegexConf.Regex = "(" + + generateConf := generate.NewConfig("") + stdoutConf := stdout.NewConfig("") + filterConfg := filter.NewConfig() + filterConfg.Expression = "true" + + testCases := []struct { + name string + enc encoding.Encoding + pattern string + ops []operator.Config + expectedErr string + }{ + { + name: "valid config", + enc: encoding.Nop, + pattern: "^#", + ops: []operator.Config{ + { + Builder: regexConf, + }, + }, + }, + { + name: "Invalid pattern", + enc: unicode.UTF8, + pattern: "(", + ops: []operator.Config{ + { + Builder: regexConf, + }, + }, + expectedErr: "failed to compile `pattern`:", + }, + { + name: "Valid without specified header size", + enc: unicode.UTF8, + pattern: "^#", + ops: []operator.Config{ + { + Builder: regexConf, + }, + }, + }, + { + name: "No operators specified", + enc: unicode.UTF8, + pattern: "^#", + ops: []operator.Config{}, + expectedErr: "at least one operator must be specified for `metadata_operators`", + }, + { + name: "No encoding specified", + pattern: "^#", + ops: []operator.Config{ + { + Builder: regexConf, + }, + }, + expectedErr: "encoding must be specified", + }, + { + name: "Invalid operator specified", + enc: unicode.UTF8, + pattern: "^#", + ops: []operator.Config{ + { + Builder: invalidRegexConf, + }, + }, + expectedErr: "failed to build pipelines:", + }, + { + name: "first operator cannot process", + enc: unicode.UTF8, + pattern: "^#", + ops: []operator.Config{ + { + Builder: generateConf, + }, + }, + expectedErr: "operator 'generate_input' in `metadata_operators` cannot process entries", + }, + { + name: "operator cannot output", + enc: unicode.UTF8, + pattern: "^#", + ops: []operator.Config{ + { + Builder: stdoutConf, + }, + }, + expectedErr: "operator 'stdout' in `metadata_operators` does not propagate entries", + }, + { + name: "filter operator present", + enc: unicode.UTF8, + pattern: "^#", + ops: []operator.Config{ + { + Builder: filterConfg, + }, + }, + expectedErr: "operator of type filter is not allowed in `metadata_operators`", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + h, err := NewConfig(tc.pattern, tc.ops, tc.enc) + if tc.expectedErr != "" { + require.ErrorContains(t, err, tc.expectedErr) + } else { + require.NoError(t, err) + require.NotNil(t, h) + } + }) + } +} diff --git a/pkg/stanza/fileconsumer/internal/header/output.go b/pkg/stanza/fileconsumer/internal/header/output.go new file mode 100644 index 000000000000..7caf7f9a89e0 --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/header/output.go @@ -0,0 +1,56 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package header // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" + +import ( + "context" + "fmt" + + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" +) + +const pipelineOutputType = "header_log_emitter" + +// pipelineOutput is a stanza operator that emits log entries to a channel +type pipelineOutput struct { + helper.OutputOperator + logChan chan *entry.Entry +} + +// newPipelineOutput creates a new receiver output +func newPipelineOutput(logger *zap.SugaredLogger) *pipelineOutput { + return &pipelineOutput{ + OutputOperator: helper.OutputOperator{ + BasicOperator: helper.BasicOperator{ + OperatorID: pipelineOutputType, + OperatorType: pipelineOutputType, + SugaredLogger: logger, + }, + }, + logChan: make(chan *entry.Entry, 1), + } +} + +// Drop the entry if logChan is full, in order to avoid this operator blocking. +// This protects against a case where an operator could return an error, but continue propagating a log entry, +// leaving an unexpected entry in the output channel. +func (e *pipelineOutput) Process(_ context.Context, ent *entry.Entry) error { + select { + case e.logChan <- ent: + default: + } + return nil +} + +func (e *pipelineOutput) WaitForEntry(ctx context.Context) (*entry.Entry, error) { + select { + case <-ctx.Done(): + return nil, fmt.Errorf("wait for entry: %w", ctx.Err()) + case ent := <-e.logChan: + return ent, nil + } +} diff --git a/pkg/stanza/fileconsumer/internal/header/reader.go b/pkg/stanza/fileconsumer/internal/header/reader.go new file mode 100644 index 000000000000..f55b2323cfee --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/header/reader.go @@ -0,0 +1,75 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package header // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" + +import ( + "context" + "errors" + "fmt" + + "go.opentelemetry.io/collector/extension/experimental/storage" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" +) + +var ErrEndOfHeader = errors.New("end of header") + +type Reader struct { + logger *zap.SugaredLogger + cfg Config + pipeline pipeline.Pipeline + output *pipelineOutput +} + +func NewReader(logger *zap.SugaredLogger, cfg Config) (*Reader, error) { + r := &Reader{logger: logger, cfg: cfg} + var err error + r.output = newPipelineOutput(logger) + r.pipeline, err = pipeline.Config{ + Operators: cfg.metadataOperators, + DefaultOutput: r.output, + }.Build(logger) + if err != nil { + return nil, fmt.Errorf("failed to build pipeline: %w", err) + } + if err = r.pipeline.Start(storage.NewNopClient()); err != nil { + return nil, fmt.Errorf("failed to start header pipeline: %w", err) + } + return r, nil +} + +// Process checks if the given token is a line of the header, and consumes it if it is. +// An EndOfHeaderError is returned if the given line was not a header line. +func (r *Reader) Process(ctx context.Context, token []byte, fileAttributes map[string]any) error { + if !r.cfg.regex.Match(token) { + return ErrEndOfHeader + } + + firstOperator := r.pipeline.Operators()[0] + + newEntry := entry.New() + newEntry.Body = string(token) + + if err := firstOperator.Process(ctx, newEntry); err != nil { + return fmt.Errorf("process header entry: %w", err) + } + + ent, err := r.output.WaitForEntry(ctx) + if err != nil { + return fmt.Errorf("wait for header entry: %w", err) + } + + // Copy resultant attributes over current set of attributes (upsert) + for k, v := range ent.Attributes { + // fileAttributes is an output parameter + fileAttributes[k] = v + } + return nil +} + +func (r *Reader) Stop() error { + return r.pipeline.Stop() +} diff --git a/pkg/stanza/fileconsumer/internal/header/reader_test.go b/pkg/stanza/fileconsumer/internal/header/reader_test.go new file mode 100644 index 000000000000..721306e7e5eb --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/header/reader_test.go @@ -0,0 +1,87 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package header + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + "golang.org/x/text/encoding/unicode" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/keyvalue" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex" +) + +func TestReader(t *testing.T) { + logger := zaptest.NewLogger(t).Sugar() + + regexConf := regex.NewConfig() + regexConf.Regex = "^#(?P.*)" + regexConf.ParseTo = entry.RootableField{Field: entry.NewBodyField()} + + kvConf := keyvalue.NewConfig() + kvConf.ParseFrom = entry.NewBodyField("header_line") + kvConf.Delimiter = ":" + + cfg, err := NewConfig("^#", []operator.Config{ + {Builder: regexConf}, + {Builder: kvConf}, + }, unicode.UTF8) + require.NoError(t, err) + + reader, err := NewReader(logger, *cfg) + assert.NoError(t, err) + + attrs := make(map[string]any) + assert.NoError(t, reader.Process(context.Background(), []byte("# foo:bar\n"), attrs)) + assert.NoError(t, reader.Process(context.Background(), []byte("# hello:world\n"), attrs)) + assert.ErrorIs(t, reader.Process(context.Background(), []byte("First log line"), attrs), ErrEndOfHeader) + assert.Len(t, attrs, 2) + assert.Equal(t, "bar", attrs["foo"]) + assert.Equal(t, "world", attrs["hello"]) + + assert.NoError(t, reader.Stop()) +} + +func TestSkipUnmatchedHeaderLine(t *testing.T) { + logger := zaptest.NewLogger(t).Sugar() + + regexConf := regex.NewConfig() + regexConf.Regex = "^#(?P.*)" + regexConf.ParseTo = entry.RootableField{Field: entry.NewBodyField()} + + kvConf := keyvalue.NewConfig() + kvConf.ParseFrom = entry.NewBodyField("header_line") + kvConf.Delimiter = ":" + + cfg, err := NewConfig("^#", []operator.Config{ + {Builder: regexConf}, + {Builder: kvConf}, + }, unicode.UTF8) + require.NoError(t, err) + + reader, err := NewReader(logger, *cfg) + assert.NoError(t, err) + + attrs := make(map[string]any) + assert.NoError(t, reader.Process(context.Background(), []byte("# foo:bar\n"), attrs)) + assert.NoError(t, reader.Process(context.Background(), []byte("# matches header regex but not metadata operator assumptions\n"), attrs)) + assert.NoError(t, reader.Process(context.Background(), []byte("# hello:world\n"), attrs)) + assert.ErrorIs(t, reader.Process(context.Background(), []byte("First log line"), attrs), ErrEndOfHeader) + assert.Len(t, attrs, 2) + assert.Equal(t, "bar", attrs["foo"]) + assert.Equal(t, "world", attrs["hello"]) + + assert.NoError(t, reader.Stop()) +} + +func TestNewReaderErr(t *testing.T) { + _, err := NewReader(nil, Config{}) + assert.Error(t, err) +} diff --git a/pkg/stanza/fileconsumer/reader.go b/pkg/stanza/fileconsumer/reader.go index ef7b0cb490d4..9f1dc61c3591 100644 --- a/pkg/stanza/fileconsumer/reader.go +++ b/pkg/stanza/fileconsumer/reader.go @@ -6,17 +6,17 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collecto import ( "bufio" "context" + "errors" "fmt" "os" "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" ) type readerConfig struct { @@ -48,11 +48,7 @@ type Reader struct { eof bool HeaderFinalized bool - recreateScanner bool - - headerSettings *headerSettings - headerPipeline pipeline.Pipeline - headerPipelineOutput *headerPipelineOutput + headerReader *header.Reader } // offsetToEnd sets the starting offset @@ -96,70 +92,36 @@ func (r *Reader) ReadToEnd(ctx context.Context) { token, err := r.encoding.Decode(s.Bytes()) if err != nil { r.Errorw("decode: %w", zap.Error(err)) - } else if err = r.processFunc(ctx, token, r.FileAttributes); err != nil { - r.Errorw("process: %w", zap.Error(err)) - } - - if r.recreateScanner { - r.recreateScanner = false - // recreate the scanner with the log-line's split func. - // We do not use the updated offset from the scanner, - // as the log line we just read could be multiline, and would be - // split differently with the new splitter. - if _, err := r.file.Seek(r.Offset, 0); err != nil { - r.Errorw("Failed to seek post-header", zap.Error(err)) - return + } else if err := r.processFunc(ctx, token, r.FileAttributes); err != nil { + if errors.Is(err, header.ErrEndOfHeader) { + r.finalizeHeader() + + // Now that the header is consumed, use the normal split and process functions. + // Recreate the scanner with the normal split func. + // Do not use the updated offset from the old scanner, as the most recent token + // could be split differently with the new splitter. + r.splitFunc = r.lineSplitFunc + r.processFunc = r.emit + if _, err = r.file.Seek(r.Offset, 0); err != nil { + r.Errorw("Failed to seek post-header", zap.Error(err)) + return + } + s = scanner.New(r, r.maxLogSize, scanner.DefaultBufferSize, r.Offset, r.splitFunc) + } else { + r.Errorw("process: %w", zap.Error(err)) } - - s = scanner.New(r, r.maxLogSize, scanner.DefaultBufferSize, r.Offset, r.splitFunc) } r.Offset = s.Pos() } } -// consumeHeaderLine checks if the given token is a line of the header, and consumes it if it is. -// The return value dictates whether the given line was a header line or not. -// If false is returned, the full header can be assumed to be read. -func (r *Reader) consumeHeaderLine(ctx context.Context, token []byte, _ map[string]any) error { - if !r.headerSettings.matchRegex.Match(token) { - // Finalize and cleanup the pipeline - r.HeaderFinalized = true - - // Stop and drop the header pipeline. - if err := r.headerPipeline.Stop(); err != nil { - return fmt.Errorf("stop header pipeline: %w", err) - } - r.headerPipeline = nil - r.headerPipelineOutput = nil - - // Use the line split func instead of the header split func - r.splitFunc = r.lineSplitFunc - r.processFunc = r.emit - // Mark that we should recreate the scanner, since we changed the split function - r.recreateScanner = true - return nil +func (r *Reader) finalizeHeader() { + if err := r.headerReader.Stop(); err != nil { + r.Errorw("Failed to stop header pipeline during finalization", zap.Error(err)) } - - firstOperator := r.headerPipeline.Operators()[0] - - newEntry := entry.New() - newEntry.Body = string(token) - - if err := firstOperator.Process(ctx, newEntry); err != nil { - return fmt.Errorf("process header entry: %w", err) - } - - ent, err := r.headerPipelineOutput.WaitForEntry(ctx) - if err != nil { - return fmt.Errorf("wait for header entry: %w", err) - } - - // Copy resultant attributes over current set of attributes (upsert) - for k, v := range ent.Attributes { - r.FileAttributes[k] = v - } - return nil + r.headerReader = nil + r.HeaderFinalized = true } // Close will close the file @@ -170,8 +132,8 @@ func (r *Reader) Close() { } } - if r.headerPipeline != nil { - if err := r.headerPipeline.Stop(); err != nil { + if r.headerReader != nil { + if err := r.headerReader.Stop(); err != nil { r.Errorw("Failed to stop header pipeline", zap.Error(err)) } } diff --git a/pkg/stanza/fileconsumer/reader_factory.go b/pkg/stanza/fileconsumer/reader_factory.go index 98c44a1e8d40..416f4876cfa7 100644 --- a/pkg/stanza/fileconsumer/reader_factory.go +++ b/pkg/stanza/fileconsumer/reader_factory.go @@ -5,18 +5,16 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collecto import ( "bufio" - "fmt" "os" "path/filepath" "runtime" - "go.opentelemetry.io/collector/extension/experimental/storage" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/util" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" ) type readerFactory struct { @@ -25,7 +23,7 @@ type readerFactory struct { fromBeginning bool splitterFactory splitterFactory encodingConfig helper.EncodingConfig - headerSettings *headerSettings + headerConfig *header.Config } func (f *readerFactory) newReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader, error) { @@ -103,7 +101,6 @@ func (b *readerBuilder) build() (r *Reader, err error) { r = &Reader{ readerConfig: b.readerConfig, Offset: b.offset, - headerSettings: b.headerSettings, HeaderFinalized: b.headerFinalized, FileAttributes: b.fileAttributes, } @@ -122,26 +119,16 @@ func (b *readerBuilder) build() (r *Reader, err error) { return nil, err } - if b.headerSettings == nil || b.headerFinalized { + if b.headerConfig == nil || b.headerFinalized { r.splitFunc = r.lineSplitFunc r.processFunc = b.readerConfig.emit } else { - // We are reading the header. Use the header split func - r.splitFunc = b.headerSettings.splitFunc - r.processFunc = r.consumeHeaderLine - - // Create the header pipeline - r.headerPipelineOutput = newHeaderPipelineOutput(b.SugaredLogger) - r.headerPipeline, err = pipeline.Config{ - Operators: b.headerSettings.config.MetadataOperators, - DefaultOutput: r.headerPipelineOutput, - }.Build(b.SugaredLogger) + r.splitFunc = b.headerConfig.SplitFunc + r.headerReader, err = header.NewReader(b.SugaredLogger, *b.headerConfig) if err != nil { - return nil, fmt.Errorf("failed to build pipeline: %w", err) - } - if err = r.headerPipeline.Start(storage.NewNopClient()); err != nil { - return nil, fmt.Errorf("failed to start header pipeline: %w", err) + return nil, err } + r.processFunc = r.headerReader.Process } if b.file == nil { diff --git a/pkg/stanza/fileconsumer/reader_test.go b/pkg/stanza/fileconsumer/reader_test.go index aedd9d99e026..cba92842a1d7 100644 --- a/pkg/stanza/fileconsumer/reader_test.go +++ b/pkg/stanza/fileconsumer/reader_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/require" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex" @@ -165,23 +166,14 @@ func TestHeaderFingerprintIncluded(t *testing.T) { regexConf := regex.NewConfig() regexConf.Regex = "^#(?P
.*)" - headerConf := &HeaderConfig{ - Pattern: "^#", - MetadataOperators: []operator.Config{ - { - Builder: regexConf, - }, - }, - } - enc, err := helper.EncodingConfig{ Encoding: "utf-8", }.Build() require.NoError(t, err) - h, err := headerConf.buildHeaderSettings(enc.Encoding) + h, err := header.NewConfig("^#", []operator.Config{{Builder: regexConf}}, enc.Encoding) require.NoError(t, err) - f.headerSettings = h + f.headerConfig = h temp := openTemp(t, t.TempDir())