From 66e160372bbd6272f05ef9967b29e34b0e41c419 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 20 Jul 2018 22:32:50 +0200 Subject: [PATCH] INGEST: Extend KV Processor (#31789) (#32232) * INGEST: Extend KV Processor (#31789) Added more capabilities supported by LS to the KV processor: * Stripping of brackets and quotes from values (`include_brackets` in corresponding LS filter) * Adding key prefixes * Trimming specified chars from keys and values Refactored the way the filter is configured to avoid conditionals during execution. Refactored Tests a little to not have to add more redundant getters for new parameters. Relates #31786 * Add documentation --- docs/reference/ingest/ingest-node.asciidoc | 4 + .../ingest/common/KeyValueProcessor.java | 131 ++++++++++++++---- .../ingest/common/KeyValueProcessorTests.java | 116 ++++++++++++++-- 3 files changed, 214 insertions(+), 37 deletions(-) diff --git a/docs/reference/ingest/ingest-node.asciidoc b/docs/reference/ingest/ingest-node.asciidoc index aa72c1e868a96..700f72850169d 100644 --- a/docs/reference/ingest/ingest-node.asciidoc +++ b/docs/reference/ingest/ingest-node.asciidoc @@ -1732,6 +1732,10 @@ For example, if you have a log message which contains `ip=1.2.3.4 error=REFUSED` | `include_keys` | no | `null` | List of keys to filter and insert into document. Defaults to including all keys | `exclude_keys` | no | `null` | List of keys to exclude from document | `ignore_missing` | no | `false` | If `true` and `field` does not exist or is `null`, the processor quietly exits without modifying the document +| `prefix` | no | `null` | Prefix to be added to extracted keys +| `trim_key` | no | `null` | String of characters to trim from extracted keys +| `trim_value` | no | `null` | String of characters to trim from extracted values +| `strip_brackets` | no | `false` | If `true` strip brackets `()`, `<>`, `[]` as well as quotes `'` and `"` from extracted values |====== diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/KeyValueProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/KeyValueProcessor.java index 6ed065926d60f..9cce3cedf3d02 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/KeyValueProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/KeyValueProcessor.java @@ -25,11 +25,14 @@ import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.regex.Pattern; /** * The KeyValueProcessor parses and extracts messages of the `key=value` variety into fields with values of the keys. @@ -38,6 +41,8 @@ public final class KeyValueProcessor extends AbstractProcessor { public static final String TYPE = "kv"; + private static final Pattern STRIP_BRACKETS = Pattern.compile("(^[\\(\\[<\"'])|([\\]\\)>\"']$)"); + private final String field; private final String fieldSplit; private final String valueSplit; @@ -45,9 +50,11 @@ public final class KeyValueProcessor extends AbstractProcessor { private final Set excludeKeys; private final String targetField; private final boolean ignoreMissing; + private final Consumer execution; KeyValueProcessor(String tag, String field, String fieldSplit, String valueSplit, Set includeKeys, - Set excludeKeys, String targetField, boolean ignoreMissing) { + Set excludeKeys, String targetField, boolean ignoreMissing, + String trimKey, String trimValue, boolean stripBrackets, String prefix) { super(tag); this.field = field; this.targetField = targetField; @@ -56,6 +63,92 @@ public final class KeyValueProcessor extends AbstractProcessor { this.includeKeys = includeKeys; this.excludeKeys = excludeKeys; this.ignoreMissing = ignoreMissing; + this.execution = buildExecution( + fieldSplit, valueSplit, field, includeKeys, excludeKeys, targetField, ignoreMissing, trimKey, trimValue, + stripBrackets, prefix + ); + } + + private static Consumer buildExecution(String fieldSplit, String valueSplit, String field, + Set includeKeys, Set excludeKeys, + String targetField, boolean ignoreMissing, + String trimKey, String trimValue, boolean stripBrackets, + String prefix) { + final Predicate keyFilter; + if (includeKeys == null) { + if (excludeKeys == null) { + keyFilter = key -> true; + } else { + keyFilter = key -> excludeKeys.contains(key) == false; + } + } else { + if (excludeKeys == null) { + keyFilter = includeKeys::contains; + } else { + keyFilter = key -> includeKeys.contains(key) && excludeKeys.contains(key) == false; + } + } + final String fieldPathPrefix; + String keyPrefix = prefix == null ? "" : prefix; + if (targetField == null) { + fieldPathPrefix = keyPrefix; + } else { + fieldPathPrefix = targetField + "." + keyPrefix; + } + final Function keyPrefixer; + if (fieldPathPrefix.isEmpty()) { + keyPrefixer = val -> val; + } else { + keyPrefixer = val -> fieldPathPrefix + val; + } + final Function fieldSplitter = buildSplitter(fieldSplit, true); + Function valueSplitter = buildSplitter(valueSplit, false); + final Function keyTrimmer = buildTrimmer(trimKey); + final Function bracketStrip; + if (stripBrackets) { + bracketStrip = val -> STRIP_BRACKETS.matcher(val).replaceAll(""); + } else { + bracketStrip = val -> val; + } + final Function valueTrimmer = buildTrimmer(trimValue); + return document -> { + String value = document.getFieldValue(field, String.class, ignoreMissing); + if (value == null) { + if (ignoreMissing) { + return; + } + throw new IllegalArgumentException("field [" + field + "] is null, cannot extract key-value pairs."); + } + for (String part : fieldSplitter.apply(value)) { + String[] kv = valueSplitter.apply(part); + if (kv.length != 2) { + throw new IllegalArgumentException("field [" + field + "] does not contain value_split [" + valueSplit + "]"); + } + String key = keyTrimmer.apply(kv[0]); + if (keyFilter.test(key)) { + append(document, keyPrefixer.apply(key), valueTrimmer.apply(bracketStrip.apply(kv[1]))); + } + } + }; + } + + private static Function buildTrimmer(String trim) { + if (trim == null) { + return val -> val; + } else { + Pattern pattern = Pattern.compile("(^([" + trim + "]+))|([" + trim + "]+$)"); + return val -> pattern.matcher(val).replaceAll(""); + } + } + + private static Function buildSplitter(String split, boolean fields) { + int limit = fields ? 0 : 2; + if (split.length() > 2 || split.length() == 2 && split.charAt(0) != '\\') { + Pattern splitPattern = Pattern.compile(split); + return val -> splitPattern.split(val, limit); + } else { + return val -> val.split(split, limit); + } } String getField() { @@ -86,7 +179,7 @@ boolean isIgnoreMissing() { return ignoreMissing; } - public void append(IngestDocument document, String targetField, String value) { + private static void append(IngestDocument document, String targetField, String value) { if (document.hasField(targetField)) { document.appendFieldValue(targetField, value); } else { @@ -96,27 +189,7 @@ public void append(IngestDocument document, String targetField, String value) { @Override public void execute(IngestDocument document) { - String oldVal = document.getFieldValue(field, String.class, ignoreMissing); - - if (oldVal == null && ignoreMissing) { - return; - } else if (oldVal == null) { - throw new IllegalArgumentException("field [" + field + "] is null, cannot extract key-value pairs."); - } - - String fieldPathPrefix = (targetField == null) ? "" : targetField + "."; - Arrays.stream(oldVal.split(fieldSplit)) - .map((f) -> { - String[] kv = f.split(valueSplit, 2); - if (kv.length != 2) { - throw new IllegalArgumentException("field [" + field + "] does not contain value_split [" + valueSplit + "]"); - } - return kv; - }) - .filter((p) -> - (includeKeys == null || includeKeys.contains(p[0])) && - (excludeKeys == null || excludeKeys.contains(p[0]) == false)) - .forEach((p) -> append(document, fieldPathPrefix + p[0], p[1])); + execution.accept(document); } @Override @@ -132,6 +205,11 @@ public KeyValueProcessor create(Map registry, String String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field"); String fieldSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field_split"); String valueSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "value_split"); + String trimKey = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "trim_key"); + String trimValue = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "trim_value"); + String prefix = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "prefix"); + boolean stripBrackets = + ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "strip_brackets", false); Set includeKeys = null; Set excludeKeys = null; List includeKeysList = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "include_keys"); @@ -143,7 +221,10 @@ public KeyValueProcessor create(Map registry, String excludeKeys = Collections.unmodifiableSet(Sets.newHashSet(excludeKeysList)); } boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false); - return new KeyValueProcessor(processorTag, field, fieldSplit, valueSplit, includeKeys, excludeKeys, targetField, ignoreMissing); + return new KeyValueProcessor( + processorTag, field, fieldSplit, valueSplit, includeKeys, excludeKeys, targetField, ignoreMissing, + trimKey, trimValue, stripBrackets, prefix + ); } } } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/KeyValueProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/KeyValueProcessorTests.java index 380af44c2515f..591f9994c60b1 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/KeyValueProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/KeyValueProcessorTests.java @@ -25,19 +25,25 @@ import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.test.ESTestCase; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.Matchers.equalTo; public class KeyValueProcessorTests extends ESTestCase { + private static final KeyValueProcessor.Factory FACTORY = new KeyValueProcessor.Factory(); + public void test() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe"); - Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=", null, null, "target", false); + Processor processor = createKvProcessor(fieldName, "&", "=", null, null, "target", false); processor.execute(ingestDocument); assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello")); assertThat(ingestDocument.getFieldValue("target.second", List.class), equalTo(Arrays.asList("world", "universe"))); @@ -46,7 +52,7 @@ public void test() throws Exception { public void testRootTarget() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); ingestDocument.setFieldValue("myField", "first=hello&second=world&second=universe"); - Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "myField", "&", "=", null, null,null, false); + Processor processor = createKvProcessor("myField", "&", "=", null, null,null, false); processor.execute(ingestDocument); assertThat(ingestDocument.getFieldValue("first", String.class), equalTo("hello")); assertThat(ingestDocument.getFieldValue("second", List.class), equalTo(Arrays.asList("world", "universe"))); @@ -55,7 +61,7 @@ public void testRootTarget() throws Exception { public void testKeySameAsSourceField() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); ingestDocument.setFieldValue("first", "first=hello"); - Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "first", "&", "=", null, null,null, false); + Processor processor = createKvProcessor("first", "&", "=", null, null,null, false); processor.execute(ingestDocument); assertThat(ingestDocument.getFieldValue("first", List.class), equalTo(Arrays.asList("first=hello", "hello"))); } @@ -63,7 +69,7 @@ public void testKeySameAsSourceField() throws Exception { public void testIncludeKeys() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe"); - Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=", + Processor processor = createKvProcessor(fieldName, "&", "=", Sets.newHashSet("first"), null, "target", false); processor.execute(ingestDocument); assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello")); @@ -73,7 +79,7 @@ public void testIncludeKeys() throws Exception { public void testExcludeKeys() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe"); - Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=", + Processor processor = createKvProcessor(fieldName, "&", "=", null, Sets.newHashSet("second"), "target", false); processor.execute(ingestDocument); assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello")); @@ -84,7 +90,7 @@ public void testIncludeAndExcludeKeys() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe&third=bar"); - Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=", + Processor processor = createKvProcessor(fieldName, "&", "=", Sets.newHashSet("first", "second"), Sets.newHashSet("first", "second"), "target", false); processor.execute(ingestDocument); assertFalse(ingestDocument.hasField("target.first")); @@ -92,9 +98,9 @@ public void testIncludeAndExcludeKeys() throws Exception { assertFalse(ingestDocument.hasField("target.third")); } - public void testMissingField() { + public void testMissingField() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); - Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "unknown", "&", + Processor processor = createKvProcessor("unknown", "&", "=", null, null, "target", false); IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument)); assertThat(exception.getMessage(), equalTo("field [unknown] not present as part of path [unknown]")); @@ -105,7 +111,7 @@ public void testNullValueWithIgnoreMissing() throws Exception { IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap(fieldName, null)); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); - Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "", "", null, null, "target", true); + Processor processor = createKvProcessor(fieldName, "", "", null, null, "target", true); processor.execute(ingestDocument); assertIngestDocument(originalIngestDocument, ingestDocument); } @@ -113,7 +119,7 @@ public void testNullValueWithIgnoreMissing() throws Exception { public void testNonExistentWithIgnoreMissing() throws Exception { IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); - Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "unknown", "", "", null, null, "target", true); + Processor processor = createKvProcessor("unknown", "", "", null, null, "target", true); processor.execute(ingestDocument); assertIngestDocument(originalIngestDocument, ingestDocument); } @@ -121,7 +127,7 @@ public void testNonExistentWithIgnoreMissing() throws Exception { public void testFailFieldSplitMatch() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello|second=world|second=universe"); - Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=", null, null, "target", false); + Processor processor = createKvProcessor(fieldName, "&", "=", null, null, "target", false); processor.execute(ingestDocument); assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello|second=world|second=universe")); assertFalse(ingestDocument.hasField("target.second")); @@ -129,8 +135,94 @@ public void testFailFieldSplitMatch() throws Exception { public void testFailValueSplitMatch() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("foo", "bar")); - Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "foo", "&", "=", null, null, "target", false); + Processor processor = createKvProcessor("foo", "&", "=", null, null, "target", false); Exception exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument)); assertThat(exception.getMessage(), equalTo("field [foo] does not contain value_split [=]")); } + + public void testTrimKeyAndValue() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first= hello &second=world& second =universe"); + Processor processor = createKvProcessor(fieldName, "&", "=", null, null, "target", false, " ", " ", false, null); + processor.execute(ingestDocument); + assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello")); + assertThat(ingestDocument.getFieldValue("target.second", List.class), equalTo(Arrays.asList("world", "universe"))); + } + + public void testTrimMultiCharSequence() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, + "to=, orig_to=, %+relay=mail.example.com[private/dovecot-lmtp]," + + " delay=2.2, delays=1.9/0.01/0.01/0.21, dsn=2.0.0, status=sent " + ); + Processor processor = createKvProcessor(fieldName, " ", "=", null, null, "target", false, "%+", "<>,", false, null); + processor.execute(ingestDocument); + assertThat(ingestDocument.getFieldValue("target.to", String.class), equalTo("foo@example.com")); + assertThat(ingestDocument.getFieldValue("target.orig_to", String.class), equalTo("bar@example.com")); + assertThat(ingestDocument.getFieldValue("target.relay", String.class), equalTo("mail.example.com[private/dovecot-lmtp]")); + assertThat(ingestDocument.getFieldValue("target.delay", String.class), equalTo("2.2")); + assertThat(ingestDocument.getFieldValue("target.delays", String.class), equalTo("1.9/0.01/0.01/0.21")); + assertThat(ingestDocument.getFieldValue("target.dsn", String.class), equalTo("2.0.0")); + assertThat(ingestDocument.getFieldValue("target.status", String.class), equalTo("sent")); + } + + public void testStripBrackets() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + String fieldName = RandomDocumentPicks.addRandomField( + random(), ingestDocument, "first=&second=\"world\"&second=(universe)&third=&fourth=[bar]&fifth='last'" + ); + Processor processor = createKvProcessor(fieldName, "&", "=", null, null, "target", false, null, null, true, null); + processor.execute(ingestDocument); + assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello")); + assertThat(ingestDocument.getFieldValue("target.second", List.class), equalTo(Arrays.asList("world", "universe"))); + assertThat(ingestDocument.getFieldValue("target.third", String.class), equalTo("foo")); + assertThat(ingestDocument.getFieldValue("target.fourth", String.class), equalTo("bar")); + assertThat(ingestDocument.getFieldValue("target.fifth", String.class), equalTo("last")); + } + + public void testAddPrefix() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe"); + Processor processor = createKvProcessor(fieldName, "&", "=", null, null, "target", false, null, null, false, "arg_"); + processor.execute(ingestDocument); + assertThat(ingestDocument.getFieldValue("target.arg_first", String.class), equalTo("hello")); + assertThat(ingestDocument.getFieldValue("target.arg_second", List.class), equalTo(Arrays.asList("world", "universe"))); + } + + private static KeyValueProcessor createKvProcessor(String field, String fieldSplit, String valueSplit, Set includeKeys, + Set excludeKeys, String targetField, + boolean ignoreMissing) throws Exception { + return createKvProcessor( + field, fieldSplit, valueSplit, includeKeys, excludeKeys, targetField, ignoreMissing, null, null, false, null + ); + } + + private static KeyValueProcessor createKvProcessor(String field, String fieldSplit, String valueSplit, Set includeKeys, + Set excludeKeys, String targetField, boolean ignoreMissing, + String trimKey, String trimValue, boolean stripBrackets, + String prefix) throws Exception { + Map config = new HashMap<>(); + config.put("field", field); + config.put("field_split", fieldSplit); + config.put("value_split", valueSplit); + config.put("target_field", targetField); + if (includeKeys != null) { + config.put("include_keys", new ArrayList<>(includeKeys)); + } + if (excludeKeys != null) { + config.put("exclude_keys", new ArrayList<>(excludeKeys)); + } + config.put("ignore_missing", ignoreMissing); + if (trimKey != null) { + config.put("trim_key", trimKey); + } + if (trimValue != null) { + config.put("trim_value", trimValue); + } + config.put("strip_brackets", stripBrackets); + if (prefix != null) { + config.put("prefix", prefix); + } + return FACTORY.create(null, randomAlphaOfLength(10), config); + } }