diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index dbee2be60b60..f68d2ffca926 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -215,6 +215,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Update Logstash module's Grok patterns to support Logstash 7.4 logs. {pull}14743[14743] - Fix a problem in Filebeat input httpjson where interval is not used as time.Duration. {issue}14752[14752] {pull}14753[14753] - Fix SSL config in input.yml for Filebeat httpjson input in the MISP module. {pull}14767[14767] +- Fix session reset detection and a crash in Netflow input. {pull}14904[14904] *Heartbeat* diff --git a/filebeat/docs/modules/netflow.asciidoc b/filebeat/docs/modules/netflow.asciidoc index 934c6e456d08..f23088c70feb 100644 --- a/filebeat/docs/modules/netflow.asciidoc +++ b/filebeat/docs/modules/netflow.asciidoc @@ -68,6 +68,10 @@ These allow to update the NetFlow/IPFIX fields with vendor extensions and to override existing fields. See <> for details. +`var.detect_sequence_reset`:: Flag controlling whether {beatname_uc} should +monitor sequence numbers in the Netflow packets to detect an Exporting Process +reset. See <> for details. + :has-dashboards!: :fileset_ex!: diff --git a/x-pack/filebeat/docs/inputs/input-netflow.asciidoc b/x-pack/filebeat/docs/inputs/input-netflow.asciidoc index 4525ce3cdf40..840ad70ec053 100644 --- a/x-pack/filebeat/docs/inputs/input-netflow.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-netflow.asciidoc @@ -29,6 +29,7 @@ Example configuration: queue_size: 8192 custom_definitions: - path/to/fields.yml + detect_sequence_reset: true ---- @@ -107,6 +108,18 @@ IPFIX PEN zero are equivalent to changes to NetFlow fields. Overriding the names and/or types of standard fields can prevent mapping of ECS fields to function properly. +[float] +[[detect_sequence_reset]] +==== `detect_sequence_reset` + +Flag controlling whether {beatname_uc} should monitor sequence numbers in the +Netflow packets to detect an Exporting Process reset. When this condition is +detected, record templates for the given exporter will be dropped. This will +cause flow loss until the exporter provides new templates. If set to `false`, +{beatname_uc} will ignore sequence numbers, which can cause some invalid flows +if the exporter process is reset. This option is only applicable to Netflow V9 +and IPFIX. Default is `true`. + [id="{beatname_lc}-input-{type}-common-options"] include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[] diff --git a/x-pack/filebeat/input/netflow/config.go b/x-pack/filebeat/input/netflow/config.go index e6250dd27dbb..a73897653445 100644 --- a/x-pack/filebeat/input/netflow/config.go +++ b/x-pack/filebeat/input/netflow/config.go @@ -20,6 +20,7 @@ type config struct { ExpirationTimeout time.Duration `config:"expiration_timeout"` PacketQueueSize int `config:"queue_size"` CustomDefinitions []string `config:"custom_definitions"` + DetectSequenceReset bool `config:"detect_sequence_reset"` } var defaultConfig = config{ @@ -31,7 +32,8 @@ var defaultConfig = config{ ForwarderConfig: harvester.ForwarderConfig{ Type: inputName, }, - Protocols: []string{"v5", "v9", "ipfix"}, - ExpirationTimeout: time.Minute * 30, - PacketQueueSize: 8192, + Protocols: []string{"v5", "v9", "ipfix"}, + ExpirationTimeout: time.Minute * 30, + PacketQueueSize: 8192, + DetectSequenceReset: true, } diff --git a/x-pack/filebeat/input/netflow/decoder/v9/session.go b/x-pack/filebeat/input/netflow/decoder/v9/session.go index a9f164213d06..734524cc2442 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/session.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/session.go @@ -101,16 +101,21 @@ func (s *SessionState) ExpireTemplates() (alive int, removed int) { // CheckReset returns if the session must be reset after the receipt of the // given sequence number. -func (s *SessionState) CheckReset(seqNum uint32) (reset bool) { +func (s *SessionState) CheckReset(seqNum uint32) (prev uint32, reset bool) { s.mutex.Lock() defer s.mutex.Unlock() - if reset = seqNum < s.lastSequence && seqNum-s.lastSequence > MaxSequenceDifference; reset { + prev = s.lastSequence + if reset = !isValidSequence(prev, seqNum); reset { s.Templates = make(map[TemplateKey]*TemplateWrapper) } s.lastSequence = seqNum return } +func isValidSequence(current, next uint32) bool { + return next-current < MaxSequenceDifference || current-next < MaxSequenceDifference +} + // SessionMap manages all the sessions for a collector. type SessionMap struct { mutex sync.RWMutex diff --git a/x-pack/filebeat/input/netflow/decoder/v9/session_test.go b/x-pack/filebeat/input/netflow/decoder/v9/session_test.go index 1883e90972d1..6a7a999087c4 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/session_test.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/session_test.go @@ -7,6 +7,7 @@ package v9 import ( "io/ioutil" "log" + "math" "sync" "testing" "time" @@ -224,3 +225,50 @@ func TestTemplateExpiration(t *testing.T) { assert.Nil(t, s.GetTemplate(256)) } + +func TestSessionCheckReset(t *testing.T) { + for _, testCase := range []struct { + title string + current, next uint32 + reset bool + }{ + { + title: "Regular advance", + current: 12345, + next: 12385, + reset: false, + }, + { + title: "Out of order packet", + current: 12388, + next: 12345, + reset: false, + }, + { + title: "Actual reset", + current: 12345, + next: 9, + reset: true, + }, + { + title: "32-bit Wrap around", + current: math.MaxUint32, + next: 9, + reset: false, + }, + { + title: "Non-sequential stream", + current: 12345, + next: 78910, + reset: true, + }, + } { + t.Run(testCase.title, func(t *testing.T) { + s := NewSession(logger) + s.lastSequence = testCase.current + prev, isReset := s.CheckReset(testCase.next) + assert.Equal(t, prev, testCase.current) + assert.Equal(t, testCase.reset, isReset) + }) + } +} diff --git a/x-pack/filebeat/input/netflow/decoder/v9/v9.go b/x-pack/filebeat/input/netflow/decoder/v9/v9.go index 1a8ca59edd59..edfe65db21d8 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/v9.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/v9.go @@ -21,7 +21,7 @@ const ( ProtocolName = "v9" LogPrefix = "[netflow-v9] " ProtocolID uint16 = 9 - MaxSequenceDifference = 100 + MaxSequenceDifference = 1000 ) type NetflowV9Protocol struct { @@ -83,8 +83,10 @@ func (p *NetflowV9Protocol) OnPacket(buf *bytes.Buffer, source net.Addr) (flows remote := source.String() p.logger.Printf("Packet from:%s src:%d seq:%d", remote, header.SourceID, header.SequenceNo) - if p.detectReset && session.CheckReset(header.SequenceNo) { - p.logger.Printf("Session %s reset (sequence=%d last=%d)", remote, header.SequenceNo, session.lastSequence) + if p.detectReset { + if prev, reset := session.CheckReset(header.SequenceNo); reset { + p.logger.Printf("Session %s reset (sequence=%d last=%d)", remote, header.SequenceNo, prev) + } } for ; numFlowSets > 0; numFlowSets-- { diff --git a/x-pack/filebeat/input/netflow/input.go b/x-pack/filebeat/input/netflow/input.go index 636f56ef666f..77e1ac8fb071 100644 --- a/x-pack/filebeat/input/netflow/input.go +++ b/x-pack/filebeat/input/netflow/input.go @@ -65,12 +65,15 @@ func init() { // An adapter so that logp.Logger can be used as a log.Logger. type logDebugWrapper struct { + sync.Mutex Logger *logp.Logger buf []byte } // Write writes messages to the log. func (w *logDebugWrapper) Write(p []byte) (n int, err error) { + w.Lock() + defer w.Unlock() n = len(p) w.buf = append(w.buf, p...) for endl := bytes.IndexByte(w.buf, '\n'); endl != -1; endl = bytes.IndexByte(w.buf, '\n') { @@ -116,7 +119,8 @@ func NewInput( WithProtocols(config.Protocols...). WithExpiration(config.ExpirationTimeout). WithLogOutput(&logDebugWrapper{Logger: logger}). - WithCustomFields(customFields...)) + WithCustomFields(customFields...). + WithSequenceResetEnabled(config.DetectSequenceReset)) if err != nil { return nil, errors.Wrapf(err, "error initializing netflow decoder") } diff --git a/x-pack/filebeat/module/netflow/_meta/docs.asciidoc b/x-pack/filebeat/module/netflow/_meta/docs.asciidoc index abb548233e4e..c92f6de7c160 100644 --- a/x-pack/filebeat/module/netflow/_meta/docs.asciidoc +++ b/x-pack/filebeat/module/netflow/_meta/docs.asciidoc @@ -63,6 +63,10 @@ These allow to update the NetFlow/IPFIX fields with vendor extensions and to override existing fields. See <> for details. +`var.detect_sequence_reset`:: Flag controlling whether {beatname_uc} should +monitor sequence numbers in the Netflow packets to detect an Exporting Process +reset. See <> for details. + :has-dashboards!: :fileset_ex!: diff --git a/x-pack/filebeat/module/netflow/log/config/netflow.yml b/x-pack/filebeat/module/netflow/log/config/netflow.yml index e49564547072..460bd498a344 100644 --- a/x-pack/filebeat/module/netflow/log/config/netflow.yml +++ b/x-pack/filebeat/module/netflow/log/config/netflow.yml @@ -19,3 +19,7 @@ custom_definitions: - '{{ . }}' {{end}} {{end}} + +{{ if .detect_sequence_reset}} +detect_sequence_reset: {{.detect_sequence_reset}} +{{end}} diff --git a/x-pack/filebeat/module/netflow/log/manifest.yml b/x-pack/filebeat/module/netflow/log/manifest.yml index f263b485bb2a..8e1e1b727399 100644 --- a/x-pack/filebeat/module/netflow/log/manifest.yml +++ b/x-pack/filebeat/module/netflow/log/manifest.yml @@ -14,7 +14,7 @@ var: - name: read_buffer - name: timeout - name: custom_definitions - + - name: detect_sequence_reset ingest_pipeline: ingest/pipeline.yml input: config/netflow.yml