Skip to content

Commit

Permalink
Add network.community_id to Packetbeat flows (elastic#10061)
Browse files Browse the repository at this point in the history
This adds the network.community_id field to Packetbeat flow events.
  • Loading branch information
andrewkroh authored Jan 14, 2019
1 parent c956b80 commit a941fd4
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 28 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

- Fixed rare issue where TLS connections to endpoints with x509 certificates missing either notBefore or notAfter would cause the check to fail with a stacktrace. {pull}9566[9566]


*Journalbeat*

*Metricbeat*
Expand All @@ -133,6 +132,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Packetbeat*

- Add `network.community_id` to Packetbeat flow events. {pull}10061[10061]

*Functionbeat*

==== Deprecated
Expand Down
29 changes: 26 additions & 3 deletions packetbeat/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ type Decoder struct {
tcpProc tcp.Processor
udpProc udp.Processor

flows *flows.Flows
statPackets *flows.Uint
statBytes *flows.Uint
flows *flows.Flows
statPackets *flows.Uint
statBytes *flows.Uint
icmpV4TypeCode *flows.Uint
icmpV6TypeCode *flows.Uint

// hold current flow ID
flowID *flows.FlowID // buffer flowID among many calls
Expand All @@ -69,6 +71,8 @@ type Decoder struct {
const (
netPacketsTotalCounter = "packets"
netBytesTotalCounter = "bytes"
icmpV4TypeCodeValue = "icmpV4TypeCode"
icmpV6TypeCodeValue = "icmpV6TypeCode"
)

// New creates and initializes a new packet decoder.
Expand Down Expand Up @@ -98,6 +102,15 @@ func New(
if err != nil {
return nil, err
}
d.icmpV4TypeCode, err = f.NewUint(icmpV4TypeCodeValue)
if err != nil {
return nil, err
}
d.icmpV6TypeCode, err = f.NewUint(icmpV6TypeCodeValue)
if err != nil {
return nil, err
}

d.flowID = &flows.FlowID{}
}

Expand Down Expand Up @@ -280,6 +293,11 @@ func (d *Decoder) process(
}

func (d *Decoder) onICMPv4(packet *protos.Packet) {
if d.flowID != nil {
flow := d.flows.Get(d.flowID)
d.icmpV4TypeCode.Set(flow, uint64(d.icmp4.TypeCode))
}

if d.icmp4Proc != nil {
packet.Payload = d.icmp4.Payload
packet.Tuple.ComputeHashables()
Expand All @@ -288,6 +306,11 @@ func (d *Decoder) onICMPv4(packet *protos.Packet) {
}

func (d *Decoder) onICMPv6(packet *protos.Packet) {
if d.flowID != nil {
flow := d.flows.Get(d.flowID)
d.icmpV6TypeCode.Set(flow, uint64(d.icmp6.TypeCode))
}

if d.icmp6Proc != nil {
packet.Payload = d.icmp6.Payload
packet.Tuple.ComputeHashables()
Expand Down
56 changes: 45 additions & 11 deletions packetbeat/flows/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/flowhash"
"github.com/elastic/beats/packetbeat/procs"
"github.com/elastic/beats/packetbeat/protos/applayer"
)
Expand Down Expand Up @@ -224,6 +225,7 @@ func createEvent(
source := common.MapStr{}
dest := common.MapStr{}
tuple := common.IPPortTuple{}
var communityID flowhash.Flow
var proto applayer.Transport

// add ethernet layer meta data
Expand All @@ -242,13 +244,6 @@ func createEvent(
putOrAppendUint64(flow, "vlan", vlanID)
}

// add icmp
if icmp := f.id.ICMPv4(); icmp != nil {
network["transport"] = "icmp"
} else if icmp := f.id.ICMPv6(); icmp != nil {
network["transport"] = "ipv6-icmp"
}

// ipv4 layer meta data
if src, dst, ok := f.id.OutterIPv4Addr(); ok {
srcIP, dstIP := net.IP(src), net.IP(dst)
Expand All @@ -258,6 +253,8 @@ func createEvent(
tuple.DstIP = dstIP
tuple.IPLength = 4
network["type"] = "ipv4"
communityID.SourceIP = srcIP
communityID.DestinationIP = dstIP
}
if src, dst, ok := f.id.IPv4Addr(); ok {
srcIP, dstIP := net.IP(src), net.IP(dst)
Expand All @@ -268,8 +265,10 @@ func createEvent(
tuple.SrcIP = srcIP
tuple.DstIP = dstIP
tuple.IPLength = 4
communityID.SourceIP = srcIP
communityID.DestinationIP = dstIP
network["type"] = "ipv4"
}
network["type"] = "ipv4"
}

// ipv6 layer meta data
Expand All @@ -281,6 +280,8 @@ func createEvent(
tuple.DstIP = dstIP
tuple.IPLength = 6
network["type"] = "ipv6"
communityID.SourceIP = srcIP
communityID.DestinationIP = dstIP
}
if src, dst, ok := f.id.IPv6Addr(); ok {
srcIP, dstIP := net.IP(src), net.IP(dst)
Expand All @@ -291,8 +292,10 @@ func createEvent(
tuple.SrcIP = srcIP
tuple.DstIP = dstIP
tuple.IPLength = 6
communityID.SourceIP = srcIP
communityID.DestinationIP = dstIP
network["type"] = "ipv6"
}
network["type"] = "ipv6"
}

// udp layer meta data
Expand All @@ -302,6 +305,9 @@ func createEvent(
source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort
network["transport"] = "udp"
proto = applayer.TransportUDP
communityID.SourcePort = tuple.SrcPort
communityID.DestinationPort = tuple.DstPort
communityID.Protocol = 17
}

// tcp layer meta data
Expand All @@ -311,14 +317,34 @@ func createEvent(
source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort
network["transport"] = "tcp"
proto = applayer.TransportTCP
communityID.SourcePort = tuple.SrcPort
communityID.DestinationPort = tuple.DstPort
communityID.Protocol = 6
}

var totalBytes, totalPackets uint64
if f.stats[0] != nil {
// Source stats.
stats := encodeStats(f.stats[0], intNames, uintNames, floatNames)
for k, v := range stats {
source[k] = v
switch k {
case "icmpV4TypeCode":
if typeCode, ok := v.(uint64); ok && typeCode > 0 {
network["transport"] = "icmp"
communityID.Protocol = 1
communityID.ICMP.Type = uint8(typeCode >> 8)
communityID.ICMP.Code = uint8(typeCode)
}
case "icmpV6TypeCode":
if typeCode, ok := v.(uint64); ok && typeCode > 0 {
network["transport"] = "ipv6-icmp"
communityID.Protocol = 58
communityID.ICMP.Type = uint8(typeCode >> 8)
communityID.ICMP.Code = uint8(typeCode)
}
default:
source[k] = v
}
}

if v, found := stats["bytes"]; found {
Expand All @@ -332,7 +358,11 @@ func createEvent(
// Destination stats.
stats := encodeStats(f.stats[1], intNames, uintNames, floatNames)
for k, v := range stats {
dest[k] = v
switch k {
case "icmpV4TypeCode", "icmpV6TypeCode":
default:
dest[k] = v
}
}

if v, found := stats["bytes"]; found {
Expand All @@ -342,6 +372,10 @@ func createEvent(
totalPackets += v.(uint64)
}
}
if communityID.Protocol > 0 && len(communityID.SourceIP) > 0 && len(communityID.DestinationIP) > 0 {
hash := flowhash.CommunityID.Hash(communityID)
network["community_id"] = hash
}
network["bytes"] = totalBytes
network["packets"] = totalPackets
fields["network"] = network
Expand Down
30 changes: 17 additions & 13 deletions packetbeat/pb/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,23 +153,27 @@ func (f *Fields) ComputeValues(localIPs []net.IP) error {
flow.Protocol = 1
// TODO: Populate the ICMP type/code.
case f.Network.Transport == "ipv6-icmp":
flow.Protocol = 65
flow.Protocol = 58
// TODO: Populate the ICMP type/code.
}
f.Network.CommunityID = flowhash.CommunityID.Hash(flow)
if flow.Protocol > 0 && len(flow.SourceIP) > 0 && len(flow.DestinationIP) > 0 {
f.Network.CommunityID = flowhash.CommunityID.Hash(flow)
}

// network.type
if len(flow.SourceIP) > 0 {
if flow.SourceIP.To4() != nil {
f.Network.Type = "ipv4"
} else {
f.Network.Type = "ipv6"
}
} else if len(flow.DestinationIP) > 0 {
if flow.DestinationIP.To4() != nil {
f.Network.Type = "ipv4"
} else {
f.Network.Type = "ipv6"
if f.Network.Type == "" {
if len(flow.SourceIP) > 0 {
if flow.SourceIP.To4() != nil {
f.Network.Type = "ipv4"
} else {
f.Network.Type = "ipv6"
}
} else if len(flow.DestinationIP) > 0 {
if flow.DestinationIP.To4() != nil {
f.Network.Type = "ipv4"
} else {
f.Network.Type = "ipv6"
}
}
}

Expand Down
52 changes: 52 additions & 0 deletions packetbeat/tests/system/test_0060_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from pprint import PrettyPrinter
from datetime import datetime
import six
import os


def pprint(x): return PrettyPrinter().pprint(x)
Expand Down Expand Up @@ -165,3 +166,54 @@ def test_q_in_q_flow(self):
'network.bytes': 82,
'network.packets': 1,
})

def test_community_id_icmp(self):
objs = self.check_community_id("icmp.pcap")

assert len(objs) == 1
self.assertEqual(objs[0]["network.community_id"], "1:X0snYXpgwiv9TZtqg64sgzUn6Dk=")

def test_community_id_icmp6(self):
objs = self.check_community_id("icmp6.pcap")

assert len(objs) == 10
self.assertEqual(objs[0]["network.community_id"], "1:zavyT/cezQr1fmImYCwYnMXbgck=")
self.assertEqual(objs[1]["network.community_id"], "1:GpbEQrKqfWtsfsFiqg8fufoZe5Y=")
self.assertEqual(objs[2]["network.community_id"], "1:bnQKq8A2r//dWnkRW2EYcMhShjc=")
self.assertEqual(objs[3]["network.community_id"], "1:2ObVBgIn28oZvibYZhZMBgh7WdQ=")
self.assertEqual(objs[4]["network.community_id"], "1:hLZd0XGWojozrvxqE0dWB1iM6R0=")
self.assertEqual(objs[5]["network.community_id"], "1:+TW+HtLHvV1xnGhV1lv7XoJrqQg=")
self.assertEqual(objs[6]["network.community_id"], "1:hO+sN4H+MG5MY/8hIrXPqc4ZQz0=")
self.assertEqual(objs[7]["network.community_id"], "1:pkvHqCL88/tg1k4cPigmZXUtL00=")
self.assertEqual(objs[8]["network.community_id"], "1:jwuBy9UWZK1KUFqJV5cHdVpfrlY=")
self.assertEqual(objs[9]["network.community_id"], "1:MEixa66kuz0OMvlQqnAIzP3n2xg=")

def test_community_id_ipv4_tcp(self):
objs = self.check_community_id("tcp.pcap")

all([self.assertEqual(o["network.community_id"], "1:LQU9qZlK+B5F3KDmev6m5PMibrg=") for o in objs])

def test_community_id_ipv4_udp(self):
objs = self.check_community_id("udp.pcap")

all([self.assertEqual(o["network.community_id"], "1:d/FP5EW3wiY1vCndhwleRRKHowQ=") for o in objs])

def check_community_id(self, pcap):
self.render_config_template(
flows=True,
shutdown_timeout="1s",
processors=[{
"drop_event": {
"when": "not.equals.event.type: flow",
},
}]
)
self.run_packetbeat(
pcap=os.path.join("../../../../libbeat/common/flowhash/testdata/pcap", pcap),
debug_selectors=["*"])

objs = self.read_output(
types=["flow"],
required_fields=FLOWS_REQUIRED_FIELDS)

return objs

0 comments on commit a941fd4

Please sign in to comment.