Skip to content

Commit

Permalink
refactor suri pcap parser for 4x perf gain
Browse files Browse the repository at this point in the history
  • Loading branch information
jertel committed Mar 14, 2024
1 parent c7f4622 commit 8bd29f4
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 73 deletions.
136 changes: 63 additions & 73 deletions packet/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package packet
import (
"bytes"
"encoding/base64"
"fmt"
"io"
"os"

Expand All @@ -33,7 +34,7 @@ var SupportedLayerTypes = [...]gopacket.LayerType{

func ParsePcap(filename string, offset int, count int, unwrap bool) ([]*model.Packet, error) {
packets := make([]*model.Packet, 0)
parsePcapFile(filename, func(index int, pcapPacket gopacket.Packet) bool {
parsePcapFile(filename, "", func(index int, pcapPacket gopacket.Packet) bool {
if index >= offset {
packet := model.NewPacket(index)
parseData(pcapPacket, packet, unwrap)
Expand Down Expand Up @@ -65,91 +66,29 @@ func ToStream(packets []gopacket.Packet) (io.ReadCloser, int, error) {
return io.NopCloser(bytes.NewReader(full.Bytes())), full.Len(), nil
}

func getPacketProtocol(packet gopacket.Packet) string {
if packet.Layer(layers.LayerTypeTCP) != nil {
return model.PROTOCOL_TCP
}
if packet.Layer(layers.LayerTypeUDP) != nil {
return model.PROTOCOL_UDP
}
if packet.Layer(layers.LayerTypeICMPv4) != nil ||
packet.Layer(layers.LayerTypeICMPv6) != nil {
return model.PROTOCOL_ICMP
}
return ""
}

func filterPacket(filter *model.Filter, packet gopacket.Packet) bool {

// Quickly exit if packet time is outside the bounds of the time filter or protocol doesn't match
timestamp := packet.Metadata().Timestamp
if (!filter.BeginTime.IsZero() && filter.BeginTime.After(timestamp)) ||
(!filter.EndTime.IsZero() && filter.EndTime.Before(timestamp)) ||
(filter.Protocol != "" && filter.Protocol != getPacketProtocol(packet)) {
(!filter.EndTime.IsZero() && filter.EndTime.Before(timestamp)) {
return false
}

// Grab the source and destination IPs, if they exist
var srcIp, dstIp string
layer := packet.Layer(layers.LayerTypeIPv4)
if layer != nil {
layer := layer.(*layers.IPv4)
srcIp = layer.SrcIP.String()
dstIp = layer.DstIP.String()
} else {
layer = packet.Layer(layers.LayerTypeIPv6)
if layer != nil {
layer := layer.(*layers.IPv6)
srcIp = layer.SrcIP.String()
dstIp = layer.DstIP.String()
}
}

// Check for IP match in both directions
include := (filter.SrcIp == "" || filter.SrcIp == srcIp || filter.SrcIp == dstIp) &&
(filter.DstIp == "" || filter.DstIp == dstIp || filter.DstIp == srcIp)

if include && (filter.Protocol == "" || filter.Protocol == "udp" || filter.Protocol == "tcp") && (filter.SrcPort != 0 || filter.DstPort != 0) {

var srcPort, dstPort int
layer = packet.Layer(layers.LayerTypeTCP)
if layer != nil {
layer := layer.(*layers.TCP)
srcPort = int(layer.SrcPort)
dstPort = int(layer.DstPort)
} else {
layer = packet.Layer(layers.LayerTypeUDP)
if layer != nil {
layer := layer.(*layers.UDP)
srcPort = int(layer.SrcPort)
dstPort = int(layer.DstPort)
}
}

include = include &&
(filter.SrcPort == 0 || filter.SrcPort == srcPort || filter.SrcPort == dstPort) &&
(filter.DstPort == 0 || filter.DstPort == dstPort || filter.DstPort == srcPort)
}

return include
return true
}

func ParseRawPcap(filename string, count int, filter *model.Filter) ([]gopacket.Packet, error) {
func ParseRawPcap(filename string, maxCount int, filter *model.Filter) ([]gopacket.Packet, error) {
packets := make([]gopacket.Packet, 0)
err := parsePcapFile(filename, func(index int, pcapPacket gopacket.Packet) bool {
currentCount := 0
err := parsePcapFile(filename, createBpf(filter), func(index int, pcapPacket gopacket.Packet) bool {
if filterPacket(filter, pcapPacket) {
packets = append(packets, pcapPacket)
} else {
pcapPacket = unwrapVxlanPacket(pcapPacket, nil)
if filterPacket(filter, pcapPacket) {
packets = append(packets, pcapPacket)
}
currentCount += 1
}

return len(packets) < count
return currentCount < maxCount
})

if len(packets) == count {
if currentCount == maxCount {
log.WithFields(log.Fields{
"packetCount": len(packets),
}).Warn("Exceeded packet capture limit for job; returned PCAP will be truncated")
Expand All @@ -158,6 +97,49 @@ func ParseRawPcap(filename string, count int, filter *model.Filter) ([]gopacket.
return packets, err
}

func addBpf(bpf string, part string) string {
newBpf := bpf

if len(newBpf) > 0 {
newBpf = newBpf + " and "
}

newBpf = newBpf + part

return newBpf

}

func createBpf(filter *model.Filter) string {
query := filter.Protocol

if len(filter.SrcIp) > 0 {
query = addBpf(query, fmt.Sprintf("host %s", filter.SrcIp))
}

if len(filter.DstIp) > 0 {
query = addBpf(query, fmt.Sprintf("host %s", filter.DstIp))
}

// Some legacy jobs won't have the protocol provided
if filter.Protocol != model.PROTOCOL_ICMP {
if filter.SrcPort > 0 {
query = addBpf(query, fmt.Sprintf("port %d", filter.SrcPort))
}

if filter.DstPort > 0 {
query = addBpf(query, fmt.Sprintf("port %d", filter.DstPort))
}
}

// Repeat the query but with vlan applied
if len(query) > 0 {
query = fmt.Sprintf("(%s) or (vlan and %s)", query, query)
}

return query
}

func UnwrapPcap(filename string, unwrappedFilename string) bool {
unwrapped := false
info, err := os.Stat(unwrappedFilename)
Expand All @@ -172,7 +154,7 @@ func UnwrapPcap(filename string, unwrappedFilename string) bool {
log.WithError(err).WithField("unwrappedFilename", unwrappedFilename).Error("Unable to write unwrapped file header")
} else {
defer unwrappedFile.Close()
err = parsePcapFile(filename, func(index int, pcapPacket gopacket.Packet) bool {
err = parsePcapFile(filename, "", func(index int, pcapPacket gopacket.Packet) bool {
newPacket := unwrapVxlanPacket(pcapPacket, nil)
err = writer.WritePacket(newPacket.Metadata().CaptureInfo, newPacket.Data())
if err != nil {
Expand Down Expand Up @@ -201,13 +183,21 @@ func UnwrapPcap(filename string, unwrappedFilename string) bool {

}

func parsePcapFile(filename string, handler func(int, gopacket.Packet) bool) error {
func parsePcapFile(filename string, bpf string, handler func(int, gopacket.Packet) bool) error {
handle, err := pcap.OpenOffline(filename)
if err == nil {
defer handle.Close()
if bpf != "" {
err = handle.SetBPFFilter(bpf)
if err != nil {
log.WithError(err).WithField("pcapBpf", bpf).Error("Invalid BPF")
return err
}
}
packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
packetSource.DecodeOptions.Lazy = true
packetSource.DecodeOptions.NoCopy = true
packetSource.DecodeOptions.SkipDecodeRecovery = true
index := 0
for pcapPacket := range packetSource.Packets() {
if pcapPacket != nil {
Expand Down
17 changes: 17 additions & 0 deletions packet/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,20 @@ func TestParseAndStreamIcmp(tester *testing.T) {
assert.Equal(tester, pcap_length, count)
assert.Equal(tester, pcap_length, size)
}

func TestCreateBpf(tester *testing.T) {
filter := model.NewFilter()
startTime, _ := time.Parse(time.RFC3339, "2024-02-12T00:00:00Z")
filter.BeginTime = startTime
endTime, _ := time.Parse(time.RFC3339, "2024-02-12T23:59:59Z")
filter.EndTime = endTime
filter.Protocol = model.PROTOCOL_ICMP
filter.SrcIp = "90.151.225.16"
filter.SrcPort = 19 // will be ignored since Protocol = ICMP
filter.DstIp = "192.168.10.128"
filter.DstPort = 34515 // will be ignored since Protocol = ICMP

actual := createBpf(filter)
expected := "(icmp and host 90.151.225.16 and host 192.168.10.128) or (vlan and icmp and host 90.151.225.16 and host 192.168.10.128)"
assert.Equal(tester, expected, actual)
}

0 comments on commit 8bd29f4

Please sign in to comment.