Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][pkg/stanza] Merge roller into fileconsumer.Manager #28451

Merged
merged 2 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,13 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli
TrimFunc: trimFunc,
HeaderConfig: hCfg,
},
fileMatcher: fileMatcher,
roller: newRoller(),
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
knownFiles: make([]*reader.Reader, 0, 10*c.MaxConcurrentFiles),
seenPaths: make(map[string]struct{}, 100),
fileMatcher: fileMatcher,
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
previousPollFiles: make([]*reader.Reader, 0, c.MaxConcurrentFiles/2),
knownFiles: make([]*reader.Reader, 0, 10*c.MaxConcurrentFiles),
seenPaths: make(map[string]struct{}, 100),
}, nil
}

Expand Down
22 changes: 14 additions & 8 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ type Manager struct {

readerFactory reader.Factory
fileMatcher *matcher.Matcher
roller roller

pollInterval time.Duration
persister operator.Persister
maxBatches int
maxBatchFiles int

knownFiles []*reader.Reader
seenPaths map[string]struct{}
previousPollFiles []*reader.Reader
knownFiles []*reader.Reader
seenPaths map[string]struct{}

currentFps []*fingerprint.Fingerprint
}
Expand Down Expand Up @@ -71,9 +71,11 @@ func (m *Manager) Start(persister operator.Persister) error {
func (m *Manager) Stop() error {
m.cancel()
m.wg.Wait()
m.roller.cleanup()
for _, reader := range m.knownFiles {
reader.Close()
for _, r := range m.previousPollFiles {
r.Close()
}
for _, r := range m.knownFiles {
r.Close()
}
m.cancel = nil
return nil
Expand Down Expand Up @@ -144,7 +146,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
// take care of files which disappeared from the pattern since the last poll cycle
// this can mean either files which were removed, or rotated into a name not matching the pattern
// we do this before reading existing files to ensure we emit older log lines before newer ones
m.roller.readLostFiles(ctx, readers)
m.readLostFiles(ctx, readers)

var wg sync.WaitGroup
for _, r := range readers {
Expand All @@ -156,7 +158,11 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
}
wg.Wait()

m.roller.roll(ctx, readers)
for _, r := range m.previousPollFiles {
r.Close()
}
m.previousPollFiles = readers

m.saveCurrent(readers)

rmds := make([]*reader.Metadata, 0, len(readers))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,11 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
)

type detectLostFiles struct {
oldReaders []*reader.Reader
}

func newRoller() roller {
return &detectLostFiles{oldReaders: []*reader.Reader{}}
}

func (r *detectLostFiles) readLostFiles(ctx context.Context, newReaders []*reader.Reader) {
func (m *Manager) readLostFiles(ctx context.Context, newReaders []*reader.Reader) {
// Detect files that have been rotated out of matching pattern
lostReaders := make([]*reader.Reader, 0, len(r.oldReaders))
lostReaders := make([]*reader.Reader, 0, len(m.previousPollFiles))
OUTER:
for _, oldReader := range r.oldReaders {
for _, oldReader := range m.previousPollFiles {
for _, newReader := range newReaders {
if newReader.Fingerprint.StartsWith(oldReader.Fingerprint) {
continue OUTER
Expand All @@ -38,8 +30,8 @@ OUTER:
// At this point, we know that the file has been rotated. However, we do not know
// if it was moved or truncated. If truncated, then both handles point to the same
// file, in which case we should only read from it using the new reader. We can use
// the ValidateOrClose method to establish that the file has not been truncated.
if !oldReader.ValidateOrClose() {
// the Validate method to ensure that the file has not been truncated.
if !oldReader.Validate() {
continue OUTER
}
}
Expand All @@ -56,17 +48,3 @@ OUTER:
}
lostWG.Wait()
}

func (r *detectLostFiles) roll(_ context.Context, newReaders []*reader.Reader) {
for _, oldReader := range r.oldReaders {
oldReader.Close()
}

r.oldReaders = newReaders
}

func (r *detectLostFiles) cleanup() {
for _, oldReader := range r.oldReaders {
oldReader.Close()
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build windows
// +build windows

package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer"

import (
Expand All @@ -9,8 +12,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
)

type roller interface {
readLostFiles(context.Context, []*reader.Reader)
roll(context.Context, []*reader.Reader)
cleanup()
func (m *Manager) readLostFiles(ctx context.Context, newReaders []*reader.Reader) {
return
}
15 changes: 2 additions & 13 deletions pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/attrs"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
Expand Down Expand Up @@ -196,27 +195,17 @@ func (r *Reader) NameEquals(other *Reader) bool {
return r.fileName == other.fileName
}

// ValidateOrClose returns true if the reader still has a valid file handle, false otherwise.
// If false is returned, the file handle should be considered closed.
//
// It may create a new fingerprint from the old file handle and compare it to the
// previously known fingerprint. If there has been a change to the fingerprint
// (other than appended data), the file is considered truncated. Consequently, the
// reader will automatically close the file and drop the handle.
func (r *Reader) ValidateOrClose() bool {
// Validate returns true if the reader still has a valid file handle, false otherwise.
func (r *Reader) Validate() bool {
if r.file == nil {
return false
}
refreshedFingerprint, err := fingerprint.New(r.file, r.FingerprintSize)
if err != nil {
r.logger.Debugw("Closing unreadable file", zap.Error(err), zap.String(attrs.LogFileName, r.fileName))
r.Close()
return false
}
if refreshedFingerprint.StartsWith(r.Fingerprint) {
return true
}
r.logger.Debugw("Closing truncated file", zap.String(attrs.LogFileName, r.fileName))
r.Close()
return false
}
33 changes: 0 additions & 33 deletions pkg/stanza/fileconsumer/roller_windows.go

This file was deleted.

Loading