Skip to content

Commit

Permalink
[chore][pkg/stanza] Merge roller into fileconsumer.Manager (#28451)
Browse files Browse the repository at this point in the history
Follows #28419

This discards the separate "roller" and implements the same
functionality directly in `fileconsumer.Manager`.

The motivation for this is to move towards a system of managing files
where each file is managed by only one list at a time. This PR retains
two overlapping slices of readers (`previousPollFiles` and
`knownFiles`), but the functionality does not change. #27823 should get
us the rest of the way there.
  • Loading branch information
djaglowski authored Oct 24, 2023
1 parent cde5da6 commit d4016fa
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 92 deletions.
14 changes: 7 additions & 7 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,13 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli
TrimFunc: trimFunc,
HeaderConfig: hCfg,
},
fileMatcher: fileMatcher,
roller: newRoller(),
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
knownFiles: make([]*reader.Reader, 0, 10*c.MaxConcurrentFiles),
seenPaths: make(map[string]struct{}, 100),
fileMatcher: fileMatcher,
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
previousPollFiles: make([]*reader.Reader, 0, c.MaxConcurrentFiles/2),
knownFiles: make([]*reader.Reader, 0, 10*c.MaxConcurrentFiles),
seenPaths: make(map[string]struct{}, 100),
}, nil
}

Expand Down
22 changes: 14 additions & 8 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ type Manager struct {

readerFactory reader.Factory
fileMatcher *matcher.Matcher
roller roller

pollInterval time.Duration
persister operator.Persister
maxBatches int
maxBatchFiles int

knownFiles []*reader.Reader
seenPaths map[string]struct{}
previousPollFiles []*reader.Reader
knownFiles []*reader.Reader
seenPaths map[string]struct{}

currentFps []*fingerprint.Fingerprint
}
Expand Down Expand Up @@ -71,9 +71,11 @@ func (m *Manager) Start(persister operator.Persister) error {
func (m *Manager) Stop() error {
m.cancel()
m.wg.Wait()
m.roller.cleanup()
for _, reader := range m.knownFiles {
reader.Close()
for _, r := range m.previousPollFiles {
r.Close()
}
for _, r := range m.knownFiles {
r.Close()
}
m.cancel = nil
return nil
Expand Down Expand Up @@ -144,7 +146,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
// take care of files which disappeared from the pattern since the last poll cycle
// this can mean either files which were removed, or rotated into a name not matching the pattern
// we do this before reading existing files to ensure we emit older log lines before newer ones
m.roller.readLostFiles(ctx, readers)
m.readLostFiles(ctx, readers)

var wg sync.WaitGroup
for _, r := range readers {
Expand All @@ -156,7 +158,11 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
}
wg.Wait()

m.roller.roll(ctx, readers)
for _, r := range m.previousPollFiles {
r.Close()
}
m.previousPollFiles = readers

m.saveCurrent(readers)

rmds := make([]*reader.Metadata, 0, len(readers))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,11 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
)

type detectLostFiles struct {
oldReaders []*reader.Reader
}

func newRoller() roller {
return &detectLostFiles{oldReaders: []*reader.Reader{}}
}

func (r *detectLostFiles) readLostFiles(ctx context.Context, newReaders []*reader.Reader) {
func (m *Manager) readLostFiles(ctx context.Context, newReaders []*reader.Reader) {
// Detect files that have been rotated out of matching pattern
lostReaders := make([]*reader.Reader, 0, len(r.oldReaders))
lostReaders := make([]*reader.Reader, 0, len(m.previousPollFiles))
OUTER:
for _, oldReader := range r.oldReaders {
for _, oldReader := range m.previousPollFiles {
for _, newReader := range newReaders {
if newReader.Fingerprint.StartsWith(oldReader.Fingerprint) {
continue OUTER
Expand All @@ -38,8 +30,8 @@ OUTER:
// At this point, we know that the file has been rotated. However, we do not know
// if it was moved or truncated. If truncated, then both handles point to the same
// file, in which case we should only read from it using the new reader. We can use
// the ValidateOrClose method to establish that the file has not been truncated.
if !oldReader.ValidateOrClose() {
// the Validate method to ensure that the file has not been truncated.
if !oldReader.Validate() {
continue OUTER
}
}
Expand All @@ -56,17 +48,3 @@ OUTER:
}
lostWG.Wait()
}

func (r *detectLostFiles) roll(_ context.Context, newReaders []*reader.Reader) {
for _, oldReader := range r.oldReaders {
oldReader.Close()
}

r.oldReaders = newReaders
}

func (r *detectLostFiles) cleanup() {
for _, oldReader := range r.oldReaders {
oldReader.Close()
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build windows
// +build windows

package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer"

import (
Expand All @@ -9,8 +12,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
)

type roller interface {
readLostFiles(context.Context, []*reader.Reader)
roll(context.Context, []*reader.Reader)
cleanup()
func (m *Manager) readLostFiles(ctx context.Context, newReaders []*reader.Reader) {
return
}
15 changes: 2 additions & 13 deletions pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/attrs"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
Expand Down Expand Up @@ -196,27 +195,17 @@ func (r *Reader) NameEquals(other *Reader) bool {
return r.fileName == other.fileName
}

// ValidateOrClose returns true if the reader still has a valid file handle, false otherwise.
// If false is returned, the file handle should be considered closed.
//
// It may create a new fingerprint from the old file handle and compare it to the
// previously known fingerprint. If there has been a change to the fingerprint
// (other than appended data), the file is considered truncated. Consequently, the
// reader will automatically close the file and drop the handle.
func (r *Reader) ValidateOrClose() bool {
// Validate returns true if the reader still has a valid file handle, false otherwise.
func (r *Reader) Validate() bool {
if r.file == nil {
return false
}
refreshedFingerprint, err := fingerprint.New(r.file, r.FingerprintSize)
if err != nil {
r.logger.Debugw("Closing unreadable file", zap.Error(err), zap.String(attrs.LogFileName, r.fileName))
r.Close()
return false
}
if refreshedFingerprint.StartsWith(r.Fingerprint) {
return true
}
r.logger.Debugw("Closing truncated file", zap.String(attrs.LogFileName, r.fileName))
r.Close()
return false
}
33 changes: 0 additions & 33 deletions pkg/stanza/fileconsumer/roller_windows.go

This file was deleted.

0 comments on commit d4016fa

Please sign in to comment.