Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[journald] Adds user-level unit filtering #11398

Merged
merged 7 commits into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pkg/logs/config/integration_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ type LogsConfig struct {
ExcludePaths []string `mapstructure:"exclude_paths" json:"exclude_paths"` // File
TailingMode string `mapstructure:"start_position" json:"start_position"` // File

IncludeUnits []string `mapstructure:"include_units" json:"include_units"` // Journald
ExcludeUnits []string `mapstructure:"exclude_units" json:"exclude_units"` // Journald
ContainerMode bool `mapstructure:"container_mode" json:"container_mode"` // Journald
IncludeSystemUnits []string `mapstructure:"include_units" json:"include_units"` // Journald
ExcludeSystemUnits []string `mapstructure:"exclude_units" json:"exclude_units"` // Journald
IncludeUserUnits []string `mapstructure:"include_user_units" json:"include_user_units"` // Journald
ExcludeUserUnits []string `mapstructure:"exclude_user_units" json:"exclude_user_units"` // Journald
ContainerMode bool `mapstructure:"container_mode" json:"container_mode"` // Journald

Image string // Docker
Label string // Docker
Expand Down
66 changes: 49 additions & 17 deletions pkg/logs/internal/tailers/journald/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@ const (

// Tailer collects logs from a journal.
type Tailer struct {
source *config.LogSource
outputChan chan *message.Message
journal *sdjournal.Journal
blacklist map[string]bool
stop chan struct{}
done chan struct{}
source *config.LogSource
outputChan chan *message.Message
journal *sdjournal.Journal
excludeUnits struct {
system map[string]bool
user map[string]bool
}
stop chan struct{}
done chan struct{}
}

// NewTailer returns a new tailer.
Expand Down Expand Up @@ -89,20 +92,36 @@ func (t *Tailer) setup() error {
return err
}

for _, unit := range config.IncludeUnits {
// add filters to collect only the logs of the units defined in the configuration,
// if no units are defined, collect all the logs of the journal by default.
// add filters to collect only the logs of the units defined in the configuration,
// if no units are defined for both System and User, collect all the logs of the journal by default.
for _, unit := range config.IncludeSystemUnits {
// add filters to collect only the logs of the system-level units defined in the configuration.
match := sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT + "=" + unit
err := t.journal.AddMatch(match)
if err != nil {
return fmt.Errorf("could not add filter %s: %s", match, err)
}
}

t.blacklist = make(map[string]bool)
for _, unit := range config.ExcludeUnits {
// add filters to drop all the logs related to units to exclude.
t.blacklist[unit] = true
for _, unit := range config.IncludeUserUnits {
// add filters to collect only the logs of the user-level units defined in the configuration.
match := sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT + "=" + unit
err := t.journal.AddMatch(match)
if err != nil {
return fmt.Errorf("could not add filter %s: %s", match, err)
}
}

t.excludeUnits.system = make(map[string]bool)
for _, unit := range config.ExcludeSystemUnits {
// add filters to drop all the logs related to system units to exclude.
t.excludeUnits.system[unit] = true
}

t.excludeUnits.user = make(map[string]bool)
for _, unit := range config.ExcludeUserUnits {
// add filters to drop all the logs related to user units to exclude.
t.excludeUnits.user[unit] = true
}

return nil
Expand Down Expand Up @@ -164,13 +183,25 @@ func (t *Tailer) tail() {
// shouldDrop returns true if the entry should be dropped,
// returns false otherwise.
func (t *Tailer) shouldDrop(entry *sdjournal.JournalEntry) bool {
unit, exists := entry.Fields[sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT]
sysUnit, exists := entry.Fields[sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT]
if !exists {
return false
}
if _, blacklisted := t.blacklist[unit]; blacklisted {
// drop the entry
return true
usrUnit, exists := entry.Fields[sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT]
if !exists {
// JournalEntry is a System-level unit
excludeAllSys := t.excludeUnits.system["*"]
if _, excluded := t.excludeUnits.system[sysUnit]; excludeAllSys || excluded {
// drop the entry
return true
}
} else {
// JournalEntry is a User-level unit
excludeAllUsr := t.excludeUnits.user["*"]
if _, excluded := t.excludeUnits.user[usrUnit]; excludeAllUsr || excluded {
// drop the entry
return true
}
}
return false
}
Expand Down Expand Up @@ -236,6 +267,7 @@ func (t *Tailer) getOrigin(entry *sdjournal.JournalEntry) *message.Origin {
// applicationKeys represents all the valid attributes used to extract the value of the application name of a journal entry.
var applicationKeys = []string{
sdjournal.SD_JOURNAL_FIELD_SYSLOG_IDENTIFIER, // "SYSLOG_IDENTIFIER"
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT, // "_SYSTEMD_USER_UNIT"
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT, // "_SYSTEMD_UNIT"
sdjournal.SD_JOURNAL_FIELD_COMM, // "_COMM"
}
Expand Down
123 changes: 120 additions & 3 deletions pkg/logs/internal/tailers/journald/tailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,16 @@ func TestIdentifier(t *testing.T) {
}

func TestShouldDropEntry(t *testing.T) {
source := config.NewLogSource("", &config.LogsConfig{ExcludeUnits: []string{"foo", "bar"}})
tailer := NewTailer(source, nil)
err := tailer.setup()
// System-level service units do not have SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT
// User-level service units may have a common value for SD_JOURNAL_FIELD_SYSTEMD_UNIT
var source *config.LogSource
var tailer *Tailer
var err error

// expect only the specified service units to be dropped
source = config.NewLogSource("", &config.LogsConfig{ExcludeSystemUnits: []string{"foo", "bar"}, ExcludeUserUnits: []string{"baz", "qux"}})
tailer = NewTailer(source, nil)
err = tailer.setup()
assert.Nil(t, err)

assert.True(t, tailer.shouldDrop(
Expand All @@ -61,6 +68,102 @@ func TestShouldDropEntry(t *testing.T) {
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT: "boo",
},
}))

assert.False(t, tailer.shouldDrop(
&sdjournal.JournalEntry{
Fields: map[string]string{
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT: "bar",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT: "[email protected]",
},
}))

assert.True(t, tailer.shouldDrop(
&sdjournal.JournalEntry{
Fields: map[string]string{
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT: "baz",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT: "[email protected]",
},
}))

assert.True(t, tailer.shouldDrop(
&sdjournal.JournalEntry{
Fields: map[string]string{
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT: "qux",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT: "[email protected]",
},
}))

// expect all System-level service units to be dropped
source = config.NewLogSource("", &config.LogsConfig{ExcludeSystemUnits: []string{"*"}})
tailer = NewTailer(source, nil)
err = tailer.setup()
assert.Nil(t, err)

assert.True(t, tailer.shouldDrop(
&sdjournal.JournalEntry{
Fields: map[string]string{
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT: "foo",
},
}))

assert.True(t, tailer.shouldDrop(
&sdjournal.JournalEntry{
Fields: map[string]string{
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT: "bar",
},
}))

assert.False(t, tailer.shouldDrop(
&sdjournal.JournalEntry{
Fields: map[string]string{
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT: "bar",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT: "[email protected]",
},
}))

assert.False(t, tailer.shouldDrop(
&sdjournal.JournalEntry{
Fields: map[string]string{
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT: "baz",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT: "[email protected]",
},
}))

// expect all User-level service units to be dropped
source = config.NewLogSource("", &config.LogsConfig{ExcludeUserUnits: []string{"*"}})
tailer = NewTailer(source, nil)
err = tailer.setup()
assert.Nil(t, err)

assert.False(t, tailer.shouldDrop(
&sdjournal.JournalEntry{
Fields: map[string]string{
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT: "foo",
},
}))

assert.False(t, tailer.shouldDrop(
&sdjournal.JournalEntry{
Fields: map[string]string{
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT: "bar",
},
}))

assert.True(t, tailer.shouldDrop(
&sdjournal.JournalEntry{
Fields: map[string]string{
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT: "bar",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT: "[email protected]",
},
}))

assert.True(t, tailer.shouldDrop(
&sdjournal.JournalEntry{
Fields: map[string]string{
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT: "baz",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT: "[email protected]",
},
}))
}

func TestApplicationName(t *testing.T) {
Expand All @@ -71,6 +174,16 @@ func TestApplicationName(t *testing.T) {
&sdjournal.JournalEntry{
Fields: map[string]string{
sdjournal.SD_JOURNAL_FIELD_SYSLOG_IDENTIFIER: "foo",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT: "foo-user.service",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT: "foo.service",
sdjournal.SD_JOURNAL_FIELD_COMM: "foo.sh",
},
}, []string{}))

assert.Equal(t, "foo-user.service", tailer.getApplicationName(
&sdjournal.JournalEntry{
Fields: map[string]string{
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT: "foo-user.service",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT: "foo.service",
sdjournal.SD_JOURNAL_FIELD_COMM: "foo.sh",
},
Expand Down Expand Up @@ -147,6 +260,7 @@ func TestApplicationNameShouldBeDockerForContainerEntries(t *testing.T) {
&sdjournal.JournalEntry{
Fields: map[string]string{
sdjournal.SD_JOURNAL_FIELD_SYSLOG_IDENTIFIER: "foo",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT: "foo-user.service",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT: "foo.service",
sdjournal.SD_JOURNAL_FIELD_COMM: "foo.sh",
containerIDKey: "bar",
Expand All @@ -164,6 +278,7 @@ func TestApplicationNameShouldBeShortImageForContainerEntries(t *testing.T) {
&sdjournal.JournalEntry{
Fields: map[string]string{
sdjournal.SD_JOURNAL_FIELD_SYSLOG_IDENTIFIER: "foo",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT: "foo-user.service",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT: "foo.service",
sdjournal.SD_JOURNAL_FIELD_COMM: "foo.sh",
containerIDKey: containerID,
Expand All @@ -185,6 +300,7 @@ func TestApplicationNameShouldBeDockerWhenTagNotFound(t *testing.T) {
&sdjournal.JournalEntry{
Fields: map[string]string{
sdjournal.SD_JOURNAL_FIELD_SYSLOG_IDENTIFIER: "foo",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT: "foo-user.service",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT: "foo.service",
sdjournal.SD_JOURNAL_FIELD_COMM: "foo.sh",
containerIDKey: containerID,
Expand All @@ -209,6 +325,7 @@ func TestWrongTypeFromCache(t *testing.T) {
&sdjournal.JournalEntry{
Fields: map[string]string{
sdjournal.SD_JOURNAL_FIELD_SYSLOG_IDENTIFIER: "foo",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT: "foo-user.service",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT: "foo.service",
sdjournal.SD_JOURNAL_FIELD_COMM: "foo.sh",
containerIDKey: containerID,
Expand Down
6 changes: 4 additions & 2 deletions pkg/logs/status/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,10 @@ func (b *Builder) toDictionary(c *config.LogsConfig) map[string]interface{} {
dictionary["Label"] = c.Label
dictionary["Name"] = c.Name
case config.JournaldType:
dictionary["IncludeUnits"] = strings.Join(c.IncludeUnits, ", ")
dictionary["ExcludeUnits"] = strings.Join(c.ExcludeUnits, ", ")
dictionary["IncludeSystemUnits"] = strings.Join(c.IncludeSystemUnits, ", ")
dictionary["ExcludeSystemUnits"] = strings.Join(c.ExcludeSystemUnits, ", ")
dictionary["IncludeUserUnits"] = strings.Join(c.IncludeUserUnits, ", ")
dictionary["ExcludeUserUnits"] = strings.Join(c.ExcludeUserUnits, ", ")
case config.WindowsEventType:
dictionary["ChannelPath"] = c.ChannelPath
dictionary["Query"] = c.Query
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
features:
- Adds User-level service unit filtering support for Journald log collection via `include_user_units` and `exclude_user_units`.
- A wildcard (`*`) can be used in either `exclude_units` or `exclude_user_units`
if only a particular type of Journald log is desired.