Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Journald] Restart journalctl if it exits unexpectedly #40558

Merged
merged 28 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0710d83
Move journalctl process management to a different type
belimawr Aug 13, 2024
181db4f
Restart journalctl on error
belimawr Aug 15, 2024
612005e
Test Jounald reader
belimawr Aug 19, 2024
9390296
Re-enable TestEventWithNonStringData
belimawr Aug 19, 2024
d07d7f4
Add integration test for journald input
belimawr Aug 20, 2024
5fb9fb8
[journald] fix sending empty message on restart
belimawr Aug 20, 2024
5ef7674
Fix log level and add changelog
belimawr Aug 20, 2024
80262d0
Fix lint warnings
belimawr Aug 20, 2024
e942ca9
add close call
belimawr Aug 20, 2024
1b73782
Remove CGO references from journalfield package
belimawr Aug 20, 2024
3856e82
Remove EnsureESIsrunning call from tests
belimawr Aug 20, 2024
1ded00a
Improve debugging failed tests
belimawr Aug 20, 2024
f202880
Add debug logs
belimawr Aug 20, 2024
d649a94
Add more debug
belimawr Aug 20, 2024
02ba7e1
Add more debug
belimawr Aug 20, 2024
286d994
Close stdin instead of sending sigterm
belimawr Aug 20, 2024
0de7a7a
Ensure call to systemd-cat worked correctly
belimawr Aug 20, 2024
f89cabc
Remove unused wait groups and improve error handling
belimawr Aug 27, 2024
67d6bd9
Remove unused field
belimawr Aug 30, 2024
ac72552
Add missing licence headers
belimawr Sep 4, 2024
c38f0cf
Only restart with --after-cursor is cursor exists
belimawr Sep 9, 2024
48dd490
make logging consistent
belimawr Sep 10, 2024
72eb03f
Add comments explaining why there is no goroutine leak
belimawr Sep 10, 2024
e7d2260
Add a backoff wait when restarting journalctl
belimawr Sep 10, 2024
6162dbe
Addd comments about calling Kill
belimawr Sep 10, 2024
dc06651
Implement review suggestions
belimawr Sep 10, 2024
d5cb6fc
Update go.mod and notice file
belimawr Sep 10, 2024
54e5be6
Use github.com/gofrs/uuid and fix lint warnings
belimawr Sep 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Update Go version to 1.22.6. {pull}40528[40528]
- Aborts all active connections for Elasticsearch output. {pull}40572[40572]
- Closes beat Publisher on beat stop and by the Agent manager. {pull}40572[40572]
- The journald input now restarts if there is an error/crash {issue}32782[32782] {pull}40558[40558]

*Auditbeat*

