Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Netflow input: Improve session reset detection and allow disabling #14904

Merged
merged 5 commits into from
Jan 7, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update Logstash module's Grok patterns to support Logstash 7.4 logs. {pull}14743[14743]
- Fix a problem in Filebeat input httpjson where interval is not used as time.Duration. {issue}14752[14752] {pull}14753[14753]
- Fix SSL config in input.yml for Filebeat httpjson input in the MISP module. {pull}14767[14767]
- Fix session reset detection and a crash in Netflow input {pull}14904[14904]

*Heartbeat*

Expand Down
4 changes: 4 additions & 0 deletions filebeat/docs/modules/netflow.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ These allow to update the NetFlow/IPFIX fields with vendor extensions and to
override existing fields. See <<filebeat-input-netflow,netflow input>> for
details.

`var.detect_sequence_reset`:: Flag controlling whether {beatname_uc} should
monitor sequence numbers in the Netflow packets to detect an Exporting Process
reset. See <<filebeat-input-netflow,netflow input>> for details.

:has-dashboards!:

:fileset_ex!:
Expand Down
13 changes: 13 additions & 0 deletions x-pack/filebeat/docs/inputs/input-netflow.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Example configuration:
queue_size: 8192
custom_definitions:
- path/to/fields.yml
detect_sequence_reset: true
----


Expand Down Expand Up @@ -107,6 +108,18 @@ IPFIX PEN zero are equivalent to changes to NetFlow fields.
Overriding the names and/or types of standard fields can prevent
mapping of ECS fields to function properly.

[float]
[[detect_sequence_reset]]
==== `detect_sequence_reset`

Flag controlling whether {beatname_uc} should monitor sequence numbers in the
Netflow packets to detect an Exporting Process reset. When this condition is
detected, record templates for the given exporter will be dropped. This will
cause flow loss until the exporter provides new templates. If set to `false`,
{beatname_uc} will ignore sequence numbers, which can cause some invalid flows
if the exporter process is reset. This option is only applicable to Netflow V9
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So an invalid flow could occur if the template changes after the Exporter Process reset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, yes. In practice, I understand the exporter will first send the new templates after a reset as it doesn't make any sense to start sending flows before sending templates for them.

and IPFIX. Default is `true`.

[id="{beatname_lc}-input-{type}-common-options"]
include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[]

Expand Down
8 changes: 5 additions & 3 deletions x-pack/filebeat/input/netflow/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type config struct {
ExpirationTimeout time.Duration `config:"expiration_timeout"`
PacketQueueSize int `config:"queue_size"`
CustomDefinitions []string `config:"custom_definitions"`
DetectSequenceReset bool `config:"detect_sequence_reset"`
}

