diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index cf9ada6e6373..dc3b13b3851e 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -15,6 +15,8 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" ) @@ -72,6 +74,11 @@ type Config struct { Header *HeaderConfig `mapstructure:"header,omitempty"` } +type HeaderConfig struct { + Pattern string `mapstructure:"pattern"` + MetadataOperators []operator.Config `mapstructure:"metadata_operators"` +} + // Build will build a file input operator from the supplied configuration func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback) (*Manager, error) { if err := c.validate(); err != nil { @@ -120,14 +127,14 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact return nil, fmt.Errorf("invalid start_at location '%s'", c.StartAt) } - var hs *headerSettings + var hCfg *header.Config if c.Header != nil { enc, err := c.Splitter.EncodingConfig.Build() if err != nil { return nil, fmt.Errorf("failed to create encoding: %w", err) } - hs, err = c.Header.buildHeaderSettings(enc.Encoding) + hCfg, err = header.NewConfig(c.Header.Pattern, c.Header.MetadataOperators, enc.Encoding) if err != nil { return nil, fmt.Errorf("failed to build header config: %w", err) } @@ -150,7 +157,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact fromBeginning: startAtBeginning, splitterFactory: factory, encodingConfig: c.Splitter.EncodingConfig, - headerSettings: hs, + headerConfig: hCfg, }, finder: c.MatchingCriteria, roller: newRoller(), @@ -226,13 +233,13 @@ func (c Config) validate() error { return errors.New("`max_batches` must not be negative") } - _, err := c.Splitter.EncodingConfig.Build() + enc, err := c.Splitter.EncodingConfig.Build() if err != nil { return err } if c.Header != nil { - if err := c.Header.validate(); err != nil { + if _, err := header.NewConfig(c.Header.Pattern, c.Header.MetadataOperators, enc.Encoding); err != nil { return fmt.Errorf("invalid config for `header`: %w", err) } } diff --git a/pkg/stanza/fileconsumer/config_test.go b/pkg/stanza/fileconsumer/config_test.go index 9ebef1861aa9..4b6b82edf752 100644 --- a/pkg/stanza/fileconsumer/config_test.go +++ b/pkg/stanza/fileconsumer/config_test.go @@ -790,10 +790,7 @@ func TestBuildWithHeader(t *testing.T) { }, require.NoError, func(t *testing.T, f *Manager) { - require.NotNil(t, f.readerFactory.headerSettings) - require.NotNil(t, f.readerFactory.headerSettings.matchRegex) - require.NotNil(t, f.readerFactory.headerSettings.splitFunc) - require.NotNil(t, f.readerFactory.headerSettings.config) + require.NotNil(t, f.readerFactory.headerConfig.SplitFunc) }, }, } @@ -810,7 +807,6 @@ func TestBuildWithHeader(t *testing.T) { if err != nil { return } - tc.validate(t, input) }) } diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 235bcb7e9556..a8b608c9c3e0 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -1630,7 +1630,6 @@ func TestHeaderPersistanceInHeader(t *testing.T) { }) require.NoError(t, op2.Stop()) - } func TestStalePartialFingerprintDiscarded(t *testing.T) { diff --git a/pkg/stanza/fileconsumer/header.go b/pkg/stanza/fileconsumer/header.go deleted file mode 100644 index 12b640e0df8f..000000000000 --- a/pkg/stanza/fileconsumer/header.go +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" - -import ( - "bufio" - "bytes" - "context" - "errors" - "fmt" - "regexp" - - "go.uber.org/zap" - "golang.org/x/text/encoding" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" -) - -const headerPipelineOutputType = "header_log_emitter" - -type HeaderConfig struct { - Pattern string `mapstructure:"pattern"` - MetadataOperators []operator.Config `mapstructure:"metadata_operators"` -} - -// validate returns an error describing why the configuration is invalid, or nil if the configuration is valid. -func (hc *HeaderConfig) validate() error { - if len(hc.MetadataOperators) == 0 { - return errors.New("at least one operator must be specified for `metadata_operators`") - } - - nopLogger := zap.NewNop().Sugar() - outOp := newHeaderPipelineOutput(nopLogger) - p, err := pipeline.Config{ - Operators: hc.MetadataOperators, - DefaultOutput: outOp, - }.Build(nopLogger) - - if err != nil { - return fmt.Errorf("failed to build pipelines: %w", err) - } - - for _, op := range p.Operators() { - // This is the default output we created, it's always valid - if op.Type() == headerPipelineOutputType { - continue - } - - if !op.CanProcess() { - return fmt.Errorf("operator '%s' in `metadata_operators` cannot process entries", op.ID()) - } - - if !op.CanOutput() { - return fmt.Errorf("operator '%s' in `metadata_operators` does not propagate entries", op.ID()) - } - - // Filter processor also may fail to propagate some entries - if op.Type() == "filter" { - return fmt.Errorf("operator of type filter is not allowed in `metadata_operators`") - } - } - - return nil -} - -func (hc *HeaderConfig) buildHeaderSettings(enc encoding.Encoding) (*headerSettings, error) { - var err error - matchRegex, err := regexp.Compile(hc.Pattern) - if err != nil { - return nil, fmt.Errorf("failed to compile `pattern`: %w", err) - } - - splitFunc, err := helper.NewNewlineSplitFunc(enc, false, func(b []byte) []byte { - return bytes.Trim(b, "\r\n") - }) - if err != nil { - return nil, fmt.Errorf("failed to create split func: %w", err) - } - - return &headerSettings{ - matchRegex: matchRegex, - splitFunc: splitFunc, - config: hc, - }, nil -} - -// headerSettings contains compiled objects defined by a HeaderConfig -type headerSettings struct { - matchRegex *regexp.Regexp - splitFunc bufio.SplitFunc - config *HeaderConfig -} - -// headerPipelineOutput is a stanza operator that emits log entries to a channel -type headerPipelineOutput struct { - helper.OutputOperator - logChan chan *entry.Entry -} - -// newHeaderPipelineOutput creates a new receiver output -func newHeaderPipelineOutput(logger *zap.SugaredLogger) *headerPipelineOutput { - return &headerPipelineOutput{ - OutputOperator: helper.OutputOperator{ - BasicOperator: helper.BasicOperator{ - OperatorID: headerPipelineOutputType, - OperatorType: headerPipelineOutputType, - SugaredLogger: logger, - }, - }, - logChan: make(chan *entry.Entry, 1), - } -} - -// Start starts the goroutine(s) required for this operator -func (e *headerPipelineOutput) Start(_ operator.Persister) error { - return nil -} - -// Stop will close the log channel and stop running goroutines -func (e *headerPipelineOutput) Stop() error { - return nil -} - -func (e *headerPipelineOutput) Process(_ context.Context, ent *entry.Entry) error { - // Drop the entry if logChan is full, in order to avoid this operator blocking. - // This protects against a case where an operator could return an error, but continue propagating a log entry, - // leaving an unexpected entry in the output channel. - select { - case e.logChan <- ent: - default: - } - - return nil -} - -func (e *headerPipelineOutput) WaitForEntry(ctx context.Context) (*entry.Entry, error) { - select { - case <-ctx.Done(): - return nil, fmt.Errorf("got context cancellation while waiting for entry: %w", ctx.Err()) - case ent := <-e.logChan: - return ent, nil - } -} diff --git a/pkg/stanza/fileconsumer/header_test.go b/pkg/stanza/fileconsumer/header_test.go deleted file mode 100644 index 945e6335059b..000000000000 --- a/pkg/stanza/fileconsumer/header_test.go +++ /dev/null @@ -1,179 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package fileconsumer - -import ( - "testing" - - "github.com/stretchr/testify/require" - "golang.org/x/text/encoding" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/generate" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/output/stdout" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/filter" -) - -func TestHeaderConfig_validate(t *testing.T) { - regexConf := regex.NewConfig() - regexConf.Regex = "^#(?P.*)" - - invalidRegexConf := regex.NewConfig() - invalidRegexConf.Regex = "(" - - generateConf := generate.NewConfig("") - stdoutConf := stdout.NewConfig("") - filterConfg := filter.NewConfig() - filterConfg.Expression = "true" - - testCases := []struct { - name string - conf HeaderConfig - expectedErr string - }{ - { - name: "Valid config", - conf: HeaderConfig{ - Pattern: "^#", - MetadataOperators: []operator.Config{ - { - Builder: regexConf, - }, - }, - }, - }, - { - name: "Valid without specified header size", - conf: HeaderConfig{ - Pattern: "^#", - MetadataOperators: []operator.Config{ - { - Builder: regexConf, - }, - }, - }, - }, - { - name: "No operators specified", - conf: HeaderConfig{ - Pattern: "^#", - MetadataOperators: []operator.Config{}, - }, - expectedErr: "at least one operator must be specified for `metadata_operators`", - }, - { - name: "Invalid operator specified", - conf: HeaderConfig{ - Pattern: "^#", - MetadataOperators: []operator.Config{ - { - Builder: invalidRegexConf, - }, - }, - }, - expectedErr: "failed to build pipelines:", - }, - { - name: "first operator cannot process", - conf: HeaderConfig{ - Pattern: "^#", - MetadataOperators: []operator.Config{ - { - Builder: generateConf, - }, - }, - }, - expectedErr: "operator 'generate_input' in `metadata_operators` cannot process entries", - }, - { - name: "operator cannot output", - conf: HeaderConfig{ - Pattern: "^#", - MetadataOperators: []operator.Config{ - { - Builder: stdoutConf, - }, - }, - }, - expectedErr: "operator 'stdout' in `metadata_operators` does not propagate entries", - }, - { - name: "filter operator present", - conf: HeaderConfig{ - Pattern: "^#", - MetadataOperators: []operator.Config{ - { - Builder: filterConfg, - }, - }, - }, - expectedErr: "operator of type filter is not allowed in `metadata_operators`", - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - err := tc.conf.validate() - if tc.expectedErr != "" { - require.ErrorContains(t, err, tc.expectedErr) - } else { - require.NoError(t, err) - } - }) - } -} - -func TestHeaderConfig_buildHeaderSettings(t *testing.T) { - regexConf := regex.NewConfig() - regexConf.Regex = "^#(?P.*)" - - invalidRegexConf := regex.NewConfig() - invalidRegexConf.Regex = "(" - - testCases := []struct { - name string - enc encoding.Encoding - conf HeaderConfig - expectedErr string - }{ - { - name: "valid config", - enc: encoding.Nop, - conf: HeaderConfig{ - Pattern: "^#", - MetadataOperators: []operator.Config{ - { - Builder: regexConf, - }, - }, - }, - }, - { - name: "Invalid pattern", - conf: HeaderConfig{ - Pattern: "(", - MetadataOperators: []operator.Config{ - { - Builder: regexConf, - }, - }, - }, - expectedErr: "failed to compile `pattern`:", - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - h, err := tc.conf.buildHeaderSettings(tc.enc) - if tc.expectedErr != "" { - require.ErrorContains(t, err, tc.expectedErr) - } else { - require.NoError(t, err) - require.NotNil(t, h) - } - - }) - } -} diff --git a/pkg/stanza/fileconsumer/internal/header/config.go b/pkg/stanza/fileconsumer/internal/header/config.go new file mode 100644 index 000000000000..9bd329a95ec8 --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/header/config.go @@ -0,0 +1,84 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package header // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" + +import ( + "bufio" + "bytes" + "errors" + "fmt" + "regexp" + + "go.uber.org/zap" + "golang.org/x/text/encoding" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" +) + +type Config struct { + regex *regexp.Regexp + SplitFunc bufio.SplitFunc + metadataOperators []operator.Config +} + +func NewConfig(matchRegex string, metadataOperators []operator.Config, enc encoding.Encoding) (*Config, error) { + var err error + if len(metadataOperators) == 0 { + return nil, errors.New("at least one operator must be specified for `metadata_operators`") + } + + if enc == nil { + return nil, errors.New("encoding must be specified") + } + + nopLogger := zap.NewNop().Sugar() + p, err := pipeline.Config{ + Operators: metadataOperators, + DefaultOutput: newPipelineOutput(nopLogger), + }.Build(nopLogger) + + if err != nil { + return nil, fmt.Errorf("failed to build pipelines: %w", err) + } + + for _, op := range p.Operators() { + // This is the default output we created, it's always valid + if op.Type() == pipelineOutputType { + continue + } + + if !op.CanProcess() { + return nil, fmt.Errorf("operator '%s' in `metadata_operators` cannot process entries", op.ID()) + } + + if !op.CanOutput() { + return nil, fmt.Errorf("operator '%s' in `metadata_operators` does not propagate entries", op.ID()) + } + + // Filter processor also may fail to propagate some entries + if op.Type() == "filter" { + return nil, fmt.Errorf("operator of type filter is not allowed in `metadata_operators`") + } + } + + regex, err := regexp.Compile(matchRegex) + if err != nil { + return nil, fmt.Errorf("failed to compile `pattern`: %w", err) + } + + splitFunc, err := helper.NewNewlineSplitFunc(enc, false, func(b []byte) []byte { + return bytes.Trim(b, "\r\n") + }) + if err != nil { + return nil, fmt.Errorf("failed to create split func: %w", err) + } + + return &Config{ + regex: regex, + SplitFunc: splitFunc, + metadataOperators: metadataOperators, + }, nil +} diff --git a/pkg/stanza/fileconsumer/internal/header/config_test.go b/pkg/stanza/fileconsumer/internal/header/config_test.go new file mode 100644 index 000000000000..bab02f437161 --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/header/config_test.go @@ -0,0 +1,144 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package header + +import ( + "testing" + + "github.com/stretchr/testify/require" + "golang.org/x/text/encoding" + "golang.org/x/text/encoding/unicode" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/generate" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/output/stdout" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/filter" +) + +func TestBuild(t *testing.T) { + regexConf := regex.NewConfig() + regexConf.Regex = "^#(?P.*)" + + invalidRegexConf := regex.NewConfig() + invalidRegexConf.Regex = "(" + + generateConf := generate.NewConfig("") + stdoutConf := stdout.NewConfig("") + filterConfg := filter.NewConfig() + filterConfg.Expression = "true" + + testCases := []struct { + name string + enc encoding.Encoding + pattern string + ops []operator.Config + expectedErr string + }{ + { + name: "valid config", + enc: encoding.Nop, + pattern: "^#", + ops: []operator.Config{ + { + Builder: regexConf, + }, + }, + }, + { + name: "Invalid pattern", + enc: unicode.UTF8, + pattern: "(", + ops: []operator.Config{ + { + Builder: regexConf, + }, + }, + expectedErr: "failed to compile `pattern`:", + }, + { + name: "Valid without specified header size", + enc: unicode.UTF8, + pattern: "^#", + ops: []operator.Config{ + { + Builder: regexConf, + }, + }, + }, + { + name: "No operators specified", + enc: unicode.UTF8, + pattern: "^#", + ops: []operator.Config{}, + expectedErr: "at least one operator must be specified for `metadata_operators`", + }, + { + name: "No encoding specified", + pattern: "^#", + ops: []operator.Config{ + { + Builder: regexConf, + }, + }, + expectedErr: "encoding must be specified", + }, + { + name: "Invalid operator specified", + enc: unicode.UTF8, + pattern: "^#", + ops: []operator.Config{ + { + Builder: invalidRegexConf, + }, + }, + expectedErr: "failed to build pipelines:", + }, + { + name: "first operator cannot process", + enc: unicode.UTF8, + pattern: "^#", + ops: []operator.Config{ + { + Builder: generateConf, + }, + }, + expectedErr: "operator 'generate_input' in `metadata_operators` cannot process entries", + }, + { + name: "operator cannot output", + enc: unicode.UTF8, + pattern: "^#", + ops: []operator.Config{ + { + Builder: stdoutConf, + }, + }, + expectedErr: "operator 'stdout' in `metadata_operators` does not propagate entries", + }, + { + name: "filter operator present", + enc: unicode.UTF8, + pattern: "^#", + ops: []operator.Config{ + { + Builder: filterConfg, + }, + }, + expectedErr: "operator of type filter is not allowed in `metadata_operators`", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + h, err := NewConfig(tc.pattern, tc.ops, tc.enc) + if tc.expectedErr != "" { + require.ErrorContains(t, err, tc.expectedErr) + } else { + require.NoError(t, err) + require.NotNil(t, h) + } + }) + } +} diff --git a/pkg/stanza/fileconsumer/internal/header/output.go b/pkg/stanza/fileconsumer/internal/header/output.go new file mode 100644 index 000000000000..7caf7f9a89e0 --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/header/output.go @@ -0,0 +1,56 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package header // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" + +import ( + "context" + "fmt" + + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" +) + +const pipelineOutputType = "header_log_emitter" + +// pipelineOutput is a stanza operator that emits log entries to a channel +type pipelineOutput struct { + helper.OutputOperator + logChan chan *entry.Entry +} + +// newPipelineOutput creates a new receiver output +func newPipelineOutput(logger *zap.SugaredLogger) *pipelineOutput { + return &pipelineOutput{ + OutputOperator: helper.OutputOperator{ + BasicOperator: helper.BasicOperator{ + OperatorID: pipelineOutputType, + OperatorType: pipelineOutputType, + SugaredLogger: logger, + }, + }, + logChan: make(chan *entry.Entry, 1), + } +} + +// Drop the entry if logChan is full, in order to avoid this operator blocking. +// This protects against a case where an operator could return an error, but continue propagating a log entry, +// leaving an unexpected entry in the output channel. +func (e *pipelineOutput) Process(_ context.Context, ent *entry.Entry) error { + select { + case e.logChan <- ent: + default: + } + return nil +} + +func (e *pipelineOutput) WaitForEntry(ctx context.Context) (*entry.Entry, error) { + select { + case <-ctx.Done(): + return nil, fmt.Errorf("wait for entry: %w", ctx.Err()) + case ent := <-e.logChan: + return ent, nil + } +} diff --git a/pkg/stanza/fileconsumer/internal/header/reader.go b/pkg/stanza/fileconsumer/internal/header/reader.go new file mode 100644 index 000000000000..f55b2323cfee --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/header/reader.go @@ -0,0 +1,75 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package header // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" + +import ( + "context" + "errors" + "fmt" + + "go.opentelemetry.io/collector/extension/experimental/storage" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" +) + +var ErrEndOfHeader = errors.New("end of header") + +type Reader struct { + logger *zap.SugaredLogger + cfg Config + pipeline pipeline.Pipeline + output *pipelineOutput +} + +func NewReader(logger *zap.SugaredLogger, cfg Config) (*Reader, error) { + r := &Reader{logger: logger, cfg: cfg} + var err error + r.output = newPipelineOutput(logger) + r.pipeline, err = pipeline.Config{ + Operators: cfg.metadataOperators, + DefaultOutput: r.output, + }.Build(logger) + if err != nil { + return nil, fmt.Errorf("failed to build pipeline: %w", err) + } + if err = r.pipeline.Start(storage.NewNopClient()); err != nil { + return nil, fmt.Errorf("failed to start header pipeline: %w", err) + } + return r, nil +} + +// Process checks if the given token is a line of the header, and consumes it if it is. +// An EndOfHeaderError is returned if the given line was not a header line. +func (r *Reader) Process(ctx context.Context, token []byte, fileAttributes map[string]any) error { + if !r.cfg.regex.Match(token) { + return ErrEndOfHeader + } + + firstOperator := r.pipeline.Operators()[0] + + newEntry := entry.New() + newEntry.Body = string(token) + + if err := firstOperator.Process(ctx, newEntry); err != nil { + return fmt.Errorf("process header entry: %w", err) + } + + ent, err := r.output.WaitForEntry(ctx) + if err != nil { + return fmt.Errorf("wait for header entry: %w", err) + } + + // Copy resultant attributes over current set of attributes (upsert) + for k, v := range ent.Attributes { + // fileAttributes is an output parameter + fileAttributes[k] = v + } + return nil +} + +func (r *Reader) Stop() error { + return r.pipeline.Stop() +} diff --git a/pkg/stanza/fileconsumer/internal/header/reader_test.go b/pkg/stanza/fileconsumer/internal/header/reader_test.go new file mode 100644 index 000000000000..721306e7e5eb --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/header/reader_test.go @@ -0,0 +1,87 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package header + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + "golang.org/x/text/encoding/unicode" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/keyvalue" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex" +) + +func TestReader(t *testing.T) { + logger := zaptest.NewLogger(t).Sugar() + + regexConf := regex.NewConfig() + regexConf.Regex = "^#(?P.*)" + regexConf.ParseTo = entry.RootableField{Field: entry.NewBodyField()} + + kvConf := keyvalue.NewConfig() + kvConf.ParseFrom = entry.NewBodyField("header_line") + kvConf.Delimiter = ":" + + cfg, err := NewConfig("^#", []operator.Config{ + {Builder: regexConf}, + {Builder: kvConf}, + }, unicode.UTF8) + require.NoError(t, err) + + reader, err := NewReader(logger, *cfg) + assert.NoError(t, err) + + attrs := make(map[string]any) + assert.NoError(t, reader.Process(context.Background(), []byte("# foo:bar\n"), attrs)) + assert.NoError(t, reader.Process(context.Background(), []byte("# hello:world\n"), attrs)) + assert.ErrorIs(t, reader.Process(context.Background(), []byte("First log line"), attrs), ErrEndOfHeader) + assert.Len(t, attrs, 2) + assert.Equal(t, "bar", attrs["foo"]) + assert.Equal(t, "world", attrs["hello"]) + + assert.NoError(t, reader.Stop()) +} + +func TestSkipUnmatchedHeaderLine(t *testing.T) { + logger := zaptest.NewLogger(t).Sugar() + + regexConf := regex.NewConfig() + regexConf.Regex = "^#(?P.*)" + regexConf.ParseTo = entry.RootableField{Field: entry.NewBodyField()} + + kvConf := keyvalue.NewConfig() + kvConf.ParseFrom = entry.NewBodyField("header_line") + kvConf.Delimiter = ":" + + cfg, err := NewConfig("^#", []operator.Config{ + {Builder: regexConf}, + {Builder: kvConf}, + }, unicode.UTF8) + require.NoError(t, err) + + reader, err := NewReader(logger, *cfg) + assert.NoError(t, err) + + attrs := make(map[string]any) + assert.NoError(t, reader.Process(context.Background(), []byte("# foo:bar\n"), attrs)) + assert.NoError(t, reader.Process(context.Background(), []byte("# matches header regex but not metadata operator assumptions\n"), attrs)) + assert.NoError(t, reader.Process(context.Background(), []byte("# hello:world\n"), attrs)) + assert.ErrorIs(t, reader.Process(context.Background(), []byte("First log line"), attrs), ErrEndOfHeader) + assert.Len(t, attrs, 2) + assert.Equal(t, "bar", attrs["foo"]) + assert.Equal(t, "world", attrs["hello"]) + + assert.NoError(t, reader.Stop()) +} + +func TestNewReaderErr(t *testing.T) { + _, err := NewReader(nil, Config{}) + assert.Error(t, err) +} diff --git a/pkg/stanza/fileconsumer/reader.go b/pkg/stanza/fileconsumer/reader.go index ef7b0cb490d4..9f1dc61c3591 100644 --- a/pkg/stanza/fileconsumer/reader.go +++ b/pkg/stanza/fileconsumer/reader.go @@ -6,17 +6,17 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collecto import ( "bufio" "context" + "errors" "fmt" "os" "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" ) type readerConfig struct { @@ -48,11 +48,7 @@ type Reader struct { eof bool HeaderFinalized bool - recreateScanner bool - - headerSettings *headerSettings - headerPipeline pipeline.Pipeline - headerPipelineOutput *headerPipelineOutput + headerReader *header.Reader } // offsetToEnd sets the starting offset @@ -96,70 +92,36 @@ func (r *Reader) ReadToEnd(ctx context.Context) { token, err := r.encoding.Decode(s.Bytes()) if err != nil { r.Errorw("decode: %w", zap.Error(err)) - } else if err = r.processFunc(ctx, token, r.FileAttributes); err != nil { - r.Errorw("process: %w", zap.Error(err)) - } - - if r.recreateScanner { - r.recreateScanner = false - // recreate the scanner with the log-line's split func. - // We do not use the updated offset from the scanner, - // as the log line we just read could be multiline, and would be - // split differently with the new splitter. - if _, err := r.file.Seek(r.Offset, 0); err != nil { - r.Errorw("Failed to seek post-header", zap.Error(err)) - return + } else if err := r.processFunc(ctx, token, r.FileAttributes); err != nil { + if errors.Is(err, header.ErrEndOfHeader) { + r.finalizeHeader() + + // Now that the header is consumed, use the normal split and process functions. + // Recreate the scanner with the normal split func. + // Do not use the updated offset from the old scanner, as the most recent token + // could be split differently with the new splitter. + r.splitFunc = r.lineSplitFunc + r.processFunc = r.emit + if _, err = r.file.Seek(r.Offset, 0); err != nil { + r.Errorw("Failed to seek post-header", zap.Error(err)) + return + } + s = scanner.New(r, r.maxLogSize, scanner.DefaultBufferSize, r.Offset, r.splitFunc) + } else { + r.Errorw("process: %w", zap.Error(err)) } - - s = scanner.New(r, r.maxLogSize, scanner.DefaultBufferSize, r.Offset, r.splitFunc) } r.Offset = s.Pos() } } -// consumeHeaderLine checks if the given token is a line of the header, and consumes it if it is. -// The return value dictates whether the given line was a header line or not. -// If false is returned, the full header can be assumed to be read. -func (r *Reader) consumeHeaderLine(ctx context.Context, token []byte, _ map[string]any) error { - if !r.headerSettings.matchRegex.Match(token) { - // Finalize and cleanup the pipeline - r.HeaderFinalized = true - - // Stop and drop the header pipeline. - if err := r.headerPipeline.Stop(); err != nil { - return fmt.Errorf("stop header pipeline: %w", err) - } - r.headerPipeline = nil - r.headerPipelineOutput = nil - - // Use the line split func instead of the header split func - r.splitFunc = r.lineSplitFunc - r.processFunc = r.emit - // Mark that we should recreate the scanner, since we changed the split function - r.recreateScanner = true - return nil +func (r *Reader) finalizeHeader() { + if err := r.headerReader.Stop(); err != nil { + r.Errorw("Failed to stop header pipeline during finalization", zap.Error(err)) } - - firstOperator := r.headerPipeline.Operators()[0] - - newEntry := entry.New() - newEntry.Body = string(token) - - if err := firstOperator.Process(ctx, newEntry); err != nil { - return fmt.Errorf("process header entry: %w", err) - } - - ent, err := r.headerPipelineOutput.WaitForEntry(ctx) - if err != nil { - return fmt.Errorf("wait for header entry: %w", err) - } - - // Copy resultant attributes over current set of attributes (upsert) - for k, v := range ent.Attributes { - r.FileAttributes[k] = v - } - return nil + r.headerReader = nil + r.HeaderFinalized = true } // Close will close the file @@ -170,8 +132,8 @@ func (r *Reader) Close() { } } - if r.headerPipeline != nil { - if err := r.headerPipeline.Stop(); err != nil { + if r.headerReader != nil { + if err := r.headerReader.Stop(); err != nil { r.Errorw("Failed to stop header pipeline", zap.Error(err)) } } diff --git a/pkg/stanza/fileconsumer/reader_factory.go b/pkg/stanza/fileconsumer/reader_factory.go index 98c44a1e8d40..416f4876cfa7 100644 --- a/pkg/stanza/fileconsumer/reader_factory.go +++ b/pkg/stanza/fileconsumer/reader_factory.go @@ -5,18 +5,16 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collecto import ( "bufio" - "fmt" "os" "path/filepath" "runtime" - "go.opentelemetry.io/collector/extension/experimental/storage" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/util" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" ) type readerFactory struct { @@ -25,7 +23,7 @@ type readerFactory struct { fromBeginning bool splitterFactory splitterFactory encodingConfig helper.EncodingConfig - headerSettings *headerSettings + headerConfig *header.Config } func (f *readerFactory) newReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader, error) { @@ -103,7 +101,6 @@ func (b *readerBuilder) build() (r *Reader, err error) { r = &Reader{ readerConfig: b.readerConfig, Offset: b.offset, - headerSettings: b.headerSettings, HeaderFinalized: b.headerFinalized, FileAttributes: b.fileAttributes, } @@ -122,26 +119,16 @@ func (b *readerBuilder) build() (r *Reader, err error) { return nil, err } - if b.headerSettings == nil || b.headerFinalized { + if b.headerConfig == nil || b.headerFinalized { r.splitFunc = r.lineSplitFunc r.processFunc = b.readerConfig.emit } else { - // We are reading the header. Use the header split func - r.splitFunc = b.headerSettings.splitFunc - r.processFunc = r.consumeHeaderLine - - // Create the header pipeline - r.headerPipelineOutput = newHeaderPipelineOutput(b.SugaredLogger) - r.headerPipeline, err = pipeline.Config{ - Operators: b.headerSettings.config.MetadataOperators, - DefaultOutput: r.headerPipelineOutput, - }.Build(b.SugaredLogger) + r.splitFunc = b.headerConfig.SplitFunc + r.headerReader, err = header.NewReader(b.SugaredLogger, *b.headerConfig) if err != nil { - return nil, fmt.Errorf("failed to build pipeline: %w", err) - } - if err = r.headerPipeline.Start(storage.NewNopClient()); err != nil { - return nil, fmt.Errorf("failed to start header pipeline: %w", err) + return nil, err } + r.processFunc = r.headerReader.Process } if b.file == nil { diff --git a/pkg/stanza/fileconsumer/reader_test.go b/pkg/stanza/fileconsumer/reader_test.go index aedd9d99e026..cba92842a1d7 100644 --- a/pkg/stanza/fileconsumer/reader_test.go +++ b/pkg/stanza/fileconsumer/reader_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/require" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex" @@ -165,23 +166,14 @@ func TestHeaderFingerprintIncluded(t *testing.T) { regexConf := regex.NewConfig() regexConf.Regex = "^#(?P
.*)" - headerConf := &HeaderConfig{ - Pattern: "^#", - MetadataOperators: []operator.Config{ - { - Builder: regexConf, - }, - }, - } - enc, err := helper.EncodingConfig{ Encoding: "utf-8", }.Build() require.NoError(t, err) - h, err := headerConf.buildHeaderSettings(enc.Encoding) + h, err := header.NewConfig("^#", []operator.Config{{Builder: regexConf}}, enc.Encoding) require.NoError(t, err) - f.headerSettings = h + f.headerConfig = h temp := openTemp(t, t.TempDir())