Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/database: simplify flush agents/bouncers #3026

Merged
merged 2 commits into from
May 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 75 additions & 78 deletions pkg/database/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,25 @@
"github.com/crowdsecurity/crowdsec/pkg/types"
)


func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Scheduler, error) {
maxItems := 0
maxAge := ""

if config.MaxItems != nil && *config.MaxItems <= 0 {
return nil, errors.New("max_items can't be zero or negative number")
}

if config.MaxItems != nil {
maxItems = *config.MaxItems
}

if config.MaxAge != nil && *config.MaxAge != "" {
maxAge = *config.MaxAge
}

// Init & Start cronjob every minute for alerts
scheduler := gocron.NewScheduler(time.UTC)

job, err := scheduler.Every(1).Minute().Do(c.FlushAlerts, maxAge, maxItems)
if err != nil {
return nil, fmt.Errorf("while starting FlushAlerts scheduler: %w", err)
Expand All @@ -46,38 +49,48 @@
if err != nil {
return nil, fmt.Errorf("while parsing agents cert auto-delete duration: %w", err)
}

config.AgentsGC.CertDuration = &duration
}

if config.AgentsGC.LoginPassword != nil {
duration, err := ParseDuration(*config.AgentsGC.LoginPassword)
if err != nil {
return nil, fmt.Errorf("while parsing agents login/password auto-delete duration: %w", err)
}

config.AgentsGC.LoginPasswordDuration = &duration
}

if config.AgentsGC.Api != nil {
log.Warning("agents auto-delete for API auth is not supported (use cert or login_password)")
}
}

if config.BouncersGC != nil {
if config.BouncersGC.Cert != nil {
duration, err := ParseDuration(*config.BouncersGC.Cert)
if err != nil {
return nil, fmt.Errorf("while parsing bouncers cert auto-delete duration: %w", err)
}

config.BouncersGC.CertDuration = &duration
}

if config.BouncersGC.Api != nil {
duration, err := ParseDuration(*config.BouncersGC.Api)
if err != nil {
return nil, fmt.Errorf("while parsing bouncers api auto-delete duration: %w", err)
}

config.BouncersGC.ApiDuration = &duration
}

if config.BouncersGC.LoginPassword != nil {
log.Warning("bouncers auto-delete for login/password auth is not supported (use cert or api)")
}
}

baJob, err := scheduler.Every(1).Minute().Do(c.FlushAgentsAndBouncers, config.AgentsGC, config.BouncersGC)
if err != nil {
return nil, fmt.Errorf("while starting FlushAgentsAndBouncers scheduler: %w", err)
Expand All @@ -89,7 +102,6 @@
return scheduler, nil
}


func (c *Client) FlushOrphans() {
/* While it has only been linked to some very corner-case bug : https://github.com/crowdsecurity/crowdsec/issues/778 */
/* We want to take care of orphaned events for which the parent alert/decision has been deleted */
Expand All @@ -98,6 +110,7 @@
c.Log.Warningf("error while deleting orphan events: %s", err)
return
}

if eventsCount > 0 {
c.Log.Infof("%d deleted orphan events", eventsCount)
}
Expand All @@ -109,103 +122,77 @@
c.Log.Warningf("error while deleting orphan decisions: %s", err)
return
}

if eventsCount > 0 {
c.Log.Infof("%d deleted orphan decisions", eventsCount)
}
}

