Skip to content

Commit

Permalink
Merge pull request #375 from Security-Onion-Solutions/jertel/det
Browse files Browse the repository at this point in the history
pcap improvements
  • Loading branch information
jertel authored Mar 8, 2024
2 parents 5d490dc + bf422be commit 4161b89
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 45 deletions.
10 changes: 2 additions & 8 deletions agent/jobmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ package agent

import (
"errors"
"fmt"
"io"
"os"
"strconv"
"strings"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -103,14 +101,10 @@ func (mgr *JobManager) ProcessJob(job *model.Job) (io.ReadCloser, error) {
defer mgr.lock.RUnlock()
var reader io.ReadCloser
var err error

job.Size = 0
for _, processor := range mgr.jobProcessors {
reader, err = processor.ProcessJob(job, reader)
if err != nil && !strings.Contains(fmt.Sprint(err), "No data available") {
log.WithError(err).WithFields(log.Fields{
"jobId": job.Id,
}).Error("Failed to process job; job processing aborted")
break
}
}
return reader, err
}
Expand Down
25 changes: 24 additions & 1 deletion agent/modules/importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,32 @@ func (importer *Importer) ProcessJob(job *model.Job, reader io.ReadCloser) (io.R
}).Debug("Executed tcpdump")
if err == nil {
var file *os.File
var info os.FileInfo
file, err = os.Open(pcapOutputFilepath)
if err == nil {
reader = file
info, err = os.Stat(pcapOutputFilepath)
if err != nil {
log.WithError(err).WithFields(log.Fields{
"pcapPath": pcapOutputFilepath,
}).Error("Failed to collect output file stats")
} else {
size := int(info.Size())
log.WithFields(log.Fields{
"pcapPath": pcapOutputFilepath,
"pcapSize": size,
"jobSize": job.Size,
}).Debug("Found matching packets")
if job.Size > size {
log.Warn("Discarding Importer job output since existing job already has more content from another processor")
} else {
job.Size = size
reader = file
log.WithFields(log.Fields{
"pcapStreamErr": err,
"pcapStreamSize": size,
}).Debug("Finished processing PCAP via Importer")
}
}
}
}
}
Expand Down
31 changes: 18 additions & 13 deletions agent/modules/stenoquery/stenoquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,6 @@ func (steno *StenoQuery) ProcessJob(job *model.Job, reader io.ReadCloser) (io.Re
"importId": job.Filter.ImportId,
}).Debug("Skipping steno processor due to presence of importId")
return reader, nil
} else if reader != nil {
log.WithFields(log.Fields{
"jobId": job.Id,
"kind": job.GetKind(),
}).Debug("Skipping steno processor due to another processor already provided PCAP data")
return reader, nil
} else if job.Filter == nil || job.Filter.EndTime.Before(steno.GetDataEpoch()) || job.Filter.BeginTime.After(steno.getDataLagDate()) {
log.WithFields(log.Fields{
"jobId": job.Id,
Expand Down Expand Up @@ -140,20 +134,31 @@ func (steno *StenoQuery) ProcessJob(job *model.Job, reader io.ReadCloser) (io.Re
}).Debug("Executed stenoread")
if err == nil {
var file *os.File
var info os.FileInfo
file, err = os.Open(pcapFilepath)
if err == nil {
info, err := os.Stat(pcapFilepath)
info, err = os.Stat(pcapFilepath)
if err != nil {
log.WithError(err).WithFields(log.Fields {
log.WithError(err).WithFields(log.Fields{
"pcapPath": pcapFilepath,
}).Error("Failed to collect output file stats")
} else {
log.WithFields(log.Fields {
"pcapPath": pcapFilepath,
"pcapBytes": info.Size(),
}).Debug("Found matching packets")
size := int(info.Size())
log.WithFields(log.Fields{
"pcapPath": pcapFilepath,
"pcapBytes": size,
}).Debug("Found matching packets")
if job.Size > size {
log.Warn("Discarding Stenographer job output since existing job already has more content from another processor")
} else {
job.Size = size
reader = file
log.WithFields(log.Fields{
"pcapStreamErr": err,
"pcapStreamSize": size,
}).Debug("Finished processing PCAP via Stenographer")
}
}
reader = file
}
}
}
Expand Down
38 changes: 21 additions & 17 deletions agent/modules/suriquery/suriquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,6 @@ func (suri *SuriQuery) ProcessJob(job *model.Job, reader io.ReadCloser) (io.Read
"importId": job.Filter.ImportId,
}).Debug("Skipping suri processor due to presence of importId")
return reader, nil
} else if reader != nil {
log.WithFields(log.Fields{
"jobId": job.Id,
"kind": job.GetKind(),
}).Debug("Skipping suricata processor due to another processor already provided PCAP data")
return reader, nil
} else if job.Filter == nil || job.Filter.EndTime.Before(suri.GetDataEpoch()) || job.Filter.BeginTime.After(suri.getDataLagDate()) {
log.WithFields(log.Fields{
"jobId": job.Id,
Expand All @@ -123,10 +117,20 @@ func (suri *SuriQuery) ProcessJob(job *model.Job, reader io.ReadCloser) (io.Read
"jobId": job.Id,
}).Debug("Starting to process new Suricata PCAP job")
pcapFiles := suri.findFilesInTimeRange(job.Filter.BeginTime, job.Filter.EndTime)
reader, err = suri.streamPacketsInPcaps(pcapFiles, job.Filter)
log.WithFields(log.Fields{
"err": err,
}).Debug("Finished processing PCAP")
var newReader io.ReadCloser
var size int
newReader, size, err = suri.streamPacketsInPcaps(pcapFiles, job.Filter)

if job.Size > size {
log.Warn("Discarding Suricata job output since existing job already has more content from another processor")
} else {
job.Size = size
reader = newReader
log.WithFields(log.Fields{
"pcapStreamErr": err,
"pcapStreamSize": size,
}).Debug("Finished processing PCAP via Suricata")
}
}
return reader, err
}
Expand Down Expand Up @@ -154,23 +158,23 @@ func (suri *SuriQuery) decompress(path string) (string, error) {
count, copyErr := io.Copy(outputWriter, lz4Reader)
if copyErr != nil {
if strings.Contains(fmt.Sprint(copyErr), "unexpected EOF") {
log.WithFields(log.Fields {
log.WithFields(log.Fields{
"decompressedPath": decompressedPath,
}).Debug("ignoring EOF error since the filestream is likely still active")
} else {
return "", copyErr
}
}
log.WithFields(log.Fields {
"pcapPath": path,
"decompressedPath": decompressedPath,
log.WithFields(log.Fields{
"pcapPath": path,
"decompressedPath": decompressedPath,
"decompressedBytes": count,
}).Debug("Decompressed lz4 PCAP file")
}
return decompressedPath, nil
}

func (suri *SuriQuery) streamPacketsInPcaps(paths []string, filter *model.Filter) (io.ReadCloser, error) {
func (suri *SuriQuery) streamPacketsInPcaps(paths []string, filter *model.Filter) (io.ReadCloser, int, error) {
allPackets := make([]gopacket.Packet, 0, 0)

for _, path := range paths {
Expand All @@ -185,8 +189,8 @@ func (suri *SuriQuery) streamPacketsInPcaps(paths []string, filter *model.Filter
log.WithError(perr).WithField("pcapPath", decompressedPath).Error("Failed to parse PCAP file")
}
if packets != nil && len(packets) > 0 {
log.WithFields(log.Fields {
"pcapPath": decompressedPath,
log.WithFields(log.Fields{
"pcapPath": decompressedPath,
"packetCount": len(packets),
}).Debug("found matching packets")
allPackets = append(allPackets, packets...)
Expand Down
3 changes: 2 additions & 1 deletion agent/modules/suriquery/suriquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,12 @@ func TestStreamPacketsInPcaps(tester *testing.T) {
filter.DstIp = "176.126.243.198"
filter.DstPort = 34515

reader, err := sq.streamPacketsInPcaps(paths, filter)
reader, size, err := sq.streamPacketsInPcaps(paths, filter)
assert.Nil(tester, err)
pcap_length := 14122 // correlates to so-pcap test file
bytes := make([]byte, 32768)
count, err := reader.Read(bytes)
assert.Nil(tester, err)
assert.Equal(tester, pcap_length, count)
assert.Equal(tester, pcap_length, size)
}
1 change: 1 addition & 0 deletions model/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Job struct {
UserId string `json:"userId"`
Kind string `json:"kind"`
Results []*JobResult `json:"results"`
Size int `json:"size"`
}

func NewJob() *Job {
Expand Down
6 changes: 3 additions & 3 deletions packet/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func ParsePcap(filename string, offset int, count int, unwrap bool) ([]*model.Pa
return packets, nil
}

func ToStream(packets []gopacket.Packet) (io.ReadCloser, error) {
func ToStream(packets []gopacket.Packet) (io.ReadCloser, int, error) {
var snaplen uint32 = 65536
var full bytes.Buffer

Expand All @@ -58,11 +58,11 @@ func ToStream(packets []gopacket.Packet) (io.ReadCloser, error) {
buf.Clear()
err := gopacket.SerializePacket(buf, opts, packet)
if err != nil {
return nil, err
return nil, 0, err
}
writer.WritePacket(packet.Metadata().CaptureInfo, buf.Bytes())
}
return io.NopCloser(bytes.NewReader(full.Bytes())), nil
return io.NopCloser(bytes.NewReader(full.Bytes())), full.Len(), nil
}

func getPacketProtocol(packet gopacket.Packet) string {
Expand Down
6 changes: 4 additions & 2 deletions packet/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ func TestParseAndStream(tester *testing.T) {
assert.Nil(tester, perr)
assert.Len(tester, packets, 12)

reader, err := ToStream(packets)
reader, size, err := ToStream(packets)

assert.Nil(tester, err)
pcap_length := 14122 // correlates to so-pcap test file
bytes := make([]byte, 32768)
count, err := reader.Read(bytes)
assert.Nil(tester, err)
assert.Equal(tester, pcap_length, count)
assert.Equal(tester, pcap_length, size)
}

func TestParseWrongProtocol(tester *testing.T) {
Expand Down Expand Up @@ -104,12 +105,13 @@ func TestParseAndStreamIcmp(tester *testing.T) {
assert.Nil(tester, perr)
assert.Len(tester, packets, 2)

reader, err := ToStream(packets)
reader, size, err := ToStream(packets)

assert.Nil(tester, err)
pcap_length := 196 // correlates to two icmp packets in icmp.pcap
bytes := make([]byte, 32768)
count, err := reader.Read(bytes)
assert.Nil(tester, err)
assert.Equal(tester, pcap_length, count)
assert.Equal(tester, pcap_length, size)
}

0 comments on commit 4161b89

Please sign in to comment.