var defaultConfig = config{
Expand All @@ -31,7 +32,8 @@ var defaultConfig = config{
ForwarderConfig: harvester.ForwarderConfig{
Type: inputName,
},
Protocols: []string{"v5", "v9", "ipfix"},
ExpirationTimeout: time.Minute * 30,
PacketQueueSize: 8192,
Protocols: []string{"v5", "v9", "ipfix"},
ExpirationTimeout: time.Minute * 30,
PacketQueueSize: 8192,
DetectSequenceReset: true,
}
9 changes: 7 additions & 2 deletions x-pack/filebeat/input/netflow/decoder/v9/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,21 @@ func (s *SessionState) ExpireTemplates() (alive int, removed int) {

// CheckReset returns if the session must be reset after the receipt of the
// given sequence number.
func (s *SessionState) CheckReset(seqNum uint32) (reset bool) {
func (s *SessionState) CheckReset(seqNum uint32) (prev uint32, reset bool) {
s.mutex.Lock()
defer s.mutex.Unlock()
if reset = seqNum < s.lastSequence && seqNum-s.lastSequence > MaxSequenceDifference; reset {
prev = s.lastSequence
if reset = !isValidSequence(prev, seqNum); reset {
s.Templates = make(map[TemplateKey]*TemplateWrapper)
}
s.lastSequence = seqNum
return
}

func isValidSequence(current, next uint32) bool {
return next-current < MaxSequenceDifference || current-next < MaxSequenceDifference
}

// SessionMap manages all the sessions for a collector.
type SessionMap struct {
mutex sync.RWMutex
Expand Down
48 changes: 48 additions & 0 deletions x-pack/filebeat/input/netflow/decoder/v9/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package v9
import (
"io/ioutil"
"log"
"math"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -224,3 +225,50 @@ func TestTemplateExpiration(t *testing.T) {

assert.Nil(t, s.GetTemplate(256))
}

func TestSessionCheckReset(t *testing.T) {
for _, testCase := range []struct {
title string
current, next uint32
reset bool
}{
{
title: "Regular advance",
current: 12345,
next: 12385,
reset: false,
},
{
title: "Out of order packet",
current: 12388,
next: 12345,
reset: false,
},
{
title: "Actual reset",
current: 12345,
next: 9,
reset: true,
},
{
title: "32-bit Wrap around",
current: math.MaxUint32,
next: 9,
reset: false,
},
{
title: "Non-sequential stream",
current: 12345,
next: 78910,
reset: true,
},
} {
t.Run(testCase.title, func(t *testing.T) {
s := NewSession(logger)
s.lastSequence = testCase.current
prev, isReset := s.CheckReset(testCase.next)
assert.Equal(t, prev, testCase.current)
assert.Equal(t, testCase.reset, isReset)
})
}
}
8 changes: 5 additions & 3 deletions x-pack/filebeat/input/netflow/decoder/v9/v9.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
ProtocolName = "v9"
LogPrefix = "[netflow-v9] "
ProtocolID uint16 = 9
MaxSequenceDifference = 100
MaxSequenceDifference = 1000
)

type NetflowV9Protocol struct {
Expand Down Expand Up @@ -83,8 +83,10 @@ func (p *NetflowV9Protocol) OnPacket(buf *bytes.Buffer, source net.Addr) (flows
remote := source.String()

p.logger.Printf("Packet from:%s src:%d seq:%d", remote, header.SourceID, header.SequenceNo)
if p.detectReset && session.CheckReset(header.SequenceNo) {
p.logger.Printf("Session %s reset (sequence=%d last=%d)", remote, header.SequenceNo, session.lastSequence)
if p.detectReset {
if prev, reset := session.CheckReset(header.SequenceNo); reset {
p.logger.Printf("Session %s reset (sequence=%d last=%d)", remote, header.SequenceNo, prev)
}
}

for ; numFlowSets > 0; numFlowSets-- {
Expand Down
6 changes: 5 additions & 1 deletion x-pack/filebeat/input/netflow/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,15 @@ func init() {

// An adapter so that logp.Logger can be used as a log.Logger.
type logDebugWrapper struct {
sync.Mutex
Logger *logp.Logger
buf []byte
}

// Write writes messages to the log.
func (w *logDebugWrapper) Write(p []byte) (n int, err error) {
w.Lock()
defer w.Unlock()
n = len(p)
w.buf = append(w.buf, p...)
for endl := bytes.IndexByte(w.buf, '\n'); endl != -1; endl = bytes.IndexByte(w.buf, '\n') {
Expand Down Expand Up @@ -116,7 +119,8 @@ func NewInput(
WithProtocols(config.Protocols...).
WithExpiration(config.ExpirationTimeout).
WithLogOutput(&logDebugWrapper{Logger: logger}).
WithCustomFields(customFields...))
WithCustomFields(customFields...).
WithSequenceResetEnabled(config.DetectSequenceReset))
if err != nil {
return nil, errors.Wrapf(err, "error initializing netflow decoder")
}
Expand Down
4 changes: 4 additions & 0 deletions x-pack/filebeat/module/netflow/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ These allow to update the NetFlow/IPFIX fields with vendor extensions and to
override existing fields. See <<filebeat-input-netflow,netflow input>> for
details.

`var.detect_sequence_reset`:: Flag controlling whether {beatname_uc} should
monitor sequence numbers in the Netflow packets to detect an Exporting Process
reset. See <<filebeat-input-netflow,netflow input>> for details.

:has-dashboards!:

:fileset_ex!:
Expand Down
4 changes: 4 additions & 0 deletions x-pack/filebeat/module/netflow/log/config/netflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ custom_definitions:
- '{{ . }}'
{{end}}
{{end}}

{{ if .detect_sequence_reset}}
detect_sequence_reset: {{.detect_sequence_reset}}
{{end}}
2 changes: 1 addition & 1 deletion x-pack/filebeat/module/netflow/log/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var:
- name: read_buffer
- name: timeout
- name: custom_definitions

- name: detect_sequence_reset
ingest_pipeline: ingest/pipeline.yml
input: config/netflow.yml

Expand Down