Skip to content

Commit

Permalink
add port parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
lspgn committed Aug 12, 2024
1 parent c5b2737 commit 4d6e0fd
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 41 deletions.
15 changes: 9 additions & 6 deletions cmd/goflow2/mapping.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,17 @@ netflowv9:
sflow:
ports:
- proto: "udp"
dstport: 6081
next: "geneve"
dir: "dst"
port: 3544
parser: "teredo-dst"
- proto: "udp"
dstport: 3544
next: "teredo"
dir: "both"
port: 4754
parser: "gre"
- proto: "udp"
dstport: 4754
next: "gre"
dir: "both"
port: 6081
parser: "geneve"
mapping:
- layer: "udp"
offset: 48
Expand Down
28 changes: 21 additions & 7 deletions producer/proto/config_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ type SFlowMapField struct {
}

type SFlowProtocolParse struct {
Proto string `yaml:"proto"`
DstPort uint16 `yaml:"dstport"`
Next string `yaml:"next"`
Proto string `yaml:"proto"`
Dir RegPortDir `yaml:"dir"`
Port uint16 `yaml:"port"`
Parser string `yaml:"parser"`
}

type SFlowProducerConfig struct {
Expand Down Expand Up @@ -199,10 +200,6 @@ func (m *SFlowMapper) Map(layer string) MapLayerIterator {
return &sflowMapperIterator{data: m.data[strings.ToLower(layer)], n: 0}
}

func (m *SFlowMapper) NextParser(proto string, srcPort, dstPort uint16) (parser string, err error) {
return "", nil
}

// Structure to help the MapCustom functions
// populate the protobuf data
type MapConfigBase struct {
Expand Down Expand Up @@ -276,6 +273,20 @@ func mapFieldsSFlow(fields []SFlowMapField) *SFlowMapper {
return &SFlowMapper{data: ret}
}

func mapPortsSFlow(ports []SFlowProtocolParse) error {
for _, port := range ports {
parser, ok := GetParser(port.Parser)
if !ok {
return fmt.Errorf("parser %s not found", port.Parser)
}
if err := RegisterPort(port.Proto, port.Dir, port.Port, parser); err != nil {
return err
}

}
return nil
}

func mapFieldsNetFlow(fields []NetFlowMapField) *NetFlowMapper {
ret := make(map[string]*DataMap)
for _, field := range fields {
Expand Down Expand Up @@ -451,6 +462,9 @@ func mapConfig(cfg *ProducerConfig) (*producerConfigMapped, error) {
newCfg.IPFIX = mapFieldsNetFlow(cfg.IPFIX.Mapping)
newCfg.NetFlowV9 = mapFieldsNetFlow(cfg.NetFlowV9.Mapping)
newCfg.SFlow = mapFieldsSFlow(cfg.SFlow.Mapping)
if err := mapPortsSFlow(cfg.SFlow.Ports); err != nil {
return nil, err
}
}
var err error
if newCfg.Formatter, err = mapFormat(cfg); err != nil {
Expand Down
116 changes: 89 additions & 27 deletions producer/proto/producer_packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@ import (
"sync"
)

type RegPortDir string

var (
PortDirSrc RegPortDir = "src"
PortDirDst RegPortDir = "dst"
PortDirBoth RegPortDir = "both"

errParserEmpty = fmt.Errorf("parser is nil")

parserNone = ParserInfo{
Expand Down Expand Up @@ -122,6 +128,22 @@ var (
12,
false,
}
parserTeredoDst = ParserInfo{
nil, //ParseTeredoDst,
"teredo-dst",
[]string{"teredo-dst", "teredo"},
40,
13,
false,
}
parserGeneve = ParserInfo{
nil, //ParseTeredoDst,
"geneve",
[]string{"geneve"},
40,
14,
false,
}

nameToParser = sync.Map{}
customEtype = sync.Map{}
Expand All @@ -143,21 +165,26 @@ func init() {
parserICMP.Parser = ParseICMP
parserICMPv6.Parser = ParseICMPv6
parserGRE.Parser = ParseGRE
parserTeredoDst.Parser = ParseTeredoDst
parserGeneve.Parser = ParseGeneve

// Load initial parsers by name
for _, p := range []*ParserInfo{
&parserEthernet,
&parser8021Q,
&parserMPLS,
&parserIPv4,
&parserIPv6,
&parserIPv6HeaderRouting,
&parserIPv6HeaderFragment,
&parserTCP,
&parserUDP,
&parserICMP,
&parserICMPv6,
&parserGRE} {
for _, p := range []ParserInfo{
parserEthernet,
parser8021Q,
parserMPLS,
parserIPv4,
parserIPv6,
parserIPv6HeaderRouting,
parserIPv6HeaderFragment,
parserTCP,
parserUDP,
parserICMP,
parserICMPv6,
parserGRE,
parserTeredoDst,
parserGeneve,
} {
nameToParser.Store(p.Name, p)
}

Expand All @@ -167,35 +194,46 @@ func init() {
func GetParser(name string) (info ParserInfo, ok bool) {
parser, ok := nameToParser.Load(name)
if ok {
return *parser.(*ParserInfo), ok
return parser.(ParserInfo), ok
}
return info, ok
}

// RegisterEtype adds or replace a parser used when decoding a protocol on top of layer 2 (eg: Ethernet).
func RegisterEtype(eType uint16, parser *ParserInfo) error {
if parser == nil {
func RegisterEtype(eType uint16, parser ParserInfo) error {
if parser.Parser == nil {
return errParserEmpty
}
customEtype.Store(eType, parser) // parser can be invoked to decode certain etypes
return nil
}

// RegisterProto adds or replace a parser used when decoding a protocol on top of layer 3 (eg: IP).
func RegisterProto(proto byte, parser *ParserInfo) error {
if parser == nil {
func RegisterProto(proto byte, parser ParserInfo) error {
if parser.Parser == nil {
return errParserEmpty
}
customProto.Store(proto, parser) // parser can be invoked to decode certain protocols
return nil
}

// RegisterPort adds or replace a parser used when decoding a protocol on top of layer 4 (eg: UDP). Port is used for source and destination
func RegisterPort(proto string, port uint16, parser *ParserInfo) error {
if parser == nil {
func RegisterPort(proto string, dir RegPortDir, port uint16, parser ParserInfo) error {
if parser.Parser == nil {
return errParserEmpty
}
customPort.Store(fmt.Sprintf("%s-%d", proto, port), parser) // parser can be invoked to decode certain encapsulated protocols
switch dir {
case PortDirBoth:
customPort.Store(fmt.Sprintf("%s-src-%d", proto, port), parser)
customPort.Store(fmt.Sprintf("%s-dst-%d", proto, port), parser)
case PortDirSrc:
customPort.Store(fmt.Sprintf("%s-src-%d", proto, port), parser)
case PortDirDst:
customPort.Store(fmt.Sprintf("%s-dst-%d", proto, port), parser)
default:
return fmt.Errorf("unknown direction %s", dir)
}

return nil
}

Expand Down Expand Up @@ -246,12 +284,14 @@ func innerNextParserEtype(etherType []byte) (ParserInfo, error) {

eType := uint16(etherType[0])<<8 | uint16(etherType[1])
if cParser, ok := customEtype.Load(eType); ok {
return *cParser.(*ParserInfo), nil
return cParser.(ParserInfo), nil
}

switch {
case eType == 0x199e:
return parserEthernet, nil // Transparent Ether Bridging (GRE)
case eType == 0x6558:
return parserEthernet, nil // Transparent Ether Bridging (Geneve)
case eType == 0x8847:
return parserMPLS, nil // MPLS
case eType == 0x8100:
Expand All @@ -274,7 +314,7 @@ func NextProtocolParser(proto byte) (ParserInfo, error) {

func innerNextProtocolParser(proto byte) (ParserInfo, error) {
if cParser, ok := customProto.Load(proto); ok {
return *cParser.(*ParserInfo), nil
return cParser.(ParserInfo), nil
}

switch {
Expand Down Expand Up @@ -316,11 +356,11 @@ func NextPortParser(proto string, srcPort, dstPort uint16) (ParserInfo, error) {
}

func innerNextPortParser(proto string, srcPort, dstPort uint16) (byte, ParserInfo, error) {
if cParser, ok := customPort.Load(fmt.Sprintf("%s-%d", proto, dstPort)); ok {
return 1, *cParser.(*ParserInfo), nil
if cParser, ok := customPort.Load(fmt.Sprintf("%s-dst-%d", proto, dstPort)); ok {
return 1, cParser.(ParserInfo), nil
}
if cParser, ok := customPort.Load(fmt.Sprintf("%s-%d", proto, srcPort)); ok {
return 2, *cParser.(*ParserInfo), nil
if cParser, ok := customPort.Load(fmt.Sprintf("%s-src-%d", proto, srcPort)); ok {
return 2, cParser.(ParserInfo), nil
}
return 0, parserNone, nil
}
Expand Down Expand Up @@ -702,6 +742,28 @@ func ParseGRE(flowMessage *ProtoProducerMessage, data []byte, pc ParseConfig) (r
return res, err
}

func ParseTeredoDst(flowMessage *ProtoProducerMessage, data []byte, pc ParseConfig) (res ParseResult, err error) {
// get next parser
res.NextParser = parserIPv6

return res, err
}

func ParseGeneve(flowMessage *ProtoProducerMessage, data []byte, pc ParseConfig) (res ParseResult, err error) {
if len(data) < 8 {
return res, nil
}

res.Size = int(data[0]&0x3f)*4 + 8

eType := data[2:4]

// get next parser
res.NextParser, err = NextParserEtype(eType)

return res, err
}

func ParseICMP(flowMessage *ProtoProducerMessage, data []byte, pc ParseConfig) (res ParseResult, err error) {
if len(data) < 2 {
return res, nil
Expand Down
2 changes: 1 addition & 1 deletion producer/proto/producer_packet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func TestProcessPacketMappingPort(t *testing.T) {

var domain []byte

RegisterPort("udp", 53, &ParserInfo{
RegisterPort("udp", PortDirDst, 53, ParserInfo{
Parser: func(flowMessage *ProtoProducerMessage, data []byte, pc ParseConfig) (res ParseResult, err error) {
domain = data[13 : 13+11]
flowMessage.AddLayer("Custom")
Expand Down

0 comments on commit 4d6e0fd

Please sign in to comment.