Skip to content

Commit

Permalink
[Filebeat] kafka v2 using parsers (#27335) (#27760)
Browse files Browse the repository at this point in the history
* using input v2
* implemented reader and parsers

(cherry picked from commit 20d6038)

Co-authored-by: Michael Bischoff <[email protected]>
  • Loading branch information
mergify[bot] and mjmbischoff authored Sep 6, 2021
1 parent 56c9502 commit 3e7338f
Show file tree
Hide file tree
Showing 7 changed files with 599 additions and 282 deletions.
65 changes: 65 additions & 0 deletions filebeat/docs/inputs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,71 @@ Configuration options for Kerberos authentication.

See <<configuration-kerberos>> for more information.

[float]
===== `parsers`

This option expects a list of parsers that the payload has to go through.

Available parsers:

* `ndjson`
* `multiline`

[float]
===== `ndjson`

These options make it possible for {beatname_uc} to decode the payload as
JSON messages.

Example configuration:

[source,yaml]
----
- ndjson:
keys_under_root: true
add_error_key: true
message_key: log
----

*`keys_under_root`*:: By default, the decoded JSON is placed under a "json" key
in the output document. If you enable this setting, the keys are copied top
level in the output document. The default is false.

*`overwrite_keys`*:: If `keys_under_root` and this setting are enabled, then the
values from the decoded JSON object overwrite the fields that {beatname_uc}
normally adds (type, source, offset, etc.) in case of conflicts.

*`expand_keys`*:: If this setting is enabled, {beatname_uc} will recursively
de-dot keys in the decoded JSON, and expand them into a hierarchical object
structure. For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`.
This setting should be enabled when the input is produced by an
https://github.com/elastic/ecs-logging[ECS logger].

*`add_error_key`*:: If this setting is enabled, {beatname_uc} adds an
"error.message" and "error.type: json" key in case of JSON unmarshalling errors
or when a `message_key` is defined in the configuration but cannot be used.

*`message_key`*:: An optional configuration setting that specifies a JSON key on
which to apply the line filtering and multiline settings. If specified the key
must be at the top level in the JSON object and the value associated with the
key must be a string, otherwise no filtering or multiline aggregation will
occur.

*`document_id`*:: Option configuration setting that specifies the JSON key to
set the document id. If configured, the field will be removed from the original
JSON document and stored in `@metadata._id`

*`ignore_decoding_error`*:: An optional configuration setting that specifies if
JSON decoding errors should be logged or not. If set to true, errors will not
be logged. The default is false.

[float]
===== `multiline`

Options that control how {beatname_uc} deals with log messages that span
multiple lines. See <<multiline-examples>> for more information about
configuring multiline options.

[id="{beatname_lc}-input-{type}-common-options"]
include::../inputs/input-common-options.asciidoc[]

Expand Down
1 change: 0 additions & 1 deletion filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions filebeat/input/default-inputs/inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package inputs
import (
"github.com/elastic/beats/v7/filebeat/beater"
"github.com/elastic/beats/v7/filebeat/input/filestream"
"github.com/elastic/beats/v7/filebeat/input/kafka"
"github.com/elastic/beats/v7/filebeat/input/unix"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -36,6 +37,7 @@ func Init(info beat.Info, log *logp.Logger, components beater.StateStore) []v2.P
func genericInputs(log *logp.Logger, components beater.StateStore) []v2.Plugin {
return []v2.Plugin{
filestream.Plugin(log, components),
kafka.Plugin(),
unix.Plugin(),
}
}
4 changes: 2 additions & 2 deletions filebeat/input/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
"github.com/elastic/beats/v7/libbeat/common/kafka"
"github.com/elastic/beats/v7/libbeat/common/transport/kerberos"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/monitoring/adapter"
"github.com/elastic/beats/v7/libbeat/reader/parser"
)

type kafkaInputConfig struct {
Expand All @@ -53,6 +53,7 @@ type kafkaInputConfig struct {
Username string `config:"username"`
Password string `config:"password"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
Parsers parser.Config `config:",inline"`
}

type kafkaFetch struct {
Expand Down Expand Up @@ -215,7 +216,6 @@ func newSaramaConfig(config kafkaInputConfig) (*sarama.Config, error) {
)

if err := k.Validate(); err != nil {
logp.Err("Invalid kafka configuration: %v", err)
return nil, err
}
return k, nil
Expand Down
Loading

0 comments on commit 3e7338f

Please sign in to comment.