Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x](backport #27335) [Filebeat] kafka v2 using parsers #27760

Merged
merged 1 commit into from
Sep 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions filebeat/docs/inputs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,71 @@ Configuration options for Kerberos authentication.

See <<configuration-kerberos>> for more information.

[float]
===== `parsers`

This option expects a list of parsers that the payload has to go through.

Available parsers:

* `ndjson`
* `multiline`

[float]
===== `ndjson`

These options make it possible for {beatname_uc} to decode the payload as
JSON messages.

Example configuration:

[source,yaml]
----
- ndjson:
keys_under_root: true
add_error_key: true
message_key: log
----

*`keys_under_root`*:: By default, the decoded JSON is placed under a "json" key
in the output document. If you enable this setting, the keys are copied top
level in the output document. The default is false.

*`overwrite_keys`*:: If `keys_under_root` and this setting are enabled, then the
values from the decoded JSON object overwrite the fields that {beatname_uc}
normally adds (type, source, offset, etc.) in case of conflicts.

*`expand_keys`*:: If this setting is enabled, {beatname_uc} will recursively
de-dot keys in the decoded JSON, and expand them into a hierarchical object
structure. For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`.
This setting should be enabled when the input is produced by an
https://github.com/elastic/ecs-logging[ECS logger].

*`add_error_key`*:: If this setting is enabled, {beatname_uc} adds an
"error.message" and "error.type: json" key in case of JSON unmarshalling errors
or when a `message_key` is defined in the configuration but cannot be used.

*`message_key`*:: An optional configuration setting that specifies a JSON key on
which to apply the line filtering and multiline settings. If specified the key
must be at the top level in the JSON object and the value associated with the
key must be a string, otherwise no filtering or multiline aggregation will
occur.

*`document_id`*:: Option configuration setting that specifies the JSON key to
set the document id. If configured, the field will be removed from the original
JSON document and stored in `@metadata._id`

*`ignore_decoding_error`*:: An optional configuration setting that specifies if
JSON decoding errors should be logged or not. If set to true, errors will not
be logged. The default is false.

[float]
===== `multiline`

Options that control how {beatname_uc} deals with log messages that span
multiple lines. See <<multiline-examples>> for more information about
configuring multiline options.

[id="{beatname_lc}-input-{type}-common-options"]
include::../inputs/input-common-options.asciidoc[]

Expand Down
1 change: 0 additions & 1 deletion filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions filebeat/input/default-inputs/inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package inputs
import (
"github.com/elastic/beats/v7/filebeat/beater"
"github.com/elastic/beats/v7/filebeat/input/filestream"
"github.com/elastic/beats/v7/filebeat/input/kafka"
"github.com/elastic/beats/v7/filebeat/input/unix"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -36,6 +37,7 @@ func Init(info beat.Info, log *logp.Logger, components beater.StateStore) []v2.P
func genericInputs(log *logp.Logger, components beater.StateStore) []v2.Plugin {
return []v2.Plugin{
filestream.Plugin(log, components),
kafka.Plugin(),
unix.Plugin(),
}
}
4 changes: 2 additions & 2 deletions filebeat/input/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
"github.com/elastic/beats/v7/libbeat/common/kafka"
"github.com/elastic/beats/v7/libbeat/common/transport/kerberos"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/monitoring/adapter"
"github.com/elastic/beats/v7/libbeat/reader/parser"
)

type kafkaInputConfig struct {
Expand All @@ -53,6 +53,7 @@ type kafkaInputConfig struct {
Username string `config:"username"`
Password string `config:"password"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
Parsers parser.Config `config:",inline"`
}

type kafkaFetch struct {
Expand Down Expand Up @@ -215,7 +216,6 @@ func newSaramaConfig(config kafkaInputConfig) (*sarama.Config, error) {
)

if err := k.Validate(); err != nil {
logp.Err("Invalid kafka configuration: %v", err)
return nil, err
}
return k, nil
Expand Down
Loading