diff --git a/agent/modules/suriquery/suriquery.go b/agent/modules/suriquery/suriquery.go index 16340c2e7..1d44b9363 100644 --- a/agent/modules/suriquery/suriquery.go +++ b/agent/modules/suriquery/suriquery.go @@ -7,41 +7,41 @@ package suriquery import ( - "context" "errors" - "fmt" "io" "os" - "os/exec" + "path" "path/filepath" + "slices" + "strconv" + "strings" "time" "github.com/apex/log" - "github.com/kennygrant/sanitize" + "github.com/google/gopacket" "github.com/security-onion-solutions/securityonion-soc/agent" "github.com/security-onion-solutions/securityonion-soc/model" "github.com/security-onion-solutions/securityonion-soc/module" + "github.com/security-onion-solutions/securityonion-soc/packet" ) -const DEFAULT_EXECUTABLE_PATH = "suriquery.sh" -const DEFAULT_PCAP_OUTPUT_PATH = "/nsm/pcapout" -const DEFAULT_PCAP_INPUT_PATH = "/nsm/pcap" +const DEFAULT_PCAP_INPUT_PATH = "/nsm/suripcap" const DEFAULT_EPOCH_REFRESH_MS = 120000 -const DEFAULT_TIMEOUT_MS = 1200000 const DEFAULT_DATA_LAG_MS = 120000 +const DEFAULT_PCAP_MAX_COUNT = 999999 + +const SURI_PCAP_PREFIX = "so-pcap." type SuriQuery struct { config module.ModuleConfig - executablePath string - pcapOutputPath string pcapInputPath string agent *agent.Agent epochTimeTmp time.Time epochTime time.Time epochRefreshTime time.Time epochRefreshMs int - timeoutMs int dataLagMs int + pcapMaxCount int } func NewSuriQuery(agt *agent.Agent) *SuriQuery { @@ -57,12 +57,10 @@ func (lag *SuriQuery) PrerequisiteModules() []string { func (suri *SuriQuery) Init(cfg module.ModuleConfig) error { var err error suri.config = cfg - suri.executablePath = module.GetStringDefault(cfg, "executablePath", DEFAULT_EXECUTABLE_PATH) - suri.pcapOutputPath = module.GetStringDefault(cfg, "pcapOutputPath", DEFAULT_PCAP_OUTPUT_PATH) suri.pcapInputPath = module.GetStringDefault(cfg, "pcapInputPath", DEFAULT_PCAP_INPUT_PATH) suri.epochRefreshMs = module.GetIntDefault(cfg, "epochRefreshMs", DEFAULT_EPOCH_REFRESH_MS) - suri.timeoutMs = module.GetIntDefault(cfg, "timeoutMs", DEFAULT_TIMEOUT_MS) suri.dataLagMs = module.GetIntDefault(cfg, "dataLagMs", DEFAULT_DATA_LAG_MS) + suri.pcapMaxCount = module.GetIntDefault(cfg, "pcapMaxCount", DEFAULT_PCAP_MAX_COUNT) if suri.agent == nil { err = errors.New("Unable to invoke JobMgr.AddJobProcessor due to nil agent") } else { @@ -91,8 +89,8 @@ func (suri *SuriQuery) ProcessJob(job *model.Job, reader io.ReadCloser) (io.Read var err error if job.GetKind() != "pcap" { log.WithFields(log.Fields{ - "jobId": job.Id, - "kind": job.GetKind(), + "jobId": job.Id, + "jobKind": job.GetKind(), }).Debug("Skipping suri processor due to unsupported job") return reader, nil } @@ -112,73 +110,90 @@ func (suri *SuriQuery) ProcessJob(job *model.Job, reader io.ReadCloser) (io.Read }).Info("Skipping suri processor due to date range conflict") err = errors.New("No data available for the requested dates") } else { - job.FileExtension = "pcap" - - query := suri.CreateQuery(job) - - pcapFilepath := fmt.Sprintf("%s/%d.%s", suri.pcapOutputPath, job.Id, job.FileExtension) - - log.WithField("jobId", job.Id).Info("Processing pcap export for job") - - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(suri.timeoutMs)*time.Millisecond) - defer cancel() - beginTime := job.Filter.BeginTime.Format(time.RFC3339) - endTime := job.Filter.EndTime.Format(time.RFC3339) - - cmd := exec.CommandContext(ctx, suri.executablePath, pcapFilepath, beginTime, endTime, query) - var output []byte - output, err = cmd.CombinedOutput() log.WithFields(log.Fields{ - "executablePath": suri.executablePath, - "query": query, - "output": string(output), - "pcapFilepath": pcapFilepath, - "err": err, - }).Debug("Executed suriread") - if err == nil { - var file *os.File - file, err = os.Open(pcapFilepath) - if err == nil { - reader = file - } - } + "jobId": job.Id, + }).Debug("Starting to process new Suricata PCAP job") + pcapFiles := suri.findFilesInTimeRange(job.Filter.BeginTime, job.Filter.EndTime) + reader, err = suri.streamPacketsInPcaps(pcapFiles, job.Filter) + log.WithFields(log.Fields{ + "err": err, + }).Debug("Finished processing PCAP") } return reader, err } func (suri *SuriQuery) CleanupJob(job *model.Job) { - pcapOutputFilepath := fmt.Sprintf("%s/%d.%s", suri.pcapOutputPath, job.Id, sanitize.Name(job.FileExtension)) - os.Remove(pcapOutputFilepath) + // Noop } -func add(query string, added string) string { - if len(query) > 0 { - query = query + " and " - } - return query + added -} +func (suri *SuriQuery) streamPacketsInPcaps(paths []string, filter *model.Filter) (io.ReadCloser, error) { + allPackets := make([]gopacket.Packet, 0, 0) -func (suri *SuriQuery) CreateQuery(job *model.Job) string { + for _, path := range paths { + packets, perr := packet.ParseRawPcap(path, suri.pcapMaxCount, filter) + if perr != nil { + log.WithError(perr).WithField("pcapPath", path).Error("Failed to parse PCAP file") + } + if packets != nil && len(packets) > 0 { + allPackets = append(allPackets, packets...) + } + } - query := "" + slices.SortFunc(allPackets, func(a, b gopacket.Packet) int { + return a.Metadata().Timestamp.Compare(b.Metadata().Timestamp) + }) - if len(job.Filter.SrcIp) > 0 { - query = add(query, fmt.Sprintf("host %s", job.Filter.SrcIp)) - } + log.WithField("matchedCount", len(allPackets)).Debug("Finished filtering eligible packets") - if len(job.Filter.DstIp) > 0 { - query = add(query, fmt.Sprintf("host %s", job.Filter.DstIp)) - } + return packet.ToStream(allPackets) +} - if job.Filter.SrcPort > 0 { - query = add(query, fmt.Sprintf("port %d", job.Filter.SrcPort)) +func (suri *SuriQuery) getPcapCreateTime(filepath string) (time.Time, error) { + var createTime time.Time + var err error + filename := path.Base(filepath) + if !strings.HasPrefix(filename, SURI_PCAP_PREFIX) { + err = errors.New("unsupported pcap file") + } else { + secondsStr := strings.TrimLeft(filename, SURI_PCAP_PREFIX) + var seconds int64 + seconds, err = strconv.ParseInt(secondsStr, 10, 64) + if err == nil { + createTime = time.Unix(seconds, 0).UTC() + } } + return createTime, err +} - if job.Filter.DstPort > 0 { - query = add(query, fmt.Sprintf("port %d", job.Filter.DstPort)) +func (suri *SuriQuery) findFilesInTimeRange(start time.Time, stop time.Time) []string { + eligibleFiles := make([]string, 0, 0) + err := filepath.Walk(suri.pcapInputPath, func(filepath string, fileinfo os.FileInfo, err error) error { + createTime, err := suri.getPcapCreateTime(filepath) + if err != nil { + log.WithField("pcapPath", filepath).WithError(err).Warn("PCAP file does not conform to expected format") + return nil + } + modTime := fileinfo.ModTime() + log.WithFields(log.Fields{ + "pcapPath": filepath, + "createTime": createTime, + "modTime": modTime, + }).Debug("Reviewing eligibility for PCAP file") + + // file was created before the time range but has still open when time range started. + if (createTime.Before(start) && modTime.After(start)) || + // file was created and finished in between time range start and stop times + (createTime.After(start) && createTime.Before(modTime) && modTime.Before(stop)) || + // file was created before the end of the time range but was still being written to after the time range stop time + (createTime.Before(stop) && modTime.After(stop)) { + eligibleFiles = append(eligibleFiles, filepath) + } + return nil + }) + if err != nil { + log.WithError(err).WithField("pcapInputPath", suri.pcapInputPath).Error("Unable to access path while locating PCAP files in time range") } - - return query + return eligibleFiles } func (suri *SuriQuery) GetDataEpoch() time.Time { @@ -188,7 +203,7 @@ func (suri *SuriQuery) GetDataEpoch() time.Time { suri.epochTimeTmp = now err := filepath.Walk(suri.pcapInputPath, suri.updateEpochTimeTmp) if err != nil { - log.WithError(err).WithField("pcapInputPath", suri.pcapInputPath) + log.WithError(err).WithField("pcapPath", suri.pcapInputPath) } else { suri.epochTime = suri.epochTimeTmp } @@ -199,11 +214,18 @@ func (suri *SuriQuery) GetDataEpoch() time.Time { func (suri *SuriQuery) updateEpochTimeTmp(path string, info os.FileInfo, err error) error { if err != nil { - log.WithError(err).WithField("path", path).Error("Unable to access path while updating epoch") + log.WithError(err).WithField("pcapPath", path).Error("Unable to access path while updating epoch") return err } - if !info.IsDir() && info.Size() > 0 && info.ModTime().Before(suri.epochTimeTmp) { - suri.epochTimeTmp = info.ModTime() + if !info.IsDir() && info.Size() > 0 { + createTime, err := suri.getPcapCreateTime(path) + if err != nil { + return err + } + + if createTime.Before(suri.epochTimeTmp) { + suri.epochTimeTmp = createTime + } } return nil } diff --git a/agent/modules/suriquery/suriquery_test.go b/agent/modules/suriquery/suriquery_test.go index d26d0b09d..36b4f3aa2 100644 --- a/agent/modules/suriquery/suriquery_test.go +++ b/agent/modules/suriquery/suriquery_test.go @@ -7,7 +7,6 @@ package suriquery import ( - "strconv" "testing" "time" @@ -15,52 +14,82 @@ import ( "github.com/stretchr/testify/assert" ) +func initTest() *SuriQuery { + cfg := make(map[string]interface{}) + cfg["pcapInputPath"] = "test_resources" + sq := NewSuriQuery(nil) + sq.Init(cfg) + return sq +} func TestInitSuriQuery(tester *testing.T) { cfg := make(map[string]interface{}) sq := NewSuriQuery(nil) err := sq.Init(cfg) assert.Error(tester, err) - assert.Equal(tester, DEFAULT_EXECUTABLE_PATH, sq.executablePath) - assert.Equal(tester, DEFAULT_PCAP_OUTPUT_PATH, sq.pcapOutputPath) assert.Equal(tester, DEFAULT_PCAP_INPUT_PATH, sq.pcapInputPath) - assert.Equal(tester, DEFAULT_TIMEOUT_MS, sq.timeoutMs) assert.Equal(tester, DEFAULT_EPOCH_REFRESH_MS, sq.epochRefreshMs) assert.Equal(tester, DEFAULT_DATA_LAG_MS, sq.dataLagMs) } func TestDataLag(tester *testing.T) { - cfg := make(map[string]interface{}) - sq := NewSuriQuery(nil) - sq.Init(cfg) + sq := initTest() lagDate := sq.getDataLagDate() assert.False(tester, lagDate.After(time.Now()), "expected data lag datetime to be before current datetime") } -func TestCreateQuery(tester *testing.T) { - sq := NewSuriQuery(nil) +func TestFindFilesExcludesMalformedNamesAndImpossibleStartTimes(tester *testing.T) { + sq := initTest() + + start, _ := time.Parse(time.RFC3339, "2024-02-05T00:00:00Z") + stop, _ := time.Parse(time.RFC3339, "2099-02-06T00:00:00Z") + files := sq.findFilesInTimeRange(start, stop) + assert.Len(tester, files, 1) + assert.Equal(tester, files[0], "test_resources/3/so-pcap.1575817346") +} + +func TestGetPcapCreateTime(tester *testing.T) { + sq := initTest() + + _, err := sq.getPcapCreateTime("/some/path/nonconforming.file") + assert.ErrorContains(tester, err, "unsupported pcap file") + + _, err = sq.getPcapCreateTime("/some/path/so-pcap.file") + assert.ErrorContains(tester, err, "invalid syntax") + + expectedTime, _ := time.Parse(time.RFC3339, "2019-12-08T15:02:26Z") + var created time.Time + created, err = sq.getPcapCreateTime("/some/path/so-pcap.1575817346") + assert.Nil(tester, err) + assert.Equal(tester, expectedTime, created) +} + +func TestGetDataEpoch(tester *testing.T) { + sq := initTest() + + epoch := sq.GetDataEpoch() + expectedTime, _ := time.Parse(time.RFC3339, "2019-12-08T15:02:26Z") + assert.Equal(tester, expectedTime, epoch) +} + +func TestStreamPacketsInPcaps(tester *testing.T) { + sq := initTest() + + paths := []string{"test_resources/3/so-pcap.1575817346"} + filter := model.NewFilter() + startTime, _ := time.Parse(time.RFC3339, "2019-12-08T00:00:00Z") + filter.BeginTime = startTime + endTime, _ := time.Parse(time.RFC3339, "2019-12-08T23:59:59Z") + filter.EndTime = endTime + filter.SrcIp = "185.47.63.113" + filter.SrcPort = 19 + filter.DstIp = "176.126.243.198" + filter.DstPort = 34515 - job := model.NewJob() - expectedQuery := "" - query := sq.CreateQuery(job) - assert.Equal(tester, expectedQuery, query) - - job.Filter.SrcIp = "1.2.3.4" - query = sq.CreateQuery(job) - expectedQuery = expectedQuery + "host " + job.Filter.SrcIp - assert.Equal(tester, expectedQuery, query) - - job.Filter.DstIp = "1.2.1.2" - query = sq.CreateQuery(job) - expectedQuery = expectedQuery + " and host " + job.Filter.DstIp - assert.Equal(tester, expectedQuery, query) - - job.Filter.SrcPort = 123 - query = sq.CreateQuery(job) - expectedQuery = expectedQuery + " and port " + strconv.Itoa(job.Filter.SrcPort) - assert.Equal(tester, expectedQuery, query) - - job.Filter.DstPort = 123 - query = sq.CreateQuery(job) - expectedQuery = expectedQuery + " and port " + strconv.Itoa(job.Filter.DstPort) - assert.Equal(tester, expectedQuery, query) + reader, err := sq.streamPacketsInPcaps(paths, filter) + assert.Nil(tester, err) + pcap_length := 14122 // correlates to so-pcap test file + bytes := make([]byte, 32768) + count, err := reader.Read(bytes) + assert.Nil(tester, err) + assert.Equal(tester, pcap_length, count) } diff --git a/packet/parser.go b/packet/parser.go index ba279b7f0..84ad3f0f5 100644 --- a/packet/parser.go +++ b/packet/parser.go @@ -7,7 +7,9 @@ package packet import ( + "bytes" "encoding/base64" + "io" "os" "github.com/apex/log" @@ -42,6 +44,94 @@ func ParsePcap(filename string, offset int, count int, unwrap bool) ([]*model.Pa return packets, nil } +func ToStream(packets []gopacket.Packet) (io.ReadCloser, error) { + var snaplen uint32 = 65536 + var full bytes.Buffer + + writer := pcapgo.NewWriter(&full) + writer.WriteFileHeader(snaplen, layers.LinkTypeEthernet) + + opts := gopacket.SerializeOptions{} + + buf := gopacket.NewSerializeBuffer() + for _, packet := range packets { + buf.Clear() + err := gopacket.SerializePacket(buf, opts, packet) + if err != nil { + return nil, err + } + writer.WritePacket(packet.Metadata().CaptureInfo, buf.Bytes()) + } + return io.NopCloser(bytes.NewReader(full.Bytes())), nil +} + +func filterPacket(filter *model.Filter, packet gopacket.Packet) bool { + var srcIp, dstIp string + var srcPort, dstPort int + + timestamp := packet.Metadata().Timestamp + layer := packet.Layer(layers.LayerTypeIPv6) + if layer != nil { + layer := layer.(*layers.IPv6) + srcIp = layer.SrcIP.String() + dstIp = layer.DstIP.String() + } else { + layer = packet.Layer(layers.LayerTypeIPv4) + if layer != nil { + layer := layer.(*layers.IPv4) + srcIp = layer.SrcIP.String() + dstIp = layer.DstIP.String() + } + } + + layer = packet.Layer(layers.LayerTypeTCP) + if layer != nil { + layer := layer.(*layers.TCP) + srcPort = int(layer.SrcPort) + dstPort = int(layer.DstPort) + } + + layer = packet.Layer(layers.LayerTypeUDP) + if layer != nil { + layer := layer.(*layers.UDP) + srcPort = int(layer.SrcPort) + dstPort = int(layer.DstPort) + } + + include := (filter.BeginTime.IsZero() || timestamp.After(filter.BeginTime)) && + (filter.EndTime.IsZero() || timestamp.Before(filter.EndTime)) && + (filter.SrcIp == "" || srcIp == filter.SrcIp) && + (filter.SrcPort == 0 || srcPort == filter.SrcPort) && + (filter.DstIp == "" || dstIp == filter.DstIp) && + (filter.DstPort == 0 || dstPort == filter.DstPort) + + return include +} + +func ParseRawPcap(filename string, count int, filter *model.Filter) ([]gopacket.Packet, error) { + packets := make([]gopacket.Packet, 0) + err := parsePcapFile(filename, func(index int, pcapPacket gopacket.Packet) bool { + if filterPacket(filter, pcapPacket) { + packets = append(packets, pcapPacket) + } else { + pcapPacket = unwrapVxlanPacket(pcapPacket, nil) + if filterPacket(filter, pcapPacket) { + packets = append(packets, pcapPacket) + } + } + + return len(packets) < count + }) + + if len(packets) == count { + log.WithFields(log.Fields{ + "packetCount": len(packets), + }).Warn("Exceeded packet capture limit for job; returned PCAP will be truncated") + } + + return packets, err +} + func UnwrapPcap(filename string, unwrappedFilename string) bool { unwrapped := false info, err := os.Stat(unwrappedFilename) diff --git a/packet/parser_resource.pcap b/packet/parser_resource.pcap deleted file mode 100644 index 04f0c2f90..000000000 Binary files a/packet/parser_resource.pcap and /dev/null differ diff --git a/packet/parser_test.go b/packet/parser_test.go index a6a343758..19fdddf51 100644 --- a/packet/parser_test.go +++ b/packet/parser_test.go @@ -9,6 +9,7 @@ package packet import ( "os" "testing" + "time" "github.com/google/gopacket" "github.com/security-onion-solutions/securityonion-soc/model" @@ -25,7 +26,7 @@ func TestOverrideType(tester *testing.T) { } func TestUnwrapPcap(tester *testing.T) { - filename := "parser_resource.pcap" + filename := "test_resources/parser_resource.pcap" tmpFile, err := os.CreateTemp("", "unwrap-test") assert.Nil(tester, err, "Unable to execute test due to bad temp file") unwrappedFilename := tmpFile.Name() @@ -34,3 +35,37 @@ func TestUnwrapPcap(tester *testing.T) { unwrapped := UnwrapPcap(filename, unwrappedFilename) assert.True(tester, unwrapped) } + +func TestParseAndStream(tester *testing.T) { + path := "test_resources/so-pcap.1575817346" + filter := model.NewFilter() + startTime, _ := time.Parse(time.RFC3339, "2019-12-08T00:00:00Z") + filter.BeginTime = startTime + endTime, _ := time.Parse(time.RFC3339, "2019-12-08T23:59:59Z") + filter.EndTime = endTime + filter.SrcIp = "185.47.63.113" + filter.SrcPort = 19 + filter.DstIp = "176.126.243.198" + filter.DstPort = 34515 + + packets, perr := ParseRawPcap(path, 999, filter) + assert.Nil(tester, perr) + assert.Len(tester, packets, 12) + + reader, err := ToStream(packets) + + assert.Nil(tester, err) + pcap_length := 14122 // correlates to so-pcap test file + bytes := make([]byte, 32768) + count, err := reader.Read(bytes) + assert.Nil(tester, err) + assert.Equal(tester, pcap_length, count) +} + +func TestParseAndStreamFail(tester *testing.T) { + path := "test_resources/so-pcap.nonexistent" + filter := model.NewFilter() + + _, perr := ParseRawPcap(path, 999, filter) + assert.ErrorContains(tester, perr, "No such file") +} diff --git a/scripts/suriquery.sh b/scripts/suriquery.sh deleted file mode 100644 index 7a406183c..000000000 --- a/scripts/suriquery.sh +++ /dev/null @@ -1,36 +0,0 @@ -#!/bin/bash - -# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one -# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at -# https://securityonion.net/license; you may not use this file except in compliance with the -# Elastic License 2.0. - -PCAP_PATH=/nsm/suripcap/*/ -PCAP_TMP=/nsm/suripcaptmp - - -if [ $# -le 3 ]; then - echo "usage: $0 " - echo "" - echo "Extracts a particular packet stream based on the given time range and BPF." - exit 1 -else - OUTPUTFILE=$1 - shift - STARTDATE=$1 - shift - ENDDATE=$1 - shift - FILTER=$@ - BEFORE=$(date -d"$STARTDATE" "+%Y-%m-%d %H:%I:%S") - AFTER=$(date -d"$ENDDATE" "+%Y-%m-%d %H:%I:%S") - FINDIT=$(find $PCAP_PATH -newermt "$AFTER" \! -newermt "$BEFORE") - TMPDIR=$(mktemp -p $PCAP_TMP -d) - for filename in $FINDIT; do - fname=$(basename $filename) - tcpdump -nn -s 0 -r $filename $FILTER or "(vlan and $FILTER)" -w $TMPDIR/$fname - done - mergecap -F pcap -w $OUTPUTFILE $TMPDIR/* - # Clean up on aisle 4 - rm -rf $TMPDIR -fi diff --git a/sensoroni.json b/sensoroni.json index 6319966f4..396bbd5b4 100644 --- a/sensoroni.json +++ b/sensoroni.json @@ -1,7 +1,8 @@ { - "logLevel": "info", + "logLevel": "debug", "logFilename": "logs/soc.log", "server": { + "developerEnabled": true, "bindAddress": "0.0.0.0:9822", "baseUrl": "/", "maxPacketCount": 5000, @@ -21,6 +22,8 @@ "serverUrl": "http://0.0.0.0:9822", "verifyCert": true, "modules": { + "suriquery": { + }, "statickeyauth": { "apiKey": "samplekey" } diff --git a/server/nodehandler.go b/server/nodehandler.go index d3f940920..916cafea9 100644 --- a/server/nodehandler.go +++ b/server/nodehandler.go @@ -46,7 +46,9 @@ func (h *NodeHandler) postNode(w http.ResponseWriter, r *http.Request) { return } - h.server.Metrics.UpdateNodeMetrics(ctx, node) + if h.server.Metrics != nil { + h.server.Metrics.UpdateNodeMetrics(ctx, node) + } h.server.Host.Broadcast("node", "nodes", node) job := h.server.Datastore.GetNextJob(ctx, node.Id)