Skip to content

Commit

Permalink
rm factory
Browse files Browse the repository at this point in the history
  • Loading branch information
kvch committed Oct 17, 2018
1 parent 15b5ac1 commit 86d3c2b
Showing 1 changed file with 28 additions and 40 deletions.
68 changes: 28 additions & 40 deletions journalbeat/reader/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,74 +65,62 @@ type Reader struct {
backoff *common.Backoff
}

// journalOpener is a function which tries to open the configured systemd journal.
// If it is successful, a new logger instance is created also.
type journalOpener func() (*sdjournal.Journal, *logp.Logger, error)

// New creates a new journal reader and moves the FP to the configured position.
func New(c Config, done chan struct{}, state checkpoint.JournalState, logger *logp.Logger) (*Reader, error) {
opener := func() (*sdjournal.Journal, *logp.Logger, error) {
f, err := os.Stat(c.Path)
f, err := os.Stat(c.Path)
if err != nil {
return nil, errors.Wrap(err, "failed to open file")
}

var j *sdjournal.Journal
if f.IsDir() {
j, err = sdjournal.NewJournalFromDir(c.Path)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to open file")
return nil, errors.Wrap(err, "failed to open journal directory")
}

var j *sdjournal.Journal
if f.IsDir() {
j, err = sdjournal.NewJournalFromDir(c.Path)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to open journal directory")
}
} else {
j, err = sdjournal.NewJournalFromFiles(c.Path)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to open journal file")
}
} else {
j, err = sdjournal.NewJournalFromFiles(c.Path)
if err != nil {
return nil, errors.Wrap(err, "failed to open journal file")
}
return j, logger.With("path", c.Path), nil
}

return newReader(opener, c, done, state, logger)
l := logger.With("path", c.Path)
l.Debug("New journal is opened for reading")

return newReader(l, done, c, j, state)
}

// NewLocal creates a reader to read form the local journal and moves the FP
// to the configured position.
func NewLocal(c Config, done chan struct{}, state checkpoint.JournalState, logger *logp.Logger) (*Reader, error) {
localOpener := func() (*sdjournal.Journal, *logp.Logger, error) {
j, err := sdjournal.NewJournal()
if err != nil {
return nil, nil, errors.Wrap(err, "failed to open local journal")
}

logger = logger.With("path", "local")
logger.Debug("New local journal is opened for reading")
return j, logger, nil
j, err := sdjournal.NewJournal()
if err != nil {
return nil, errors.Wrap(err, "failed to open local journal")
}

return newReader(localOpener, c, done, state, logger)
}
l := logger.With("path", "local")
l.Debug("New local journal is opened for reading")

func newReader(opener journalOpener, c Config, done chan struct{}, state checkpoint.JournalState, logger *logp.Logger) (*Reader, error) {
j, logger, err := opener()
if err != nil {
return nil, err
}
return newReader(l, done, c, j, state)
}

err = setupMatches(j, c.Matches)
func newReader(logger *logp.Logger, done chan struct{}, c Config, journal *sdjournal.Journal, state checkpoint.JournalState) (*Reader, error) {
err := setupMatches(journal, c.Matches)
if err != nil {
return nil, err
}

r := &Reader{
journal: j,
journal: journal,
config: c,
done: done,
logger: logger,
backoff: common.NewBackoff(done, c.Backoff, c.MaxBackoff),
}
r.seek(state.Cursor)

instance.AddJournalToMonitor(c.Path, j)
instance.AddJournalToMonitor(c.Path, journal)

return r, nil
}
Expand Down

0 comments on commit 86d3c2b

Please sign in to comment.