Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INGEST: Extend KV Processor (#31789) #32232

Merged
merged 2 commits into from
Jul 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/reference/ingest/ingest-node.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1732,6 +1732,10 @@ For example, if you have a log message which contains `ip=1.2.3.4 error=REFUSED`
| `include_keys` | no | `null` | List of keys to filter and insert into document. Defaults to including all keys
| `exclude_keys` | no | `null` | List of keys to exclude from document
| `ignore_missing` | no | `false` | If `true` and `field` does not exist or is `null`, the processor quietly exits without modifying the document
| `prefix` | no | `null` | Prefix to be added to extracted keys
| `trim_key` | no | `null` | String of characters to trim from extracted keys
Copy link
Contributor

@jakelandis jakelandis Jul 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you want to call out these are regex character classes ?

edit: clarified question

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jakelandis I'd rather not talk about regex patterns here I think. If I do that instead of simply saying I'm trimming all the chars in the given string, then that's probably not really true if you look at the code (I'm building a pattern from the string, arbitrary patterns won't work here).

| `trim_value` | no | `null` | String of characters to trim from extracted values
| `strip_brackets` | no | `false` | If `true` strip brackets `()`, `<>`, `[]` as well as quotes `'` and `"` from extracted values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be renamed to be more general than just brackets. maybe strip_surrounding

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@talevy yea I agree, brackets isn't a good term here, I went with it because that's what LS uses (see https://github.com/logstash-plugins/logstash-filter-kv/blob/master/lib/logstash/filters/kv.rb#L290). surrounding sounds kinda strange too, not that I have a better suggestion ... maybe strip_delimiters but then that's somewhat weird too since we're already splitting twice elsewhere :P

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is part of the hidden magic that I didn't love in that filter. But I agree, I cannot think of a better word, and having a separate property just for quotes feels like too much variation

|======


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Pattern;

/**
* The KeyValueProcessor parses and extracts messages of the `key=value` variety into fields with values of the keys.
Expand All @@ -38,16 +41,20 @@ public final class KeyValueProcessor extends AbstractProcessor {

public static final String TYPE = "kv";

private static final Pattern STRIP_BRACKETS = Pattern.compile("(^[\\(\\[<\"'])|([\\]\\)>\"']$)");

private final String field;
private final String fieldSplit;
private final String valueSplit;
private final Set<String> includeKeys;
private final Set<String> excludeKeys;
private final String targetField;
private final boolean ignoreMissing;
private final Consumer<IngestDocument> execution;

KeyValueProcessor(String tag, String field, String fieldSplit, String valueSplit, Set<String> includeKeys,
Set<String> excludeKeys, String targetField, boolean ignoreMissing) {
Set<String> excludeKeys, String targetField, boolean ignoreMissing,
String trimKey, String trimValue, boolean stripBrackets, String prefix) {
super(tag);
this.field = field;
this.targetField = targetField;
Expand All @@ -56,6 +63,92 @@ public final class KeyValueProcessor extends AbstractProcessor {
this.includeKeys = includeKeys;
this.excludeKeys = excludeKeys;
this.ignoreMissing = ignoreMissing;
this.execution = buildExecution(
fieldSplit, valueSplit, field, includeKeys, excludeKeys, targetField, ignoreMissing, trimKey, trimValue,
stripBrackets, prefix
);
}

private static Consumer<IngestDocument> buildExecution(String fieldSplit, String valueSplit, String field,
Set<String> includeKeys, Set<String> excludeKeys,
String targetField, boolean ignoreMissing,
String trimKey, String trimValue, boolean stripBrackets,
String prefix) {
final Predicate<String> keyFilter;
if (includeKeys == null) {
if (excludeKeys == null) {
keyFilter = key -> true;
} else {
keyFilter = key -> excludeKeys.contains(key) == false;
}
} else {
if (excludeKeys == null) {
keyFilter = includeKeys::contains;
} else {
keyFilter = key -> includeKeys.contains(key) && excludeKeys.contains(key) == false;
}
}
final String fieldPathPrefix;
String keyPrefix = prefix == null ? "" : prefix;
if (targetField == null) {
fieldPathPrefix = keyPrefix;
} else {
fieldPathPrefix = targetField + "." + keyPrefix;
}
final Function<String, String> keyPrefixer;
if (fieldPathPrefix.isEmpty()) {
keyPrefixer = val -> val;
} else {
keyPrefixer = val -> fieldPathPrefix + val;
}
final Function<String, String[]> fieldSplitter = buildSplitter(fieldSplit, true);
Function<String, String[]> valueSplitter = buildSplitter(valueSplit, false);
final Function<String, String> keyTrimmer = buildTrimmer(trimKey);
final Function<String, String> bracketStrip;
if (stripBrackets) {
bracketStrip = val -> STRIP_BRACKETS.matcher(val).replaceAll("");
} else {
bracketStrip = val -> val;
}
final Function<String, String> valueTrimmer = buildTrimmer(trimValue);
return document -> {
String value = document.getFieldValue(field, String.class, ignoreMissing);
if (value == null) {
if (ignoreMissing) {
return;
}
throw new IllegalArgumentException("field [" + field + "] is null, cannot extract key-value pairs.");
}
for (String part : fieldSplitter.apply(value)) {
String[] kv = valueSplitter.apply(part);
if (kv.length != 2) {
throw new IllegalArgumentException("field [" + field + "] does not contain value_split [" + valueSplit + "]");
}
String key = keyTrimmer.apply(kv[0]);
if (keyFilter.test(key)) {
append(document, keyPrefixer.apply(key), valueTrimmer.apply(bracketStrip.apply(kv[1])));
}
}
};
}

private static Function<String, String> buildTrimmer(String trim) {
if (trim == null) {
return val -> val;
} else {
Pattern pattern = Pattern.compile("(^([" + trim + "]+))|([" + trim + "]+$)");
return val -> pattern.matcher(val).replaceAll("");
}
}

private static Function<String, String[]> buildSplitter(String split, boolean fields) {
int limit = fields ? 0 : 2;
if (split.length() > 2 || split.length() == 2 && split.charAt(0) != '\\') {
Pattern splitPattern = Pattern.compile(split);
return val -> splitPattern.split(val, limit);
} else {
return val -> val.split(split, limit);
}
}

String getField() {
Expand Down Expand Up @@ -86,7 +179,7 @@ boolean isIgnoreMissing() {
return ignoreMissing;
}

public void append(IngestDocument document, String targetField, String value) {
private static void append(IngestDocument document, String targetField, String value) {
if (document.hasField(targetField)) {
document.appendFieldValue(targetField, value);
} else {
Expand All @@ -96,27 +189,7 @@ public void append(IngestDocument document, String targetField, String value) {

@Override
public void execute(IngestDocument document) {
String oldVal = document.getFieldValue(field, String.class, ignoreMissing);

if (oldVal == null && ignoreMissing) {
return;
} else if (oldVal == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot extract key-value pairs.");
}

String fieldPathPrefix = (targetField == null) ? "" : targetField + ".";
Arrays.stream(oldVal.split(fieldSplit))
.map((f) -> {
String[] kv = f.split(valueSplit, 2);
if (kv.length != 2) {
throw new IllegalArgumentException("field [" + field + "] does not contain value_split [" + valueSplit + "]");
}
return kv;
})
.filter((p) ->
(includeKeys == null || includeKeys.contains(p[0])) &&
(excludeKeys == null || excludeKeys.contains(p[0]) == false))
.forEach((p) -> append(document, fieldPathPrefix + p[0], p[1]));
execution.accept(document);
}

@Override
Expand All @@ -132,6 +205,11 @@ public KeyValueProcessor create(Map<String, Processor.Factory> registry, String
String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field");
String fieldSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field_split");
String valueSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "value_split");
String trimKey = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "trim_key");
String trimValue = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "trim_value");
String prefix = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "prefix");
boolean stripBrackets =
ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "strip_brackets", false);
Set<String> includeKeys = null;
Set<String> excludeKeys = null;
List<String> includeKeysList = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "include_keys");
Expand All @@ -143,7 +221,10 @@ public KeyValueProcessor create(Map<String, Processor.Factory> registry, String
excludeKeys = Collections.unmodifiableSet(Sets.newHashSet(excludeKeysList));
}
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
return new KeyValueProcessor(processorTag, field, fieldSplit, valueSplit, includeKeys, excludeKeys, targetField, ignoreMissing);
return new KeyValueProcessor(
processorTag, field, fieldSplit, valueSplit, includeKeys, excludeKeys, targetField, ignoreMissing,
trimKey, trimValue, stripBrackets, prefix
);
}
}
}
Loading