From 2b499bda754a697ab85364ae87835dd315bfa43d Mon Sep 17 00:00:00 2001 From: Corey Ogburn Date: Wed, 12 Jun 2024 12:11:50 -0600 Subject: [PATCH] Deduplication of Detections by Public Id To support this properly, engines need to process their rules in a deterministic order. If different detections are marked as duplicates from one sync to the next, you never really know which rules your system is executing. UpdateRepos has been refactored to return an array instead of an unordered map. The array of RepoOnDisk (formerly DirtyRepo) will be in the same order as the repos listed in the config. Strelka needed even more refactoring as it would parse each rule from beginning to end before going to the next rule. Now it gathers all the detections, dedupes them, then syncs them. Refactored the call to WalkDir to use the IOManager instead of directly calling filepath.WalkDir. Some tests needed to be updated to work around the changes to UpdateRepos and other determinism changes. Added a new test for the deduplication process. --- .../modules/detections/detengine_helpers.go | 38 +++- .../detections/detengine_helpers_test.go | 53 +++++ server/modules/elastalert/elastalert.go | 35 ++-- server/modules/elastalert/elastalert_test.go | 17 +- server/modules/strelka/mock/mock_iomanager.go | 30 ++- server/modules/strelka/strelka.go | 183 ++++++++++-------- server/modules/suricata/suricata.go | 2 + 7 files changed, 225 insertions(+), 133 deletions(-) diff --git a/server/modules/detections/detengine_helpers.go b/server/modules/detections/detengine_helpers.go index dccde47d..3a277a37 100644 --- a/server/modules/detections/detengine_helpers.go +++ b/server/modules/detections/detengine_helpers.go @@ -108,13 +108,14 @@ func WriteStateFile(iom IOManager, path string) { } } -type DirtyRepo struct { - WasModified bool +type RepoOnDisk struct { Repo *model.RuleRepo + Path string + WasModified bool } -func UpdateRepos(isRunning *bool, baseRepoFolder string, rulesRepos []*model.RuleRepo, cfg *config.ServerConfig) (allRepos map[string]*DirtyRepo, anythingNew bool, err error) { - allRepos = map[string]*DirtyRepo{} // map[repoPath]repo +func UpdateRepos(isRunning *bool, baseRepoFolder string, rulesRepos []*model.RuleRepo, cfg *config.ServerConfig) (allRepos []*RepoOnDisk, anythingNew bool, err error) { + allRepos = make([]*RepoOnDisk, 0, len(rulesRepos)) // read existing repos entries, err := os.ReadDir(baseRepoFolder) @@ -148,11 +149,12 @@ func UpdateRepos(isRunning *bool, baseRepoFolder string, rulesRepos []*model.Rul _, lastFolder := path.Split(parser.Path) repoPath := filepath.Join(baseRepoFolder, lastFolder) - dirty := &DirtyRepo{ + dirty := &RepoOnDisk{ Repo: repo, + Path: repoPath, } - allRepos[repoPath] = dirty + allRepos = append(allRepos, dirty) reclone := false proxyOpts, err := proxyToTransportOptions(cfg.Proxy) @@ -352,3 +354,27 @@ func AddUser(previous string, user *model.User, sep string) string { func EscapeDoubleQuotes(str string) string { return doubleQuoteEscaper.ReplaceAllString(str, "\\$1$2") } + +func DeduplicateByPublicId(detects []*model.Detection) []*model.Detection { + set := map[string]*model.Detection{} + deduped := make([]*model.Detection, 0, len(detects)) + + for _, detect := range detects { + existing, inSet := set[detect.PublicID] + if inSet { + log.WithFields(log.Fields{ + "publicId": detect.PublicID, + "engine": detect.Engine, + "existingRuleset": existing.Ruleset, + "duplicateRuleset": detect.Ruleset, + "existingTitle": existing.Title, + "duplicateTitle": detect.Title, + }).Warn("duplicate publicId found, skipping") + } else { + set[detect.PublicID] = detect + deduped = append(deduped, detect) + } + } + + return deduped +} diff --git a/server/modules/detections/detengine_helpers_test.go b/server/modules/detections/detengine_helpers_test.go index cec9a4ce..4a41b75f 100644 --- a/server/modules/detections/detengine_helpers_test.go +++ b/server/modules/detections/detengine_helpers_test.go @@ -280,3 +280,56 @@ func TestProxyToTransportOptions(t *testing.T) { }) } } + +func TestDeduplicateByPublicId(t *testing.T) { + tests := []struct { + Name string + InputIds []string + ExpOutput []string + }{ + { + Name: "Empty", + InputIds: []string{}, + ExpOutput: []string{}, + }, + { + Name: "No Duplicates", + InputIds: []string{"1", "2", "3"}, + ExpOutput: []string{"1", "2", "3"}, + }, + { + Name: "Only Duplicates", + InputIds: []string{"1", "1", "1", "1", "1", "1", "1", "1", "1", "1"}, + ExpOutput: []string{"1"}, + }, + { + Name: "Mixed", + InputIds: []string{"1", "2", "1", "3", "2", "4", "1", "5", "2", "6"}, + ExpOutput: []string{"1", "2", "3", "4", "5", "6"}, + }, + { + Name: "One Duplicate", + InputIds: []string{"1", "2", "3", "4", "5", "6", "1"}, + ExpOutput: []string{"1", "2", "3", "4", "5", "6"}, + }, + } + + for _, test := range tests { + test := test + t.Run(test.Name, func(t *testing.T) { + dets := make([]*model.Detection, 0, len(test.InputIds)) + for _, id := range test.InputIds { + dets = append(dets, &model.Detection{PublicID: id}) + } + + deduped := DeduplicateByPublicId(dets) + + output := make([]string, 0, len(deduped)) + for _, det := range deduped { + output = append(output, det.PublicID) + } + + assert.Equal(t, test.ExpOutput, output) + }) + } +} diff --git a/server/modules/elastalert/elastalert.go b/server/modules/elastalert/elastalert.go index 423c06c3..108daf38 100644 --- a/server/modules/elastalert/elastalert.go +++ b/server/modules/elastalert/elastalert.go @@ -579,9 +579,6 @@ func (e *ElastAlertEngine) startCommunityRuleImport() { templateFound = true } - allRepos := map[string]*model.RuleRepo{} - var repoChanges bool - var zips map[string][]byte var errMap map[string]error var regenNeeded bool @@ -617,9 +614,7 @@ func (e *ElastAlertEngine) startCommunityRuleImport() { continue } - var dirtyRepos map[string]*detections.DirtyRepo - - dirtyRepos, repoChanges, err = detections.UpdateRepos(&e.isRunning, e.reposFolder, e.rulesRepos, e.srv.Config) + dirtyRepos, repoChanges, err := detections.UpdateRepos(&e.isRunning, e.reposFolder, e.rulesRepos, e.srv.Config) if err != nil { if strings.Contains(err.Error(), "module stopped") { break @@ -637,10 +632,6 @@ func (e *ElastAlertEngine) startCommunityRuleImport() { continue } - for k, v := range dirtyRepos { - allRepos[k] = v.Repo - } - zipHashes := map[string]string{} for pkg, data := range zips { h := sha256.Sum256(data) @@ -714,7 +705,7 @@ func (e *ElastAlertEngine) startCommunityRuleImport() { break } - repoDets, errMap := e.parseRepoRules(allRepos) + repoDets, errMap := e.parseRepoRules(dirtyRepos) if errMap != nil { log.WithField("sigmaParseError", errMap).Error("something went wrong while parsing sigma rule files from repos") } @@ -725,6 +716,8 @@ func (e *ElastAlertEngine) startCommunityRuleImport() { detects = append(detects, repoDets...) + detects = detections.DeduplicateByPublicId(detects) + errMap, err = e.syncCommunityDetections(ctx, detects) if err != nil { if err == errModuleStopped { @@ -864,11 +857,13 @@ func (e *ElastAlertEngine) parseZipRules(pkgZips map[string][]byte) (detections } }() - for pkg, zipData := range pkgZips { + for _, pkg := range e.sigmaRulePackages { if !e.isRunning { return nil, map[string]error{"module": errModuleStopped} } + zipData := pkgZips[pkg] + reader, err := zip.NewReader(bytes.NewReader(zipData), int64(len(zipData))) if err != nil { errMap[pkg] = err @@ -925,7 +920,7 @@ func (e *ElastAlertEngine) parseZipRules(pkgZips map[string][]byte) (detections return detections, errMap } -func (e *ElastAlertEngine) parseRepoRules(allRepos map[string]*model.RuleRepo) (detections []*model.Detection, errMap map[string]error) { +func (e *ElastAlertEngine) parseRepoRules(allRepos []*detections.RepoOnDisk) (detections []*model.Detection, errMap map[string]error) { errMap = map[string]error{} // map[repoName]error defer func() { if len(errMap) == 0 { @@ -933,14 +928,14 @@ func (e *ElastAlertEngine) parseRepoRules(allRepos map[string]*model.RuleRepo) ( } }() - for repopath, repo := range allRepos { + for _, repo := range allRepos { if !e.isRunning { return nil, map[string]error{"module": errModuleStopped} } - baseDir := repopath - if repo.Folder != nil { - baseDir = filepath.Join(baseDir, *repo.Folder) + baseDir := repo.Path + if repo.Repo.Folder != nil { + baseDir = filepath.Join(baseDir, *repo.Repo.Folder) } err := e.WalkDir(baseDir, func(path string, d fs.DirEntry, err error) error { @@ -974,16 +969,16 @@ func (e *ElastAlertEngine) parseRepoRules(allRepos map[string]*model.RuleRepo) ( return nil } - ruleset := filepath.Base(repopath) + ruleset := filepath.Base(repo.Path) - det := rule.ToDetection(ruleset, repo.License, repo.Community) + det := rule.ToDetection(ruleset, repo.Repo.License, repo.Repo.Community) detections = append(detections, det) return nil }) if err != nil { - log.WithError(err).WithField("elastAlertRuleRepo", repopath).Error("Failed to walk repo") + log.WithError(err).WithField("elastAlertRuleRepo", repo.Path).Error("Failed to walk repo") continue } } diff --git a/server/modules/elastalert/elastalert_test.go b/server/modules/elastalert/elastalert_test.go index 72b64871..3d328f0a 100644 --- a/server/modules/elastalert/elastalert_test.go +++ b/server/modules/elastalert/elastalert_test.go @@ -27,6 +27,7 @@ import ( "github.com/security-onion-solutions/securityonion-soc/model" "github.com/security-onion-solutions/securityonion-soc/module" "github.com/security-onion-solutions/securityonion-soc/server" + "github.com/security-onion-solutions/securityonion-soc/server/modules/detections" "github.com/security-onion-solutions/securityonion-soc/server/modules/elastalert/mock" "github.com/security-onion-solutions/securityonion-soc/util" @@ -529,7 +530,8 @@ level: high } engine := ElastAlertEngine{ - isRunning: true, + isRunning: true, + sigmaRulePackages: []string{"all_rules"}, } engine.allowRegex = regexp.MustCompile("00000000-0000-0000-0000-00000000") engine.denyRegex = regexp.MustCompile("deny") @@ -578,11 +580,14 @@ level: high license: Elastic-2.0 ` - repos := map[string]*model.RuleRepo{ - "repo-path": { - Repo: "github.com/repo-user/repo-path", - License: "DRL", - Community: true, + repos := []*detections.RepoOnDisk{ + { + Repo: &model.RuleRepo{ + Repo: "github.com/repo-user/repo-path", + License: "DRL", + Community: true, + }, + Path: "repo-path", }, } diff --git a/server/modules/strelka/mock/mock_iomanager.go b/server/modules/strelka/mock/mock_iomanager.go index e30a25b2..bd06aeae 100644 --- a/server/modules/strelka/mock/mock_iomanager.go +++ b/server/modules/strelka/mock/mock_iomanager.go @@ -10,7 +10,6 @@ package mock import ( fs "io/fs" - http "net/http" exec "os/exec" reflect "reflect" time "time" @@ -72,21 +71,6 @@ func (mr *MockIOManagerMockRecorder) ExecCommand(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExecCommand", reflect.TypeOf((*MockIOManager)(nil).ExecCommand), arg0) } -// MakeRequest mocks base method. -func (m *MockIOManager) MakeRequest(arg0 *http.Request) (*http.Response, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "MakeRequest", arg0) - ret0, _ := ret[0].(*http.Response) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// MakeRequest indicates an expected call of MakeRequest. -func (mr *MockIOManagerMockRecorder) MakeRequest(arg0 any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MakeRequest", reflect.TypeOf((*MockIOManager)(nil).MakeRequest), arg0) -} - // ReadDir mocks base method. func (m *MockIOManager) ReadDir(arg0 string) ([]fs.DirEntry, error) { m.ctrl.T.Helper() @@ -117,6 +101,20 @@ func (mr *MockIOManagerMockRecorder) ReadFile(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadFile", reflect.TypeOf((*MockIOManager)(nil).ReadFile), arg0) } +// WalkDir mocks base method. +func (m *MockIOManager) WalkDir(arg0 string, arg1 fs.WalkDirFunc) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WalkDir", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// WalkDir indicates an expected call of WalkDir. +func (mr *MockIOManagerMockRecorder) WalkDir(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WalkDir", reflect.TypeOf((*MockIOManager)(nil).WalkDir), arg0, arg1) +} + // WriteFile mocks base method. func (m *MockIOManager) WriteFile(arg0 string, arg1 []byte, arg2 fs.FileMode) error { m.ctrl.T.Helper() diff --git a/server/modules/strelka/strelka.go b/server/modules/strelka/strelka.go index a0ef577c..51011870 100644 --- a/server/modules/strelka/strelka.go +++ b/server/modules/strelka/strelka.go @@ -60,6 +60,7 @@ type IOManager interface { DeleteFile(path string) error ReadDir(path string) ([]os.DirEntry, error) ExecCommand(cmd *exec.Cmd) ([]byte, int, time.Duration, error) + WalkDir(root string, fn fs.WalkDirFunc) error } type StrelkaEngine struct { @@ -396,8 +397,6 @@ func (e *StrelkaEngine) startCommunityRuleImport() { templateFound = true } - upToDate := map[string]*model.RuleRepo{} - allRepos, anythingNew, err := detections.UpdateRepos(&e.isRunning, e.reposFolder, e.rulesRepos, e.srv.Config) if err != nil { if strings.Contains(err.Error(), "module stopped") { @@ -437,12 +436,6 @@ func (e *StrelkaEngine) startCommunityRuleImport() { continue } - for k, v := range allRepos { - if v.WasModified || forceSync { - upToDate[k] = v.Repo - } - } - communityDetections, err := e.srv.Detectionstore.GetAllDetections(e.srv.Context, model.WithEngine(model.EngineNameStrelka), model.WithCommunity(true)) if err != nil { log.WithError(err).Error("Failed to get all community SIDs") @@ -463,20 +456,24 @@ func (e *StrelkaEngine) startCommunityRuleImport() { } et := detections.NewErrorTracker(e.failAfterConsecutiveErrorCount) - failSync := false + detects := []*model.Detection{} // parse *.yar files in repos - for repopath, repo := range upToDate { + for repopath, repo := range allRepos { if !e.isRunning { return } - baseDir := repopath - if repo.Folder != nil { - baseDir = filepath.Join(baseDir, *repo.Folder) + if !repo.WasModified && !forceSync { + continue + } + + baseDir := repo.Path + if repo.Repo.Folder != nil { + baseDir = filepath.Join(baseDir, *repo.Repo.Folder) } - err = filepath.WalkDir(baseDir, func(path string, d fs.DirEntry, err error) error { + err = e.WalkDir(baseDir, func(path string, d fs.DirEntry, err error) error { if err != nil { log.WithError(err).WithField("repoPath", path).Error("Failed to walk path") return nil @@ -508,97 +505,108 @@ func (e *StrelkaEngine) startCommunityRuleImport() { } for _, rule := range parsed { - det := rule.ToDetection(repo.License, filepath.Base(repopath), repo.Community) - log.WithFields(log.Fields{ - "rule.uuid": det.PublicID, - "rule.name": det.Title, - }).Info("Strelka community sync - processing YARA rule") - - comRule, exists := communityDetections[det.PublicID] - if exists { - if comRule.Content != det.Content || comRule.Ruleset != det.Ruleset || len(det.Overrides) != 0 { - // pre-existing detection, update it - det.IsEnabled = comRule.IsEnabled - det.Id = comRule.Id - det.Overrides = comRule.Overrides - det.CreateTime = comRule.CreateTime - - log.WithFields(log.Fields{ - "rule.uuid": det.PublicID, - "rule.name": det.Title, - }).Info("Updating Yara detection") - - _, err = e.srv.Detectionstore.UpdateDetection(e.srv.Context, det) - if err != nil && err.Error() == "Object not found" { - log.WithField("publicId", det.PublicID).Error("unable to read back successful write") - - writeNoRead = util.Ptr(det.PublicID) - failSync = true - - return err - } - - eterr := et.AddError(err) - if eterr != nil { - return eterr - } + det := rule.ToDetection(repo.Repo.License, filepath.Base(repo.Path), repo.Repo.Community) + detects = append(detects, det) + } - if err != nil { - log.WithError(err).WithField("publicId", det.PublicID).Error("Failed to update detection") - continue - } - } + return nil + }) + if err != nil { + log.WithError(err).WithField("strelkaRepo", repopath).Error("failed while walking repo") - delete(toDelete, det.PublicID) - } else { - // new detection, create it - log.WithFields(log.Fields{ - "rule.uuid": det.PublicID, - "rule.name": det.Title, - }).Info("Creating new Yara detection") + continue + } + } - checkRulesetEnabled(e, det) + var eterr error + detects = detections.DeduplicateByPublicId(detects) - _, err = e.srv.Detectionstore.CreateDetection(e.srv.Context, det) - if err != nil && err.Error() == "Object not found" { - log.WithField("publicId", det.PublicID).Error("unable to read back successful write") + for _, det := range detects { + log.WithFields(log.Fields{ + "rule.uuid": det.PublicID, + "rule.name": det.Title, + }).Info("Strelka community sync - processing YARA rule") - writeNoRead = util.Ptr(det.PublicID) - failSync = true + comRule, exists := communityDetections[det.PublicID] + if exists { + if comRule.Content != det.Content || comRule.Ruleset != det.Ruleset || len(det.Overrides) != 0 { + // pre-existing detection, update it + det.IsEnabled = comRule.IsEnabled + det.Id = comRule.Id + det.Overrides = comRule.Overrides + det.CreateTime = comRule.CreateTime - return err - } + log.WithFields(log.Fields{ + "rule.uuid": det.PublicID, + "rule.name": det.Title, + }).Info("Updating Yara detection") - eterr := et.AddError(err) - if eterr != nil { - failSync = true + _, err = e.srv.Detectionstore.UpdateDetection(e.srv.Context, det) + if err != nil && err.Error() == "Object not found" { + writeNoRead = util.Ptr(det.PublicID) + log.WithField("publicId", det.PublicID).Error("unable to read back successful write") - return eterr - } + break + } - if err != nil { - log.WithError(err).WithField("publicId", det.PublicID).Error("Failed to create detection") - continue - } + eterr = et.AddError(err) + if eterr != nil { + break + } - delete(toDelete, det.PublicID) + if err != nil { + log.WithError(err).WithField("publicId", det.PublicID).Error("Failed to update detection") + continue } } - return nil - }) - if err != nil { - log.WithError(err).WithField("strelkaRepo", repopath).Error("failed while walking repo") + delete(toDelete, det.PublicID) + } else { + // new detection, create it + log.WithFields(log.Fields{ + "rule.uuid": det.PublicID, + "rule.name": det.Title, + }).Info("Creating new Yara detection") + + checkRulesetEnabled(e, det) + + _, err = e.srv.Detectionstore.CreateDetection(e.srv.Context, det) + if err != nil && err.Error() == "Object not found" { + writeNoRead = util.Ptr(det.PublicID) + log.WithField("publicId", det.PublicID).Error("unable to read back successful write") - if failSync { break } - continue + eterr = et.AddError(err) + if eterr != nil { + break + } + + if err != nil { + log.WithError(err).WithField("publicId", det.PublicID).Error("Failed to create detection") + continue + } + + delete(toDelete, det.PublicID) } } - if failSync { + if eterr != nil || writeNoRead != nil { + if eterr != nil { + log.WithError(eterr).Error("unable to sync YARA community detections") + } + if writeNoRead != nil { + log.Warn("detection was written but not read back, attempting read before continuing") + } + + if e.notify { + e.srv.Host.Broadcast("detection-sync", "detections", server.SyncStatus{ + Engine: model.EngineNameElastAlert, + Status: "error", + }) + } + continue } @@ -1170,6 +1178,7 @@ func (_ *ResourceManager) DeleteFile(path string) error { func (_ *ResourceManager) ReadDir(path string) ([]os.DirEntry, error) { return os.ReadDir(path) } + func (_ *ResourceManager) ExecCommand(cmd *exec.Cmd) (output []byte, exitCode int, runtime time.Duration, err error) { start := time.Now() output, err = cmd.CombinedOutput() @@ -1179,3 +1188,7 @@ func (_ *ResourceManager) ExecCommand(cmd *exec.Cmd) (output []byte, exitCode in return output, exitCode, runtime, err } + +func (_ *ResourceManager) WalkDir(root string, fn fs.WalkDirFunc) error { + return filepath.WalkDir(root, fn) +} diff --git a/server/modules/suricata/suricata.go b/server/modules/suricata/suricata.go index dd487478..c11593a5 100644 --- a/server/modules/suricata/suricata.go +++ b/server/modules/suricata/suricata.go @@ -509,6 +509,8 @@ func (e *SuricataEngine) watchCommunityRules() { continue } + commDetections = detections.DeduplicateByPublicId(commDetections) + for _, d := range commDetections { d.IsCommunity = true }