Skip to content

Commit

Permalink
Adds user-level unit filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
ian28223 committed Mar 22, 2022
1 parent 304c66a commit aa79ed6
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 17 deletions.
8 changes: 5 additions & 3 deletions pkg/logs/config/integration_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ type LogsConfig struct {
ExcludePaths []string `mapstructure:"exclude_paths" json:"exclude_paths"` // File
TailingMode string `mapstructure:"start_position" json:"start_position"` // File

IncludeUnits []string `mapstructure:"include_units" json:"include_units"` // Journald
ExcludeUnits []string `mapstructure:"exclude_units" json:"exclude_units"` // Journald
ContainerMode bool `mapstructure:"container_mode" json:"container_mode"` // Journald
IncludeSystemUnits []string `mapstructure:"include_units" json:"include_system_units" json:"include_units"` // Journald
ExcludeSystemUnits []string `mapstructure:"exclude_units" json:"exclude_system_units" json:"exclude_units"` // Journald
IncludeUserUnits []string `mapstructure:"include_user_units" json:"include_user_units"` // Journald
ExcludeUserUnits []string `mapstructure:"exclude_user_units" json:"exclude_user_units"` // Journald
ContainerMode bool `mapstructure:"container_mode" json:"container_mode"` // Journald

Image string // Docker
Label string // Docker
Expand Down
56 changes: 42 additions & 14 deletions pkg/logs/internal/tailers/journald/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@ const (

// Tailer collects logs from a journal.
type Tailer struct {
source *config.LogSource
outputChan chan *message.Message
journal *sdjournal.Journal
blacklist map[string]bool
stop chan struct{}
done chan struct{}
source *config.LogSource
outputChan chan *message.Message
journal *sdjournal.Journal
excludeUnits struct {
system map[string]bool
user map[string]bool
}
stop chan struct{}
done chan struct{}
}

// NewTailer returns a new tailer.
Expand Down Expand Up @@ -89,20 +92,36 @@ func (t *Tailer) setup() error {
return err
}

for _, unit := range config.IncludeUnits {
// add filters to collect only the logs of the units defined in the configuration,
// if no units are defined, collect all the logs of the journal by default.
// add filters to collect only the logs of the units defined in the configuration,
// if no units are defined for both System and User, collect all the logs of the journal by default.
for _, unit := range config.IncludeSystemUnits {
// add filters to collect only the logs of the system-level units defined in the configuration.
match := sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT + "=" + unit
err := t.journal.AddMatch(match)
if err != nil {
return fmt.Errorf("could not add filter %s: %s", match, err)
}
}

t.blacklist = make(map[string]bool)
for _, unit := range config.ExcludeUnits {
// add filters to drop all the logs related to units to exclude.
t.blacklist[unit] = true
for _, unit := range config.IncludeUserUnits {
// add filters to collect only the logs of the user-level units defined in the configuration.
match := sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT + "=" + unit
err := t.journal.AddMatch(match)
if err != nil {
return fmt.Errorf("could not add filter %s: %s", match, err)
}
}

t.excludeUnits.system = make(map[string]bool)
for _, unit := range config.ExcludeSystemUnits {
// add filters to drop all the logs related to system units to exclude.
t.excludeUnits.system[unit] = true
}

t.excludeUnits.user = make(map[string]bool)
for _, unit := range config.ExcludeUserUnits {
// add filters to drop all the logs related to user units to exclude.
t.excludeUnits.user[unit] = true
}

return nil
Expand Down Expand Up @@ -168,7 +187,15 @@ func (t *Tailer) shouldDrop(entry *sdjournal.JournalEntry) bool {
if !exists {
return false
}
if _, blacklisted := t.blacklist[unit]; blacklisted {
if _, excluded := t.excludeUnits.system[unit]; excluded {
// drop the entry
return true
}
unit, exists = entry.Fields[sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT]
if !exists {
return false
}
if _, excluded := t.excludeUnits.user[unit]; excluded {
// drop the entry
return true
}
Expand Down Expand Up @@ -236,6 +263,7 @@ func (t *Tailer) getOrigin(entry *sdjournal.JournalEntry) *message.Origin {
// applicationKeys represents all the valid attributes used to extract the value of the application name of a journal entry.
var applicationKeys = []string{
sdjournal.SD_JOURNAL_FIELD_SYSLOG_IDENTIFIER, // "SYSLOG_IDENTIFIER"
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT, // "_SYSTEMD_USER_UNIT"
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT, // "_SYSTEMD_UNIT"
sdjournal.SD_JOURNAL_FIELD_COMM, // "_COMM"
}
Expand Down

0 comments on commit aa79ed6

Please sign in to comment.