diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7f09d9716f94..f45c222fb1d6 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -68,6 +68,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Fix states client support for output options. {pull}33405[33405] - Fix states client reloader under managed mode. {pull}33405[33405] - Fix bug where states.duration_ms was incorrect type. {pull}33563[33563] +- Fix handling of long UDP messages in UDP input. {issue}33836[33836] {pull}33837[33837] *Auditbeat* diff --git a/filebeat/input/tcp/input.go b/filebeat/input/tcp/input.go index ff60ba376de3..0486f7310afb 100644 --- a/filebeat/input/tcp/input.go +++ b/filebeat/input/tcp/input.go @@ -71,7 +71,7 @@ func NewInput( cb := func(data []byte, metadata inputsource.NetworkMetadata) { event := createEvent(data, metadata) - forwarder.Send(event) + _ = forwarder.Send(event) } splitFunc, err := streaming.SplitFunc(config.Framing, []byte(config.LineDelimiter)) @@ -128,15 +128,18 @@ func (p *Input) Wait() { } func createEvent(raw []byte, metadata inputsource.NetworkMetadata) beat.Event { - return beat.Event{ + evt := beat.Event{ Timestamp: time.Now(), Fields: mapstr.M{ "message": string(raw), - "log": mapstr.M{ - "source": mapstr.M{ - "address": metadata.RemoteAddr.String(), - }, - }, }, } + if metadata.RemoteAddr != nil { + evt.Fields["log"] = mapstr.M{ + "source": mapstr.M{ + "address": metadata.RemoteAddr.String(), + }, + } + } + return evt } diff --git a/filebeat/input/udp/input.go b/filebeat/input/udp/input.go index a38055876b95..1f6129f6c23d 100644 --- a/filebeat/input/udp/input.go +++ b/filebeat/input/udp/input.go @@ -65,20 +65,23 @@ func NewInput( forwarder := harvester.NewForwarder(out) callback := func(data []byte, metadata inputsource.NetworkMetadata) { - forwarder.Send(beat.Event{ + evt := beat.Event{ Timestamp: time.Now(), Meta: mapstr.M{ "truncated": metadata.Truncated, }, Fields: mapstr.M{ "message": string(data), - "log": mapstr.M{ - "source": mapstr.M{ - "address": metadata.RemoteAddr.String(), - }, - }, }, - }) + } + if metadata.RemoteAddr != nil { + evt.Fields["log"] = mapstr.M{ + "source": mapstr.M{ + "address": metadata.RemoteAddr.String(), + }, + } + } + _ = forwarder.Send(evt) } udp := udp.New(&config.Config, callback)