From a941fd4c6ccf3be80965c74264972f60f3d10f1f Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Mon, 14 Jan 2019 17:30:30 -0500 Subject: [PATCH] Add network.community_id to Packetbeat flows (#10061) This adds the network.community_id field to Packetbeat flow events. --- CHANGELOG.next.asciidoc | 3 +- packetbeat/decoder/decoder.go | 29 +++++++++-- packetbeat/flows/worker.go | 56 +++++++++++++++++----- packetbeat/pb/event.go | 30 +++++++----- packetbeat/tests/system/test_0060_flows.py | 52 ++++++++++++++++++++ 5 files changed, 142 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9af67cd36c1a..db797e8bca48 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -119,7 +119,6 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fixed rare issue where TLS connections to endpoints with x509 certificates missing either notBefore or notAfter would cause the check to fail with a stacktrace. {pull}9566[9566] - *Journalbeat* *Metricbeat* @@ -133,6 +132,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Packetbeat* +- Add `network.community_id` to Packetbeat flow events. {pull}10061[10061] + *Functionbeat* ==== Deprecated diff --git a/packetbeat/decoder/decoder.go b/packetbeat/decoder/decoder.go index d5b1d28848ae..3aba33b7f88a 100644 --- a/packetbeat/decoder/decoder.go +++ b/packetbeat/decoder/decoder.go @@ -57,9 +57,11 @@ type Decoder struct { tcpProc tcp.Processor udpProc udp.Processor - flows *flows.Flows - statPackets *flows.Uint - statBytes *flows.Uint + flows *flows.Flows + statPackets *flows.Uint + statBytes *flows.Uint + icmpV4TypeCode *flows.Uint + icmpV6TypeCode *flows.Uint // hold current flow ID flowID *flows.FlowID // buffer flowID among many calls @@ -69,6 +71,8 @@ type Decoder struct { const ( netPacketsTotalCounter = "packets" netBytesTotalCounter = "bytes" + icmpV4TypeCodeValue = "icmpV4TypeCode" + icmpV6TypeCodeValue = "icmpV6TypeCode" ) // New creates and initializes a new packet decoder. @@ -98,6 +102,15 @@ func New( if err != nil { return nil, err } + d.icmpV4TypeCode, err = f.NewUint(icmpV4TypeCodeValue) + if err != nil { + return nil, err + } + d.icmpV6TypeCode, err = f.NewUint(icmpV6TypeCodeValue) + if err != nil { + return nil, err + } + d.flowID = &flows.FlowID{} } @@ -280,6 +293,11 @@ func (d *Decoder) process( } func (d *Decoder) onICMPv4(packet *protos.Packet) { + if d.flowID != nil { + flow := d.flows.Get(d.flowID) + d.icmpV4TypeCode.Set(flow, uint64(d.icmp4.TypeCode)) + } + if d.icmp4Proc != nil { packet.Payload = d.icmp4.Payload packet.Tuple.ComputeHashables() @@ -288,6 +306,11 @@ func (d *Decoder) onICMPv4(packet *protos.Packet) { } func (d *Decoder) onICMPv6(packet *protos.Packet) { + if d.flowID != nil { + flow := d.flows.Get(d.flowID) + d.icmpV6TypeCode.Set(flow, uint64(d.icmp6.TypeCode)) + } + if d.icmp6Proc != nil { packet.Payload = d.icmp6.Payload packet.Tuple.ComputeHashables() diff --git a/packetbeat/flows/worker.go b/packetbeat/flows/worker.go index 6c9d3b2455f9..ad9fdf9f217f 100644 --- a/packetbeat/flows/worker.go +++ b/packetbeat/flows/worker.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/flowhash" "github.com/elastic/beats/packetbeat/procs" "github.com/elastic/beats/packetbeat/protos/applayer" ) @@ -224,6 +225,7 @@ func createEvent( source := common.MapStr{} dest := common.MapStr{} tuple := common.IPPortTuple{} + var communityID flowhash.Flow var proto applayer.Transport // add ethernet layer meta data @@ -242,13 +244,6 @@ func createEvent( putOrAppendUint64(flow, "vlan", vlanID) } - // add icmp - if icmp := f.id.ICMPv4(); icmp != nil { - network["transport"] = "icmp" - } else if icmp := f.id.ICMPv6(); icmp != nil { - network["transport"] = "ipv6-icmp" - } - // ipv4 layer meta data if src, dst, ok := f.id.OutterIPv4Addr(); ok { srcIP, dstIP := net.IP(src), net.IP(dst) @@ -258,6 +253,8 @@ func createEvent( tuple.DstIP = dstIP tuple.IPLength = 4 network["type"] = "ipv4" + communityID.SourceIP = srcIP + communityID.DestinationIP = dstIP } if src, dst, ok := f.id.IPv4Addr(); ok { srcIP, dstIP := net.IP(src), net.IP(dst) @@ -268,8 +265,10 @@ func createEvent( tuple.SrcIP = srcIP tuple.DstIP = dstIP tuple.IPLength = 4 + communityID.SourceIP = srcIP + communityID.DestinationIP = dstIP + network["type"] = "ipv4" } - network["type"] = "ipv4" } // ipv6 layer meta data @@ -281,6 +280,8 @@ func createEvent( tuple.DstIP = dstIP tuple.IPLength = 6 network["type"] = "ipv6" + communityID.SourceIP = srcIP + communityID.DestinationIP = dstIP } if src, dst, ok := f.id.IPv6Addr(); ok { srcIP, dstIP := net.IP(src), net.IP(dst) @@ -291,8 +292,10 @@ func createEvent( tuple.SrcIP = srcIP tuple.DstIP = dstIP tuple.IPLength = 6 + communityID.SourceIP = srcIP + communityID.DestinationIP = dstIP + network["type"] = "ipv6" } - network["type"] = "ipv6" } // udp layer meta data @@ -302,6 +305,9 @@ func createEvent( source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort network["transport"] = "udp" proto = applayer.TransportUDP + communityID.SourcePort = tuple.SrcPort + communityID.DestinationPort = tuple.DstPort + communityID.Protocol = 17 } // tcp layer meta data @@ -311,6 +317,9 @@ func createEvent( source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort network["transport"] = "tcp" proto = applayer.TransportTCP + communityID.SourcePort = tuple.SrcPort + communityID.DestinationPort = tuple.DstPort + communityID.Protocol = 6 } var totalBytes, totalPackets uint64 @@ -318,7 +327,24 @@ func createEvent( // Source stats. stats := encodeStats(f.stats[0], intNames, uintNames, floatNames) for k, v := range stats { - source[k] = v + switch k { + case "icmpV4TypeCode": + if typeCode, ok := v.(uint64); ok && typeCode > 0 { + network["transport"] = "icmp" + communityID.Protocol = 1 + communityID.ICMP.Type = uint8(typeCode >> 8) + communityID.ICMP.Code = uint8(typeCode) + } + case "icmpV6TypeCode": + if typeCode, ok := v.(uint64); ok && typeCode > 0 { + network["transport"] = "ipv6-icmp" + communityID.Protocol = 58 + communityID.ICMP.Type = uint8(typeCode >> 8) + communityID.ICMP.Code = uint8(typeCode) + } + default: + source[k] = v + } } if v, found := stats["bytes"]; found { @@ -332,7 +358,11 @@ func createEvent( // Destination stats. stats := encodeStats(f.stats[1], intNames, uintNames, floatNames) for k, v := range stats { - dest[k] = v + switch k { + case "icmpV4TypeCode", "icmpV6TypeCode": + default: + dest[k] = v + } } if v, found := stats["bytes"]; found { @@ -342,6 +372,10 @@ func createEvent( totalPackets += v.(uint64) } } + if communityID.Protocol > 0 && len(communityID.SourceIP) > 0 && len(communityID.DestinationIP) > 0 { + hash := flowhash.CommunityID.Hash(communityID) + network["community_id"] = hash + } network["bytes"] = totalBytes network["packets"] = totalPackets fields["network"] = network diff --git a/packetbeat/pb/event.go b/packetbeat/pb/event.go index bcab14fa4f5e..8a36d78b0b2e 100644 --- a/packetbeat/pb/event.go +++ b/packetbeat/pb/event.go @@ -153,23 +153,27 @@ func (f *Fields) ComputeValues(localIPs []net.IP) error { flow.Protocol = 1 // TODO: Populate the ICMP type/code. case f.Network.Transport == "ipv6-icmp": - flow.Protocol = 65 + flow.Protocol = 58 // TODO: Populate the ICMP type/code. } - f.Network.CommunityID = flowhash.CommunityID.Hash(flow) + if flow.Protocol > 0 && len(flow.SourceIP) > 0 && len(flow.DestinationIP) > 0 { + f.Network.CommunityID = flowhash.CommunityID.Hash(flow) + } // network.type - if len(flow.SourceIP) > 0 { - if flow.SourceIP.To4() != nil { - f.Network.Type = "ipv4" - } else { - f.Network.Type = "ipv6" - } - } else if len(flow.DestinationIP) > 0 { - if flow.DestinationIP.To4() != nil { - f.Network.Type = "ipv4" - } else { - f.Network.Type = "ipv6" + if f.Network.Type == "" { + if len(flow.SourceIP) > 0 { + if flow.SourceIP.To4() != nil { + f.Network.Type = "ipv4" + } else { + f.Network.Type = "ipv6" + } + } else if len(flow.DestinationIP) > 0 { + if flow.DestinationIP.To4() != nil { + f.Network.Type = "ipv4" + } else { + f.Network.Type = "ipv6" + } } } diff --git a/packetbeat/tests/system/test_0060_flows.py b/packetbeat/tests/system/test_0060_flows.py index 2fae71c47549..cd6215f6cf6c 100644 --- a/packetbeat/tests/system/test_0060_flows.py +++ b/packetbeat/tests/system/test_0060_flows.py @@ -2,6 +2,7 @@ from pprint import PrettyPrinter from datetime import datetime import six +import os def pprint(x): return PrettyPrinter().pprint(x) @@ -165,3 +166,54 @@ def test_q_in_q_flow(self): 'network.bytes': 82, 'network.packets': 1, }) + + def test_community_id_icmp(self): + objs = self.check_community_id("icmp.pcap") + + assert len(objs) == 1 + self.assertEqual(objs[0]["network.community_id"], "1:X0snYXpgwiv9TZtqg64sgzUn6Dk=") + + def test_community_id_icmp6(self): + objs = self.check_community_id("icmp6.pcap") + + assert len(objs) == 10 + self.assertEqual(objs[0]["network.community_id"], "1:zavyT/cezQr1fmImYCwYnMXbgck=") + self.assertEqual(objs[1]["network.community_id"], "1:GpbEQrKqfWtsfsFiqg8fufoZe5Y=") + self.assertEqual(objs[2]["network.community_id"], "1:bnQKq8A2r//dWnkRW2EYcMhShjc=") + self.assertEqual(objs[3]["network.community_id"], "1:2ObVBgIn28oZvibYZhZMBgh7WdQ=") + self.assertEqual(objs[4]["network.community_id"], "1:hLZd0XGWojozrvxqE0dWB1iM6R0=") + self.assertEqual(objs[5]["network.community_id"], "1:+TW+HtLHvV1xnGhV1lv7XoJrqQg=") + self.assertEqual(objs[6]["network.community_id"], "1:hO+sN4H+MG5MY/8hIrXPqc4ZQz0=") + self.assertEqual(objs[7]["network.community_id"], "1:pkvHqCL88/tg1k4cPigmZXUtL00=") + self.assertEqual(objs[8]["network.community_id"], "1:jwuBy9UWZK1KUFqJV5cHdVpfrlY=") + self.assertEqual(objs[9]["network.community_id"], "1:MEixa66kuz0OMvlQqnAIzP3n2xg=") + + def test_community_id_ipv4_tcp(self): + objs = self.check_community_id("tcp.pcap") + + all([self.assertEqual(o["network.community_id"], "1:LQU9qZlK+B5F3KDmev6m5PMibrg=") for o in objs]) + + def test_community_id_ipv4_udp(self): + objs = self.check_community_id("udp.pcap") + + all([self.assertEqual(o["network.community_id"], "1:d/FP5EW3wiY1vCndhwleRRKHowQ=") for o in objs]) + + def check_community_id(self, pcap): + self.render_config_template( + flows=True, + shutdown_timeout="1s", + processors=[{ + "drop_event": { + "when": "not.equals.event.type: flow", + }, + }] + ) + self.run_packetbeat( + pcap=os.path.join("../../../../libbeat/common/flowhash/testdata/pcap", pcap), + debug_selectors=["*"]) + + objs = self.read_output( + types=["flow"], + required_fields=FLOWS_REQUIRED_FIELDS) + + return objs