Skip to content

Commit

Permalink
[chore][fileconsumer] Extract internal header package (#24264)
Browse files Browse the repository at this point in the history
This continues from #24036 by moving additional complexity into an
internal package.

Some files contain headers that are different than the rest of the file.
In such cases, we may need to extract file attributes that will be
applied to logs from that file.
  • Loading branch information
djaglowski authored Jul 25, 2023
1 parent d71f107 commit 911336c
Show file tree
Hide file tree
Showing 13 changed files with 496 additions and 433 deletions.
17 changes: 12 additions & 5 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

Expand Down Expand Up @@ -72,6 +74,11 @@ type Config struct {
Header *HeaderConfig `mapstructure:"header,omitempty"`
}

type HeaderConfig struct {
Pattern string `mapstructure:"pattern"`
MetadataOperators []operator.Config `mapstructure:"metadata_operators"`
}

// Build will build a file input operator from the supplied configuration
func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback) (*Manager, error) {
if err := c.validate(); err != nil {
Expand Down Expand Up @@ -120,14 +127,14 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact
return nil, fmt.Errorf("invalid start_at location '%s'", c.StartAt)
}

var hs *headerSettings
var hCfg *header.Config
if c.Header != nil {
enc, err := c.Splitter.EncodingConfig.Build()
if err != nil {
return nil, fmt.Errorf("failed to create encoding: %w", err)
}

hs, err = c.Header.buildHeaderSettings(enc.Encoding)
hCfg, err = header.NewConfig(c.Header.Pattern, c.Header.MetadataOperators, enc.Encoding)
if err != nil {
return nil, fmt.Errorf("failed to build header config: %w", err)
}
Expand All @@ -150,7 +157,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact
fromBeginning: startAtBeginning,
splitterFactory: factory,
encodingConfig: c.Splitter.EncodingConfig,
headerSettings: hs,
headerConfig: hCfg,
},
finder: c.MatchingCriteria,
roller: newRoller(),
Expand Down Expand Up @@ -226,13 +233,13 @@ func (c Config) validate() error {
return errors.New("`max_batches` must not be negative")
}

_, err := c.Splitter.EncodingConfig.Build()
enc, err := c.Splitter.EncodingConfig.Build()
if err != nil {
return err
}

if c.Header != nil {
if err := c.Header.validate(); err != nil {
if _, err := header.NewConfig(c.Header.Pattern, c.Header.MetadataOperators, enc.Encoding); err != nil {
return fmt.Errorf("invalid config for `header`: %w", err)
}
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/stanza/fileconsumer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,10 +790,7 @@ func TestBuildWithHeader(t *testing.T) {
},
require.NoError,
func(t *testing.T, f *Manager) {
require.NotNil(t, f.readerFactory.headerSettings)
require.NotNil(t, f.readerFactory.headerSettings.matchRegex)
require.NotNil(t, f.readerFactory.headerSettings.splitFunc)
require.NotNil(t, f.readerFactory.headerSettings.config)
require.NotNil(t, f.readerFactory.headerConfig.SplitFunc)
},
},
}
Expand All @@ -810,7 +807,6 @@ func TestBuildWithHeader(t *testing.T) {
if err != nil {
return
}

tc.validate(t, input)
})
}
Expand Down
1 change: 0 additions & 1 deletion pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1630,7 +1630,6 @@ func TestHeaderPersistanceInHeader(t *testing.T) {
})

require.NoError(t, op2.Stop())

}

func TestStalePartialFingerprintDiscarded(t *testing.T) {
Expand Down
147 changes: 0 additions & 147 deletions pkg/stanza/fileconsumer/header.go

This file was deleted.

Loading

0 comments on commit 911336c

Please sign in to comment.