Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redo the suri module with native pcap extraction; improve local dev #336

Merged
merged 2 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 93 additions & 71 deletions agent/modules/suriquery/suriquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,41 +7,41 @@
package suriquery

import (
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path"
"path/filepath"
"slices"
"strconv"
"strings"
"time"

"github.com/apex/log"
"github.com/kennygrant/sanitize"
"github.com/google/gopacket"
"github.com/security-onion-solutions/securityonion-soc/agent"
"github.com/security-onion-solutions/securityonion-soc/model"
"github.com/security-onion-solutions/securityonion-soc/module"
"github.com/security-onion-solutions/securityonion-soc/packet"
)

const DEFAULT_EXECUTABLE_PATH = "suriquery.sh"
const DEFAULT_PCAP_OUTPUT_PATH = "/nsm/pcapout"
const DEFAULT_PCAP_INPUT_PATH = "/nsm/pcap"
const DEFAULT_PCAP_INPUT_PATH = "/nsm/suripcap"
const DEFAULT_EPOCH_REFRESH_MS = 120000
const DEFAULT_TIMEOUT_MS = 1200000
const DEFAULT_DATA_LAG_MS = 120000
const DEFAULT_PCAP_MAX_COUNT = 999999

const SURI_PCAP_PREFIX = "so-pcap."

type SuriQuery struct {
config module.ModuleConfig
executablePath string
pcapOutputPath string
pcapInputPath string
agent *agent.Agent
epochTimeTmp time.Time
epochTime time.Time
epochRefreshTime time.Time
epochRefreshMs int
timeoutMs int
dataLagMs int
pcapMaxCount int
}

func NewSuriQuery(agt *agent.Agent) *SuriQuery {
Expand All @@ -57,12 +57,10 @@ func (lag *SuriQuery) PrerequisiteModules() []string {
func (suri *SuriQuery) Init(cfg module.ModuleConfig) error {
var err error
suri.config = cfg
suri.executablePath = module.GetStringDefault(cfg, "executablePath", DEFAULT_EXECUTABLE_PATH)
suri.pcapOutputPath = module.GetStringDefault(cfg, "pcapOutputPath", DEFAULT_PCAP_OUTPUT_PATH)
suri.pcapInputPath = module.GetStringDefault(cfg, "pcapInputPath", DEFAULT_PCAP_INPUT_PATH)
suri.epochRefreshMs = module.GetIntDefault(cfg, "epochRefreshMs", DEFAULT_EPOCH_REFRESH_MS)
suri.timeoutMs = module.GetIntDefault(cfg, "timeoutMs", DEFAULT_TIMEOUT_MS)
suri.dataLagMs = module.GetIntDefault(cfg, "dataLagMs", DEFAULT_DATA_LAG_MS)
suri.pcapMaxCount = module.GetIntDefault(cfg, "pcapMaxCount", DEFAULT_PCAP_MAX_COUNT)
if suri.agent == nil {
err = errors.New("Unable to invoke JobMgr.AddJobProcessor due to nil agent")
} else {
Expand Down Expand Up @@ -91,8 +89,8 @@ func (suri *SuriQuery) ProcessJob(job *model.Job, reader io.ReadCloser) (io.Read
var err error
if job.GetKind() != "pcap" {
log.WithFields(log.Fields{
"jobId": job.Id,
"kind": job.GetKind(),
"jobId": job.Id,
"jobKind": job.GetKind(),
}).Debug("Skipping suri processor due to unsupported job")
return reader, nil
}
Expand All @@ -112,73 +110,90 @@ func (suri *SuriQuery) ProcessJob(job *model.Job, reader io.ReadCloser) (io.Read
}).Info("Skipping suri processor due to date range conflict")
err = errors.New("No data available for the requested dates")
} else {
job.FileExtension = "pcap"

query := suri.CreateQuery(job)

pcapFilepath := fmt.Sprintf("%s/%d.%s", suri.pcapOutputPath, job.Id, job.FileExtension)

log.WithField("jobId", job.Id).Info("Processing pcap export for job")

ctx, cancel := context.WithTimeout(context.Background(), time.Duration(suri.timeoutMs)*time.Millisecond)
defer cancel()
beginTime := job.Filter.BeginTime.Format(time.RFC3339)
endTime := job.Filter.EndTime.Format(time.RFC3339)

cmd := exec.CommandContext(ctx, suri.executablePath, pcapFilepath, beginTime, endTime, query)
var output []byte
output, err = cmd.CombinedOutput()
log.WithFields(log.Fields{
"executablePath": suri.executablePath,
"query": query,
"output": string(output),
"pcapFilepath": pcapFilepath,
"err": err,
}).Debug("Executed suriread")
if err == nil {
var file *os.File
file, err = os.Open(pcapFilepath)
if err == nil {
reader = file
}
}
"jobId": job.Id,
}).Debug("Starting to process new Suricata PCAP job")
pcapFiles := suri.findFilesInTimeRange(job.Filter.BeginTime, job.Filter.EndTime)
reader, err = suri.streamPacketsInPcaps(pcapFiles, job.Filter)
log.WithFields(log.Fields{
"err": err,
}).Debug("Finished processing PCAP")
}
return reader, err
}

