From ec961004f02c405ffd644103de4b3aa50d11c3fb Mon Sep 17 00:00:00 2001 From: Louis Date: Sat, 17 Aug 2024 17:28:09 -0700 Subject: [PATCH] feat(producer): refactor encapsulation and configuration (#342) Configuration and ProtoProducer now rely on interface to be extended more easily. The packet parser has been rewritten to support encapsulation (also on top of UDP). Adds SRv6. --- cmd/goflow2/main.go | 9 +- cmd/goflow2/mapping.yaml | 13 + docs/mapping.md | 201 +++++ format/binary/binary.go | 2 +- format/format.go | 8 +- format/text/text.go | 2 +- pb/flow.pb.go | 195 ++++- pb/flow.proto | 26 + producer/proto/config.go | 80 ++ producer/proto/{custom.go => config_impl.go} | 251 +++++- producer/proto/messages.go | 99 ++- producer/proto/producer_nf.go | 26 +- producer/proto/producer_packet.go | 860 +++++++++++++++++++ producer/proto/producer_packet_test.go | 494 +++++++++++ producer/proto/producer_sf.go | 395 +-------- producer/proto/producer_test.go | 71 -- producer/proto/proto.go | 17 +- producer/proto/reflect.go | 158 ++-- producer/proto/reflect_test.go | 80 ++ producer/proto/render.go | 2 + transport/file/transport.go | 1 - transport/kafka/kafka.go | 2 +- transport/transport.go | 6 +- utils/debug/decoder.go | 2 - utils/debug/producer.go | 1 - 25 files changed, 2340 insertions(+), 661 deletions(-) create mode 100644 docs/mapping.md create mode 100644 producer/proto/config.go rename producer/proto/{custom.go => config_impl.go} (59%) create mode 100644 producer/proto/producer_packet.go create mode 100644 producer/proto/producer_packet_test.go create mode 100644 producer/proto/reflect_test.go diff --git a/cmd/goflow2/main.go b/cmd/goflow2/main.go index 53f46eb1..7b3f5aca 100644 --- a/cmd/goflow2/main.go +++ b/cmd/goflow2/main.go @@ -122,7 +122,12 @@ func main() { } } - flowProducer, err = protoproducer.CreateProtoProducer(cfgProducer, protoproducer.CreateSamplingSystem) + cfgm, err := cfgProducer.Compile() // converts configuration into a format that can be used by a protobuf producer + if err != nil { + log.Fatal(err) + } + + flowProducer, err = protoproducer.CreateProtoProducer(cfgm, protoproducer.CreateSamplingSystem) if err != nil { log.Fatal(err) } @@ -308,7 +313,7 @@ func main() { l.Info("closed receiver") continue } else if !errors.Is(err, netflow.ErrorTemplateNotFound) && !errors.Is(err, debug.PanicError) { - l.Error("error") + l.WithError(err).Error("error") continue } diff --git a/cmd/goflow2/mapping.yaml b/cmd/goflow2/mapping.yaml index bf87dd34..e4039ab0 100644 --- a/cmd/goflow2/mapping.yaml +++ b/cmd/goflow2/mapping.yaml @@ -64,6 +64,19 @@ netflowv9: - field: 61 destination: flow_direction sflow: + ports: + - proto: "udp" + dir: "dst" + port: 3544 + parser: "teredo-dst" + - proto: "udp" + dir: "both" + port: 4754 + parser: "gre" + - proto: "udp" + dir: "both" + port: 6081 + parser: "geneve" mapping: - layer: "udp" offset: 48 diff --git a/docs/mapping.md b/docs/mapping.md new file mode 100644 index 00000000..20fbfd3d --- /dev/null +++ b/docs/mapping.md @@ -0,0 +1,201 @@ +# Mapping and Configuration + +GoFlow2 allows users to collect and represent non-standard fields +without having to rely on `-produce=raw` setting. + +By default, commonly used types are collected into the protobuf. +For instance source and destination IP addresses, TCP/UDP ports, etc. +When suggesting a new field to collect, preference should be given to fields +that are both widely adopted and supported by multiple protocols (sFlow, IPFIX). + +Some scenarios require more flexibility. +In fact, IPFIX allows Private Enterprise Numbers ([PEN](https://www.iana.org/assignments/enterprise-numbers/)) +and entire datagrams (IPFIX, sFlow) can contain bytes of interest. + +A mapping configuration file empowers GoFlow2 users to collect +extra data without changing the code and recompiling. +The feature is available for both protobuf binary and JSON formatting. + +A configuration file can be invoked the following way: + +```bash +goflow2 -mapping=config.yaml -format=json -produce=sample +``` + +An example configuration file that collects NetFlow/IPFIX flow direction information: + +```yaml +formatter: + fields: # list of fields to format in JSON + - flow_direction + protobuf: # manual protobuf fields addition + - name: flow_direction + index: 42 + type: varint +# Decoder mappings +ipfix: + mapping: + - field: 61 + destination: flow_direction +netflowv9: + mapping: + - field: 61 + destination: flow_direction +``` + +In JSON, the field `flow_direction` will now be added. +In binary protobuf, when consumed by another tool, +the latter can access the new field at index 42. +A custom proto file can be compiled with the following: + +```proto +message FlowMessage { + + ... + uint32 flow_direction = 42; + +``` + +## Formatting and rendering + +This section of the configuration is used for textual representations. +Both fields from [`flow.proto`](../pb/flow.proto) and custom ones inside `formatter.protobuf` +can be available in the textual output (JSON for instance). + +The items inside `formatter.fields` are the fields present in the output. + +The render section picks the representation. +For instance a 4/16 bytes field can be represented as an IP address, time can be represented as RFC3339 or epoch. + +```yaml +formatter: + fields: + - time_received_ns + - my_new_field + - my_other_field + protobuf: + - name: my_new_field + index: 1000 + type: varint + - name: my_other_field + index: 2000 + type: string + render: + time_received_ns: datetimenano + my_other_field: ip +``` + +## Encapsulation + +Custom mapping can be used with encapsulation. + +By default, GoFlow2 will expect a packet with the following layers: + +* Ethernet +* 802.1q and/or MPLS +* IP +* TCP or UDP + +A more complex packet could be in the form: + +* **Ethernet** +* **MPLS** +* **IP** +* *GRE* +* *Ethernet* +* *IP* +* *UDP* + +Only the layers in **bold** will have the information collected. +The perimeter that is considered encapsulation here is the GRE protocol (note: it could be started if a second Ethernet layer was above 802.1q). +Rather than having duplicates of the existing fields with encapsulation, a configuration file can be used to collect +the encapsulated fields. + +An additional consideration is that protobuf fields can be array (or `repeated`). +Due to the way the mapping works, the arrays are not [packed](https://protobuf.dev/programming-guides/encoding/#packed) +(equivalent to a `repeated myfield = 123 [packed=false]` in the definition). +Each item is encoded in the order they are parsed alongside other fields +whereas packed would require a second pass to combine all the items together. + +### Inner UDP/TCP ports + +```yaml +formatter: + fields: + - src_port_encap + - dst_port_encap + protobuf: + - name: src_port_encap + index: 1021 + type: string + array: true + - name: dst_port_encap + index: 1022 + type: string + array: true +sflow: + mapping: + - layer: "udp" + encap: true + offset: 0 + length: 16 + destination: src_port_encap + - layer: "udp" + encap: true + offset: 16 + length: 16 + destination: dst_port_encap + - layer: "tcp" + encap: true + offset: 0 + length: 16 + destination: src_port_encap + - layer: "tcp" + encap: true + offset: 16 + length: 16 + destination: dst_port_encap +``` + +### Inner IP addresses + +```yaml +formatter: + fields: + - src_ip_encap + - dst_ip_encap + protobuf: + - name: src_ip_encap + index: 1006 + type: string + array: true + - name: dst_ip_encap + index: 1007 + type: string + array: true + render: + src_ip_encap: ip + dst_ip_encap: ip +sflow: + mapping: + - layer: "ipv6" + encap: true + offset: 64 + length: 128 + destination: src_ip_encap + - layer: "ipv6" + encap: true + offset: 192 + length: 128 + destination: dst_ip_encap + - layer: "ipv4" + encap: true + offset: 96 + length: 32 + destination: src_ip_encap + - layer: "ipv4" + encap: true + offset: 128 + length: 32 + destination: dst_ip_encap +``` diff --git a/format/binary/binary.go b/format/binary/binary.go index d47d88ea..fc556ed9 100644 --- a/format/binary/binary.go +++ b/format/binary/binary.go @@ -26,7 +26,7 @@ func (d *BinaryDriver) Format(data interface{}) ([]byte, []byte, error) { text, err := dataIf.MarshalBinary() return key, text, err } - return key, nil, format.ErrorNoSerializer + return key, nil, format.ErrNoSerializer } func init() { diff --git a/format/format.go b/format/format.go index 43074187..1a53735b 100644 --- a/format/format.go +++ b/format/format.go @@ -9,8 +9,8 @@ var ( formatDrivers = make(map[string]FormatDriver) lock = &sync.RWMutex{} - ErrorFormat = fmt.Errorf("format error") - ErrorNoSerializer = fmt.Errorf("message is not serializable") + ErrFormat = fmt.Errorf("format error") + ErrNoSerializer = fmt.Errorf("message is not serializable") ) type DriverFormatError struct { @@ -23,7 +23,7 @@ func (e *DriverFormatError) Error() string { } func (e *DriverFormatError) Unwrap() []error { - return []error{ErrorFormat, e.Err} + return []error{ErrFormat, e.Err} } type FormatDriver interface { @@ -67,7 +67,7 @@ func FindFormat(name string) (*Format, error) { t, ok := formatDrivers[name] lock.RUnlock() if !ok { - return nil, fmt.Errorf("%w %s not found", ErrorFormat, name) + return nil, fmt.Errorf("%w %s not found", ErrFormat, name) } err := t.Init() diff --git a/format/text/text.go b/format/text/text.go index d1c68fa8..d7d123f7 100644 --- a/format/text/text.go +++ b/format/text/text.go @@ -29,7 +29,7 @@ func (d *TextDriver) Format(data interface{}) ([]byte, []byte, error) { if dataIf, ok := data.(interface{ String() string }); ok { return key, []byte(dataIf.String()), nil } - return key, nil, format.ErrorNoSerializer + return key, nil, format.ErrNoSerializer } func init() { diff --git a/pb/flow.pb.go b/pb/flow.pb.go index 6194e3d3..66c1a4cb 100644 --- a/pb/flow.pb.go +++ b/pb/flow.pb.go @@ -75,6 +75,92 @@ func (FlowMessage_FlowType) EnumDescriptor() ([]byte, []int) { return file_pb_flow_proto_rawDescGZIP(), []int{0, 0} } +// Encapsulation +type FlowMessage_LayerStack int32 + +const ( + FlowMessage_Ethernet FlowMessage_LayerStack = 0 + FlowMessage_IPv4 FlowMessage_LayerStack = 1 + FlowMessage_IPv6 FlowMessage_LayerStack = 2 + FlowMessage_TCP FlowMessage_LayerStack = 3 + FlowMessage_UDP FlowMessage_LayerStack = 4 + FlowMessage_MPLS FlowMessage_LayerStack = 5 + FlowMessage_Dot1Q FlowMessage_LayerStack = 6 + FlowMessage_ICMP FlowMessage_LayerStack = 7 + FlowMessage_ICMPv6 FlowMessage_LayerStack = 8 + FlowMessage_GRE FlowMessage_LayerStack = 9 + FlowMessage_IPv6HeaderRouting FlowMessage_LayerStack = 10 + FlowMessage_IPv6HeaderFragment FlowMessage_LayerStack = 11 + FlowMessage_Geneve FlowMessage_LayerStack = 12 + FlowMessage_Teredo FlowMessage_LayerStack = 13 + FlowMessage_Custom FlowMessage_LayerStack = 99 // todo: add nsh +) + +// Enum value maps for FlowMessage_LayerStack. +var ( + FlowMessage_LayerStack_name = map[int32]string{ + 0: "Ethernet", + 1: "IPv4", + 2: "IPv6", + 3: "TCP", + 4: "UDP", + 5: "MPLS", + 6: "Dot1Q", + 7: "ICMP", + 8: "ICMPv6", + 9: "GRE", + 10: "IPv6HeaderRouting", + 11: "IPv6HeaderFragment", + 12: "Geneve", + 13: "Teredo", + 99: "Custom", + } + FlowMessage_LayerStack_value = map[string]int32{ + "Ethernet": 0, + "IPv4": 1, + "IPv6": 2, + "TCP": 3, + "UDP": 4, + "MPLS": 5, + "Dot1Q": 6, + "ICMP": 7, + "ICMPv6": 8, + "GRE": 9, + "IPv6HeaderRouting": 10, + "IPv6HeaderFragment": 11, + "Geneve": 12, + "Teredo": 13, + "Custom": 99, + } +) + +func (x FlowMessage_LayerStack) Enum() *FlowMessage_LayerStack { + p := new(FlowMessage_LayerStack) + *p = x + return p +} + +func (x FlowMessage_LayerStack) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (FlowMessage_LayerStack) Descriptor() protoreflect.EnumDescriptor { + return file_pb_flow_proto_enumTypes[1].Descriptor() +} + +func (FlowMessage_LayerStack) Type() protoreflect.EnumType { + return &file_pb_flow_proto_enumTypes[1] +} + +func (x FlowMessage_LayerStack) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use FlowMessage_LayerStack.Descriptor instead. +func (FlowMessage_LayerStack) EnumDescriptor() ([]byte, []int) { + return file_pb_flow_proto_rawDescGZIP(), []int{0, 1} +} + type FlowMessage struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -138,11 +224,15 @@ type FlowMessage struct { BgpCommunities []uint32 `protobuf:"varint,101,rep,packed,name=bgp_communities,json=bgpCommunities,proto3" json:"bgp_communities,omitempty"` AsPath []uint32 `protobuf:"varint,102,rep,packed,name=as_path,json=asPath,proto3" json:"as_path,omitempty"` // MPLS information - MplsTtl []uint32 `protobuf:"varint,80,rep,packed,name=mpls_ttl,json=mplsTtl,proto3" json:"mpls_ttl,omitempty"` - MplsLabel []uint32 `protobuf:"varint,81,rep,packed,name=mpls_label,json=mplsLabel,proto3" json:"mpls_label,omitempty"` - MplsIp [][]byte `protobuf:"bytes,82,rep,name=mpls_ip,json=mplsIp,proto3" json:"mpls_ip,omitempty"` - ObservationDomainId uint32 `protobuf:"varint,70,opt,name=observation_domain_id,json=observationDomainId,proto3" json:"observation_domain_id,omitempty"` - ObservationPointId uint32 `protobuf:"varint,71,opt,name=observation_point_id,json=observationPointId,proto3" json:"observation_point_id,omitempty"` + MplsTtl []uint32 `protobuf:"varint,80,rep,packed,name=mpls_ttl,json=mplsTtl,proto3" json:"mpls_ttl,omitempty"` + MplsLabel []uint32 `protobuf:"varint,81,rep,packed,name=mpls_label,json=mplsLabel,proto3" json:"mpls_label,omitempty"` + MplsIp [][]byte `protobuf:"bytes,82,rep,name=mpls_ip,json=mplsIp,proto3" json:"mpls_ip,omitempty"` + ObservationDomainId uint32 `protobuf:"varint,70,opt,name=observation_domain_id,json=observationDomainId,proto3" json:"observation_domain_id,omitempty"` + ObservationPointId uint32 `protobuf:"varint,71,opt,name=observation_point_id,json=observationPointId,proto3" json:"observation_point_id,omitempty"` + LayerStack []FlowMessage_LayerStack `protobuf:"varint,103,rep,packed,name=layer_stack,json=layerStack,proto3,enum=flowpb.FlowMessage_LayerStack" json:"layer_stack,omitempty"` + LayerSize []uint32 `protobuf:"varint,104,rep,packed,name=layer_size,json=layerSize,proto3" json:"layer_size,omitempty"` + Ipv6RoutingHeaderAddresses [][]byte `protobuf:"bytes,105,rep,name=ipv6_routing_header_addresses,json=ipv6RoutingHeaderAddresses,proto3" json:"ipv6_routing_header_addresses,omitempty"` // SRv6 + Ipv6RoutingHeaderSegLeft uint32 `protobuf:"varint,106,opt,name=ipv6_routing_header_seg_left,json=ipv6RoutingHeaderSegLeft,proto3" json:"ipv6_routing_header_seg_left,omitempty"` // SRv6 } func (x *FlowMessage) Reset() { @@ -499,11 +589,39 @@ func (x *FlowMessage) GetObservationPointId() uint32 { return 0 } +func (x *FlowMessage) GetLayerStack() []FlowMessage_LayerStack { + if x != nil { + return x.LayerStack + } + return nil +} + +func (x *FlowMessage) GetLayerSize() []uint32 { + if x != nil { + return x.LayerSize + } + return nil +} + +func (x *FlowMessage) GetIpv6RoutingHeaderAddresses() [][]byte { + if x != nil { + return x.Ipv6RoutingHeaderAddresses + } + return nil +} + +func (x *FlowMessage) GetIpv6RoutingHeaderSegLeft() uint32 { + if x != nil { + return x.Ipv6RoutingHeaderSegLeft + } + return 0 +} + var File_pb_flow_proto protoreflect.FileDescriptor var file_pb_flow_proto_rawDesc = []byte{ 0x0a, 0x0d, 0x70, 0x62, 0x2f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, - 0x06, 0x66, 0x6c, 0x6f, 0x77, 0x70, 0x62, 0x22, 0xf1, 0x0b, 0x0a, 0x0b, 0x46, 0x6c, 0x6f, 0x77, + 0x06, 0x66, 0x6c, 0x6f, 0x77, 0x70, 0x62, 0x22, 0x9e, 0x0f, 0x0a, 0x0b, 0x46, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x30, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1c, 0x2e, 0x66, 0x6c, 0x6f, 0x77, 0x70, 0x62, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x54, @@ -593,15 +711,42 @@ var file_pb_flow_proto_rawDesc = []byte{ 0x69, 0x6e, 0x49, 0x64, 0x12, 0x30, 0x0a, 0x14, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x47, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x12, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x50, - 0x6f, 0x69, 0x6e, 0x74, 0x49, 0x64, 0x22, 0x53, 0x0a, 0x08, 0x46, 0x6c, 0x6f, 0x77, 0x54, 0x79, - 0x70, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x46, 0x4c, 0x4f, 0x57, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, - 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x35, 0x10, 0x01, - 0x12, 0x0e, 0x0a, 0x0a, 0x4e, 0x45, 0x54, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x56, 0x35, 0x10, 0x02, - 0x12, 0x0e, 0x0a, 0x0a, 0x4e, 0x45, 0x54, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x56, 0x39, 0x10, 0x03, - 0x12, 0x09, 0x0a, 0x05, 0x49, 0x50, 0x46, 0x49, 0x58, 0x10, 0x04, 0x42, 0x29, 0x5a, 0x27, 0x67, - 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x65, 0x74, 0x73, 0x61, 0x6d, - 0x70, 0x6c, 0x65, 0x72, 0x2f, 0x67, 0x6f, 0x66, 0x6c, 0x6f, 0x77, 0x32, 0x2f, 0x70, 0x62, 0x3b, - 0x66, 0x6c, 0x6f, 0x77, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6f, 0x69, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x3f, 0x0a, 0x0b, 0x6c, 0x61, 0x79, 0x65, 0x72, 0x5f, + 0x73, 0x74, 0x61, 0x63, 0x6b, 0x18, 0x67, 0x20, 0x03, 0x28, 0x0e, 0x32, 0x1e, 0x2e, 0x66, 0x6c, + 0x6f, 0x77, 0x70, 0x62, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x2e, 0x4c, 0x61, 0x79, 0x65, 0x72, 0x53, 0x74, 0x61, 0x63, 0x6b, 0x52, 0x0a, 0x6c, 0x61, 0x79, + 0x65, 0x72, 0x53, 0x74, 0x61, 0x63, 0x6b, 0x12, 0x1d, 0x0a, 0x0a, 0x6c, 0x61, 0x79, 0x65, 0x72, + 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x68, 0x20, 0x03, 0x28, 0x0d, 0x52, 0x09, 0x6c, 0x61, 0x79, + 0x65, 0x72, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x41, 0x0a, 0x1d, 0x69, 0x70, 0x76, 0x36, 0x5f, 0x72, + 0x6f, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x5f, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x5f, 0x61, 0x64, + 0x64, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x18, 0x69, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x1a, 0x69, + 0x70, 0x76, 0x36, 0x52, 0x6f, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, + 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x12, 0x3e, 0x0a, 0x1c, 0x69, 0x70, 0x76, + 0x36, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x5f, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, + 0x5f, 0x73, 0x65, 0x67, 0x5f, 0x6c, 0x65, 0x66, 0x74, 0x18, 0x6a, 0x20, 0x01, 0x28, 0x0d, 0x52, + 0x18, 0x69, 0x70, 0x76, 0x36, 0x52, 0x6f, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x48, 0x65, 0x61, 0x64, + 0x65, 0x72, 0x53, 0x65, 0x67, 0x4c, 0x65, 0x66, 0x74, 0x22, 0x53, 0x0a, 0x08, 0x46, 0x6c, 0x6f, + 0x77, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x46, 0x4c, 0x4f, 0x57, 0x55, 0x4e, 0x4b, + 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x46, 0x4c, 0x4f, 0x57, 0x5f, + 0x35, 0x10, 0x01, 0x12, 0x0e, 0x0a, 0x0a, 0x4e, 0x45, 0x54, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x56, + 0x35, 0x10, 0x02, 0x12, 0x0e, 0x0a, 0x0a, 0x4e, 0x45, 0x54, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x56, + 0x39, 0x10, 0x03, 0x12, 0x09, 0x0a, 0x05, 0x49, 0x50, 0x46, 0x49, 0x58, 0x10, 0x04, 0x22, 0xc7, + 0x01, 0x0a, 0x0a, 0x4c, 0x61, 0x79, 0x65, 0x72, 0x53, 0x74, 0x61, 0x63, 0x6b, 0x12, 0x0c, 0x0a, + 0x08, 0x45, 0x74, 0x68, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x49, + 0x50, 0x76, 0x34, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x49, 0x50, 0x76, 0x36, 0x10, 0x02, 0x12, + 0x07, 0x0a, 0x03, 0x54, 0x43, 0x50, 0x10, 0x03, 0x12, 0x07, 0x0a, 0x03, 0x55, 0x44, 0x50, 0x10, + 0x04, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x50, 0x4c, 0x53, 0x10, 0x05, 0x12, 0x09, 0x0a, 0x05, 0x44, + 0x6f, 0x74, 0x31, 0x51, 0x10, 0x06, 0x12, 0x08, 0x0a, 0x04, 0x49, 0x43, 0x4d, 0x50, 0x10, 0x07, + 0x12, 0x0a, 0x0a, 0x06, 0x49, 0x43, 0x4d, 0x50, 0x76, 0x36, 0x10, 0x08, 0x12, 0x07, 0x0a, 0x03, + 0x47, 0x52, 0x45, 0x10, 0x09, 0x12, 0x15, 0x0a, 0x11, 0x49, 0x50, 0x76, 0x36, 0x48, 0x65, 0x61, + 0x64, 0x65, 0x72, 0x52, 0x6f, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x10, 0x0a, 0x12, 0x16, 0x0a, 0x12, + 0x49, 0x50, 0x76, 0x36, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x46, 0x72, 0x61, 0x67, 0x6d, 0x65, + 0x6e, 0x74, 0x10, 0x0b, 0x12, 0x0a, 0x0a, 0x06, 0x47, 0x65, 0x6e, 0x65, 0x76, 0x65, 0x10, 0x0c, + 0x12, 0x0a, 0x0a, 0x06, 0x54, 0x65, 0x72, 0x65, 0x64, 0x6f, 0x10, 0x0d, 0x12, 0x0a, 0x0a, 0x06, + 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x10, 0x63, 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, + 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x65, 0x74, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, + 0x72, 0x2f, 0x67, 0x6f, 0x66, 0x6c, 0x6f, 0x77, 0x32, 0x2f, 0x70, 0x62, 0x3b, 0x66, 0x6c, 0x6f, + 0x77, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -616,19 +761,21 @@ func file_pb_flow_proto_rawDescGZIP() []byte { return file_pb_flow_proto_rawDescData } -var file_pb_flow_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_pb_flow_proto_enumTypes = make([]protoimpl.EnumInfo, 2) var file_pb_flow_proto_msgTypes = make([]protoimpl.MessageInfo, 1) var file_pb_flow_proto_goTypes = []interface{}{ - (FlowMessage_FlowType)(0), // 0: flowpb.FlowMessage.FlowType - (*FlowMessage)(nil), // 1: flowpb.FlowMessage + (FlowMessage_FlowType)(0), // 0: flowpb.FlowMessage.FlowType + (FlowMessage_LayerStack)(0), // 1: flowpb.FlowMessage.LayerStack + (*FlowMessage)(nil), // 2: flowpb.FlowMessage } var file_pb_flow_proto_depIdxs = []int32{ 0, // 0: flowpb.FlowMessage.type:type_name -> flowpb.FlowMessage.FlowType - 1, // [1:1] is the sub-list for method output_type - 1, // [1:1] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name + 1, // 1: flowpb.FlowMessage.layer_stack:type_name -> flowpb.FlowMessage.LayerStack + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } func init() { file_pb_flow_proto_init() } @@ -655,7 +802,7 @@ func file_pb_flow_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_pb_flow_proto_rawDesc, - NumEnums: 1, + NumEnums: 2, NumMessages: 1, NumExtensions: 0, NumServices: 0, diff --git a/pb/flow.proto b/pb/flow.proto index ef726495..662ef304 100644 --- a/pb/flow.proto +++ b/pb/flow.proto @@ -94,4 +94,30 @@ message FlowMessage { uint32 observation_domain_id = 70; uint32 observation_point_id = 71; + + // Encapsulation + enum LayerStack { + Ethernet = 0; + IPv4 = 1; + IPv6 = 2; + TCP = 3; + UDP = 4; + MPLS = 5; + Dot1Q = 6; + ICMP = 7; + ICMPv6 = 8; + GRE = 9; + IPv6HeaderRouting = 10; + IPv6HeaderFragment = 11; + Geneve = 12; + Teredo = 13; + Custom = 99; + // todo: add nsh + } + repeated LayerStack layer_stack = 103; + repeated uint32 layer_size = 104; + + repeated bytes ipv6_routing_header_addresses = 105; // SRv6 + uint32 ipv6_routing_header_seg_left = 106; // SRv6 + } diff --git a/producer/proto/config.go b/producer/proto/config.go new file mode 100644 index 00000000..3ad82873 --- /dev/null +++ b/producer/proto/config.go @@ -0,0 +1,80 @@ +package protoproducer + +import ( + "github.com/netsampler/goflow2/v2/decoders/netflow" +) + +type EndianType string +type ProtoType string + +var ( + BigEndian EndianType = "big" + LittleEndian EndianType = "little" + + ProtoString ProtoType = "string" + ProtoVarint ProtoType = "varint" + + ProtoTypeMap = map[string]ProtoType{ + string(ProtoString): ProtoString, + string(ProtoVarint): ProtoVarint, + "bytes": ProtoString, + } +) + +// MappableField is the interface that allows a flow's field to be mapped to a specific protobuf field. +// Provided by Template Mapper's function. +type MappableField interface { + GetEndianness() EndianType + GetDestination() string + GetProtoIndex() int32 + GetProtoType() ProtoType + IsArray() bool +} + +// MappableByteField is the interface, similar to MappableField, but for direct packet parsing. +// Provided by PacketMapper. +type MappableByteField interface { + MappableField + GetOffset() int + GetLength() int + IsEncapsulated() bool +} + +// TemplateMapper is the interface to returns the mapping information for a specific type of template field +type TemplateMapper interface { + Map(field netflow.DataField) (MappableField, bool) +} + +// MapLayerIterator is the interface to obtain subsequent mapping information +type MapLayerIterator interface { + Next() MappableByteField // returns the next MappableByteField. Function is called by the packet parser until returns nil. +} + +// PacketLayerMapper is the interface to obtain the mapping information for a layer of a packet +type PacketLayerMapper interface { + Map(layer string) MapLayerIterator // returns an iterator to avoid handling arrays +} + +// PacketMapper is the interface to parse a packet into a flow message +type PacketMapper interface { + ParsePacket(flowMessage ProtoProducerMessageIf, data []byte) (err error) +} + +// FormatterMapper returns the configuration statements for the textual formatting of the protobuf messages +type FormatterMapper interface { + Keys() []string + Fields() []string + Rename(name string) (string, bool) + Remap(name string) (string, bool) + Render(name string) (RenderFunc, bool) + NumToProtobuf(num int32) (ProtobufFormatterConfig, bool) + IsArray(name string) bool +} + +// ProtoProducerConfig is the top level configuration for a general flow to protobuf producer +type ProtoProducerConfig interface { + GetFormatter() FormatterMapper + GetIPFIXMapper() TemplateMapper + GetNetFlowMapper() TemplateMapper + GetPacketMapper() PacketMapper +} diff --git a/producer/proto/custom.go b/producer/proto/config_impl.go similarity index 59% rename from producer/proto/custom.go rename to producer/proto/config_impl.go index 348e46e7..b00dfb3f 100644 --- a/producer/proto/custom.go +++ b/producer/proto/config_impl.go @@ -3,10 +3,24 @@ package protoproducer import ( "fmt" "reflect" + "strings" "github.com/netsampler/goflow2/v2/decoders/netflow" ) +var ( + isSliceMap = map[string]bool{ + "BgpCommunities": true, + "AsPath": true, + "MplsIp": true, + "MplsLabel": true, + "MplsTtl": true, + "LayerStack": true, + "LayerSize": true, + "Ipv6RoutingHeaderAddresses": true, + } +) + type NetFlowMapField struct { PenProvided bool `yaml:"penprovided"` Type uint16 `yaml:"field"` @@ -19,32 +33,40 @@ type NetFlowMapField struct { type IPFIXProducerConfig struct { Mapping []NetFlowMapField `yaml:"mapping"` - //PacketMapping []SFlowMapField `json:"packet-mapping"` // for embedded frames: use sFlow configuration } type NetFlowV9ProducerConfig struct { - Mapping []NetFlowMapField `json:"mapping"` + Mapping []NetFlowMapField `yaml:"mapping"` } type SFlowMapField struct { - Layer string `yaml:"layer"` - Offset int `yaml:"offset"` // offset in bits - Length int `yaml:"length"` // length in bits + Layer string `yaml:"layer"` + Encapsulated bool `yaml:"encap"` // only parse if encapsulated + Offset int `yaml:"offset"` // offset in bits + Length int `yaml:"length"` // length in bits Destination string `yaml:"destination"` Endian EndianType `yaml:"endianness"` //DestinationLength uint8 `json:"dlen"` } +type SFlowProtocolParse struct { + Proto string `yaml:"proto"` + Dir RegPortDir `yaml:"dir"` + Port uint16 `yaml:"port"` + Parser string `yaml:"parser"` +} + type SFlowProducerConfig struct { - Mapping []SFlowMapField `yaml:"mapping"` + Mapping []SFlowMapField `yaml:"mapping"` + Ports []SFlowProtocolParse `yaml:"ports"` } type ProtobufFormatterConfig struct { - Name string - Index int32 - Type string - Array bool + Name string `yaml:"name"` + Index int32 `yaml:"index"` + Type string `yaml:"type"` + Array bool `yaml:"array"` } type FormatterConfig struct { @@ -65,6 +87,35 @@ type ProducerConfig struct { // should do a rename map list for when printing } +func (c *ProducerConfig) Compile() (ProtoProducerConfig, error) { + return mapConfig(c) +} + +// Optimized version of a configuration to be used by a protobuf producer +type producerConfigMapped struct { + Formatter *FormatterConfigMapper + + IPFIX *NetFlowMapper + NetFlowV9 *NetFlowMapper + SFlow *SFlowMapper +} + +func (c *producerConfigMapped) GetFormatter() FormatterMapper { + return c.Formatter +} + +func (c *producerConfigMapped) GetIPFIXMapper() TemplateMapper { + return c.IPFIX +} + +func (c *producerConfigMapped) GetNetFlowMapper() TemplateMapper { + return c.NetFlowV9 +} + +func (c *producerConfigMapped) GetPacketMapper() PacketMapper { + return c.SFlow +} + type DataMap struct { MapConfigBase } @@ -80,38 +131,146 @@ type FormatterConfigMapper struct { isSlice map[string]bool } +func (f *FormatterConfigMapper) Keys() []string { + return f.key +} + +func (f *FormatterConfigMapper) Fields() []string { + return f.fields +} + +func (f *FormatterConfigMapper) Rename(name string) (string, bool) { + r, ok := f.rename[name] + return r, ok +} + +func (f *FormatterConfigMapper) Remap(name string) (string, bool) { + r, ok := f.reMap[name] + return r, ok +} + +func (f *FormatterConfigMapper) Render(name string) (RenderFunc, bool) { + r, ok := f.render[name] + return r, ok +} + +func (f *FormatterConfigMapper) NumToProtobuf(num int32) (ProtobufFormatterConfig, bool) { + r, ok := f.numToPb[num] + return r, ok +} + +func (f *FormatterConfigMapper) IsArray(name string) bool { + return f.isSlice[name] +} + type NetFlowMapper struct { - data map[string]DataMap // maps field to destination + data map[string]*DataMap // maps field to destination } -func (m *NetFlowMapper) Map(field netflow.DataField) (DataMap, bool) { +func (m *NetFlowMapper) Map(field netflow.DataField) (MappableField, bool) { + if m == nil { + return &DataMap{}, false + } mapped, found := m.data[fmt.Sprintf("%v-%d-%d", field.PenProvided, field.Pen, field.Type)] return mapped, found } -type DataMapLayer struct { - MapConfigBase - Offset int - Length int +type SFlowMapper struct { + data map[string][]*DataMapLayer // map layer to list of offsets + parserEnvironment ParserEnvironment } -type SFlowMapper struct { - data map[string][]DataMapLayer // map layer to list of offsets +type sflowMapperIterator struct { + data []*DataMapLayer + n int +} + +func (i *sflowMapperIterator) Next() MappableByteField { + if len(i.data) <= i.n { + return nil + } + d := i.data[i.n] + i.n += 1 + return d +} + +func (m *SFlowMapper) Map(layer string) MapLayerIterator { + if m == nil { + return nil + } + return &sflowMapperIterator{data: m.data[strings.ToLower(layer)], n: 0} } -func GetSFlowConfigLayer(m *SFlowMapper, layer string) []DataMapLayer { +func (m *SFlowMapper) ParsePacket(flowMessage ProtoProducerMessageIf, data []byte) (err error) { if m == nil { return nil } - return m.data[layer] + return ParsePacket(flowMessage, data, m, m.parserEnvironment) +} + +// Structure to help the MapCustom functions +// populate the protobuf data +type MapConfigBase struct { + // Used if the field inside the protobuf exists + // also serves as the field when rendering with text + Destination string + Endianness EndianType + + // The following fields are used for mapping + // when the destination field does not exist + // inside the protobuf + ProtoIndex int32 + ProtoType ProtoType + ProtoArray bool +} + +func (c *MapConfigBase) GetEndianness() EndianType { + return c.Endianness +} + +func (c *MapConfigBase) GetDestination() string { + return c.Destination +} + +func (c *MapConfigBase) GetProtoIndex() int32 { + return c.ProtoIndex +} + +func (c *MapConfigBase) GetProtoType() ProtoType { + return c.ProtoType +} + +func (c *MapConfigBase) IsArray() bool { + return c.ProtoArray +} + +// Extended structure for packet mapping +type DataMapLayer struct { + MapConfigBase + Offset int + Length int + Encapsulated bool +} + +func (c *DataMapLayer) GetOffset() int { + return c.Offset +} + +func (c *DataMapLayer) GetLength() int { + return c.Length +} + +func (c *DataMapLayer) IsEncapsulated() bool { + return c.Encapsulated } func mapFieldsSFlow(fields []SFlowMapField) *SFlowMapper { - ret := make(map[string][]DataMapLayer) + ret := make(map[string][]*DataMapLayer) for _, field := range fields { - retLayerEntry := DataMapLayer{ - Offset: field.Offset, - Length: field.Length, + retLayerEntry := &DataMapLayer{ + Offset: field.Offset, + Length: field.Length, + Encapsulated: field.Encapsulated, } retLayerEntry.Destination = field.Destination retLayerEntry.Endianness = field.Endian @@ -119,26 +278,32 @@ func mapFieldsSFlow(fields []SFlowMapField) *SFlowMapper { retLayer = append(retLayer, retLayerEntry) ret[field.Layer] = retLayer } - return &SFlowMapper{ret} + return &SFlowMapper{data: ret} +} + +func mapPortsSFlow(ports []SFlowProtocolParse) (ParserEnvironment, error) { + e := NewBaseParserEnvironment() + for _, port := range ports { + parser, ok := e.GetParser(port.Parser) + if !ok { + return e, fmt.Errorf("parser %s not found", port.Parser) + } + if err := e.RegisterPort(port.Proto, port.Dir, port.Port, parser); err != nil { + return e, err + } + } + return e, nil } func mapFieldsNetFlow(fields []NetFlowMapField) *NetFlowMapper { - ret := make(map[string]DataMap) + ret := make(map[string]*DataMap) for _, field := range fields { - dm := DataMap{} + dm := &DataMap{} dm.Destination = field.Destination dm.Endianness = field.Endian ret[fmt.Sprintf("%v-%d-%d", field.PenProvided, field.Pen, field.Type)] = dm } - return &NetFlowMapper{ret} -} - -type producerConfigMapped struct { - Formatter *FormatterConfigMapper - - IPFIX *NetFlowMapper - NetFlowV9 *NetFlowMapper - SFlow *SFlowMapper + return &NetFlowMapper{data: ret} } func (c *producerConfigMapped) finalizemapDest(v *MapConfigBase) error { @@ -167,7 +332,6 @@ func (c *producerConfigMapped) finalizeSFlowMapper(m *SFlowMapper) error { } m.data[k][i] = v } - } return nil } @@ -231,13 +395,7 @@ func mapFormat(cfg *ProducerConfig) (*FormatterConfigMapper, error) { pbMap := make(map[string]ProtobufFormatterConfig) formatterMapped.render = make(map[string]RenderFunc) formatterMapped.rename = make(map[string]string) - formatterMapped.isSlice = map[string]bool{ - "BgpCommunities": true, - "AsPath": true, - "MplsIp": true, - "MplsLabel": true, - "MplsTtl": true, - } // todo: improve this with defaults + formatterMapped.isSlice = isSliceMap for k, v := range defaultRenderers { formatterMapped.render[k] = v } @@ -311,6 +469,11 @@ func mapConfig(cfg *ProducerConfig) (*producerConfigMapped, error) { newCfg.IPFIX = mapFieldsNetFlow(cfg.IPFIX.Mapping) newCfg.NetFlowV9 = mapFieldsNetFlow(cfg.NetFlowV9.Mapping) newCfg.SFlow = mapFieldsSFlow(cfg.SFlow.Mapping) + var err error + newCfg.SFlow.parserEnvironment, err = mapPortsSFlow(cfg.SFlow.Ports) + if err != nil { + return nil, err + } } var err error if newCfg.Formatter, err = mapFormat(cfg); err != nil { diff --git a/producer/proto/messages.go b/producer/proto/messages.go index 5595b9a8..b1a5fdb5 100644 --- a/producer/proto/messages.go +++ b/producer/proto/messages.go @@ -11,14 +11,23 @@ import ( "google.golang.org/protobuf/encoding/protodelim" "google.golang.org/protobuf/encoding/protowire" + "google.golang.org/protobuf/proto" flowmessage "github.com/netsampler/goflow2/v2/pb" ) +// ProtoProducerMessageIf interface to a flow message, used by parsers and tests +type ProtoProducerMessageIf interface { + GetFlowMessage() *ProtoProducerMessage // access the underlying structure + MapCustom(key string, v []byte, cfg MappableField) error // inject custom field +} + type ProtoProducerMessage struct { flowmessage.FlowMessage - formatter *FormatterConfigMapper + formatter FormatterMapper + + skipDelimiter bool // for binary marshalling, skips the varint prefix } var protoMessagePool = sync.Pool{ @@ -27,10 +36,29 @@ var protoMessagePool = sync.Pool{ }, } +func (m *ProtoProducerMessage) GetFlowMessage() *ProtoProducerMessage { + return m +} + +func (m *ProtoProducerMessage) MapCustom(key string, v []byte, cfg MappableField) error { + return MapCustom(m, v, cfg) +} + +func (m *ProtoProducerMessage) AddLayer(name string) (ok bool) { + value, ok := flowmessage.FlowMessage_LayerStack_value[name] + m.LayerStack = append(m.LayerStack, flowmessage.FlowMessage_LayerStack(value)) + return ok +} + func (m *ProtoProducerMessage) MarshalBinary() ([]byte, error) { buf := bytes.NewBuffer([]byte{}) - _, err := protodelim.MarshalTo(buf, m) - return buf.Bytes(), err + if m.skipDelimiter { + b, err := proto.Marshal(m) + return b, err + } else { + _, err := protodelim.MarshalTo(buf, m) + return buf.Bytes(), err + } } func (m *ProtoProducerMessage) MarshalText() ([]byte, error) { @@ -43,11 +71,11 @@ func (m *ProtoProducerMessage) baseKey(h hash.Hash) { unkMap := m.mapUnknown() // todo: should be able to reuse if set in structure - for _, s := range m.formatter.key { + for _, s := range m.formatter.Keys() { fieldName := s // get original name from structure - if fieldNameMap, ok := m.formatter.reMap[fieldName]; ok && fieldNameMap != "" { + if fieldNameMap, ok := m.formatter.Remap(fieldName); ok && fieldNameMap != "" { fieldName = fieldNameMap } @@ -68,7 +96,7 @@ func (m *ProtoProducerMessage) baseKey(h hash.Hash) { } func (m *ProtoProducerMessage) Key() []byte { - if m.formatter == nil || len(m.formatter.key) == 0 { + if len(m.formatter.Keys()) == 0 { return nil } h := fnv.New32() @@ -111,7 +139,7 @@ func (m *ProtoProducerMessage) mapUnknown() map[string]interface{} { offset += length // we check if the index is listed in the config - if pbField, ok := m.formatter.numToPb[int32(num)]; ok { + if pbField, ok := m.formatter.NumToProtobuf(int32(num)); ok { var dest interface{} var value interface{} @@ -150,27 +178,28 @@ func (m *ProtoProducerMessage) FormatMessageReflectCustom(ext, quotes, sep, sign vfm = reflect.Indirect(vfm) var i int - fstr := make([]string, len(m.formatter.fields)) // todo: reuse with pool + fields := m.formatter.Fields() + fstr := make([]string, len(fields)) // todo: reuse with pool unkMap := m.mapUnknown() // iterate through the fields requested by the user - for _, s := range m.formatter.fields { + for _, s := range fields { fieldName := s fieldFinalName := s - if fieldRename, ok := m.formatter.rename[s]; ok && fieldRename != "" { + if fieldRename, ok := m.formatter.Rename(s); ok && fieldRename != "" { fieldFinalName = fieldRename } // get original name from structure - if fieldNameMap, ok := m.formatter.reMap[fieldName]; ok && fieldNameMap != "" { + if fieldNameMap, ok := m.formatter.Remap(fieldName); ok && fieldNameMap != "" { fieldName = fieldNameMap } // get renderer - renderer, okRenderer := m.formatter.render[fieldName] - if !okRenderer { + renderer, okRenderer := m.formatter.Render(fieldName) + if !okRenderer { // todo: change to renderer check renderer = NilRenderer } @@ -187,34 +216,38 @@ func (m *ProtoProducerMessage) FormatMessageReflectCustom(ext, quotes, sep, sign } } - isSlice := m.formatter.isSlice[fieldName] + isSlice := m.formatter.IsArray(fieldName) // render each item of the array independently // note: isSlice is necessary to consider certain byte arrays in their entirety // eg: IP addresses if isSlice { - c := fieldValue.Len() v := "[" - for i := 0; i < c; i++ { - fieldValueI := fieldValue.Index(i) - var val interface{} - if fieldValueI.IsValid() { - val = fieldValueI.Interface() - } - rendered := renderer(m, fieldName, val) - if rendered == nil { - continue - } - renderedType := reflect.TypeOf(rendered) - if renderedType.Kind() == reflect.String { - v += fmt.Sprintf("%s%v%s", quotes, rendered, quotes) - } else { - v += fmt.Sprintf("%v", rendered) - } + if fieldValue.IsValid() { - if i < c-1 { - v += "," + c := fieldValue.Len() + for i := 0; i < c; i++ { + fieldValueI := fieldValue.Index(i) + var val interface{} + if fieldValueI.IsValid() { + val = fieldValueI.Interface() + } + + rendered := renderer(m, fieldName, val) + if rendered == nil { + continue + } + renderedType := reflect.TypeOf(rendered) + if renderedType.Kind() == reflect.String { + v += fmt.Sprintf("%s%v%s", quotes, rendered, quotes) + } else { + v += fmt.Sprintf("%v", rendered) + } + + if i < c-1 { + v += "," + } } } v += "]" diff --git a/producer/proto/producer_nf.go b/producer/proto/producer_nf.go index 7e67ab0e..53cd9e96 100644 --- a/producer/proto/producer_nf.go +++ b/producer/proto/producer_nf.go @@ -263,7 +263,7 @@ func addrReplaceCheck(dstAddr *[]byte, v []byte, eType *uint32, ipv6 bool) { } } -func ConvertNetFlowDataSet(flowMessage *ProtoProducerMessage, version uint16, baseTime uint32, uptime uint32, record []netflow.DataField, mapperNetFlow *NetFlowMapper, mapperSFlow *SFlowMapper) error { +func ConvertNetFlowDataSet(flowMessage *ProtoProducerMessage, version uint16, baseTime uint32, uptime uint32, record []netflow.DataField, mapperNetFlow TemplateMapper, mapperSFlow PacketMapper) error { var time uint64 baseTimeNs := uint64(baseTime) * 1000000000 // the following should be overriden if the template contains timing information @@ -614,7 +614,7 @@ func ConvertNetFlowDataSet(flowMessage *ProtoProducerMessage, version uint16, ba } flowMessage.Packets = 1 case netflow.IPFIX_FIELD_dataLinkFrameSection: - if err := ParseEthernetHeader(flowMessage, v, mapperSFlow); err != nil { + if err := mapperSFlow.ParsePacket(flowMessage, v); err != nil { return err } flowMessage.Packets = 1 @@ -629,7 +629,7 @@ func ConvertNetFlowDataSet(flowMessage *ProtoProducerMessage, version uint16, ba return nil } -func SearchNetFlowDataSetsRecords(version uint16, baseTime uint32, uptime uint32, dataRecords []netflow.DataRecord, mapperNetFlow *NetFlowMapper, mapperSFlow *SFlowMapper) (flowMessageSet []producer.ProducerMessage, err error) { +func SearchNetFlowDataSetsRecords(version uint16, baseTime uint32, uptime uint32, dataRecords []netflow.DataRecord, mapperNetFlow TemplateMapper, mapperSFlow PacketMapper) (flowMessageSet []producer.ProducerMessage, err error) { for _, record := range dataRecords { fmsg := protoMessagePool.Get().(*ProtoProducerMessage) fmsg.Reset() @@ -643,7 +643,7 @@ func SearchNetFlowDataSetsRecords(version uint16, baseTime uint32, uptime uint32 return flowMessageSet, nil } -func SearchNetFlowDataSets(version uint16, baseTime uint32, uptime uint32, dataFlowSet []netflow.DataFlowSet, mapperNetFlow *NetFlowMapper, mapperSFlow *SFlowMapper) (flowMessageSet []producer.ProducerMessage, err error) { +func SearchNetFlowDataSets(version uint16, baseTime uint32, uptime uint32, dataFlowSet []netflow.DataFlowSet, mapperNetFlow TemplateMapper, mapperSFlow PacketMapper) (flowMessageSet []producer.ProducerMessage, err error) { for _, dataFlowSetItem := range dataFlowSet { fmsg, err := SearchNetFlowDataSetsRecords(version, baseTime, uptime, dataFlowSetItem.Records, mapperNetFlow, mapperSFlow) if err != nil { @@ -715,18 +715,18 @@ func SplitIPFIXSets(packetIPFIX netflow.IPFIXPacket) ([]netflow.DataFlowSet, []n // Convert a NetFlow datastructure to a FlowMessage protobuf // Does not put sampling rate -func ProcessMessageIPFIXConfig(packet *netflow.IPFIXPacket, samplingRateSys SamplingRateSystem, config *producerConfigMapped) (flowMessageSet []producer.ProducerMessage, err error) { +func ProcessMessageIPFIXConfig(packet *netflow.IPFIXPacket, samplingRateSys SamplingRateSystem, config ProtoProducerConfig) (flowMessageSet []producer.ProducerMessage, err error) { dataFlowSet, _, _, optionDataFlowSet := SplitIPFIXSets(*packet) seqnum := packet.SequenceNumber baseTime := packet.ExportTime obsDomainId := packet.ObservationDomainId - var cfgIpfix *NetFlowMapper - var cfgSflow *SFlowMapper + var cfgIpfix TemplateMapper + var cfgSflow PacketMapper if config != nil { - cfgIpfix = config.IPFIX - cfgSflow = config.SFlow + cfgIpfix = config.GetIPFIXMapper() + cfgSflow = config.GetPacketMapper() } flowMessageSet, err = SearchNetFlowDataSets(10, baseTime, 0, dataFlowSet, cfgIpfix, cfgSflow) if err != nil { @@ -758,7 +758,7 @@ func ProcessMessageIPFIXConfig(packet *netflow.IPFIXPacket, samplingRateSys Samp // Convert a NetFlow datastructure to a FlowMessage protobuf // Does not put sampling rate -func ProcessMessageNetFlowV9Config(packet *netflow.NFv9Packet, samplingRateSys SamplingRateSystem, config *producerConfigMapped) (flowMessageSet []producer.ProducerMessage, err error) { +func ProcessMessageNetFlowV9Config(packet *netflow.NFv9Packet, samplingRateSys SamplingRateSystem, config ProtoProducerConfig) (flowMessageSet []producer.ProducerMessage, err error) { dataFlowSet, _, _, optionDataFlowSet := SplitNetFlowSets(*packet) seqnum := packet.SequenceNumber @@ -766,11 +766,11 @@ func ProcessMessageNetFlowV9Config(packet *netflow.NFv9Packet, samplingRateSys S uptime := packet.SystemUptime obsDomainId := packet.SourceId - var cfg *NetFlowMapper + var cfgNetFlow TemplateMapper if config != nil { - cfg = config.NetFlowV9 + cfgNetFlow = config.GetNetFlowMapper() } - flowMessageSet, err = SearchNetFlowDataSets(9, baseTime, uptime, dataFlowSet, cfg, nil) + flowMessageSet, err = SearchNetFlowDataSets(9, baseTime, uptime, dataFlowSet, cfgNetFlow, nil) if err != nil { return flowMessageSet, err } diff --git a/producer/proto/producer_packet.go b/producer/proto/producer_packet.go new file mode 100644 index 00000000..2d032d3a --- /dev/null +++ b/producer/proto/producer_packet.go @@ -0,0 +1,860 @@ +package protoproducer + +import ( + "encoding/binary" + "fmt" + "sync" +) + +type ParserEnvironment interface { + NextParserEtype(etherType []byte) (ParserInfo, error) + NextParserProto(proto byte) (ParserInfo, error) + NextParserPort(proto string, srcPort, dstPort uint16) (ParserInfo, error) +} + +type RegPortDir string + +var ( + PortDirSrc RegPortDir = "src" + PortDirDst RegPortDir = "dst" + PortDirBoth RegPortDir = "both" + + errParserEmpty = fmt.Errorf("parser is nil") + + parserNone = ParserInfo{ + nil, + "none", + nil, + 100, + 9999, + false, + } + parserPayload = ParserInfo{ + nil, + "payload", + []string{"payload", "7"}, + 100, + 9998, + false, + } + + parserEthernet = ParserInfo{ + nil, //ParseEthernet2, + "ethernet", + []string{"ethernet", "2"}, + 20, + 1, + false, + } + parser8021Q = ParserInfo{ + nil, //Parse8021Q2, + "dot1q", + []string{"dot1q"}, + 25, + 2, + true, + } + parserMPLS = ParserInfo{ + nil, //ParseMPLS2, + "mpls", + []string{"mpls"}, + 25, + 3, + true, + } + parserIPv4 = ParserInfo{ + nil, //ParseIPv42, + "ipv4", + []string{"ipv4", "ip", "3"}, + 30, + 4, + false, + } + parserIPv6 = ParserInfo{ + nil, //ParseIPv62, + "ipv6", + []string{"ipv6", "ip", "3"}, + 30, + 5, + false, + } + parserIPv6HeaderRouting = ParserInfo{ + nil, //ParseIPv6HeaderRouting2, + "ipv6-route", + []string{"ipv6eh_routing", "ipv6-route", "ipv6eh"}, + 35, + 7, + false, + } + parserIPv6HeaderFragment = ParserInfo{ + nil, //ParseIPv6HeaderFragment2, + "ipv6-frag", + []string{"ipv6eh_fragment", "ipv6-frag", "ipv6eh"}, + 35, + 6, + true, + } + parserTCP = ParserInfo{ + nil, //ParseTCP2, + "tcp", + []string{"tcp", "4"}, + 40, + 8, + false, + } + parserUDP = ParserInfo{ + nil, //ParseUDP2, + "udp", + []string{"udp", "4"}, + 40, + 9, + false, + } + parserICMP = ParserInfo{ + nil, //ParseICMP2, + "icmp", + []string{"icmp"}, + 70, + 10, + false, + } + parserICMPv6 = ParserInfo{ + nil, //ParseICMPv62, + "ipv6-icmp", + []string{"icmpv6", "ipv6-icmp"}, + 70, + 11, + false, + } + parserGRE = ParserInfo{ + nil, //ParseGRE2, + "gre", + []string{"gre"}, + 40, + 12, + false, + } + parserTeredoDst = ParserInfo{ + nil, //ParseTeredoDst, + "teredo-dst", + []string{"teredo-dst", "teredo"}, + 40, + 13, + false, + } + parserGeneve = ParserInfo{ + nil, //ParseTeredoDst, + "geneve", + []string{"geneve"}, + 40, + 14, + false, + } + + DefaultEnvironment *BaseParserEnvironment +) + +func init() { + // necessary to set here otherwise initialization loop compilation error + parserEthernet.Parser = ParseEthernet + parser8021Q.Parser = Parse8021Q + parserMPLS.Parser = ParseMPLS + parserIPv4.Parser = ParseIPv4 + parserIPv6.Parser = ParseIPv6 + parserIPv6HeaderRouting.Parser = ParseIPv6HeaderRouting + parserIPv6HeaderFragment.Parser = ParseIPv6HeaderFragment + parserTCP.Parser = ParseTCP + parserUDP.Parser = ParseUDP + parserICMP.Parser = ParseICMP + parserICMPv6.Parser = ParseICMPv6 + parserGRE.Parser = ParseGRE + parserTeredoDst.Parser = ParseTeredoDst + parserGeneve.Parser = ParseGeneve + + DefaultEnvironment = NewBaseParserEnvironment() +} + +type BaseParserEnvironment struct { + nameToParser *sync.Map + customEtype *sync.Map + customProto *sync.Map + customPort *sync.Map +} + +func NewBaseParserEnvironment() *BaseParserEnvironment { + e := &BaseParserEnvironment{} + e.nameToParser = &sync.Map{} + e.customEtype = &sync.Map{} + e.customProto = &sync.Map{} + e.customPort = &sync.Map{} + + // Load initial parsers by name + for _, p := range []ParserInfo{ + parserEthernet, + parser8021Q, + parserMPLS, + parserIPv4, + parserIPv6, + parserIPv6HeaderRouting, + parserIPv6HeaderFragment, + parserTCP, + parserUDP, + parserICMP, + parserICMPv6, + parserGRE, + parserTeredoDst, + parserGeneve, + } { + e.nameToParser.Store(p.Name, p) + } + + return e +} + +// GetParser returns a parser by name +func (e *BaseParserEnvironment) GetParser(name string) (info ParserInfo, ok bool) { + parser, ok := e.nameToParser.Load(name) + if ok { + return parser.(ParserInfo), ok + } + return info, ok +} + +// RegisterEtype adds or replace a parser used when decoding a protocol on top of layer 2 (eg: Ethernet). +func (e *BaseParserEnvironment) RegisterEtype(eType uint16, parser ParserInfo) error { + if parser.Parser == nil { + return errParserEmpty + } + e.customEtype.Store(eType, parser) // parser can be invoked to decode certain etypes + return nil +} + +// RegisterProto adds or replace a parser used when decoding a protocol on top of layer 3 (eg: IP). +func (e *BaseParserEnvironment) RegisterProto(proto byte, parser ParserInfo) error { + if parser.Parser == nil { + return errParserEmpty + } + e.customProto.Store(proto, parser) // parser can be invoked to decode certain protocols + return nil +} + +// RegisterPort adds or replace a parser used when decoding a protocol on top of layer 4 (eg: UDP). Port is used for source and destination +func (e *BaseParserEnvironment) RegisterPort(proto string, dir RegPortDir, port uint16, parser ParserInfo) error { + if parser.Parser == nil { + return errParserEmpty + } + switch dir { + case PortDirBoth: + e.customPort.Store(fmt.Sprintf("%s-src-%d", proto, port), parser) + e.customPort.Store(fmt.Sprintf("%s-dst-%d", proto, port), parser) + case PortDirSrc: + e.customPort.Store(fmt.Sprintf("%s-src-%d", proto, port), parser) + case PortDirDst: + e.customPort.Store(fmt.Sprintf("%s-dst-%d", proto, port), parser) + default: + return fmt.Errorf("unknown direction %s", dir) + } + + return nil +} + +func (e *BaseParserEnvironment) NextParserEtype(etherType []byte) (ParserInfo, error) { + info, err := e.innerNextParserEtype(etherType) + etypeNum := uint16(etherType[0]<<8) | uint16(etherType[1]) + info.ConfigKeyList = append(info.ConfigKeyList, fmt.Sprintf("etype%d", etypeNum), fmt.Sprintf("etype0x%.4x", etypeNum)) + return info, err +} + +func (e *BaseParserEnvironment) innerNextParserEtype(etherType []byte) (ParserInfo, error) { + if len(etherType) != 2 { + return parserNone, fmt.Errorf("wrong ether type") + } + + eType := uint16(etherType[0])<<8 | uint16(etherType[1]) + if cParser, ok := e.customEtype.Load(eType); ok { + return cParser.(ParserInfo), nil + } + + switch { + case eType == 0x199e: + return parserEthernet, nil // Transparent Ether Bridging (GRE) + case eType == 0x6558: + return parserEthernet, nil // Transparent Ether Bridging (Geneve) + case eType == 0x8847: + return parserMPLS, nil // MPLS + case eType == 0x8100: + return parser8021Q, nil // 802.1q + case eType == 0x0800: + return parserIPv4, nil // IPv4 + case eType == 0x86dd: + return parserIPv6, nil // IPv6 + case eType == 0x0806: + // ARP + } + return parserNone, nil +} + +func (e *BaseParserEnvironment) NextParserProto(proto byte) (ParserInfo, error) { + info, err := e.innerNextParserProto(proto) + info.ConfigKeyList = append(info.ConfigKeyList, fmt.Sprintf("proto%d", proto)) + return info, err +} + +func (e *BaseParserEnvironment) innerNextParserProto(proto byte) (ParserInfo, error) { + if cParser, ok := e.customProto.Load(proto); ok { + return cParser.(ParserInfo), nil + } + + switch { + case proto == 1: + return parserICMP, nil // ICMP + case proto == 4: + return parserIPv4, nil // IPIP + case proto == 6: + return parserTCP, nil // TCP + case proto == 17: + return parserUDP, nil // UDP + case proto == 41: + return parserIPv6, nil // IPv6IP + case proto == 43: + return parserIPv6HeaderRouting, nil // IPv6 EH Routing + case proto == 44: + return parserIPv6HeaderFragment, nil // IPv6 EH Fragment + case proto == 47: + return parserGRE, nil // GRE + case proto == 58: + return parserICMPv6, nil // ICMPv6 + case proto == 115: + // L2TP + } + return parserNone, nil +} + +func (e *BaseParserEnvironment) NextParserPort(proto string, srcPort, dstPort uint16) (ParserInfo, error) { + // Parser for GRE, Teredo, Geneve, etc. + + dir, info, err := e.innerNextParserPort(proto, srcPort, dstPort) + // a custom parser must be present in order to expand the keys array + if dir == 1 { + info.ConfigKeyList = append(info.ConfigKeyList, fmt.Sprintf("%s%d", proto, dstPort)) + } else if dir == 2 { + info.ConfigKeyList = append(info.ConfigKeyList, fmt.Sprintf("%s%d", proto, srcPort)) + } + return info, err +} + +func (e *BaseParserEnvironment) innerNextParserPort(proto string, srcPort, dstPort uint16) (byte, ParserInfo, error) { + if cParser, ok := e.customPort.Load(fmt.Sprintf("%s-dst-%d", proto, dstPort)); ok { + return 1, cParser.(ParserInfo), nil + } + if cParser, ok := e.customPort.Load(fmt.Sprintf("%s-src-%d", proto, srcPort)); ok { + return 2, cParser.(ParserInfo), nil + } + + return 0, parserNone, nil +} + +func (e *BaseParserEnvironment) ParsePacket(flowMessage ProtoProducerMessageIf, data []byte) (err error) { + return ParsePacket(flowMessage, data, nil, e) +} + +// Stores information about the current state of parsing +type ParseConfig struct { + Environment ParserEnvironment // parser configuration to customize chained calls + Layer int // absolute index of the layer + Calls int // number of times the function was called (using parser index) + LayerCall int // number of times a function in a layer (eg: Transport) was called (using layer index) + Encapsulated bool // indicates if outside the typical mac-network-transport +} + +// BaseLayer indicates if the parser should map to the top-level fields of the protobuf +func (c *ParseConfig) BaseLayer() bool { + return !c.Encapsulated +} + +// ParseResult contains information about the next +type ParseResult struct { + NextParser ParserInfo // Next parser to be called + Size int // Size of the layer +} + +type ParserInfo struct { + Parser Parser + Name string + ConfigKeyList []string // keys to match for custom parsing + LayerIndex int // index to group + ParserIndex int // unique parser index + EncapSkip bool // indicates if should skip encapsulation calculations +} + +// Parser is a function that maps various items of a layer to a ProtoProducerMessage +type Parser func(flowMessage *ProtoProducerMessage, data []byte, pc ParseConfig) (res ParseResult, err error) + +func ParsePacket(flowMessage ProtoProducerMessageIf, data []byte, config PacketLayerMapper, pe ParserEnvironment) (err error) { + var offset int + + var nextParser ParserInfo + var parseConfig ParseConfig + + if pe != nil { + parseConfig.Environment = pe + } else { + parseConfig.Environment = DefaultEnvironment + } + + nextParser = parserEthernet // initial parser + callsLayer := make(map[int]int) + calls := make(map[int]int) + + for nextParser.Parser != nil && len(data) >= offset { // check that a next parser exists and there is enough data to read + parseConfig.Calls = calls[nextParser.ParserIndex] + parseConfig.LayerCall = callsLayer[nextParser.LayerIndex] + res, err := nextParser.Parser(flowMessage.GetFlowMessage(), data[offset:], parseConfig) + parseConfig.Layer += 1 + if err != nil { + return err + } + + // Map custom fields + for _, key := range nextParser.ConfigKeyList { + if config != nil { + layerIterator := config.Map(key) + for layerIterator != nil { + configLayer := layerIterator.Next() + if configLayer == nil { + break + } + if configLayer.IsEncapsulated() != parseConfig.Encapsulated { + continue + } + + extracted := GetBytes(data, offset*8+configLayer.GetOffset(), configLayer.GetLength(), true) + if err := flowMessage.MapCustom(key, extracted, configLayer); err != nil { + return err + } + } + } + } + + fm := flowMessage.GetFlowMessage() + fm.LayerSize = append(fm.LayerSize, uint32(res.Size)) + + // compares the next layer index with current to determine if it's an encapsulation + // IP over IP is the equals case + // except if layer is skipping comparison (will be compared after). For instance IPv6 Fragment Header, dot1q and MPLS cannot trigger encap + if !res.NextParser.EncapSkip && res.NextParser.LayerIndex <= nextParser.LayerIndex { + parseConfig.Encapsulated = true + } + + nextParser = res.NextParser + calls[nextParser.ParserIndex] += 1 + callsLayer[nextParser.LayerIndex] += 1 + + offset += res.Size + } + return nil +} + +func ParseEthernet(flowMessage *ProtoProducerMessage, data []byte, pc ParseConfig) (res ParseResult, err error) { + if len(data) < 14 { + return res, nil + } + + res.Size = 14 + + flowMessage.AddLayer("Ethernet") + + dstMac := binary.BigEndian.Uint64(append([]byte{0, 0}, data[0:6]...)) + srcMac := binary.BigEndian.Uint64(append([]byte{0, 0}, data[6:12]...)) + + eType := data[12:14] + + if pc.BaseLayer() { // first time calling + flowMessage.SrcMac = srcMac + flowMessage.DstMac = dstMac + flowMessage.Etype = uint32(binary.BigEndian.Uint16(eType)) + } + // add to list of macs + + if pc.Environment == nil { + return res, err + } + // get next parser + res.NextParser, err = pc.Environment.NextParserEtype(eType) + + return res, err +} + +func Parse8021Q(flowMessage *ProtoProducerMessage, data []byte, pc ParseConfig) (res ParseResult, err error) { + if len(data) < 4 { + return res, nil + } + + res.Size = 4 + + flowMessage.AddLayer("Dot1Q") + + eType := data[2:4] + + if pc.BaseLayer() { // first time calling + flowMessage.VlanId = uint32(binary.BigEndian.Uint16(data[0:2])) + flowMessage.Etype = uint32(binary.BigEndian.Uint16(eType)) + } + + if pc.Environment == nil { + return res, err + } + // get next parser + res.NextParser, err = pc.Environment.NextParserEtype(eType) + + return res, err +} + +func ParseMPLS(flowMessage *ProtoProducerMessage, data []byte, pc ParseConfig) (res ParseResult, err error) { + if len(data) < 4 { + return res, nil + } + + flowMessage.AddLayer("MPLS") + + var eType []byte + var mplsLabel, mplsTtl []uint32 + + iterateMpls := true + var offset int + for iterateMpls { + if len(data) < offset+4 { + // stop iterating mpls, not enough payload left + break + } + label := binary.BigEndian.Uint32(append([]byte{0}, data[offset:offset+3]...)) >> 4 + //exp := data[offset+2] > 1 + bottom := data[offset+2] & 1 + ttl := data[offset+3] + offset += 4 + + if bottom == 1 || label <= 15 || offset > len(data) { + + if len(data) > offset { + // peak at next byte + if data[offset]&0xf0>>4 == 4 { + eType = []byte{0x8, 0x0} + } else if data[offset]&0xf0>>4 == 6 { + eType = []byte{0x86, 0xdd} + } + } + + iterateMpls = false // stop iterating mpls, bottom of stack + } + + mplsLabel = append(mplsLabel, label) + mplsTtl = append(mplsTtl, uint32(ttl)) + } + + res.Size = offset + + if pc.BaseLayer() { // first time calling + if len(eType) == 2 { + flowMessage.Etype = uint32(binary.BigEndian.Uint16(eType)) + } + + flowMessage.MplsLabel = mplsLabel + flowMessage.MplsTtl = mplsTtl + } + + // get next parser + if len(eType) == 2 { + if pc.Environment == nil { + return res, err + } + res.NextParser, err = pc.Environment.NextParserEtype(eType) + } + + return res, err +} + +func ParseIPv4(flowMessage *ProtoProducerMessage, data []byte, pc ParseConfig) (res ParseResult, err error) { + if len(data) < 20 { + return res, nil + } + + res.Size = 20 + + flowMessage.AddLayer("IPv4") + + nextHeader := data[9] + + if pc.BaseLayer() { // first time calling + flowMessage.SrcAddr = data[12:16] + flowMessage.DstAddr = data[16:20] + + tos := data[1] + ttl := data[8] + + flowMessage.IpTos = uint32(tos) + flowMessage.IpTtl = uint32(ttl) + + identification := binary.BigEndian.Uint16(data[4:6]) + fragOffset := binary.BigEndian.Uint16(data[6:8]) // also includes flag + + flowMessage.FragmentId = uint32(identification) + flowMessage.FragmentOffset = uint32(fragOffset) & 8191 + flowMessage.IpFlags = uint32(fragOffset) >> 13 + + flowMessage.Proto = uint32(nextHeader) + } + + if pc.Environment == nil { + return res, err + } + // get next parser + res.NextParser, err = pc.Environment.NextParserProto(nextHeader) + + return res, err +} + +func ParseIPv6(flowMessage *ProtoProducerMessage, data []byte, pc ParseConfig) (res ParseResult, err error) { + if len(data) < 40 { + return res, nil + } + + res.Size = 40 + + flowMessage.AddLayer("IPv6") + + nextHeader := data[6] + + if pc.BaseLayer() { // first time calling + flowMessage.SrcAddr = data[8:24] + flowMessage.DstAddr = data[24:40] + + tostmp := uint32(binary.BigEndian.Uint16(data[0:2])) + tos := uint8(tostmp & 0x0ff0 >> 4) + ttl := data[7] + + flowMessage.IpTos = uint32(tos) + flowMessage.IpTtl = uint32(ttl) + + flowLabel := binary.BigEndian.Uint32(data[0:4]) + flowMessage.Ipv6FlowLabel = flowLabel & 0xFFFFF + + flowMessage.Proto = uint32(nextHeader) + } + if pc.Environment == nil { + return res, err + } + // get next parser + res.NextParser, err = pc.Environment.NextParserProto(nextHeader) + + return res, err +} + +func ParseIPv6HeaderFragment(flowMessage *ProtoProducerMessage, data []byte, pc ParseConfig) (res ParseResult, err error) { + if len(data) < 8 { + return res, nil + } + + res.Size = 8 + + flowMessage.AddLayer("IPv6HeaderFragment") + + nextHeader := data[0] + + if pc.BaseLayer() { // first time calling + fragOffset := binary.BigEndian.Uint16(data[2:4]) // also includes flag + identification := binary.BigEndian.Uint32(data[4:8]) + + flowMessage.FragmentId = identification + flowMessage.FragmentOffset = uint32(fragOffset) >> 3 + flowMessage.IpFlags = uint32(fragOffset) & 7 + } + if pc.Environment == nil { + return res, err + } + // get next parser + res.NextParser, err = pc.Environment.NextParserProto(nextHeader) + + return res, err +} + +func ParseIPv6HeaderRouting(flowMessage *ProtoProducerMessage, data []byte, pc ParseConfig) (res ParseResult, err error) { + if len(data) < 8 { + return res, nil + } + + nextHeader := data[0] + length := data[1] + + res.Size = 8 + 8*int(length) + + flowMessage.AddLayer("IPv6HeaderRouting") + + if pc.BaseLayer() { // first time calling + + routingType := data[2] + segLeft := data[3] + + flowMessage.Ipv6RoutingHeaderSegLeft = uint32(segLeft) + + if routingType == 4 { // Segment Routing + + lastEntry := data[4] + var offset int + var entry int + + for 8+offset < res.Size && + 8+offset+16 <= len(data) && + entry <= int(lastEntry) { + + addr := data[8+offset : 8+offset+16] + + flowMessage.Ipv6RoutingHeaderAddresses = append(flowMessage.Ipv6RoutingHeaderAddresses, addr) + + offset += 16 + entry++ + } + } + + } + if pc.Environment == nil { + return res, err + } + // get next parser + res.NextParser, err = pc.Environment.NextParserProto(nextHeader) + + return res, err +} + +func ParseTCP(flowMessage *ProtoProducerMessage, data []byte, pc ParseConfig) (res ParseResult, err error) { + if len(data) < 20 { + return res, nil + } + + length := int(data[13]>>4) * 4 + + res.Size = 20 + length + + flowMessage.AddLayer("TCP") + + srcPort := binary.BigEndian.Uint16(data[0:2]) + dstPort := binary.BigEndian.Uint16(data[2:4]) + + if pc.BaseLayer() { // first time calling + flowMessage.SrcPort = uint32(srcPort) + flowMessage.DstPort = uint32(dstPort) + + tcpflags := data[13] + flowMessage.TcpFlags = uint32(tcpflags) + } + if pc.Environment == nil { + return res, err + } + res.NextParser, err = pc.Environment.NextParserPort("tcp", srcPort, dstPort) + + return res, err +} + +func ParseUDP(flowMessage *ProtoProducerMessage, data []byte, pc ParseConfig) (res ParseResult, err error) { + if len(data) < 8 { + return res, nil + } + + res.Size = 8 + + flowMessage.AddLayer("UDP") + + srcPort := binary.BigEndian.Uint16(data[0:2]) + dstPort := binary.BigEndian.Uint16(data[2:4]) + + if pc.BaseLayer() { // first time calling + flowMessage.SrcPort = uint32(srcPort) + flowMessage.DstPort = uint32(dstPort) + } + if pc.Environment == nil { + return res, err + } + res.NextParser, err = pc.Environment.NextParserPort("udp", srcPort, dstPort) + + return res, err +} + +func ParseGRE(flowMessage *ProtoProducerMessage, data []byte, pc ParseConfig) (res ParseResult, err error) { + if len(data) < 4 { + return res, nil + } + + res.Size = 4 + + flowMessage.AddLayer("GRE") + + eType := data[2:4] + if pc.Environment == nil { + return res, err + } + // get next parser + res.NextParser, err = pc.Environment.NextParserEtype(eType) + + return res, err +} + +func ParseTeredoDst(flowMessage *ProtoProducerMessage, data []byte, pc ParseConfig) (res ParseResult, err error) { + flowMessage.AddLayer("Teredo") + + // get next parser + res.NextParser = parserIPv6 + + return res, err +} + +func ParseGeneve(flowMessage *ProtoProducerMessage, data []byte, pc ParseConfig) (res ParseResult, err error) { + if len(data) < 8 { + return res, nil + } + + res.Size = int(data[0]&0x3f)*4 + 8 + + flowMessage.AddLayer("Geneve") + + eType := data[2:4] + if pc.Environment == nil { + return res, err + } + // get next parser + res.NextParser, err = pc.Environment.NextParserEtype(eType) + + return res, err +} + +func ParseICMP(flowMessage *ProtoProducerMessage, data []byte, pc ParseConfig) (res ParseResult, err error) { + if len(data) < 2 { + return res, nil + } + + res.Size = 8 + + flowMessage.AddLayer("ICMP") + + if pc.Calls == 0 { // first time calling + flowMessage.IcmpType = uint32(data[0]) + flowMessage.IcmpCode = uint32(data[1]) + } + + return res, err +} + +func ParseICMPv6(flowMessage *ProtoProducerMessage, data []byte, pc ParseConfig) (res ParseResult, err error) { + if len(data) < 2 { + return res, nil + } + + res.Size = 8 + + flowMessage.AddLayer("ICMPv6") + + if pc.Calls == 0 { // first time calling + flowMessage.IcmpType = uint32(data[0]) + flowMessage.IcmpCode = uint32(data[1]) + } + + return res, err +} diff --git a/producer/proto/producer_packet_test.go b/producer/proto/producer_packet_test.go new file mode 100644 index 00000000..2c6b9807 --- /dev/null +++ b/producer/proto/producer_packet_test.go @@ -0,0 +1,494 @@ +package protoproducer + +import ( + "encoding/base64" + "encoding/hex" + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestProcessEthernet(t *testing.T) { + dataStr := "005300000001" + // src mac + "005300000002" + // dst mac + "86dd" // etype + data, _ := hex.DecodeString(dataStr) + var flowMessage ProtoProducerMessage + _, err := ParseEthernet(&flowMessage, data, ParseConfig{}) + assert.NoError(t, err) + + b, _ := json.Marshal(flowMessage.FlowMessage) + t.Log(string(b)) + + assert.Equal(t, uint32(0x86dd), flowMessage.Etype) +} + +func TestProcessDot1Q(t *testing.T) { + dataStr := "00140800" + data, _ := hex.DecodeString(dataStr) + var flowMessage ProtoProducerMessage + _, err := Parse8021Q(&flowMessage, data, ParseConfig{}) + assert.NoError(t, err) + + b, _ := json.Marshal(flowMessage.FlowMessage) + t.Log(string(b)) + + assert.Equal(t, uint32(20), flowMessage.VlanId) + assert.Equal(t, uint32(0x0800), flowMessage.Etype) +} + +func TestProcessMPLS(t *testing.T) { + dataStr := "000120ff" + // label 1 + "000101ff" // label 2 + data, _ := hex.DecodeString(dataStr) + var flowMessage ProtoProducerMessage + _, err := ParseMPLS(&flowMessage, data, ParseConfig{}) + assert.NoError(t, err) + + b, _ := json.Marshal(flowMessage.FlowMessage) + t.Log(string(b)) + + assert.Equal(t, []uint32{18, 16}, flowMessage.MplsLabel) + assert.Equal(t, []uint32{255, 255}, flowMessage.MplsTtl) + //assert.Equal(t, uint32(0x800), flowMessage.Etype) // tested with next byte in whole packet +} + +func TestProcessIPv4(t *testing.T) { + dataStr := "45000064" + + "abab" + // id + "0000ff01" + // flag, ttl, proto + "aaaa" + // csum + "0a000001" + // src + "0a000002" // dst + data, _ := hex.DecodeString(dataStr) + var flowMessage ProtoProducerMessage + _, err := ParseIPv4(&flowMessage, data, ParseConfig{}) + assert.NoError(t, err) + + b, _ := json.Marshal(flowMessage.FlowMessage) + t.Log(string(b)) + + assert.Equal(t, []byte{10, 0, 0, 1}, flowMessage.SrcAddr) + assert.Equal(t, []byte{10, 0, 0, 2}, flowMessage.DstAddr) + assert.Equal(t, uint32(0xabab), flowMessage.FragmentId) + assert.Equal(t, uint32(0xff), flowMessage.IpTtl) + assert.Equal(t, uint32(1), flowMessage.Proto) +} + +func TestProcessIPv6(t *testing.T) { + dataStr := "6001010104d83a40" + // ipv6 + "fd010000000000000000000000000001" + // src + "fd010000000000000000000000000002" // dst + data, _ := hex.DecodeString(dataStr) + var flowMessage ProtoProducerMessage + _, err := ParseIPv6(&flowMessage, data, ParseConfig{}) + assert.NoError(t, err) + + b, _ := json.Marshal(flowMessage.FlowMessage) + t.Log(string(b)) + + assert.Equal(t, []byte{0xfd, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, flowMessage.SrcAddr) + assert.Equal(t, []byte{0xfd, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02}, flowMessage.DstAddr) + assert.Equal(t, uint32(0x40), flowMessage.IpTtl) + assert.Equal(t, uint32(0x3a), flowMessage.Proto) + assert.Equal(t, uint32(0x010101), flowMessage.Ipv6FlowLabel) +} + +func TestProcessIPv6HeaderFragment(t *testing.T) { + dataStr := "3a000001a7882ea9" + + data, _ := hex.DecodeString(dataStr) + var flowMessage ProtoProducerMessage + _, err := ParseIPv6HeaderFragment(&flowMessage, data, ParseConfig{}) + assert.NoError(t, err) + + b, _ := json.Marshal(flowMessage.FlowMessage) + t.Log(string(b)) + + assert.Equal(t, uint32(2810719913), flowMessage.FragmentId) + assert.Equal(t, uint32(0), flowMessage.FragmentOffset) +} + +func TestProcessIPv6HeaderRouting(t *testing.T) { + dataStr := "29060401020300102001baba0002e00200000000000000002001baba0001000000000000000000002001baba0003e0070000000000000000" + + data, _ := hex.DecodeString(dataStr) + var flowMessage ProtoProducerMessage + _, err := ParseIPv6HeaderRouting(&flowMessage, data, ParseConfig{}) + assert.NoError(t, err) + + b, _ := json.Marshal(flowMessage.FlowMessage) + t.Log(string(b)) +} + +func TestProcessICMP(t *testing.T) { + dataStr := "01018cf7000627c4" + + data, _ := hex.DecodeString(dataStr) + var flowMessage ProtoProducerMessage + _, err := ParseICMP(&flowMessage, data, ParseConfig{}) + assert.NoError(t, err) + + b, _ := json.Marshal(flowMessage.FlowMessage) + t.Log(string(b)) + + assert.Equal(t, uint32(1), flowMessage.IcmpType) + assert.Equal(t, uint32(1), flowMessage.IcmpCode) +} + +func TestProcessICMPv6(t *testing.T) { + dataStr := "8080f96508a4" + + data, _ := hex.DecodeString(dataStr) + var flowMessage ProtoProducerMessage + _, err := ParseICMPv6(&flowMessage, data, ParseConfig{}) + assert.NoError(t, err) + + b, _ := json.Marshal(flowMessage.FlowMessage) + t.Log(string(b)) + + assert.Equal(t, uint32(128), flowMessage.IcmpType) + assert.Equal(t, uint32(128), flowMessage.IcmpCode) +} + +func TestProcessPacketBase(t *testing.T) { + dataStr := "005300000001" + // src mac + "005300000002" + // dst mac + "8100" + // etype + "00008847" + // 8021q + "000120ff" + // mpls label 1 + "000101ff" + // mpls label 2 + "6000000004d83a40" + // ipv6 + "fd010000000000000000000000000001" + // src + "fd010000000000000000000000000002" + // dst + "8000f96508a4" // icmpv6 + + data, _ := hex.DecodeString(dataStr) + var flowMessage ProtoProducerMessage + + err := ParsePacket(&flowMessage, data, nil, nil) + assert.NoError(t, err) + + b, _ := json.Marshal(flowMessage.FlowMessage) + t.Log(string(b)) + + layers := []uint32{0, 6, 5, 2, 8} + assert.Equal(t, len(layers), len(flowMessage.LayerStack)) + + for i, layer := range layers { + assert.Equal(t, layer, uint32(flowMessage.LayerStack[i])) + } + + assert.Equal(t, uint32(0x86dd), flowMessage.Etype) + +} + +func TestProcessPacketGRE(t *testing.T) { + dataStr := "005300000001" + // src mac + "005300000002" + // dst mac + "86dd" + // etype + + "6000000004d82f40" + // ipv6 + "fd010000000000000000000000000001" + // src + "fd010000000000000000000000000002" + // dst + + "00000800" + // gre + + "45000064" + // ipv4 + "abab" + // id + "0000ff01" + // flag, ttl, proto + "aaaa" + // csum + "0a000001" + // src + "0a000002" + // dst + + "01018cf7000627c4" // icmp + + data, _ := hex.DecodeString(dataStr) + var flowMessage ProtoProducerMessage + err := ParsePacket(&flowMessage, data, nil, nil) + assert.NoError(t, err) + + b, _ := json.Marshal(flowMessage.FlowMessage) + t.Log(string(b)) + + layers := []uint32{0, 2, 9, 1, 7} + assert.Equal(t, len(layers), len(flowMessage.LayerStack)) + + for i, layer := range layers { + assert.Equal(t, layer, uint32(flowMessage.LayerStack[i])) + } + + assert.Equal(t, uint32(0x86dd), flowMessage.Etype) + assert.Equal(t, uint32(47), flowMessage.Proto) + // todo: check addresses + +} + +type testProtoProducerMessage struct { + ProtoProducerMessage + t *testing.T +} + +func (m *testProtoProducerMessage) MapCustom(key string, v []byte, cfg MappableField) error { + m.t.Log("mapping", key, v) + mc := MapConfigBase{ + Endianness: BigEndian, + ProtoIndex: 999, + ProtoType: ProtoVarint, + ProtoArray: false, + } + return m.ProtoProducerMessage.MapCustom(key, v, &mc) +} + +func TestProcessPacketMapping(t *testing.T) { + dataStr := "005300000001" + // src mac + "005300000002" + // dst mac + "0800" + // etype + + "45000064" + // ipv4 + "abab" + // id + "0000ff11" + // flag, ttl, proto + "aaaa" + // csum + "0a000001" + // src + "0a000002" + // dst + + // udp + "ff00" + // src port + "0035" + // dst port + "0010" + // length + "ffff" + // csum + + "0000000000000000" // payload + + config := SFlowProducerConfig{ + Mapping: []SFlowMapField{ + SFlowMapField{ + Layer: "udp", + Offset: 48, + Length: 16, + + Destination: "csum", + }, + }, + } + configm := mapFieldsSFlow(config.Mapping) + + data, _ := hex.DecodeString(dataStr) + flowMessage := testProtoProducerMessage{ + t: t, + } + + err := ParsePacket(&flowMessage, data, configm, nil) + assert.NoError(t, err) + + b, _ := json.Marshal(flowMessage.FlowMessage) + t.Log(string(b)) +} + +func TestProcessPacketMappingEncap(t *testing.T) { + dataStr := "005300000001" + // src mac + "005300000002" + // dst mac + "86dd" + // etype + + "6001010104d82b40" + // ipv6 + "fd010000000000000000000000000001" + // src + "fd010000000000000000000000000002" + // dst + + "04060401020300102001baba0002e00200000000000000002001baba0001000000000000000000002001baba0003e0070000000000000000" + // srv6 + + "45000064" + // ipv4 + "abab" + // id + "0000ff11" + // flag, ttl, proto + "aaaa" + // csum + "0a000001" + // src + "0a000002" + // dst + + // udp + "ff00" + // src port + "0035" + // dst port + "0010" + // length + "ffff" + // csum + + "0000000000000000" // payload + + config := ProducerConfig{ + Formatter: FormatterConfig{ + Render: map[string]RendererID{ + "src_ip_encap": RendererIP, + "dst_ip_encap": RendererIP, + }, + Fields: []string{ + "src_ip_encap", + "dst_ip_encap", + }, + Protobuf: []ProtobufFormatterConfig{ + ProtobufFormatterConfig{ + Name: "src_ip_encap", + Index: 998, + Type: "string", + Array: true, + }, + ProtobufFormatterConfig{ + Name: "dst_ip_encap", + Index: 999, + Type: "string", + Array: true, + }, + }, + }, + SFlow: SFlowProducerConfig{ + Mapping: []SFlowMapField{ + SFlowMapField{ + Layer: "ipv6", + Offset: 64, + Length: 128, + Encapsulated: true, + + Destination: "src_ip_encap", + }, + SFlowMapField{ + Layer: "ipv6", + Offset: 192, + Length: 128, + Encapsulated: true, + + Destination: "dst_ip_encap", + }, + + SFlowMapField{ + Layer: "ipv4", + Offset: 96, + Length: 32, + Encapsulated: true, + + Destination: "src_ip_encap", + }, + SFlowMapField{ + Layer: "ipv4", + Offset: 128, + Length: 32, + Encapsulated: true, + + Destination: "dst_ip_encap", + }, + }, + }, + } + configm, _ := config.Compile() + + data, _ := hex.DecodeString(dataStr) + var flowMessage ProtoProducerMessage + flowMessage.formatter = configm.GetFormatter() + + err := configm.GetPacketMapper().ParsePacket(&flowMessage, data) + assert.NoError(t, err) + + b, _ := json.Marshal(flowMessage.FlowMessage) + t.Log(string(b)) + + flowMessage.skipDelimiter = true + b, _ = flowMessage.MarshalBinary() + t.Log(base64.StdEncoding.EncodeToString(b)) + + b, _ = flowMessage.MarshalJSON() + t.Log(string(b)) +} + +func TestProcessPacketMappingPort(t *testing.T) { + dataStr := "005300000001" + // src mac + "005300000002" + // dst mac + "0800" + // etype + + "45000064" + // ipv4 + "abab" + // id + "0000ff11" + // flag, ttl, proto + "aaaa" + // csum + "0a000001" + // src + "0a000002" + // dst + + // udp + "ff00" + // src port + "0035" + // dst port + "0015" + // length + "ffff" + // csum + + "02a901000001000000000000146578616d706c6503636f6d0000010001" // dns packet + + data, _ := hex.DecodeString(dataStr) + var flowMessage ProtoProducerMessage + + var domain []byte + + pe := NewBaseParserEnvironment() + + pe.RegisterPort("udp", PortDirDst, 53, ParserInfo{ + Parser: func(flowMessage *ProtoProducerMessage, data []byte, pc ParseConfig) (res ParseResult, err error) { + domain = data[13 : 13+11] + flowMessage.AddLayer("Custom") + t.Log("read DNS packet", string(domain)) + res.Size = len(data) + return res, err + }, + }) + + err := ParsePacket(&flowMessage, data, nil, pe) + assert.NoError(t, err) + + assert.Equal(t, []byte{0x65, 0x78, 0x61, 0x6D, 0x70, 0x6C, 0x65, 0x03, 0x63, 0x6F, 0x6D}, domain) + assert.Equal(t, 4, len(flowMessage.LayerSize)) + assert.Equal(t, uint32(29), flowMessage.LayerSize[3]) +} + +func TestProcessPacketMappingGeneve(t *testing.T) { + dataStr := "005300000001" + // src mac + "005300000002" + // dst mac + "0800" + // etype + + "45000064" + // ipv4 + "abab" + // id + "0000ff11" + // flag, ttl, proto + "aaaa" + // csum + "0a000001" + // src + "0a000002" + // dst + + // udp + "ff00" + // src port + "17c1" + // dst port + "0015" + // length + "ffff" + // csum + + "0240655800000a00000080010000000c" + // geneve + + "005300000001" + // src mac + "005300000002" + // dst mac + "0800" + // etype + + "45000064" + // ipv4 + "abab" + // id + "0000ff01" + // flag, ttl, proto + "aaaa" + // csum + "0a000001" + // src + "0a000002" + // dst + + "01018cf7000627c4" // icmp + + data, _ := hex.DecodeString(dataStr) + var flowMessage ProtoProducerMessage + + pe := NewBaseParserEnvironment() + + gp, _ := pe.GetParser("geneve") + + pe.RegisterPort("udp", PortDirBoth, 6081, gp) + + err := ParsePacket(&flowMessage, data, nil, pe) + assert.NoError(t, err) + + layers := []uint32{0, 1, 4, 12, 0, 1, 7} + assert.Equal(t, len(layers), len(flowMessage.LayerStack)) + + for i, layer := range layers { + assert.Equal(t, layer, uint32(flowMessage.LayerStack[i])) + } +} diff --git a/producer/proto/producer_sf.go b/producer/proto/producer_sf.go index a9c5320a..acddb675 100644 --- a/producer/proto/producer_sf.go +++ b/producer/proto/producer_sf.go @@ -1,8 +1,6 @@ package protoproducer import ( - "encoding/binary" - "github.com/netsampler/goflow2/v2/decoders/sflow" flowmessage "github.com/netsampler/goflow2/v2/pb" "github.com/netsampler/goflow2/v2/producer" @@ -25,391 +23,22 @@ func ParseSampledHeader(flowMessage *ProtoProducerMessage, sampledHeader *sflow. return ParseSampledHeaderConfig(flowMessage, sampledHeader, nil) } -func ParseEthernet(offset int, flowMessage *ProtoProducerMessage, data []byte) (etherType []byte, newOffset int, err error) { - if len(data) >= offset+14 { - etherType = data[offset+12 : offset+14] - - dstMac := binary.BigEndian.Uint64(append([]byte{0, 0}, data[offset+0:offset+6]...)) - srcMac := binary.BigEndian.Uint64(append([]byte{0, 0}, data[offset+6:offset+12]...)) - flowMessage.SrcMac = srcMac - flowMessage.DstMac = dstMac - - offset += 14 - } - return etherType, offset, err -} - -func Parse8021Q(offset int, flowMessage *ProtoProducerMessage, data []byte) (etherType []byte, newOffset int, err error) { - if len(data) >= offset+4 { - flowMessage.VlanId = uint32(binary.BigEndian.Uint16(data[offset : offset+2])) - etherType = data[offset+2 : offset+4] - - offset += 4 - } - return etherType, offset, err -} - -func ParseMPLS(offset int, flowMessage *ProtoProducerMessage, data []byte) (etherType []byte, newOffset int, err error) { - var mplsLabel []uint32 - var mplsTtl []uint32 - - iterateMpls := true - for iterateMpls { - if len(data) < offset+5 { - // stop iterating mpls, not enough payload left - break - } - label := binary.BigEndian.Uint32(append([]byte{0}, data[offset:offset+3]...)) >> 4 - //exp := data[offset+2] > 1 - bottom := data[offset+2] & 1 - ttl := data[offset+3] - offset += 4 - - if bottom == 1 || label <= 15 || offset > len(data) { - if data[offset]&0xf0>>4 == 4 { - etherType = []byte{0x8, 0x0} - } else if data[offset]&0xf0>>4 == 6 { - etherType = []byte{0x86, 0xdd} - } - iterateMpls = false // stop iterating mpls, bottom of stack - } - - mplsLabel = append(mplsLabel, label) - mplsTtl = append(mplsTtl, uint32(ttl)) - } - // if multiple MPLS headers, will reset existing values - flowMessage.MplsLabel = mplsLabel - flowMessage.MplsTtl = mplsTtl - return etherType, offset, err -} - -func ParseIPv4(offset int, flowMessage *ProtoProducerMessage, data []byte) (nextHeader byte, newOffset int, err error) { - if len(data) >= offset+20 { - nextHeader = data[offset+9] - flowMessage.SrcAddr = data[offset+12 : offset+16] - flowMessage.DstAddr = data[offset+16 : offset+20] - - tos := data[offset+1] - ttl := data[offset+8] - - flowMessage.IpTos = uint32(tos) - flowMessage.IpTtl = uint32(ttl) - - identification := binary.BigEndian.Uint16(data[offset+4 : offset+6]) - fragOffset := binary.BigEndian.Uint16(data[offset+6 : offset+8]) // also includes flag - - flowMessage.FragmentId = uint32(identification) - flowMessage.FragmentOffset = uint32(fragOffset) & 8191 - flowMessage.IpFlags = uint32(fragOffset) >> 13 - - offset += 20 - } - return nextHeader, offset, err -} - -func ParseIPv6(offset int, flowMessage *ProtoProducerMessage, data []byte) (nextHeader byte, newOffset int, err error) { - if len(data) >= offset+40 { - nextHeader = data[offset+6] - flowMessage.SrcAddr = data[offset+8 : offset+24] - flowMessage.DstAddr = data[offset+24 : offset+40] - - tostmp := uint32(binary.BigEndian.Uint16(data[offset : offset+2])) - tos := uint8(tostmp & 0x0ff0 >> 4) - ttl := data[offset+7] - - flowMessage.IpTos = uint32(tos) - flowMessage.IpTtl = uint32(ttl) - - flowLabel := binary.BigEndian.Uint32(data[offset : offset+4]) - flowMessage.Ipv6FlowLabel = flowLabel & 0xFFFFF - - offset += 40 - } - return nextHeader, offset, err -} - -func ParseIPv6Headers(nextHeader byte, offset int, flowMessage *ProtoProducerMessage, data []byte) (newNextHeader byte, newOffset int, err error) { - for { - if nextHeader == 44 && len(data) >= offset+8 { - nextHeader = data[offset] - - fragOffset := binary.BigEndian.Uint16(data[offset+2 : offset+4]) // also includes flag - identification := binary.BigEndian.Uint32(data[offset+4 : offset+8]) - - flowMessage.FragmentId = identification - flowMessage.FragmentOffset = uint32(fragOffset) >> 3 - flowMessage.IpFlags = uint32(fragOffset) & 7 - - offset += 8 - } else { - break - } - } - return nextHeader, offset, err -} - -func ParseTCP(offset int, flowMessage *ProtoProducerMessage, data []byte) (newOffset int, err error) { - if len(data) >= offset+13 { - srcPort := binary.BigEndian.Uint16(data[offset+0 : offset+2]) - dstPort := binary.BigEndian.Uint16(data[offset+2 : offset+4]) - - flowMessage.SrcPort = uint32(srcPort) - flowMessage.DstPort = uint32(dstPort) - - tcpflags := data[offset+13] - flowMessage.TcpFlags = uint32(tcpflags) - - length := int(data[13]>>4) * 4 - - offset += length - } - return offset, err -} - -func ParseUDP(offset int, flowMessage *ProtoProducerMessage, data []byte) (newOffset int, err error) { - if len(data) >= offset+4 { - srcPort := binary.BigEndian.Uint16(data[offset+0 : offset+2]) - dstPort := binary.BigEndian.Uint16(data[offset+2 : offset+4]) - - flowMessage.SrcPort = uint32(srcPort) - flowMessage.DstPort = uint32(dstPort) - - offset += 8 - } - return offset, err -} - -func ParseICMP(offset int, flowMessage *ProtoProducerMessage, data []byte) (newOffset int, err error) { - if len(data) >= offset+2 { - flowMessage.IcmpType = uint32(data[offset+0]) - flowMessage.IcmpCode = uint32(data[offset+1]) - - offset += 8 - } - return offset, err -} - -func ParseICMPv6(offset int, flowMessage *ProtoProducerMessage, data []byte) (newOffset int, err error) { - if len(data) >= offset+2 { - flowMessage.IcmpType = uint32(data[offset+0]) - flowMessage.IcmpCode = uint32(data[offset+1]) - - offset += 8 - } - return offset, err -} - -func IsMPLS(etherType []byte) bool { - if len(etherType) != 2 { - return false - } - return etherType[0] == 0x88 && etherType[1] == 0x47 -} - -func Is8021Q(etherType []byte) bool { - if len(etherType) != 2 { - return false - } - return etherType[0] == 0x81 && etherType[1] == 0x0 -} - -func IsIPv4(etherType []byte) bool { - if len(etherType) != 2 { - return false - } - return etherType[0] == 0x8 && etherType[1] == 0x0 -} - -func IsIPv6(etherType []byte) bool { - if len(etherType) != 2 { - return false - } - return etherType[0] == 0x86 && etherType[1] == 0xdd -} - -func IsARP(etherType []byte) bool { - if len(etherType) != 2 { - return false - } - return etherType[0] == 0x8 && etherType[1] == 0x6 -} - -// Parses an entire stream consisting of multiple layers of protocols -// It picks the best field to map when multiple encapsulation of the same layer (eg: tunnels, extension headers, etc.) -func ParseEthernetHeader(flowMessage *ProtoProducerMessage, data []byte, config *SFlowMapper) (err error) { - var nextHeader byte - var offset int - - var etherType []byte - - for _, configLayer := range GetSFlowConfigLayer(config, "0") { - extracted := GetBytes(data, offset+configLayer.Offset, configLayer.Length) - if err := MapCustom(flowMessage, extracted, configLayer.MapConfigBase); err != nil { - return err - } - } - - if etherType, offset, err = ParseEthernet(offset, flowMessage, data); err != nil { - return err - } - - if len(etherType) != 2 { - return nil - } - - encap := true - iterations := 0 - for encap && iterations <= 1 { - encap = false - - if Is8021Q(etherType) { // VLAN 802.1Q - if etherType, offset, err = Parse8021Q(offset, flowMessage, data); err != nil { - return err - } - } - - if IsMPLS(etherType) { // MPLS - if etherType, offset, err = ParseMPLS(offset, flowMessage, data); err != nil { - return err - } - } - - for _, configLayer := range GetSFlowConfigLayer(config, "3") { - extracted := GetBytes(data, offset*8+configLayer.Offset, configLayer.Length) - if err := MapCustom(flowMessage, extracted, configLayer.MapConfigBase); err != nil { - return err - } - } - - if IsIPv4(etherType) { // IPv4 - prevOffset := offset - if nextHeader, offset, err = ParseIPv4(offset, flowMessage, data); err != nil { - return err - } - - for _, configLayer := range GetSFlowConfigLayer(config, "ipv4") { - extracted := GetBytes(data, prevOffset*8+configLayer.Offset, configLayer.Length) - if err := MapCustom(flowMessage, extracted, configLayer.MapConfigBase); err != nil { - return err - } - } - } else if IsIPv6(etherType) { // IPv6 - prevOffset := offset - if nextHeader, offset, err = ParseIPv6(offset, flowMessage, data); err != nil { - return err - } - if nextHeader, offset, err = ParseIPv6Headers(nextHeader, offset, flowMessage, data); err != nil { - return err - } - - for _, configLayer := range GetSFlowConfigLayer(config, "ipv6") { - extracted := GetBytes(data, prevOffset*8+configLayer.Offset, configLayer.Length) - if err := MapCustom(flowMessage, extracted, configLayer.MapConfigBase); err != nil { - return err - } - } - } else if IsARP(etherType) { // ARP - for _, configLayer := range GetSFlowConfigLayer(config, "arp") { - extracted := GetBytes(data, offset*8+configLayer.Offset, configLayer.Length) - if err := MapCustom(flowMessage, extracted, configLayer.MapConfigBase); err != nil { - return err - } - } - } - - for _, configLayer := range GetSFlowConfigLayer(config, "4") { - extracted := GetBytes(data, offset*8+configLayer.Offset, configLayer.Length) - if err := MapCustom(flowMessage, extracted, configLayer.MapConfigBase); err != nil { - return err - } - } - - var appOffset int // keeps track of the user payload - - // Transport protocols - if nextHeader == 17 || nextHeader == 6 || nextHeader == 1 || nextHeader == 58 { - prevOffset := offset - if flowMessage.FragmentOffset == 0 { - if nextHeader == 17 { // UDP - if offset, err = ParseUDP(offset, flowMessage, data); err != nil { - return err - } - for _, configLayer := range GetSFlowConfigLayer(config, "udp") { - extracted := GetBytes(data, prevOffset*8+configLayer.Offset, configLayer.Length) - if err := MapCustom(flowMessage, extracted, configLayer.MapConfigBase); err != nil { - return err - } - } - } else if nextHeader == 6 { // TCP - if offset, err = ParseTCP(offset, flowMessage, data); err != nil { - return err - } - for _, configLayer := range GetSFlowConfigLayer(config, "tcp") { - extracted := GetBytes(data, prevOffset*8+configLayer.Offset, configLayer.Length) - if err := MapCustom(flowMessage, extracted, configLayer.MapConfigBase); err != nil { - return err - } - } - } else if nextHeader == 1 { // ICMP - if offset, err = ParseICMP(offset, flowMessage, data); err != nil { - return err - } - for _, configLayer := range GetSFlowConfigLayer(config, "icmp") { - extracted := GetBytes(data, prevOffset*8+configLayer.Offset, configLayer.Length) - if err := MapCustom(flowMessage, extracted, configLayer.MapConfigBase); err != nil { - return err - } - } - } else if nextHeader == 58 { // ICMPv6 - if offset, err = ParseICMPv6(offset, flowMessage, data); err != nil { - return err - } - for _, configLayer := range GetSFlowConfigLayer(config, "icmp6") { - extracted := GetBytes(data, prevOffset*8+configLayer.Offset, configLayer.Length) - if err := MapCustom(flowMessage, extracted, configLayer.MapConfigBase); err != nil { - return err - } - } - } - } - appOffset = offset - } - - // fetch data from the application/payload - if appOffset > 0 { - for _, configLayer := range GetSFlowConfigLayer(config, "7") { - customOffset := appOffset*8 + configLayer.Offset - int(flowMessage.FragmentOffset)*8 // allows user to get data from a fragment as well - // todo: check the calculation (might be off due to various header size) - extracted := GetBytes(data, customOffset, configLayer.Length) - if err := MapCustom(flowMessage, extracted, configLayer.MapConfigBase); err != nil { - return err - } - } - } - - iterations++ - } - - if len(etherType) >= 2 { - flowMessage.Etype = uint32(binary.BigEndian.Uint16(etherType[0:2])) - } - flowMessage.Proto = uint32(nextHeader) - - return nil -} - -func ParseSampledHeaderConfig(flowMessage *ProtoProducerMessage, sampledHeader *sflow.SampledHeader, config *SFlowMapper) error { +func ParseSampledHeaderConfig(flowMessage *ProtoProducerMessage, sampledHeader *sflow.SampledHeader, config PacketMapper) error { data := (*sampledHeader).HeaderData switch (*sampledHeader).Protocol { case 1: // Ethernet - if err := ParseEthernetHeader(flowMessage, data, config); err != nil { + if config == nil { + config = DefaultEnvironment + } + + if err := config.ParsePacket(flowMessage, data); err != nil { return err } } return nil } -func SearchSFlowSampleConfig(flowMessage *ProtoProducerMessage, flowSample interface{}, config *SFlowMapper) error { +func SearchSFlowSampleConfig(flowMessage *ProtoProducerMessage, flowSample interface{}, config PacketMapper) error { var records []sflow.FlowRecord flowMessage.Type = flowmessage.FlowMessage_SFLOW_5 @@ -487,7 +116,7 @@ func SearchSFlowSampleConfig(flowMessage *ProtoProducerMessage, flowSample inter } -func SearchSFlowSamplesConfig(samples []interface{}, config *SFlowMapper) (flowMessageSet []producer.ProducerMessage, err error) { +func SearchSFlowSamplesConfig(samples []interface{}, config PacketMapper) (flowMessageSet []producer.ProducerMessage, err error) { for _, flowSample := range samples { fmsg := protoMessagePool.Get().(*ProtoProducerMessage) fmsg.Reset() @@ -500,17 +129,17 @@ func SearchSFlowSamplesConfig(samples []interface{}, config *SFlowMapper) (flowM } // Converts an sFlow message -func ProcessMessageSFlowConfig(packet *sflow.Packet, config *producerConfigMapped) (flowMessageSet []producer.ProducerMessage, err error) { +func ProcessMessageSFlowConfig(packet *sflow.Packet, config ProtoProducerConfig) (flowMessageSet []producer.ProducerMessage, err error) { seqnum := packet.SequenceNumber agent := packet.AgentIP - var cfg *SFlowMapper + var cfgSFlow PacketMapper if config != nil { - cfg = config.SFlow + cfgSFlow = config.GetPacketMapper() } flowSamples := GetSFlowFlowSamples(packet) - flowMessageSet, err = SearchSFlowSamplesConfig(flowSamples, cfg) + flowMessageSet, err = SearchSFlowSamplesConfig(flowSamples, cfgSFlow) if err != nil { return flowMessageSet, err } diff --git a/producer/proto/producer_test.go b/producer/proto/producer_test.go index fe510bcf..0d009f3f 100644 --- a/producer/proto/producer_test.go +++ b/producer/proto/producer_test.go @@ -1,8 +1,6 @@ package protoproducer import ( - "encoding/hex" - "encoding/json" "testing" "github.com/netsampler/goflow2/v2/decoders/netflow" @@ -194,75 +192,6 @@ func getSflowPacket() *sflow.Packet { return &pkt } -func TestProcessEthernet(t *testing.T) { - dataStr := "005300000001" + // src mac - "005300000002" + // dst mac - "86dd" + // etype - "6000000004d83a40" + // ipv6 - "fd010000000000000000000000000001" + // src - "fd010000000000000000000000000002" + // dst - "8000f96508a4" // icmpv6 - data, _ := hex.DecodeString(dataStr) - var flowMessage ProtoProducerMessage - err := ParseEthernetHeader(&flowMessage, data, nil) - assert.Nil(t, err) - - b, _ := json.Marshal(flowMessage.FlowMessage) - t.Log(string(b)) - - assert.Equal(t, uint32(0x86dd), flowMessage.Etype) - assert.Equal(t, uint32(58), flowMessage.Proto) - assert.Equal(t, uint32(128), flowMessage.IcmpType) -} - -func TestProcessIPv6Headers(t *testing.T) { - dataStr := "6000000004d82c40" + - "fd010000000000000000000000000001" + // src - "fd010000000000000000000000000002" + // dst - "3a000001a7882ea9" + // fragment header - "8000f96508a4" // icmpv6 - data, _ := hex.DecodeString(dataStr) - var flowMessage ProtoProducerMessage - nextHeader, offset, err := ParseIPv6(0, &flowMessage, data) - assert.Nil(t, err) - assert.Equal(t, byte(44), nextHeader) - nextHeader, offset, err = ParseIPv6Headers(nextHeader, offset, &flowMessage, data) - assert.Nil(t, err) - assert.Equal(t, byte(58), nextHeader) - - offset, err = ParseICMPv6(offset, &flowMessage, data) - assert.Nil(t, err) - - b, _ := json.Marshal(flowMessage.FlowMessage) - t.Log(string(b), nextHeader, offset) - - assert.Equal(t, uint32(1), flowMessage.IpFlags) - assert.Equal(t, uint32(64), flowMessage.IpTtl) - assert.Equal(t, uint32(2810719913), flowMessage.FragmentId) - assert.Equal(t, uint32(0), flowMessage.FragmentOffset) - assert.Equal(t, uint32(128), flowMessage.IcmpType) -} - -func TestProcessIPv4Fragment(t *testing.T) { - dataStr := "450002245dd900b94001ffe1" + - "c0a80101" + // src - "c0a80102" + // dst - "0809" // continued payload - data, _ := hex.DecodeString(dataStr) - var flowMessage ProtoProducerMessage - nextHeader, offset, err := ParseIPv4(0, &flowMessage, data) - assert.Nil(t, err) - assert.Equal(t, byte(1), nextHeader) - - b, _ := json.Marshal(flowMessage.FlowMessage) - t.Log(string(b), nextHeader, offset) - - assert.Equal(t, uint32(0), flowMessage.IpFlags) - assert.Equal(t, uint32(64), flowMessage.IpTtl) - assert.Equal(t, uint32(24025), flowMessage.FragmentId) - assert.Equal(t, uint32(185), flowMessage.FragmentOffset) -} - func TestNetFlowV9Time(t *testing.T) { // This test ensures the NetFlow v9 timestamps are properly calculated. // It passes a baseTime = 2024-01-01 00:00:00 (in seconds) and an uptime of 2 seconds (in milliseconds). diff --git a/producer/proto/proto.go b/producer/proto/proto.go index 47e15971..9a5d6ea0 100644 --- a/producer/proto/proto.go +++ b/producer/proto/proto.go @@ -11,7 +11,7 @@ import ( ) type ProtoProducer struct { - cfgMapped *producerConfigMapped + cfg ProtoProducerConfig samplinglock *sync.RWMutex sampling map[string]SamplingRateSystem samplingRateSystem func() SamplingRateSystem @@ -55,7 +55,7 @@ func (p *ProtoProducer) Produce(msg interface{}, args *producer.ProduceArgs) (fl }) case *netflow.NFv9Packet: samplingRateSystem := p.getSamplingRateSystem(args) - flowMessageSet, err = ProcessMessageNetFlowV9Config(msgConv, samplingRateSystem, p.cfgMapped) + flowMessageSet, err = ProcessMessageNetFlowV9Config(msgConv, samplingRateSystem, p.cfg) p.enrich(flowMessageSet, func(fmsg *ProtoProducerMessage) { fmsg.TimeReceivedNs = tr @@ -63,14 +63,14 @@ func (p *ProtoProducer) Produce(msg interface{}, args *producer.ProduceArgs) (fl }) case *netflow.IPFIXPacket: samplingRateSystem := p.getSamplingRateSystem(args) - flowMessageSet, err = ProcessMessageIPFIXConfig(msgConv, samplingRateSystem, p.cfgMapped) + flowMessageSet, err = ProcessMessageIPFIXConfig(msgConv, samplingRateSystem, p.cfg) p.enrich(flowMessageSet, func(fmsg *ProtoProducerMessage) { fmsg.TimeReceivedNs = tr fmsg.SamplerAddress = sa }) case *sflow.Packet: - flowMessageSet, err = ProcessMessageSFlowConfig(msgConv, p.cfgMapped) + flowMessageSet, err = ProcessMessageSFlowConfig(msgConv, p.cfg) p.enrich(flowMessageSet, func(fmsg *ProtoProducerMessage) { fmsg.TimeReceivedNs = tr @@ -82,7 +82,7 @@ func (p *ProtoProducer) Produce(msg interface{}, args *producer.ProduceArgs) (fl } p.enrich(flowMessageSet, func(fmsg *ProtoProducerMessage) { - fmsg.formatter = p.cfgMapped.Formatter + fmsg.formatter = p.cfg.GetFormatter() }) return flowMessageSet, err } @@ -95,12 +95,11 @@ func (p *ProtoProducer) Commit(flowMessageSet []producer.ProducerMessage) { func (p *ProtoProducer) Close() {} -func CreateProtoProducer(cfg *ProducerConfig, samplingRateSystem func() SamplingRateSystem) (producer.ProducerInterface, error) { - cfgMapped, err := mapConfig(cfg) +func CreateProtoProducer(cfg ProtoProducerConfig, samplingRateSystem func() SamplingRateSystem) (producer.ProducerInterface, error) { return &ProtoProducer{ - cfgMapped: cfgMapped, + cfg: cfg, samplinglock: &sync.RWMutex{}, sampling: make(map[string]SamplingRateSystem), samplingRateSystem: samplingRateSystem, - }, err + }, nil } diff --git a/producer/proto/reflect.go b/producer/proto/reflect.go index 6874a70c..d836d295 100644 --- a/producer/proto/reflect.go +++ b/producer/proto/reflect.go @@ -9,54 +9,92 @@ import ( "github.com/netsampler/goflow2/v2/decoders/netflow" ) -type EndianType string -type ProtoType string +// Using a data slice, returns a chunk corresponding +func GetBytes(d []byte, offset, length int, shift bool) []byte { -var ( - BigEndian EndianType = "big" - LittleEndian EndianType = "little" + /* - ProtoString ProtoType = "string" - ProtoVarint ProtoType = "varint" + Example with an offset of 4 and length of 6 - ProtoTypeMap = map[string]ProtoType{ - string(ProtoString): ProtoString, - string(ProtoVarint): ProtoVarint, - "bytes": ProtoString, - } -) + initial data: + 0xAA 0x55 + 1010 1010.0101 0101 + ^--- -^ + + with shift + 0x29 + 0010 1001 + + without shift (bitwise AND) + 0xa4 + 1010 0100 -func GetBytes(d []byte, offset int, length int) []byte { - if length == 0 || offset < 0 { + + */ + + if len(d)*8 < offset { return nil } - leftBytes := offset / 8 - rightBytes := (offset + length) / 8 - if (offset+length)%8 != 0 { - rightBytes += 1 - } - if leftBytes >= len(d) { + if length == 0 { return nil } - if rightBytes > len(d) { - rightBytes = len(d) + + shiftSize := offset % 8 // how much to shift to the left due to offset + shiftRightSize := length % 8 // for final step + + start := offset / 8 + end := (offset + length) / 8 + if (offset+length)%8 > 0 { + end += 1 + } + + lengthB := length / 8 + if shiftRightSize > 0 { + lengthB += 1 + } + + missing := end - len(d) // calculate how many missing bytes + if missing > 0 { + end = len(d) + } + + dUsed := d[start:end] + + if shiftSize == 0 && length%8 == 0 { // simple case + if missing > 0 { + dFinal := make([]byte, lengthB) + + copy(dFinal, dUsed) + return dFinal + } + return dUsed } - chunk := make([]byte, rightBytes-leftBytes) - offsetMod8 := (offset % 8) - shiftAnd := byte(0xff >> (8 - offsetMod8)) + dFinal := make([]byte, lengthB) + + // first pass, apply offset + for i := range dFinal { + if i >= len(dUsed) { + break + } + left := dUsed[i] << shiftSize - var shifted byte - for i := range chunk { - j := len(chunk) - 1 - i - cur := d[j+leftBytes] - chunk[j] = (cur << offsetMod8) | shifted - shifted = shiftAnd & cur + dFinal[i] = left + if i+1 >= len(dUsed) { + break + } + right := dUsed[i+1] >> (8 - shiftSize) + dFinal[i] |= right + } + + // final pass + if shift { + dFinal[len(dFinal)-1] >>= (8 - shiftRightSize) % 8 + } else { + dFinal[len(dFinal)-1] &= (0xFF << ((8 - shiftRightSize) % 8)) } - last := len(chunk) - 1 - shiftAndLast := byte(0xff << ((8 - ((offset + length) % 8)) % 8)) - chunk[last] = chunk[last] & shiftAndLast - return chunk + + return dFinal } func IsUInt(k reflect.Kind) bool { @@ -67,41 +105,25 @@ func IsInt(k reflect.Kind) bool { return k == reflect.Int8 || k == reflect.Int16 || k == reflect.Int32 || k == reflect.Int64 } -// Structure to help the MapCustom functions -// populate the protobuf data -type MapConfigBase struct { - // Used if the field inside the protobuf exists - // also serves as the field when rendering with text - Destination string - Endianness EndianType - - // The following fields are used for mapping - // when the destination field does not exist - // inside the protobuf - ProtoIndex int32 - ProtoType ProtoType - ProtoArray bool -} - -func MapCustomNetFlow(flowMessage *ProtoProducerMessage, df netflow.DataField, mapper *NetFlowMapper) error { +func MapCustomNetFlow(flowMessage *ProtoProducerMessage, df netflow.DataField, mapper TemplateMapper) error { if mapper == nil { return nil } mapped, ok := mapper.Map(df) if ok { v := df.Value.([]byte) - if err := MapCustom(flowMessage, v, mapped.MapConfigBase); err != nil { + if err := MapCustom(flowMessage, v, mapped); err != nil { return err } } return nil } -func MapCustom(flowMessage *ProtoProducerMessage, v []byte, cfg MapConfigBase) error { +func MapCustom(flowMessage *ProtoProducerMessage, v []byte, cfg MappableField) error { vfm := reflect.ValueOf(flowMessage) vfm = reflect.Indirect(vfm) - fieldValue := vfm.FieldByName(cfg.Destination) + fieldValue := vfm.FieldByName(cfg.GetDestination()) if fieldValue.IsValid() { typeDest := fieldValue.Type() @@ -115,7 +137,7 @@ func MapCustom(flowMessage *ProtoProducerMessage, v []byte, cfg MapConfigBase) e item := reflect.New(typeDest.Elem()) if IsUInt(typeDest.Elem().Kind()) { - if cfg.Endianness == LittleEndian { + if cfg.GetEndianness() == LittleEndian { if err := DecodeUNumberLE(v, item.Interface()); err != nil { return err } @@ -125,7 +147,7 @@ func MapCustom(flowMessage *ProtoProducerMessage, v []byte, cfg MapConfigBase) e } } } else if IsInt(typeDest.Elem().Kind()) { - if cfg.Endianness == LittleEndian { + if cfg.GetEndianness() == LittleEndian { if err := DecodeNumberLE(v, item.Interface()); err != nil { return err } @@ -143,7 +165,7 @@ func MapCustom(flowMessage *ProtoProducerMessage, v []byte, cfg MapConfigBase) e } else if fieldValueAddr.IsValid() { if IsUInt(typeDest.Kind()) { - if cfg.Endianness == LittleEndian { + if cfg.GetEndianness() == LittleEndian { if err := DecodeUNumberLE(v, fieldValueAddr.Interface()); err != nil { return err } @@ -153,7 +175,7 @@ func MapCustom(flowMessage *ProtoProducerMessage, v []byte, cfg MapConfigBase) e } } } else if IsInt(typeDest.Kind()) { - if cfg.Endianness == LittleEndian { + if cfg.GetEndianness() == LittleEndian { if err := DecodeNumberLE(v, fieldValueAddr.Interface()); err != nil { return err } @@ -165,17 +187,17 @@ func MapCustom(flowMessage *ProtoProducerMessage, v []byte, cfg MapConfigBase) e } } - } else if cfg.ProtoIndex > 0 { + } else if cfg.GetProtoIndex() > 0 { fmr := flowMessage.ProtoReflect() unk := fmr.GetUnknown() - if !cfg.ProtoArray { + if !cfg.IsArray() { var offset int for offset < len(unk) { num, _, length := protowire.ConsumeField(unk[offset:]) offset += length - if int32(num) == cfg.ProtoIndex { + if int32(num) == cfg.GetProtoIndex() { // only one allowed break } @@ -183,8 +205,8 @@ func MapCustom(flowMessage *ProtoProducerMessage, v []byte, cfg MapConfigBase) e } var dstVar uint64 - if cfg.ProtoType == ProtoVarint { - if cfg.Endianness == LittleEndian { + if cfg.GetProtoType() == ProtoVarint { + if cfg.GetEndianness() == LittleEndian { if err := DecodeUNumberLE(v, &dstVar); err != nil { return err } @@ -194,10 +216,10 @@ func MapCustom(flowMessage *ProtoProducerMessage, v []byte, cfg MapConfigBase) e } } // support signed int? - unk = protowire.AppendTag(unk, protowire.Number(cfg.ProtoIndex), protowire.VarintType) + unk = protowire.AppendTag(unk, protowire.Number(cfg.GetProtoIndex()), protowire.VarintType) unk = protowire.AppendVarint(unk, dstVar) - } else if cfg.ProtoType == ProtoString { - unk = protowire.AppendTag(unk, protowire.Number(cfg.ProtoIndex), protowire.BytesType) + } else if cfg.GetProtoType() == ProtoString { + unk = protowire.AppendTag(unk, protowire.Number(cfg.GetProtoIndex()), protowire.BytesType) unk = protowire.AppendString(unk, string(v)) } else { return fmt.Errorf("could not insert into protobuf unknown") diff --git a/producer/proto/reflect_test.go b/producer/proto/reflect_test.go new file mode 100644 index 00000000..dee8b286 --- /dev/null +++ b/producer/proto/reflect_test.go @@ -0,0 +1,80 @@ +package protoproducer + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetBytes(t *testing.T) { + d := []byte{0xAA, 0x55, 0xAB, 0x56} + + // Simple case + r := GetBytes(d, 16, 16, true) + assert.Equal(t, []byte{0xAB, 0x56}, r) + + r = GetBytes(d, 24, 8, true) + assert.Equal(t, []byte{0x56}, r) + + r = GetBytes(d, 24, 32, true) + assert.Equal(t, []byte{0x56, 0x00, 0x00, 0x00}, r) + + // Trying to break + r = GetBytes(d, 32, 0, true) + assert.Nil(t, r) + + r = GetBytes(d, 32, 16, true) + assert.Equal(t, []byte{0x00, 0x00}, r) + + // Offset to shift + r = GetBytes(d, 4, 16, true) + assert.Equal(t, []byte{0xA5, 0x5A}, r) + + r = GetBytes(d, 4, 16, false) + assert.Equal(t, []byte{0xA5, 0x5A}, r) + + r = GetBytes(d, 4, 4, true) + assert.Equal(t, []byte{0x0A}, r) + + r = GetBytes(d, 4, 4, false) + assert.Equal(t, []byte{0xA0}, r) + + r = GetBytes(d, 4, 6, true) + assert.Equal(t, []byte{0x29}, r) + + r = GetBytes(d, 4, 6, false) + assert.Equal(t, []byte{0xA4}, r) + + r = GetBytes(d, 20, 6, true) + assert.Equal(t, []byte{0x2D}, r) + + r = GetBytes(d, 20, 6, false) + assert.Equal(t, []byte{0xB4}, r) + + r = GetBytes(d, 5, 10, true) + assert.Equal(t, []byte{0x4A, 0x02}, r) + + // Trying to break + r = GetBytes(d, 30, 10, true) + assert.Equal(t, []byte{0x80, 0x00}, r) + + r = GetBytes(d, 30, 10, false) + assert.Equal(t, []byte{0x80, 0x00}, r) + + r = GetBytes(d, 30, 2, true) + assert.Equal(t, []byte{0x02}, r) + + r = GetBytes(d, 30, 2, false) + assert.Equal(t, []byte{0x80}, r) + + r = GetBytes(d, 32, 1, true) + assert.Equal(t, []byte{0}, r) + +} + +func BenchmarkGetBytes(b *testing.B) { + d := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + for i := 0; i < b.N; i++ { + GetBytes(d, 2, 10, false) + } +} diff --git a/producer/proto/render.go b/producer/proto/render.go index 28da75c7..c3e5cbcd 100644 --- a/producer/proto/render.go +++ b/producer/proto/render.go @@ -53,6 +53,8 @@ var ( "DstNet": NetworkRenderer, "icmp_name": ICMPRenderer, + + "Ipv6RoutingHeaderAddresses": IPRenderer, } etypeName = map[uint32]string{ diff --git a/transport/file/transport.go b/transport/file/transport.go index ab495555..9be431b7 100644 --- a/transport/file/transport.go +++ b/transport/file/transport.go @@ -71,7 +71,6 @@ func (d *FileDriver) Init() error { } } }() - } return nil } diff --git a/transport/kafka/kafka.go b/transport/kafka/kafka.go index 407a72cc..a96af035 100644 --- a/transport/kafka/kafka.go +++ b/transport/kafka/kafka.go @@ -47,7 +47,7 @@ func (e *KafkaTransportError) Error() string { return fmt.Sprintf("kafka transport %s", e.Err.Error()) } func (e *KafkaTransportError) Unwrap() []error { - return []error{transport.ErrorTransport, e.Err} + return []error{transport.ErrTransport, e.Err} } type KafkaSASLAlgorithm string diff --git a/transport/transport.go b/transport/transport.go index c862d659..1ae49c78 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -9,7 +9,7 @@ var ( transportDrivers = make(map[string]TransportDriver) lock = &sync.RWMutex{} - ErrorTransport = fmt.Errorf("transport error") + ErrTransport = fmt.Errorf("transport error") ) type DriverTransportError struct { @@ -22,7 +22,7 @@ func (e *DriverTransportError) Error() string { } func (e *DriverTransportError) Unwrap() []error { - return []error{ErrorTransport, e.Err} + return []error{ErrTransport, e.Err} } type TransportDriver interface { @@ -70,7 +70,7 @@ func FindTransport(name string) (*Transport, error) { t, ok := transportDrivers[name] lock.RUnlock() if !ok { - return nil, fmt.Errorf("%w %s not found", ErrorTransport, name) + return nil, fmt.Errorf("%w %s not found", ErrTransport, name) } err := t.Init() diff --git a/utils/debug/decoder.go b/utils/debug/decoder.go index 3c61f42e..691a1eef 100644 --- a/utils/debug/decoder.go +++ b/utils/debug/decoder.go @@ -8,10 +8,8 @@ import ( func PanicDecoderWrapper(wrapped utils.DecoderFunc) utils.DecoderFunc { return func(msg interface{}) (err error) { - defer func() { if pErr := recover(); pErr != nil { - pErrC, _ := pErr.(string) err = &PanicErrorMessage{Msg: msg, Inner: pErrC, Stacktrace: debug.Stack()} } diff --git a/utils/debug/producer.go b/utils/debug/producer.go index c185178a..b69fe992 100644 --- a/utils/debug/producer.go +++ b/utils/debug/producer.go @@ -14,7 +14,6 @@ func (p *PanicProducerWrapper) Produce(msg interface{}, args *producer.ProduceAr defer func() { if pErr := recover(); pErr != nil { - pErrC, _ := pErr.(string) err = &PanicErrorMessage{Msg: msg, Inner: pErrC, Stacktrace: debug.Stack()} }