Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(decoders.sflow): Add decoding of drop packets #337

Merged
merged 4 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions decoders/sflow/datastructure.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ type ExtendedGateway struct {
LocalPref uint32 `json:"local-pref"`
}

type EgressQueue struct {
Queue uint32 `json:"queue"`
}

type ExtendedACL struct {
Number uint32 `json:"number"`
Name string `json:"name"`
Direction uint32 `json:"direction"` // 0:unknown, 1:ingress, 2:egress
}

type ExtendedFunction struct {
Symbol string `json:"symbol"`
}

type IfCounters struct {
IfIndex uint32 `json:"if-index"`
IfType uint32 `json:"if-type"`
Expand Down
12 changes: 12 additions & 0 deletions decoders/sflow/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ type ExpandedFlowSample struct {
Records []FlowRecord `json:"records"`
}

// DropSample data structure according to https://sflow.org/sflow_drops.txt
type DropSample struct {
Header SampleHeader `json:"header"`

Drops uint32 `json:"drops"`
Input uint32 `json:"input"`
Output uint32 `json:"output"`
Reason uint32 `json:"reason"`
FlowRecordsCount uint32 `json:"flow-records-count"`
Records []FlowRecord `json:"records"`
}

type RecordHeader struct {
DataFormat uint32 `json:"data-format"`
Length uint32 `json:"length"`
Expand Down
52 changes: 50 additions & 2 deletions decoders/sflow/sflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
SAMPLE_FORMAT_COUNTER = 2
SAMPLE_FORMAT_EXPANDED_FLOW = 3
SAMPLE_FORMAT_EXPANDED_COUNTER = 4
SAMPLE_FORMAT_DROP = 5
)

// Opaque flow_data types according to https://sflow.org/SFLOW-STRUCTS5.txt
Expand All @@ -33,6 +34,11 @@ const (
FLOW_TYPE_EXT_MPLS_FEC = 1010
FLOW_TYPE_EXT_MPLS_LVP_FEC = 1011
FLOW_TYPE_EXT_VLAN_TUNNEL = 1012

// According to https://sflow.org/sflow_drops.txt
FLOW_TYPE_EGRESS_QUEUE = 1036
FLOW_TYPE_EXT_ACL = 1037
FLOW_TYPE_EXT_FUNCTION = 1038
)

// Opaque counter_data types according to https://sflow.org/SFLOW-STRUCTS5.txt
Expand Down Expand Up @@ -319,6 +325,24 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord,
extendedGateway.Communities = communities

flowRecord.Data = extendedGateway
case FLOW_TYPE_EGRESS_QUEUE:
var queue EgressQueue
if err := utils.BinaryDecoder(payload, &queue.Queue); err != nil {
return flowRecord, &RecordError{header.DataFormat, err}
}
flowRecord.Data = queue
case FLOW_TYPE_EXT_ACL:
var acl ExtendedACL
if err := utils.BinaryDecoder(payload, &acl.Number, &acl.Name, &acl.Direction); err != nil {
return flowRecord, &RecordError{header.DataFormat, err}
}
flowRecord.Data = acl
case FLOW_TYPE_EXT_FUNCTION:
var function ExtendedFunction
if err := utils.BinaryDecoder(payload, &function.Symbol); err != nil {
return flowRecord, &RecordError{header.DataFormat, err}
}
flowRecord.Data = function
default:
var rawRecord RawRecord
rawRecord.Data = payload.Bytes()
Expand All @@ -344,10 +368,9 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err
if err := utils.BinaryDecoder(payload, &sourceId); err != nil {
return sample, &FlowError{format, seq, fmt.Errorf("header source [%w]", err)}
}

header.SourceIdType = sourceId >> 24
header.SourceIdValue = sourceId & 0x00ffffff
case SAMPLE_FORMAT_EXPANDED_FLOW, SAMPLE_FORMAT_EXPANDED_COUNTER:
case SAMPLE_FORMAT_EXPANDED_FLOW, SAMPLE_FORMAT_EXPANDED_COUNTER, SAMPLE_FORMAT_DROP:
// Explicit data-source format
if err := utils.BinaryDecoder(payload,
&header.SourceIdType,
Expand All @@ -363,6 +386,8 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err
var flowSample FlowSample
var counterSample CounterSample
var expandedFlowSample ExpandedFlowSample
var dropSample DropSample

switch format {
case SAMPLE_FORMAT_FLOW:
flowSample.Header = *header
Expand Down Expand Up @@ -410,6 +435,23 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err
recordsCount = expandedFlowSample.FlowRecordsCount
expandedFlowSample.Records = make([]FlowRecord, recordsCount)
sample = expandedFlowSample
case SAMPLE_FORMAT_DROP:
dropSample.Header = *header
if err := utils.BinaryDecoder(payload,
&dropSample.Drops,
&dropSample.Input,
&dropSample.Output,
&dropSample.Reason,
&dropSample.FlowRecordsCount,
); err != nil {
return sample, &FlowError{format, seq, fmt.Errorf("raw [%w]", err)}
}
recordsCount = dropSample.FlowRecordsCount
if recordsCount > 1000 { // protection against ddos
return sample, &FlowError{format, seq, fmt.Errorf("too many flow records: %d", recordsCount)}
}
dropSample.Records = make([]FlowRecord, recordsCount) // max size of 1000 for protection
sample = dropSample
}
for i := 0; i < int(recordsCount) && payload.Len() >= 8; i++ {
recordHeader := RecordHeader{}
Expand Down Expand Up @@ -442,6 +484,12 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err
return sample, &FlowError{format, seq, fmt.Errorf("record [%w]", err)}
}
expandedFlowSample.Records[i] = record
case SAMPLE_FORMAT_DROP:
record, err := DecodeFlowRecord(&recordHeader, recordReader)
if err != nil {
return sample, &FlowError{format, seq, fmt.Errorf("record [%w]", err)}
}
dropSample.Records[i] = record
}
}
return sample, nil
Expand Down
62 changes: 62 additions & 0 deletions decoders/sflow/sflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,65 @@ func TestExpandedSFlowDecode(t *testing.T) {
var packet Packet
assert.NoError(t, DecodeMessageVersion(buf, &packet))
}