func (c *Client) flushBouncers(bouncersCfg *csconfig.AuthGCCfg) {
if bouncersCfg == nil {
func (c *Client) flushBouncers(authType string, duration *time.Duration) {
if duration == nil {

Check warning on line 132 in pkg/database/flush.go

View check run for this annotation

Codecov / codecov/patch

pkg/database/flush.go#L131-L132

Added lines #L131 - L132 were not covered by tests
return
}

if bouncersCfg.ApiDuration != nil {
log.Debug("trying to delete old bouncers from api")
count, err := c.Ent.Bouncer.Delete().Where(
bouncer.LastPullLTE(time.Now().UTC().Add(-*duration)),
).Where(
bouncer.AuthTypeEQ(authType),
).Exec(c.CTX)

Check warning on line 140 in pkg/database/flush.go

View check run for this annotation

Codecov / codecov/patch

pkg/database/flush.go#L136-L140

Added lines #L136 - L140 were not covered by tests

deletionCount, err := c.Ent.Bouncer.Delete().Where(
bouncer.LastPullLTE(time.Now().UTC().Add(-*bouncersCfg.ApiDuration)),
).Where(
bouncer.AuthTypeEQ(types.ApiKeyAuthType),
).Exec(c.CTX)
if err != nil {
c.Log.Errorf("while auto-deleting expired bouncers (api key): %s", err)
} else if deletionCount > 0 {
c.Log.Infof("deleted %d expired bouncers (api auth)", deletionCount)
}
if err != nil {
c.Log.Errorf("while auto-deleting expired bouncers (%s): %s", authType, err)
return

Check warning on line 144 in pkg/database/flush.go

View check run for this annotation

Codecov / codecov/patch

pkg/database/flush.go#L142-L144

Added lines #L142 - L144 were not covered by tests
}

if bouncersCfg.CertDuration != nil {
log.Debug("trying to delete old bouncers from cert")

deletionCount, err := c.Ent.Bouncer.Delete().Where(
bouncer.LastPullLTE(time.Now().UTC().Add(-*bouncersCfg.CertDuration)),
).Where(
bouncer.AuthTypeEQ(types.TlsAuthType),
).Exec(c.CTX)
if err != nil {
c.Log.Errorf("while auto-deleting expired bouncers (api key): %s", err)
} else if deletionCount > 0 {
c.Log.Infof("deleted %d expired bouncers (api auth)", deletionCount)
}
if count > 0 {
c.Log.Infof("deleted %d expired bouncers (%s)", count, authType)

Check warning on line 148 in pkg/database/flush.go

View check run for this annotation

Codecov / codecov/patch

pkg/database/flush.go#L147-L148

Added lines #L147 - L148 were not covered by tests
}
}

func (c *Client) flushAgents(agentsCfg *csconfig.AuthGCCfg) {
if agentsCfg == nil {
func (c *Client) flushAgents(authType string, duration *time.Duration) {
if duration == nil {

Check warning on line 153 in pkg/database/flush.go

View check run for this annotation

Codecov / codecov/patch

pkg/database/flush.go#L152-L153

Added lines #L152 - L153 were not covered by tests
return
}

if agentsCfg.CertDuration != nil {
log.Debug("trying to delete old agents from cert")

deletionCount, err := c.Ent.Machine.Delete().Where(
machine.LastHeartbeatLTE(time.Now().UTC().Add(-*agentsCfg.CertDuration)),
).Where(
machine.Not(machine.HasAlerts()),
).Where(
machine.AuthTypeEQ(types.TlsAuthType),
).Exec(c.CTX)
log.Debugf("deleted %d entries", deletionCount)
if err != nil {
c.Log.Errorf("while auto-deleting expired machine (cert): %s", err)
} else if deletionCount > 0 {
c.Log.Infof("deleted %d expired machine (cert auth)", deletionCount)
}
count, err := c.Ent.Machine.Delete().Where(
machine.LastHeartbeatLTE(time.Now().UTC().Add(-*duration)),
machine.Not(machine.HasAlerts()),
machine.AuthTypeEQ(authType),
).Exec(c.CTX)

if err != nil {
c.Log.Errorf("while auto-deleting expired machines (%s): %s", authType, err)
return

Check warning on line 165 in pkg/database/flush.go

View check run for this annotation

Codecov / codecov/patch

pkg/database/flush.go#L157-L165

Added lines #L157 - L165 were not covered by tests
}

if agentsCfg.LoginPasswordDuration != nil {
log.Debug("trying to delete old agents from password")

deletionCount, err := c.Ent.Machine.Delete().Where(
machine.LastHeartbeatLTE(time.Now().UTC().Add(-*agentsCfg.LoginPasswordDuration)),
).Where(
machine.Not(machine.HasAlerts()),
).Where(
machine.AuthTypeEQ(types.PasswordAuthType),
).Exec(c.CTX)
log.Debugf("deleted %d entries", deletionCount)
if err != nil {
c.Log.Errorf("while auto-deleting expired machine (password): %s", err)
} else if deletionCount > 0 {
c.Log.Infof("deleted %d expired machine (password auth)", deletionCount)
}
if count > 0 {
c.Log.Infof("deleted %d expired machines (%s auth)", count, authType)

Check warning on line 169 in pkg/database/flush.go

View check run for this annotation

Codecov / codecov/patch

pkg/database/flush.go#L168-L169

Added lines #L168 - L169 were not covered by tests
}
}

func (c *Client) FlushAgentsAndBouncers(agentsCfg *csconfig.AuthGCCfg, bouncersCfg *csconfig.AuthGCCfg) error {
log.Debug("starting FlushAgentsAndBouncers")

c.flushBouncers(bouncersCfg)
c.flushAgents(agentsCfg)
if agentsCfg != nil {
c.flushAgents(types.TlsAuthType, agentsCfg.CertDuration)
c.flushAgents(types.PasswordAuthType, agentsCfg.LoginPasswordDuration)
}

Check warning on line 179 in pkg/database/flush.go

View check run for this annotation

Codecov / codecov/patch

pkg/database/flush.go#L177-L179

Added lines #L177 - L179 were not covered by tests

if bouncersCfg != nil {
c.flushBouncers(types.TlsAuthType, bouncersCfg.CertDuration)
c.flushBouncers(types.ApiKeyAuthType, bouncersCfg.ApiDuration)
}

Check warning on line 184 in pkg/database/flush.go

View check run for this annotation

Codecov / codecov/patch

pkg/database/flush.go#L182-L184

Added lines #L182 - L184 were not covered by tests

return nil
}

func (c *Client) FlushAlerts(MaxAge string, MaxItems int) error {
var deletedByAge int
var deletedByNbItem int
var totalAlerts int
var err error
var (
deletedByAge int
deletedByNbItem int
totalAlerts int
err error
)

if !c.CanFlush {
c.Log.Debug("a list is being imported, flushing later")
Expand All @@ -215,17 +202,20 @@
c.Log.Debug("Flushing orphan alerts")
c.FlushOrphans()
c.Log.Debug("Done flushing orphan alerts")

totalAlerts, err = c.TotalAlerts()
if err != nil {
c.Log.Warningf("FlushAlerts (max items count): %s", err)
return fmt.Errorf("unable to get alerts count: %w", err)
}

c.Log.Debugf("FlushAlerts (Total alerts): %d", totalAlerts)

if MaxAge != "" {
filter := map[string][]string{
"created_before": {MaxAge},
}

nbDeleted, err := c.DeleteAlertWithFilter(filter)
if err != nil {
c.Log.Warningf("FlushAlerts (max age): %s", err)
Expand All @@ -235,19 +225,21 @@
c.Log.Debugf("FlushAlerts (deleted max age alerts): %d", nbDeleted)
deletedByAge = nbDeleted
}

if MaxItems > 0 {
//We get the highest id for the alerts
//We subtract MaxItems to avoid deleting alerts that are not old enough
//This gives us the oldest alert that we want to keep
//We then delete all the alerts with an id lower than this one
//We can do this because the id is auto-increment, and the database won't reuse the same id twice
// We get the highest id for the alerts
// We subtract MaxItems to avoid deleting alerts that are not old enough
// This gives us the oldest alert that we want to keep
// We then delete all the alerts with an id lower than this one
// We can do this because the id is auto-increment, and the database won't reuse the same id twice

Check warning on line 234 in pkg/database/flush.go

View check run for this annotation

Codecov / codecov/patch

pkg/database/flush.go#L230-L234

Added lines #L230 - L234 were not covered by tests
lastAlert, err := c.QueryAlertWithFilter(map[string][]string{
"sort": {"DESC"},
"limit": {"1"},
//we do not care about fetching the edges, we just want the id
// we do not care about fetching the edges, we just want the id

Check warning on line 238 in pkg/database/flush.go

View check run for this annotation

Codecov / codecov/patch

pkg/database/flush.go#L238

Added line #L238 was not covered by tests
"with_decisions": {"false"},
})
c.Log.Debugf("FlushAlerts (last alert): %+v", lastAlert)

Check warning on line 242 in pkg/database/flush.go

View check run for this annotation

Codecov / codecov/patch

pkg/database/flush.go#L242

Added line #L242 was not covered by tests
if err != nil {
c.Log.Errorf("FlushAlerts: could not get last alert: %s", err)
return fmt.Errorf("could not get last alert: %w", err)
Expand All @@ -259,7 +251,7 @@
c.Log.Debugf("FlushAlerts (max id): %d", maxid)

if maxid > 0 {
//This may lead to orphan alerts (at least on MySQL), but the next time the flush job will run, they will be deleted
// This may lead to orphan alerts (at least on MySQL), but the next time the flush job will run, they will be deleted

Check warning on line 254 in pkg/database/flush.go

View check run for this annotation

Codecov / codecov/patch

pkg/database/flush.go#L254

Added line #L254 was not covered by tests
deletedByNbItem, err = c.Ent.Alert.Delete().Where(alert.IDLT(maxid)).Exec(c.CTX)

if err != nil {
Expand All @@ -269,11 +261,16 @@
}
}
}

if deletedByNbItem > 0 {
c.Log.Infof("flushed %d/%d alerts because the max number of alerts has been reached (%d max)", deletedByNbItem, totalAlerts, MaxItems)
c.Log.Infof("flushed %d/%d alerts because the max number of alerts has been reached (%d max)",
deletedByNbItem, totalAlerts, MaxItems)

Check warning on line 267 in pkg/database/flush.go

View check run for this annotation

Codecov / codecov/patch

pkg/database/flush.go#L266-L267

Added lines #L266 - L267 were not covered by tests
}

if deletedByAge > 0 {
c.Log.Infof("flushed %d/%d alerts because they were created %s ago or more", deletedByAge, totalAlerts, MaxAge)
c.Log.Infof("flushed %d/%d alerts because they were created %s ago or more",
deletedByAge, totalAlerts, MaxAge)

Check warning on line 272 in pkg/database/flush.go

View check run for this annotation

Codecov / codecov/patch

pkg/database/flush.go#L271-L272

Added lines #L271 - L272 were not covered by tests
}

return nil
}
Loading