Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Suri PCAP parsing performance #387

Merged
merged 1 commit into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 63 additions & 73 deletions packet/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package packet
import (
"bytes"
"encoding/base64"
"fmt"
"io"
"os"

Expand All @@ -33,7 +34,7 @@ var SupportedLayerTypes = [...]gopacket.LayerType{

func ParsePcap(filename string, offset int, count int, unwrap bool) ([]*model.Packet, error) {
packets := make([]*model.Packet, 0)
parsePcapFile(filename, func(index int, pcapPacket gopacket.Packet) bool {
parsePcapFile(filename, "", func(index int, pcapPacket gopacket.Packet) bool {
if index >= offset {
packet := model.NewPacket(index)
parseData(pcapPacket, packet, unwrap)
Expand Down Expand Up @@ -65,91 +66,29 @@ func ToStream(packets []gopacket.Packet) (io.ReadCloser, int, error) {
return io.NopCloser(bytes.NewReader(full.Bytes())), full.Len(), nil
}

func getPacketProtocol(packet gopacket.Packet) string {
if packet.Layer(layers.LayerTypeTCP) != nil {
return model.PROTOCOL_TCP
}
if packet.Layer(layers.LayerTypeUDP) != nil {
return model.PROTOCOL_UDP
}
if packet.Layer(layers.LayerTypeICMPv4) != nil ||
packet.Layer(layers.LayerTypeICMPv6) != nil {
return model.PROTOCOL_ICMP
}
return ""
}

func filterPacket(filter *model.Filter, packet gopacket.Packet) bool {

// Quickly exit if packet time is outside the bounds of the time filter or protocol doesn't match
timestamp := packet.Metadata().Timestamp
if (!filter.BeginTime.IsZero() && filter.BeginTime.After(timestamp)) ||
(!filter.EndTime.IsZero() && filter.EndTime.Before(timestamp)) ||
(filter.Protocol != "" && filter.Protocol != getPacketProtocol(packet)) {
(!filter.EndTime.IsZero() && filter.EndTime.Before(timestamp)) {
return false
}

// Grab the source and destination IPs, if they exist
var srcIp, dstIp string
layer := packet.Layer(layers.LayerTypeIPv4)
if layer != nil {
layer := layer.(*layers.IPv4)
srcIp = layer.SrcIP.String()
dstIp = layer.DstIP.String()
} else {
layer = packet.Layer(layers.LayerTypeIPv6)
if layer != nil {
layer := layer.(*layers.IPv6)
srcIp = layer.SrcIP.String()
dstIp = layer.DstIP.String()
}
}

// Check for IP match in both directions
include := (filter.SrcIp == "" || filter.SrcIp == srcIp || filter.SrcIp == dstIp) &&
(filter.DstIp == "" || filter.DstIp == dstIp || filter.DstIp == srcIp)

if include && (filter.Protocol == "" || filter.Protocol == "udp" || filter.Protocol == "tcp") && (filter.SrcPort != 0 || filter.DstPort != 0) {

var srcPort, dstPort int
layer = packet.Layer(layers.LayerTypeTCP)
if layer != nil {
layer := layer.(*layers.TCP)
srcPort = int(layer.SrcPort)
dstPort = int(layer.DstPort)
} else {
layer = packet.Layer(layers.LayerTypeUDP)
if layer != nil {
layer := layer.(*layers.UDP)
srcPort = int(layer.SrcPort)
dstPort = int(layer.DstPort)
}
}

include = include &&
(filter.SrcPort == 0 || filter.SrcPort == srcPort || filter.SrcPort == dstPort) &&
(filter.DstPort == 0 || filter.DstPort == dstPort || filter.DstPort == srcPort)
}

return include
return true
}

func ParseRawPcap(filename string, count int, filter *model.Filter) ([]gopacket.Packet, error) {
func ParseRawPcap(filename string, maxCount int, filter *model.Filter) ([]gopacket.Packet, error) {
packets := make([]gopacket.Packet, 0)
err := parsePcapFile(filename, func(index int, pcapPacket gopacket.Packet) bool {
currentCount := 0
err := parsePcapFile(filename, createBpf(filter), func(index int, pcapPacket gopacket.Packet) bool {
if filterPacket(filter, pcapPacket) {
packets = append(packets, pcapPacket)
} else {
pcapPacket = unwrapVxlanPacket(pcapPacket, nil)
if filterPacket(filter, pcapPacket) {
packets = append(packets, pcapPacket)
}
currentCount += 1
}

return len(packets) < count
return currentCount < maxCount
})

if len(packets) == count {
if currentCount == maxCount {
log.WithFields(log.Fields{
"packetCount": len(packets),
}).Warn("Exceeded packet capture limit for job; returned PCAP will be truncated")
Expand All @@ -158,6 +97,49 @@ func ParseRawPcap(filename string, count int, filter *model.Filter) ([]gopacket.
return packets, err
}

func addBpf(bpf string, part string) string {
newBpf := bpf

if len(newBpf) > 0 {
newBpf = newBpf + " and "
}

newBpf = newBpf + part

return newBpf

}

func createBpf(filter *model.Filter) string {
query := filter.Protocol

if len(filter.SrcIp) > 0 {
query = addBpf(query, fmt.Sprintf("host %s", filter.SrcIp))
}

if len(filter.DstIp) > 0 {
query = addBpf(query, fmt.Sprintf("host %s", filter.DstIp))
}

// Some legacy jobs won't have the protocol provided
if filter.Protocol != model.PROTOCOL_ICMP {
if filter.SrcPort > 0 {
query = addBpf(query, fmt.Sprintf("port %d", filter.SrcPort))
}

if filter.DstPort > 0 {
query = addBpf(query, fmt.Sprintf("port %d", filter.DstPort))
}
}

// Repeat the query but with vlan applied
if len(query) > 0 {
query = fmt.Sprintf("(%s) or (vlan and %s)", query, query)
}

return query
}

func UnwrapPcap(filename string, unwrappedFilename string) bool {
unwrapped := false
info, err := os.Stat(unwrappedFilename)
Expand All @@ -172,7 +154,7 @@ func UnwrapPcap(filename string, unwrappedFilename string) bool {
log.WithError(err).WithField("unwrappedFilename", unwrappedFilename).Error("Unable to write unwrapped file header")
} else {
defer unwrappedFile.Close()
err = parsePcapFile(filename, func(index int, pcapPacket gopacket.Packet) bool {
err = parsePcapFile(filename, "", func(index int, pcapPacket gopacket.Packet) bool {
newPacket := unwrapVxlanPacket(pcapPacket, nil)
err = writer.WritePacket(newPacket.Metadata().CaptureInfo, newPacket.Data())
if err != nil {
Expand Down Expand Up @@ -201,13 +183,21 @@ func UnwrapPcap(filename string, unwrappedFilename string) bool {

}

func parsePcapFile(filename string, handler func(int, gopacket.Packet) bool) error {
func parsePcapFile(filename string, bpf string, handler func(int, gopacket.Packet) bool) error {
handle, err := pcap.OpenOffline(filename)
if err == nil {
defer handle.Close()
if bpf != "" {
err = handle.SetBPFFilter(bpf)
if err != nil {
log.WithError(err).WithField("pcapBpf", bpf).Error("Invalid BPF")
return err
}
}
packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
packetSource.DecodeOptions.Lazy = true
packetSource.DecodeOptions.NoCopy = true
packetSource.DecodeOptions.SkipDecodeRecovery = true
index := 0
for pcapPacket := range packetSource.Packets() {
if pcapPacket != nil {
Expand Down
17 changes: 17 additions & 0 deletions packet/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,20 @@ func TestParseAndStreamIcmp(tester *testing.T) {
assert.Equal(tester, pcap_length, count)
assert.Equal(tester, pcap_length, size)
}

func TestCreateBpf(tester *testing.T) {
filter := model.NewFilter()
startTime, _ := time.Parse(time.RFC3339, "2024-02-12T00:00:00Z")
filter.BeginTime = startTime
endTime, _ := time.Parse(time.RFC3339, "2024-02-12T23:59:59Z")
filter.EndTime = endTime
filter.Protocol = model.PROTOCOL_ICMP
filter.SrcIp = "90.151.225.16"
filter.SrcPort = 19 // will be ignored since Protocol = ICMP
filter.DstIp = "192.168.10.128"
filter.DstPort = 34515 // will be ignored since Protocol = ICMP

actual := createBpf(filter)
expected := "(icmp and host 90.151.225.16 and host 192.168.10.128) or (vlan and icmp and host 90.151.225.16 and host 192.168.10.128)"
assert.Equal(tester, expected, actual)
}
Loading