func TestSFlowDecodeDropEgressQueue(t *testing.T) {
data := []byte{
0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x01, 0xc0, 0xa8, 0x77, 0xb8, 0x00, 0x01, 0x86, 0xa0,
0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x30, 0x7e, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x05,
0x00, 0x00, 0x00, 0x2C, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x04, 0x0c, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x2a,
}

buf := bytes.NewBuffer(data)
var packet Packet
assert.NoError(t, DecodeMessageVersion(buf, &packet))
assert.Len(t, packet.Samples, 1)
assert.NotNil(t, packet.Samples[0])
sample, ok := packet.Samples[0].(DropSample)
assert.True(t, ok)
assert.Len(t, sample.Records, 1)
assert.Equal(t, EgressQueue{Queue: 42}, sample.Records[0].Data)
}

func TestSFlowDecodeDropExtendedACL(t *testing.T) {
data := []byte{
0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x01, 0xc0, 0xa8, 0x77, 0xb8, 0x00, 0x01, 0x86, 0xa0,
0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x30, 0x7e, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x05,
0x00, 0x00, 0x00, 0x38, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x04, 0x0d, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x2a,
0x00, 0x00, 0x00, 0x04, 0x66, 0x6f, 0x6f, 0x21, 0x00, 0x00, 0x00, 0x02,
}

buf := bytes.NewBuffer(data)
var packet Packet
assert.NoError(t, DecodeMessageVersion(buf, &packet))
assert.Len(t, packet.Samples, 1)
assert.NotNil(t, packet.Samples[0])
sample, ok := packet.Samples[0].(DropSample)
assert.True(t, ok)
assert.Len(t, sample.Records, 1)
assert.Equal(t, ExtendedACL{Number: 42, Name: "foo!", Direction: 2}, sample.Records[0].Data)
}

func TestSFlowDecodeDropExtendedFunction(t *testing.T) {
data := []byte{
0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x01, 0xc0, 0xa8, 0x77, 0xb8, 0x00, 0x01, 0x86, 0xa0,
0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x30, 0x7e, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x05,
0x00, 0x00, 0x00, 0x32, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x04, 0x0e, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x06,
0x66, 0x6f, 0x6f, 0x62, 0x61, 0x72,
}

buf := bytes.NewBuffer(data)
var packet Packet
assert.NoError(t, DecodeMessageVersion(buf, &packet))
assert.Len(t, packet.Samples, 1)
assert.NotNil(t, packet.Samples[0])
sample, ok := packet.Samples[0].(DropSample)
assert.True(t, ok)
assert.Len(t, sample.Records, 1)
assert.Equal(t, ExtendedFunction{Symbol: "foobar"}, sample.Records[0].Data)
}
9 changes: 9 additions & 0 deletions decoders/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ func BinaryRead(payload BytesBuffer, order binary.ByteOrder, data any) error {
*data = int64(order.Uint64(bs))
case *uint64:
*data = order.Uint64(bs)
case *string:
strlen := int(order.Uint32(bs))
buf := payload.Next(strlen)
if len(buf) < strlen {
return io.ErrUnexpectedEOF
}
*data = string(buf)
case []bool:
for i, x := range bs { // Easier to loop over the input for 8-bit values.
data[i] = x != 0
Expand Down Expand Up @@ -121,6 +128,8 @@ func intDataSize(data any) int {
return 2 * len(data)
case int32, uint32, *int32, *uint32:
return 4
case *string: // return the length field
return 4
case []int32:
return 4 * len(data)
case []uint32:
Expand Down
Loading