Expand Down
13 changes: 10 additions & 3 deletions filebeat/input/journald/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (inp *journald) Test(src cursor.Source, ctx input.TestContext) error {
"",
inp.Since,
src.Name(),
journalctl.Factory,
)
if err != nil {
return err
Expand Down Expand Up @@ -161,6 +162,7 @@ func (inp *journald) Run(
pos,
inp.Since,
src.Name(),
journalctl.Factory,
)
if err != nil {
return fmt.Errorf("could not start journal reader: %w", err)
Expand All @@ -179,12 +181,17 @@ func (inp *journald) Run(
for {
entry, err := parser.Next()
if err != nil {
switch {
// The input has been cancelled, gracefully return
if errors.Is(err, journalctl.ErrCancelled) {
case errors.Is(err, journalctl.ErrCancelled):
return nil
// Journalctl is restarting, do ignore the empty event
case errors.Is(err, journalctl.ErrRestarting):
continue
default:
logger.Errorf("could not read event: %s", err)
return err
}
logger.Errorf("could not read event: %s", err)
return err
}

event := entry.ToEvent()
Expand Down
130 changes: 130 additions & 0 deletions filebeat/input/journald/pkg/journalctl/jctlmock_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

147 changes: 147 additions & 0 deletions filebeat/input/journald/pkg/journalctl/journalctl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package journalctl

import (
"bufio"
"errors"
"fmt"
"io"
"os/exec"
"strings"

input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/elastic-agent-libs/logp"
)

type journalctl struct {
cmd *exec.Cmd
dataChan chan []byte
stdout io.ReadCloser
stderr io.ReadCloser

logger *logp.Logger
canceler input.Canceler
}

// Factory returns an instance of journalctl ready to use.
// The caller is responsible for calling Kill to ensure the
// journalctl process created is correctly terminated.
//
// The returned type is an interface to allow mocking for testing
func Factory(canceller input.Canceler, logger *logp.Logger, binary string, args ...string) (Jctl, error) {
cmd := exec.Command(binary, args...)

jctl := journalctl{
canceler: canceller,
cmd: cmd,
dataChan: make(chan []byte),
logger: logger,
}

var err error
jctl.stdout, err = cmd.StdoutPipe()
if err != nil {
return &journalctl{}, fmt.Errorf("cannot get stdout pipe: %w", err)
}
jctl.stderr, err = cmd.StderrPipe()
if err != nil {
return &journalctl{}, fmt.Errorf("cannot get stderr pipe: %w", err)
}

// This gorroutune reads the stderr from the journalctl process, if the
// process exits for any reason, then its stderr is closed, this goroutine
// gets an EOF error and exits
go func() {
defer jctl.logger.Debug("stderr reader goroutine done")
reader := bufio.NewReader(jctl.stderr)
for {
line, err := reader.ReadString('\n')
if err != nil {
if !errors.Is(err, io.EOF) {
logger.Errorf("cannot read from journalctl stderr: %s", err)
}
return
}

logger.Errorf("Journalctl wrote to stderr: %s", line)
}
}()

// This goroutine reads the stdout from the journalctl process and makes
// the data available via the `Next()` method.
// If the journalctl process exits for any reason, then its stdout is closed
// this goroutine gets an EOF error and exits.
go func() {
defer jctl.logger.Debug("stdout reader goroutine done")
defer close(jctl.dataChan)
reader := bufio.NewReader(jctl.stdout)
for {
data, err := reader.ReadBytes('\n')
if err != nil {
if !errors.Is(err, io.EOF) {
logger.Errorf("cannot read from journalctl stdout: %s", err)
}
return
}

select {
case <-jctl.canceler.Done():
return
case jctl.dataChan <- data:
}
}
}()

logger.Infof("Journalctl command: journalctl %s", strings.Join(args, " "))

if err := cmd.Start(); err != nil {
return &journalctl{}, fmt.Errorf("cannot start journalctl: %w", err)
}

logger.Infof("journalctl started with PID %d", cmd.Process.Pid)

// Whenever the journalctl process exits, the `Wait` call returns,
// if there was an error it is logged and this goroutine exits.
go func() {
if err := cmd.Wait(); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should also watch for the dataChan and errChan to close. For example, if we failed to read stdout for some reason, that go routine loop would exit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goroutine is here only to log any error returned by cmd.Wait, if the process exits unexpectedly its stderr and stdout will be closed and the reader gorourines will get an EOF or error. At the moment I'm doing the best to read and ship/log all data without over complicating the code and risking getting into a dead lock.

jctl.logger.Errorf("journalctl exited with an error, exit code %d ", cmd.ProcessState.ExitCode())
}
}()

return &jctl, nil
}

// Kill Terminates the journalctl process using a SIGKILL.
func (j *journalctl) Kill() error {
j.logger.Debug("sending SIGKILL to journalctl")
err := j.cmd.Process.Kill()
return err
}

func (j *journalctl) Next(cancel input.Canceler) ([]byte, error) {
select {
case <-cancel.Done():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if cancel.Done(), do we have to kill the process?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This input.Canceler is in the context of the Next call, so I don't think we should kill the journalctl process. If the journald input is stopped, it will call Close on the Reader:

https://github.com/belimawr/beats/blob/e7d226030d8a90477949a4cd3a013e09c28db519/filebeat/input/journald/input.go#L236-L238

and the reader will kill the journalctl process

https://github.com/belimawr/beats/blob/e7d226030d8a90477949a4cd3a013e09c28db519/filebeat/input/journald/pkg/journalctl/reader.go#L192-L200.

I added some comments explaining kill needs to be called.

return []byte{}, ErrCancelled
case d, open := <-j.dataChan:
if !open {
return []byte{}, errors.New("no more data to read, journalctl might have exited unexpectedly")
}
return d, nil
}
}
Loading
Loading