func (suri *SuriQuery) CleanupJob(job *model.Job) {
pcapOutputFilepath := fmt.Sprintf("%s/%d.%s", suri.pcapOutputPath, job.Id, sanitize.Name(job.FileExtension))
os.Remove(pcapOutputFilepath)
// Noop
}

func add(query string, added string) string {
if len(query) > 0 {
query = query + " and "
}
return query + added
}
func (suri *SuriQuery) streamPacketsInPcaps(paths []string, filter *model.Filter) (io.ReadCloser, error) {
allPackets := make([]gopacket.Packet, 0, 0)

func (suri *SuriQuery) CreateQuery(job *model.Job) string {
for _, path := range paths {
packets, perr := packet.ParseRawPcap(path, suri.pcapMaxCount, filter)
if perr != nil {
log.WithError(perr).WithField("pcapPath", path).Error("Failed to parse PCAP file")
}
if packets != nil && len(packets) > 0 {
allPackets = append(allPackets, packets...)
}
}

query := ""
slices.SortFunc(allPackets, func(a, b gopacket.Packet) int {
return a.Metadata().Timestamp.Compare(b.Metadata().Timestamp)
})

if len(job.Filter.SrcIp) > 0 {
query = add(query, fmt.Sprintf("host %s", job.Filter.SrcIp))
}
log.WithField("matchedCount", len(allPackets)).Debug("Finished filtering eligible packets")

if len(job.Filter.DstIp) > 0 {
query = add(query, fmt.Sprintf("host %s", job.Filter.DstIp))
}
return packet.ToStream(allPackets)
}

if job.Filter.SrcPort > 0 {
query = add(query, fmt.Sprintf("port %d", job.Filter.SrcPort))
func (suri *SuriQuery) getPcapCreateTime(filepath string) (time.Time, error) {
var createTime time.Time
var err error
filename := path.Base(filepath)
if !strings.HasPrefix(filename, SURI_PCAP_PREFIX) {
err = errors.New("unsupported pcap file")
} else {
secondsStr := strings.TrimLeft(filename, SURI_PCAP_PREFIX)
var seconds int64
seconds, err = strconv.ParseInt(secondsStr, 10, 64)
if err == nil {
createTime = time.Unix(seconds, 0).UTC()
}
}
return createTime, err
}

if job.Filter.DstPort > 0 {
query = add(query, fmt.Sprintf("port %d", job.Filter.DstPort))
func (suri *SuriQuery) findFilesInTimeRange(start time.Time, stop time.Time) []string {
eligibleFiles := make([]string, 0, 0)
err := filepath.Walk(suri.pcapInputPath, func(filepath string, fileinfo os.FileInfo, err error) error {
createTime, err := suri.getPcapCreateTime(filepath)
if err != nil {
log.WithField("pcapPath", filepath).WithError(err).Warn("PCAP file does not conform to expected format")
return nil
}
modTime := fileinfo.ModTime()
log.WithFields(log.Fields{
"pcapPath": filepath,
"createTime": createTime,
"modTime": modTime,
}).Debug("Reviewing eligibility for PCAP file")

// file was created before the time range but has still open when time range started.
if (createTime.Before(start) && modTime.After(start)) ||
// file was created and finished in between time range start and stop times
(createTime.After(start) && createTime.Before(modTime) && modTime.Before(stop)) ||
// file was created before the end of the time range but was still being written to after the time range stop time
(createTime.Before(stop) && modTime.After(stop)) {
eligibleFiles = append(eligibleFiles, filepath)
}
return nil
})
if err != nil {
log.WithError(err).WithField("pcapInputPath", suri.pcapInputPath).Error("Unable to access path while locating PCAP files in time range")
}

return query
return eligibleFiles
}

func (suri *SuriQuery) GetDataEpoch() time.Time {
Expand All @@ -188,7 +203,7 @@ func (suri *SuriQuery) GetDataEpoch() time.Time {
suri.epochTimeTmp = now
err := filepath.Walk(suri.pcapInputPath, suri.updateEpochTimeTmp)
if err != nil {
log.WithError(err).WithField("pcapInputPath", suri.pcapInputPath)
log.WithError(err).WithField("pcapPath", suri.pcapInputPath)
} else {
suri.epochTime = suri.epochTimeTmp
}
Expand All @@ -199,11 +214,18 @@ func (suri *SuriQuery) GetDataEpoch() time.Time {

func (suri *SuriQuery) updateEpochTimeTmp(path string, info os.FileInfo, err error) error {
if err != nil {
log.WithError(err).WithField("path", path).Error("Unable to access path while updating epoch")
log.WithError(err).WithField("pcapPath", path).Error("Unable to access path while updating epoch")
return err
}
if !info.IsDir() && info.Size() > 0 && info.ModTime().Before(suri.epochTimeTmp) {
suri.epochTimeTmp = info.ModTime()
if !info.IsDir() && info.Size() > 0 {
createTime, err := suri.getPcapCreateTime(path)
if err != nil {
return err
}

if createTime.Before(suri.epochTimeTmp) {
suri.epochTimeTmp = createTime
}
}
return nil
}
95 changes: 62 additions & 33 deletions agent/modules/suriquery/suriquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,60 +7,89 @@
package suriquery

import (
"strconv"
"testing"
"time"

"github.com/security-onion-solutions/securityonion-soc/model"
"github.com/stretchr/testify/assert"
)

func initTest() *SuriQuery {
cfg := make(map[string]interface{})
cfg["pcapInputPath"] = "test_resources"
sq := NewSuriQuery(nil)
sq.Init(cfg)
return sq
}
func TestInitSuriQuery(tester *testing.T) {
cfg := make(map[string]interface{})
sq := NewSuriQuery(nil)
err := sq.Init(cfg)
assert.Error(tester, err)
assert.Equal(tester, DEFAULT_EXECUTABLE_PATH, sq.executablePath)
assert.Equal(tester, DEFAULT_PCAP_OUTPUT_PATH, sq.pcapOutputPath)
assert.Equal(tester, DEFAULT_PCAP_INPUT_PATH, sq.pcapInputPath)
assert.Equal(tester, DEFAULT_TIMEOUT_MS, sq.timeoutMs)
assert.Equal(tester, DEFAULT_EPOCH_REFRESH_MS, sq.epochRefreshMs)
assert.Equal(tester, DEFAULT_DATA_LAG_MS, sq.dataLagMs)
}

func TestDataLag(tester *testing.T) {
cfg := make(map[string]interface{})
sq := NewSuriQuery(nil)
sq.Init(cfg)
sq := initTest()
lagDate := sq.getDataLagDate()
assert.False(tester, lagDate.After(time.Now()), "expected data lag datetime to be before current datetime")
}

func TestCreateQuery(tester *testing.T) {
sq := NewSuriQuery(nil)
func TestFindFilesExcludesMalformedNamesAndImpossibleStartTimes(tester *testing.T) {
sq := initTest()

start, _ := time.Parse(time.RFC3339, "2024-02-05T00:00:00Z")
stop, _ := time.Parse(time.RFC3339, "2099-02-06T00:00:00Z")
files := sq.findFilesInTimeRange(start, stop)
assert.Len(tester, files, 1)
assert.Equal(tester, files[0], "test_resources/3/so-pcap.1575817346")
}

func TestGetPcapCreateTime(tester *testing.T) {
sq := initTest()

_, err := sq.getPcapCreateTime("/some/path/nonconforming.file")
assert.ErrorContains(tester, err, "unsupported pcap file")

_, err = sq.getPcapCreateTime("/some/path/so-pcap.file")
assert.ErrorContains(tester, err, "invalid syntax")

expectedTime, _ := time.Parse(time.RFC3339, "2019-12-08T15:02:26Z")
var created time.Time
created, err = sq.getPcapCreateTime("/some/path/so-pcap.1575817346")
assert.Nil(tester, err)
assert.Equal(tester, expectedTime, created)
}

func TestGetDataEpoch(tester *testing.T) {
sq := initTest()

epoch := sq.GetDataEpoch()
expectedTime, _ := time.Parse(time.RFC3339, "2019-12-08T15:02:26Z")
assert.Equal(tester, expectedTime, epoch)
}

func TestStreamPacketsInPcaps(tester *testing.T) {
sq := initTest()

paths := []string{"test_resources/3/so-pcap.1575817346"}
filter := model.NewFilter()
startTime, _ := time.Parse(time.RFC3339, "2019-12-08T00:00:00Z")
filter.BeginTime = startTime
endTime, _ := time.Parse(time.RFC3339, "2019-12-08T23:59:59Z")
filter.EndTime = endTime
filter.SrcIp = "185.47.63.113"
filter.SrcPort = 19
filter.DstIp = "176.126.243.198"
filter.DstPort = 34515

job := model.NewJob()
expectedQuery := ""
query := sq.CreateQuery(job)
assert.Equal(tester, expectedQuery, query)

job.Filter.SrcIp = "1.2.3.4"
query = sq.CreateQuery(job)
expectedQuery = expectedQuery + "host " + job.Filter.SrcIp
assert.Equal(tester, expectedQuery, query)

job.Filter.DstIp = "1.2.1.2"
query = sq.CreateQuery(job)
expectedQuery = expectedQuery + " and host " + job.Filter.DstIp
assert.Equal(tester, expectedQuery, query)

job.Filter.SrcPort = 123
query = sq.CreateQuery(job)
expectedQuery = expectedQuery + " and port " + strconv.Itoa(job.Filter.SrcPort)
assert.Equal(tester, expectedQuery, query)

job.Filter.DstPort = 123
query = sq.CreateQuery(job)
expectedQuery = expectedQuery + " and port " + strconv.Itoa(job.Filter.DstPort)
assert.Equal(tester, expectedQuery, query)
reader, err := sq.streamPacketsInPcaps(paths, filter)
assert.Nil(tester, err)
pcap_length := 14122 // correlates to so-pcap test file
bytes := make([]byte, 32768)
count, err := reader.Read(bytes)
assert.Nil(tester, err)
assert.Equal(tester, pcap_length, count)
}
Loading
Loading