From bf422bed040efaead408712f99d7302ea3320c72 Mon Sep 17 00:00:00 2001 From: Jason Ertel Date: Fri, 8 Mar 2024 13:20:03 -0500 Subject: [PATCH] pcap improvements --- agent/jobmanager.go | 10 ++---- agent/modules/importer/importer.go | 25 ++++++++++++++- agent/modules/stenoquery/stenoquery.go | 31 ++++++++++-------- agent/modules/suriquery/suriquery.go | 38 +++++++++++++---------- agent/modules/suriquery/suriquery_test.go | 3 +- model/job.go | 1 + packet/parser.go | 6 ++-- packet/parser_test.go | 6 ++-- 8 files changed, 75 insertions(+), 45 deletions(-) diff --git a/agent/jobmanager.go b/agent/jobmanager.go index a13941b8..27b6b2ad 100644 --- a/agent/jobmanager.go +++ b/agent/jobmanager.go @@ -8,11 +8,9 @@ package agent import ( "errors" - "fmt" "io" "os" "strconv" - "strings" "sync" "syscall" "time" @@ -103,14 +101,10 @@ func (mgr *JobManager) ProcessJob(job *model.Job) (io.ReadCloser, error) { defer mgr.lock.RUnlock() var reader io.ReadCloser var err error + + job.Size = 0 for _, processor := range mgr.jobProcessors { reader, err = processor.ProcessJob(job, reader) - if err != nil && !strings.Contains(fmt.Sprint(err), "No data available") { - log.WithError(err).WithFields(log.Fields{ - "jobId": job.Id, - }).Error("Failed to process job; job processing aborted") - break - } } return reader, err } diff --git a/agent/modules/importer/importer.go b/agent/modules/importer/importer.go index ef08f7c2..477d848e 100644 --- a/agent/modules/importer/importer.go +++ b/agent/modules/importer/importer.go @@ -113,9 +113,32 @@ func (importer *Importer) ProcessJob(job *model.Job, reader io.ReadCloser) (io.R }).Debug("Executed tcpdump") if err == nil { var file *os.File + var info os.FileInfo file, err = os.Open(pcapOutputFilepath) if err == nil { - reader = file + info, err = os.Stat(pcapOutputFilepath) + if err != nil { + log.WithError(err).WithFields(log.Fields{ + "pcapPath": pcapOutputFilepath, + }).Error("Failed to collect output file stats") + } else { + size := int(info.Size()) + log.WithFields(log.Fields{ + "pcapPath": pcapOutputFilepath, + "pcapSize": size, + "jobSize": job.Size, + }).Debug("Found matching packets") + if job.Size > size { + log.Warn("Discarding Importer job output since existing job already has more content from another processor") + } else { + job.Size = size + reader = file + log.WithFields(log.Fields{ + "pcapStreamErr": err, + "pcapStreamSize": size, + }).Debug("Finished processing PCAP via Importer") + } + } } } } diff --git a/agent/modules/stenoquery/stenoquery.go b/agent/modules/stenoquery/stenoquery.go index 7e4da4dc..6108e34e 100644 --- a/agent/modules/stenoquery/stenoquery.go +++ b/agent/modules/stenoquery/stenoquery.go @@ -102,12 +102,6 @@ func (steno *StenoQuery) ProcessJob(job *model.Job, reader io.ReadCloser) (io.Re "importId": job.Filter.ImportId, }).Debug("Skipping steno processor due to presence of importId") return reader, nil - } else if reader != nil { - log.WithFields(log.Fields{ - "jobId": job.Id, - "kind": job.GetKind(), - }).Debug("Skipping steno processor due to another processor already provided PCAP data") - return reader, nil } else if job.Filter == nil || job.Filter.EndTime.Before(steno.GetDataEpoch()) || job.Filter.BeginTime.After(steno.getDataLagDate()) { log.WithFields(log.Fields{ "jobId": job.Id, @@ -140,20 +134,31 @@ func (steno *StenoQuery) ProcessJob(job *model.Job, reader io.ReadCloser) (io.Re }).Debug("Executed stenoread") if err == nil { var file *os.File + var info os.FileInfo file, err = os.Open(pcapFilepath) if err == nil { - info, err := os.Stat(pcapFilepath) + info, err = os.Stat(pcapFilepath) if err != nil { - log.WithError(err).WithFields(log.Fields { + log.WithError(err).WithFields(log.Fields{ "pcapPath": pcapFilepath, }).Error("Failed to collect output file stats") } else { - log.WithFields(log.Fields { - "pcapPath": pcapFilepath, - "pcapBytes": info.Size(), - }).Debug("Found matching packets") + size := int(info.Size()) + log.WithFields(log.Fields{ + "pcapPath": pcapFilepath, + "pcapBytes": size, + }).Debug("Found matching packets") + if job.Size > size { + log.Warn("Discarding Stenographer job output since existing job already has more content from another processor") + } else { + job.Size = size + reader = file + log.WithFields(log.Fields{ + "pcapStreamErr": err, + "pcapStreamSize": size, + }).Debug("Finished processing PCAP via Stenographer") + } } - reader = file } } } diff --git a/agent/modules/suriquery/suriquery.go b/agent/modules/suriquery/suriquery.go index 36574650..3e2ddabb 100644 --- a/agent/modules/suriquery/suriquery.go +++ b/agent/modules/suriquery/suriquery.go @@ -103,12 +103,6 @@ func (suri *SuriQuery) ProcessJob(job *model.Job, reader io.ReadCloser) (io.Read "importId": job.Filter.ImportId, }).Debug("Skipping suri processor due to presence of importId") return reader, nil - } else if reader != nil { - log.WithFields(log.Fields{ - "jobId": job.Id, - "kind": job.GetKind(), - }).Debug("Skipping suricata processor due to another processor already provided PCAP data") - return reader, nil } else if job.Filter == nil || job.Filter.EndTime.Before(suri.GetDataEpoch()) || job.Filter.BeginTime.After(suri.getDataLagDate()) { log.WithFields(log.Fields{ "jobId": job.Id, @@ -123,10 +117,20 @@ func (suri *SuriQuery) ProcessJob(job *model.Job, reader io.ReadCloser) (io.Read "jobId": job.Id, }).Debug("Starting to process new Suricata PCAP job") pcapFiles := suri.findFilesInTimeRange(job.Filter.BeginTime, job.Filter.EndTime) - reader, err = suri.streamPacketsInPcaps(pcapFiles, job.Filter) - log.WithFields(log.Fields{ - "err": err, - }).Debug("Finished processing PCAP") + var newReader io.ReadCloser + var size int + newReader, size, err = suri.streamPacketsInPcaps(pcapFiles, job.Filter) + + if job.Size > size { + log.Warn("Discarding Suricata job output since existing job already has more content from another processor") + } else { + job.Size = size + reader = newReader + log.WithFields(log.Fields{ + "pcapStreamErr": err, + "pcapStreamSize": size, + }).Debug("Finished processing PCAP via Suricata") + } } return reader, err } @@ -154,23 +158,23 @@ func (suri *SuriQuery) decompress(path string) (string, error) { count, copyErr := io.Copy(outputWriter, lz4Reader) if copyErr != nil { if strings.Contains(fmt.Sprint(copyErr), "unexpected EOF") { - log.WithFields(log.Fields { + log.WithFields(log.Fields{ "decompressedPath": decompressedPath, }).Debug("ignoring EOF error since the filestream is likely still active") } else { return "", copyErr } } - log.WithFields(log.Fields { - "pcapPath": path, - "decompressedPath": decompressedPath, + log.WithFields(log.Fields{ + "pcapPath": path, + "decompressedPath": decompressedPath, "decompressedBytes": count, }).Debug("Decompressed lz4 PCAP file") } return decompressedPath, nil } -func (suri *SuriQuery) streamPacketsInPcaps(paths []string, filter *model.Filter) (io.ReadCloser, error) { +func (suri *SuriQuery) streamPacketsInPcaps(paths []string, filter *model.Filter) (io.ReadCloser, int, error) { allPackets := make([]gopacket.Packet, 0, 0) for _, path := range paths { @@ -185,8 +189,8 @@ func (suri *SuriQuery) streamPacketsInPcaps(paths []string, filter *model.Filter log.WithError(perr).WithField("pcapPath", decompressedPath).Error("Failed to parse PCAP file") } if packets != nil && len(packets) > 0 { - log.WithFields(log.Fields { - "pcapPath": decompressedPath, + log.WithFields(log.Fields{ + "pcapPath": decompressedPath, "packetCount": len(packets), }).Debug("found matching packets") allPackets = append(allPackets, packets...) diff --git a/agent/modules/suriquery/suriquery_test.go b/agent/modules/suriquery/suriquery_test.go index b9955fab..5a64e504 100644 --- a/agent/modules/suriquery/suriquery_test.go +++ b/agent/modules/suriquery/suriquery_test.go @@ -107,11 +107,12 @@ func TestStreamPacketsInPcaps(tester *testing.T) { filter.DstIp = "176.126.243.198" filter.DstPort = 34515 - reader, err := sq.streamPacketsInPcaps(paths, filter) + reader, size, err := sq.streamPacketsInPcaps(paths, filter) assert.Nil(tester, err) pcap_length := 14122 // correlates to so-pcap test file bytes := make([]byte, 32768) count, err := reader.Read(bytes) assert.Nil(tester, err) assert.Equal(tester, pcap_length, count) + assert.Equal(tester, pcap_length, size) } diff --git a/model/job.go b/model/job.go index 0fc464b5..2af03d19 100644 --- a/model/job.go +++ b/model/job.go @@ -48,6 +48,7 @@ type Job struct { UserId string `json:"userId"` Kind string `json:"kind"` Results []*JobResult `json:"results"` + Size int `json:"size"` } func NewJob() *Job { diff --git a/packet/parser.go b/packet/parser.go index 8f033fd8..ab1d1815 100644 --- a/packet/parser.go +++ b/packet/parser.go @@ -44,7 +44,7 @@ func ParsePcap(filename string, offset int, count int, unwrap bool) ([]*model.Pa return packets, nil } -func ToStream(packets []gopacket.Packet) (io.ReadCloser, error) { +func ToStream(packets []gopacket.Packet) (io.ReadCloser, int, error) { var snaplen uint32 = 65536 var full bytes.Buffer @@ -58,11 +58,11 @@ func ToStream(packets []gopacket.Packet) (io.ReadCloser, error) { buf.Clear() err := gopacket.SerializePacket(buf, opts, packet) if err != nil { - return nil, err + return nil, 0, err } writer.WritePacket(packet.Metadata().CaptureInfo, buf.Bytes()) } - return io.NopCloser(bytes.NewReader(full.Bytes())), nil + return io.NopCloser(bytes.NewReader(full.Bytes())), full.Len(), nil } func getPacketProtocol(packet gopacket.Packet) string { diff --git a/packet/parser_test.go b/packet/parser_test.go index f2c01aeb..5d59b6d6 100644 --- a/packet/parser_test.go +++ b/packet/parser_test.go @@ -53,7 +53,7 @@ func TestParseAndStream(tester *testing.T) { assert.Nil(tester, perr) assert.Len(tester, packets, 12) - reader, err := ToStream(packets) + reader, size, err := ToStream(packets) assert.Nil(tester, err) pcap_length := 14122 // correlates to so-pcap test file @@ -61,6 +61,7 @@ func TestParseAndStream(tester *testing.T) { count, err := reader.Read(bytes) assert.Nil(tester, err) assert.Equal(tester, pcap_length, count) + assert.Equal(tester, pcap_length, size) } func TestParseWrongProtocol(tester *testing.T) { @@ -104,7 +105,7 @@ func TestParseAndStreamIcmp(tester *testing.T) { assert.Nil(tester, perr) assert.Len(tester, packets, 2) - reader, err := ToStream(packets) + reader, size, err := ToStream(packets) assert.Nil(tester, err) pcap_length := 196 // correlates to two icmp packets in icmp.pcap @@ -112,4 +113,5 @@ func TestParseAndStreamIcmp(tester *testing.T) { count, err := reader.Read(bytes) assert.Nil(tester, err) assert.Equal(tester, pcap_length, count) + assert.Equal(tester, pcap_length, size) }