From 0521f70c7ae29a8af5c76c2c104b8fd4590a5bea Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Fri, 29 Jun 2018 09:30:54 -0700 Subject: [PATCH 1/8] Introduce a Hashing Processor (#31087) It is useful to have a processor similar to logstash-filter-fingerprint in Elasticsearch. A processor that leverages a variety of hashing algorithms to create cryptographically-secure one-way hashes of values in documents. This processor introduces a pbkdf2hmac hashing scheme to fields in documents for indexing --- .../xpack/security/Security.java | 10 +- .../xpack/security/ingest/HashProcessor.java | 200 ++++++++++++++++++ .../ingest/HashProcessorFactoryTests.java | 136 ++++++++++++ .../security/ingest/HashProcessorTests.java | 130 ++++++++++++ .../test/hash_processor/10_basic.yml | 51 +++++ 5 files changed, 526 insertions(+), 1 deletion(-) create mode 100644 x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/HashProcessor.java create mode 100644 x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/ingest/HashProcessorFactoryTests.java create mode 100644 x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/ingest/HashProcessorTests.java create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/test/hash_processor/10_basic.yml diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index 533897bac4466..20337d5ed5a04 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -196,6 +196,7 @@ import org.elasticsearch.xpack.security.authz.store.FileRolesStore; import org.elasticsearch.xpack.security.authz.store.NativePrivilegeStore; import org.elasticsearch.xpack.security.authz.store.NativeRolesStore; +import org.elasticsearch.xpack.security.ingest.HashProcessor; import org.elasticsearch.xpack.security.ingest.SetSecurityUserProcessor; import org.elasticsearch.xpack.security.rest.SecurityRestFilter; import org.elasticsearch.xpack.security.rest.action.RestAuthenticateAction; @@ -615,6 +616,10 @@ public static List> getSettings(List securityExten // hide settings settingsList.add(Setting.listSetting(SecurityField.setting("hide_settings"), Collections.emptyList(), Function.identity(), Property.NodeScope, Property.Filtered)); + + // ingest processor settings + settingsList.add(HashProcessor.HMAC_KEY_SETTING); + return settingsList; } @@ -790,7 +795,10 @@ public List getRestHandlers(Settings settings, RestController restC @Override public Map getProcessors(Processor.Parameters parameters) { - return Collections.singletonMap(SetSecurityUserProcessor.TYPE, new SetSecurityUserProcessor.Factory(parameters.threadContext)); + Map processors = new HashMap<>(); + processors.put(SetSecurityUserProcessor.TYPE, new SetSecurityUserProcessor.Factory(parameters.threadContext)); + processors.put(HashProcessor.TYPE, new HashProcessor.Factory(parameters.env.settings())); + return processors; } /** diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/HashProcessor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/HashProcessor.java new file mode 100644 index 0000000000000..fa49b843847ee --- /dev/null +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/HashProcessor.java @@ -0,0 +1,200 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.security.ingest; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.SecureSetting; +import org.elasticsearch.common.settings.SecureString; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.ConfigurationUtils; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.xpack.core.security.SecurityField; + +import javax.crypto.Mac; +import javax.crypto.SecretKeyFactory; +import javax.crypto.spec.PBEKeySpec; +import javax.crypto.spec.SecretKeySpec; +import java.nio.charset.StandardCharsets; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.security.spec.InvalidKeySpecException; +import java.util.Arrays; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; + +/** + * A processor that hashes the contents of a field (or fields) using various hashing algorithms + */ +public final class HashProcessor extends AbstractProcessor { + public static final String TYPE = "hash"; + public static final Setting.AffixSetting HMAC_KEY_SETTING = SecureSetting + .affixKeySetting(SecurityField.setting("ingest." + TYPE) + ".", "key", + (key) -> SecureSetting.secureString(key, null)); + + private final List fields; + private final String targetField; + private final Method method; + private final Mac mac; + private final byte[] salt; + private final boolean ignoreMissing; + + HashProcessor(String tag, List fields, String targetField, byte[] salt, Method method, @Nullable Mac mac, + boolean ignoreMissing) { + super(tag); + this.fields = fields; + this.targetField = targetField; + this.method = method; + this.mac = mac; + this.salt = salt; + this.ignoreMissing = ignoreMissing; + } + + List getFields() { + return fields; + } + + String getTargetField() { + return targetField; + } + + byte[] getSalt() { + return salt; + } + + @Override + public void execute(IngestDocument document) { + Map hashedFieldValues = fields.stream().map(f -> { + String value = document.getFieldValue(f, String.class, ignoreMissing); + if (value == null && ignoreMissing) { + return new Tuple(null, null); + } + try { + return new Tuple<>(f, method.hash(mac, salt, value)); + } catch (Exception e) { + throw new IllegalArgumentException("field[" + f + "] could not be hashed", e); + } + }).filter(tuple -> Objects.nonNull(tuple.v1())).collect(Collectors.toMap(Tuple::v1, Tuple::v2)); + if (fields.size() == 1) { + document.setFieldValue(targetField, hashedFieldValues.values().iterator().next()); + } else { + document.setFieldValue(targetField, hashedFieldValues); + } + } + + @Override + public String getType() { + return TYPE; + } + + public static final class Factory implements Processor.Factory { + + private final Settings settings; + private final Map secureKeys; + + public Factory(Settings settings) { + this.settings = settings; + this.secureKeys = new HashMap<>(); + HMAC_KEY_SETTING.getAllConcreteSettings(settings).forEach(k -> { + secureKeys.put(k.getKey(), k.get(settings)); + }); + } + + private static Mac createMac(Method method, SecureString password, byte[] salt, int iterations) { + try { + SecretKeyFactory secretKeyFactory = SecretKeyFactory.getInstance("PBKDF2With" + method.getAlgorithm()); + PBEKeySpec keySpec = new PBEKeySpec(password.getChars(), salt, iterations, 128); + byte[] pbkdf2 = secretKeyFactory.generateSecret(keySpec).getEncoded(); + Mac mac = Mac.getInstance(method.getAlgorithm()); + mac.init(new SecretKeySpec(pbkdf2, method.getAlgorithm())); + return mac; + } catch (NoSuchAlgorithmException | InvalidKeySpecException | InvalidKeyException e) { + throw new IllegalArgumentException("invalid settings", e); + } + } + + @Override + public HashProcessor create(Map registry, String processorTag, Map config) { + boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false); + List fields = ConfigurationUtils.readList(TYPE, processorTag, config, "fields"); + if (fields.isEmpty()) { + throw ConfigurationUtils.newConfigurationException(TYPE, processorTag, "fields", "must specify at least one field"); + } else if (fields.stream().anyMatch(Strings::isNullOrEmpty)) { + throw ConfigurationUtils.newConfigurationException(TYPE, processorTag, "fields", + "a field-name entry is either empty or null"); + } + String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field"); + String keySettingName = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "key_setting"); + SecureString key = secureKeys.get(keySettingName); + if (key == null) { + throw ConfigurationUtils.newConfigurationException(TYPE, processorTag, "key_setting", + "key [" + keySettingName + "] must match [xpack.security.ingest.hash.*.key]. It is not set"); + } + String saltString = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "salt"); + byte[] salt = saltString.getBytes(StandardCharsets.UTF_8); + String methodProperty = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "method", "SHA256"); + Method method = Method.fromString(processorTag, "method", methodProperty); + int iterations = ConfigurationUtils.readIntProperty(TYPE, processorTag, config, "iterations", 5); + Mac mac = createMac(method, key, salt, iterations); + return new HashProcessor(processorTag, fields, targetField, salt, method, mac, ignoreMissing); + } + } + + enum Method { + SHA1("HmacSHA1"), + SHA256("HmacSHA256"), + SHA384("HmacSHA384"), + SHA512("HmacSHA512"); + + private final String algorithm; + + Method(String algorithm) { + this.algorithm = algorithm; + } + + public String getAlgorithm() { + return algorithm; + } + + @Override + public String toString() { + return name().toLowerCase(Locale.ROOT); + } + + public String hash(Mac mac, byte[] salt, String input) { + try { + byte[] encrypted = mac.doFinal(input.getBytes(StandardCharsets.UTF_8)); + byte[] messageWithSalt = new byte[salt.length + encrypted.length]; + System.arraycopy(salt, 0, messageWithSalt, 0, salt.length); + System.arraycopy(encrypted, 0, messageWithSalt, salt.length, encrypted.length); + return Base64.getEncoder().encodeToString(messageWithSalt); + } catch (IllegalStateException e) { + throw new ElasticsearchException("error hashing data", e); + } + } + + public static Method fromString(String processorTag, String propertyName, String type) { + try { + return Method.valueOf(type.toUpperCase(Locale.ROOT)); + } catch(IllegalArgumentException e) { + throw newConfigurationException(TYPE, processorTag, propertyName, "type [" + type + + "] not supported, cannot convert field. Valid hash methods: " + Arrays.toString(Method.values())); + } + } + } +} diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/ingest/HashProcessorFactoryTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/ingest/HashProcessorFactoryTests.java new file mode 100644 index 0000000000000..e9dda488e7216 --- /dev/null +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/ingest/HashProcessorFactoryTests.java @@ -0,0 +1,136 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.security.ingest; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.settings.MockSecureSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class HashProcessorFactoryTests extends ESTestCase { + + public void testProcessor() { + MockSecureSettings mockSecureSettings = new MockSecureSettings(); + mockSecureSettings.setString("xpack.security.ingest.hash.processor.key", "my_key"); + Settings settings = Settings.builder().setSecureSettings(mockSecureSettings).build(); + HashProcessor.Factory factory = new HashProcessor.Factory(settings); + Map config = new HashMap<>(); + config.put("fields", Collections.singletonList("_field")); + config.put("target_field", "_target"); + config.put("salt", "_salt"); + config.put("key_setting", "xpack.security.ingest.hash.processor.key"); + for (HashProcessor.Method method : HashProcessor.Method.values()) { + config.put("method", method.toString()); + HashProcessor processor = factory.create(null, "_tag", new HashMap<>(config)); + assertThat(processor.getFields(), equalTo(Collections.singletonList("_field"))); + assertThat(processor.getTargetField(), equalTo("_target")); + assertArrayEquals(processor.getSalt(), "_salt".getBytes(StandardCharsets.UTF_8)); + } + } + + public void testProcessorNoFields() { + MockSecureSettings mockSecureSettings = new MockSecureSettings(); + mockSecureSettings.setString("xpack.security.ingest.hash.processor.key", "my_key"); + Settings settings = Settings.builder().setSecureSettings(mockSecureSettings).build(); + HashProcessor.Factory factory = new HashProcessor.Factory(settings); + Map config = new HashMap<>(); + config.put("target_field", "_target"); + config.put("salt", "_salt"); + config.put("key_setting", "xpack.security.ingest.hash.processor.key"); + config.put("method", HashProcessor.Method.SHA1.toString()); + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> factory.create(null, "_tag", config)); + assertThat(e.getMessage(), equalTo("[fields] required property is missing")); + } + + public void testProcessorNoTargetField() { + MockSecureSettings mockSecureSettings = new MockSecureSettings(); + mockSecureSettings.setString("xpack.security.ingest.hash.processor.key", "my_key"); + Settings settings = Settings.builder().setSecureSettings(mockSecureSettings).build(); + HashProcessor.Factory factory = new HashProcessor.Factory(settings); + Map config = new HashMap<>(); + config.put("fields", Collections.singletonList("_field")); + config.put("salt", "_salt"); + config.put("key_setting", "xpack.security.ingest.hash.processor.key"); + config.put("method", HashProcessor.Method.SHA1.toString()); + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> factory.create(null, "_tag", config)); + assertThat(e.getMessage(), equalTo("[target_field] required property is missing")); + } + + public void testProcessorFieldsIsEmpty() { + MockSecureSettings mockSecureSettings = new MockSecureSettings(); + mockSecureSettings.setString("xpack.security.ingest.hash.processor.key", "my_key"); + Settings settings = Settings.builder().setSecureSettings(mockSecureSettings).build(); + HashProcessor.Factory factory = new HashProcessor.Factory(settings); + Map config = new HashMap<>(); + config.put("fields", Collections.singletonList(randomBoolean() ? "" : null)); + config.put("salt", "_salt"); + config.put("target_field", "_target"); + config.put("key_setting", "xpack.security.ingest.hash.processor.key"); + config.put("method", HashProcessor.Method.SHA1.toString()); + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> factory.create(null, "_tag", config)); + assertThat(e.getMessage(), equalTo("[fields] a field-name entry is either empty or null")); + } + + public void testProcessorMissingSalt() { + MockSecureSettings mockSecureSettings = new MockSecureSettings(); + mockSecureSettings.setString("xpack.security.ingest.hash.processor.key", "my_key"); + Settings settings = Settings.builder().setSecureSettings(mockSecureSettings).build(); + HashProcessor.Factory factory = new HashProcessor.Factory(settings); + Map config = new HashMap<>(); + config.put("fields", Collections.singletonList("_field")); + config.put("target_field", "_target"); + config.put("key_setting", "xpack.security.ingest.hash.processor.key"); + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> factory.create(null, "_tag", config)); + assertThat(e.getMessage(), equalTo("[salt] required property is missing")); + } + + public void testProcessorInvalidMethod() { + MockSecureSettings mockSecureSettings = new MockSecureSettings(); + mockSecureSettings.setString("xpack.security.ingest.hash.processor.key", "my_key"); + Settings settings = Settings.builder().setSecureSettings(mockSecureSettings).build(); + HashProcessor.Factory factory = new HashProcessor.Factory(settings); + Map config = new HashMap<>(); + config.put("fields", Collections.singletonList("_field")); + config.put("salt", "_salt"); + config.put("target_field", "_target"); + config.put("key_setting", "xpack.security.ingest.hash.processor.key"); + config.put("method", "invalid"); + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> factory.create(null, "_tag", config)); + assertThat(e.getMessage(), equalTo("[method] type [invalid] not supported, cannot convert field. " + + "Valid hash methods: [sha1, sha256, sha384, sha512]")); + } + + public void testProcessorInvalidOrMissingKeySetting() { + Settings settings = Settings.builder().setSecureSettings(new MockSecureSettings()).build(); + HashProcessor.Factory factory = new HashProcessor.Factory(settings); + Map config = new HashMap<>(); + config.put("fields", Collections.singletonList("_field")); + config.put("salt", "_salt"); + config.put("target_field", "_target"); + config.put("key_setting", "invalid"); + config.put("method", HashProcessor.Method.SHA1.toString()); + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> factory.create(null, "_tag", new HashMap<>(config))); + assertThat(e.getMessage(), + equalTo("[key_setting] key [invalid] must match [xpack.security.ingest.hash.*.key]. It is not set")); + config.remove("key_setting"); + ElasticsearchException ex = expectThrows(ElasticsearchException.class, + () -> factory.create(null, "_tag", config)); + assertThat(ex.getMessage(), equalTo("[key_setting] required property is missing")); + } +} diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/ingest/HashProcessorTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/ingest/HashProcessorTests.java new file mode 100644 index 0000000000000..b3890600592f5 --- /dev/null +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/ingest/HashProcessorTests.java @@ -0,0 +1,130 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.security.ingest; + +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.security.ingest.HashProcessor.Method; + +import javax.crypto.Mac; +import javax.crypto.SecretKeyFactory; +import javax.crypto.spec.PBEKeySpec; +import javax.crypto.spec.SecretKeySpec; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class HashProcessorTests extends ESTestCase { + + @SuppressWarnings("unchecked") + public void testIgnoreMissing() throws Exception { + Method method = randomFrom(Method.values()); + Mac mac = createMac(method); + Map fields = new HashMap<>(); + fields.put("one", "foo"); + HashProcessor processor = new HashProcessor("_tag", Arrays.asList("one", "two"), + "target", "_salt".getBytes(StandardCharsets.UTF_8), Method.SHA1, mac, true); + IngestDocument ingestDocument = new IngestDocument(fields, new HashMap<>()); + processor.execute(ingestDocument); + Map target = ingestDocument.getFieldValue("target", Map.class); + assertThat(target.size(), equalTo(1)); + assertNotNull(target.get("one")); + + HashProcessor failProcessor = new HashProcessor("_tag", Arrays.asList("one", "two"), + "target", "_salt".getBytes(StandardCharsets.UTF_8), Method.SHA1, mac, false); + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> failProcessor.execute(ingestDocument)); + assertThat(exception.getMessage(), equalTo("field [two] not present as part of path [two]")); + } + + public void testStaticKeyAndSalt() throws Exception { + byte[] salt = "_salt".getBytes(StandardCharsets.UTF_8); + SecretKeyFactory secretKeyFactory = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA1"); + PBEKeySpec keySpec = new PBEKeySpec("hmackey".toCharArray(), salt, 5, 128); + byte[] pbkdf2 = secretKeyFactory.generateSecret(keySpec).getEncoded(); + Mac mac = Mac.getInstance(Method.SHA1.getAlgorithm()); + mac.init(new SecretKeySpec(pbkdf2, Method.SHA1.getAlgorithm())); + Map fields = new HashMap<>(); + fields.put("field", "0123456789"); + HashProcessor processor = new HashProcessor("_tag", Collections.singletonList("field"), + "target", salt, Method.SHA1, mac, false); + IngestDocument ingestDocument = new IngestDocument(fields, new HashMap<>()); + processor.execute(ingestDocument); + assertThat(ingestDocument.getFieldValue("target", String.class), equalTo("X3NhbHQMW0oHJGEEE9obGcGv5tGd7HFyDw==")); + } + + public void testProcessorSingleField() throws Exception { + List fields = Collections.singletonList(randomAlphaOfLength(6)); + Map docFields = new HashMap<>(); + for (String field : fields) { + docFields.put(field, randomAlphaOfLengthBetween(2, 10)); + } + + String targetField = randomAlphaOfLength(6); + Method method = randomFrom(Method.values()); + Mac mac = createMac(method); + byte[] salt = randomByteArrayOfLength(5); + HashProcessor processor = new HashProcessor("_tag", fields, targetField, salt, method, mac, false); + IngestDocument ingestDocument = new IngestDocument(docFields, new HashMap<>()); + processor.execute(ingestDocument); + + String targetFieldValue = ingestDocument.getFieldValue(targetField, String.class); + Object expectedTargetFieldValue = method.hash(mac, salt, ingestDocument.getFieldValue(fields.get(0), String.class)); + assertThat(targetFieldValue, equalTo(expectedTargetFieldValue)); + byte[] bytes = Base64.getDecoder().decode(targetFieldValue); + byte[] actualSaltPrefix = new byte[salt.length]; + System.arraycopy(bytes, 0, actualSaltPrefix, 0, salt.length); + assertArrayEquals(salt, actualSaltPrefix); + } + + @SuppressWarnings("unchecked") + public void testProcessorMultipleFields() throws Exception { + List fields = new ArrayList<>(); + for (int i = 0; i < randomIntBetween(2, 10); i++) { + fields.add(randomAlphaOfLength(5 + i)); + } + Map docFields = new HashMap<>(); + for (String field : fields) { + docFields.put(field, randomAlphaOfLengthBetween(2, 10)); + } + + String targetField = randomAlphaOfLength(6); + Method method = randomFrom(Method.values()); + Mac mac = createMac(method); + byte[] salt = randomByteArrayOfLength(5); + HashProcessor processor = new HashProcessor("_tag", fields, targetField, salt, method, mac, false); + IngestDocument ingestDocument = new IngestDocument(docFields, new HashMap<>()); + processor.execute(ingestDocument); + + Map targetFieldMap = ingestDocument.getFieldValue(targetField, Map.class); + for (Map.Entry entry : targetFieldMap.entrySet()) { + Object expectedTargetFieldValue = method.hash(mac, salt, ingestDocument.getFieldValue(entry.getKey(), String.class)); + assertThat(entry.getValue(), equalTo(expectedTargetFieldValue)); + byte[] bytes = Base64.getDecoder().decode(entry.getValue()); + byte[] actualSaltPrefix = new byte[salt.length]; + System.arraycopy(bytes, 0, actualSaltPrefix, 0, salt.length); + assertArrayEquals(salt, actualSaltPrefix); + } + } + + private Mac createMac(Method method) throws Exception { + char[] password = randomAlphaOfLengthBetween(1, 10).toCharArray(); + byte[] salt = randomAlphaOfLength(5).getBytes(StandardCharsets.UTF_8); + int iterations = randomIntBetween(1, 10); + SecretKeyFactory secretKeyFactory = SecretKeyFactory.getInstance("PBKDF2With" + method.getAlgorithm()); + PBEKeySpec keySpec = new PBEKeySpec(password, salt, iterations, 128); + byte[] pbkdf2 = secretKeyFactory.generateSecret(keySpec).getEncoded(); + Mac mac = Mac.getInstance(method.getAlgorithm()); + mac.init(new SecretKeySpec(pbkdf2, method.getAlgorithm())); + return mac; + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/hash_processor/10_basic.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/hash_processor/10_basic.yml new file mode 100644 index 0000000000000..ee84e02d2f498 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/hash_processor/10_basic.yml @@ -0,0 +1,51 @@ +--- +teardown: + - do: + ingest.delete_pipeline: + id: "my_pipeline" + ignore: 404 + +--- +"Test Hash Processor": + + - do: + cluster.health: + wait_for_status: yellow + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "processors": [ + { + "hash" : { + "fields" : ["user_ssid"], + "target_field" : "anonymized", + "salt": "_salt", + "iterations": 5, + "method": "sha1", + "key_setting": "xpack.security.ingest.hash.processor.key" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + type: test + id: 1 + pipeline: "my_pipeline" + body: > + { + "user_ssid": "0123456789" + } + + - do: + get: + index: test + type: test + id: 1 + - match: { _source.anonymized: "X3NhbHQMW0oHJGEEE9obGcGv5tGd7HFyDw==" } + From d23376e467fd164168b425da2f3893b35186e961 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Mon, 9 Sep 2019 15:26:55 -0500 Subject: [PATCH 2/8] update to new processor interface --- .../org/elasticsearch/xpack/security/ingest/HashProcessor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/HashProcessor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/HashProcessor.java index fa49b843847ee..e249f507293b6 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/HashProcessor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/HashProcessor.java @@ -78,7 +78,7 @@ byte[] getSalt() { } @Override - public void execute(IngestDocument document) { + public IngestDocument execute(IngestDocument document) { Map hashedFieldValues = fields.stream().map(f -> { String value = document.getFieldValue(f, String.class, ignoreMissing); if (value == null && ignoreMissing) { @@ -95,6 +95,7 @@ public void execute(IngestDocument document) { } else { document.setFieldValue(targetField, hashedFieldValues); } + return document; } @Override From 5a0f9e8fe218330f023c658c34d94bf58a67158e Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Wed, 11 Sep 2019 20:57:38 -0500 Subject: [PATCH 3/8] check for key consistency --- .../xpack/security/Security.java | 3 +- .../xpack/security/ingest/HashProcessor.java | 60 +++++++++++++------ 2 files changed, 45 insertions(+), 18 deletions(-) diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index 20337d5ed5a04..a52195c537d9f 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -797,7 +797,8 @@ public List getRestHandlers(Settings settings, RestController restC public Map getProcessors(Processor.Parameters parameters) { Map processors = new HashMap<>(); processors.put(SetSecurityUserProcessor.TYPE, new SetSecurityUserProcessor.Factory(parameters.threadContext)); - processors.put(HashProcessor.TYPE, new HashProcessor.Factory(parameters.env.settings())); + processors.put(HashProcessor.TYPE, new HashProcessor.Factory(parameters.env.settings(), + parameters.ingestService.getClusterService())); return processors; } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/HashProcessor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/HashProcessor.java index e249f507293b6..153dc7539cba8 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/HashProcessor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/HashProcessor.java @@ -6,9 +6,12 @@ package org.elasticsearch.xpack.security.ingest; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.ConsistentSettingsService; import org.elasticsearch.common.settings.SecureSetting; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Setting; @@ -29,19 +32,22 @@ import java.security.spec.InvalidKeySpecException; import java.util.Arrays; import java.util.Base64; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.stream.Collectors; import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; /** - * A processor that hashes the contents of a field (or fields) using various hashing algorithms + * A processor that hashes the contents of a field or fields using various hashing algorithms */ -public final class HashProcessor extends AbstractProcessor { +public final class HashProcessor extends AbstractProcessor implements Consumer { public static final String TYPE = "hash"; public static final Setting.AffixSetting HMAC_KEY_SETTING = SecureSetting .affixKeySetting(SecurityField.setting("ingest." + TYPE) + ".", "key", @@ -53,6 +59,7 @@ public final class HashProcessor extends AbstractProcessor { private final Mac mac; private final byte[] salt; private final boolean ignoreMissing; + private final AtomicBoolean consistentHashes = new AtomicBoolean(true); HashProcessor(String tag, List fields, String targetField, byte[] salt, Method method, @Nullable Mac mac, boolean ignoreMissing) { @@ -79,23 +86,27 @@ byte[] getSalt() { @Override public IngestDocument execute(IngestDocument document) { - Map hashedFieldValues = fields.stream().map(f -> { - String value = document.getFieldValue(f, String.class, ignoreMissing); - if (value == null && ignoreMissing) { - return new Tuple(null, null); + if (consistentHashes.get()) { + Map hashedFieldValues = fields.stream().map(f -> { + String value = document.getFieldValue(f, String.class, ignoreMissing); + if (value == null && ignoreMissing) { + return new Tuple(null, null); + } + try { + return new Tuple<>(f, method.hash(mac, salt, value)); + } catch (Exception e) { + throw new IllegalArgumentException("field[" + f + "] could not be hashed", e); + } + }).filter(tuple -> Objects.nonNull(tuple.v1())).collect(Collectors.toMap(Tuple::v1, Tuple::v2)); + if (fields.size() == 1) { + document.setFieldValue(targetField, hashedFieldValues.values().iterator().next()); + } else { + document.setFieldValue(targetField, hashedFieldValues); } - try { - return new Tuple<>(f, method.hash(mac, salt, value)); - } catch (Exception e) { - throw new IllegalArgumentException("field[" + f + "] could not be hashed", e); - } - }).filter(tuple -> Objects.nonNull(tuple.v1())).collect(Collectors.toMap(Tuple::v1, Tuple::v2)); - if (fields.size() == 1) { - document.setFieldValue(targetField, hashedFieldValues.values().iterator().next()); + return document; } else { - document.setFieldValue(targetField, hashedFieldValues); + throw new IllegalArgumentException("inconsistent hash key"); } - return document; } @Override @@ -103,13 +114,20 @@ public String getType() { return TYPE; } + @Override + public void accept(ClusterState clusterState) { + // check hash keys for consistency and unset consistentHashes flag if inconsistent + } + public static final class Factory implements Processor.Factory { private final Settings settings; + private final ClusterService clusterService; private final Map secureKeys; - public Factory(Settings settings) { + public Factory(Settings settings, ClusterService clusterService) { this.settings = settings; + this.clusterService = clusterService; this.secureKeys = new HashMap<>(); HMAC_KEY_SETTING.getAllConcreteSettings(settings).forEach(k -> { secureKeys.put(k.getKey(), k.get(settings)); @@ -146,6 +164,13 @@ public HashProcessor create(Map registry, String proc throw ConfigurationUtils.newConfigurationException(TYPE, processorTag, "key_setting", "key [" + keySettingName + "] must match [xpack.security.ingest.hash.*.key]. It is not set"); } + + Collection> consistentSettings = HMAC_KEY_SETTING.getAllConcreteSettings(settings).collect(Collectors.toList()); + ConsistentSettingsService consistentSettingsService = new ConsistentSettingsService(settings, clusterService, consistentSettings); + if (consistentSettingsService.areAllConsistent() == false) { + throw ConfigurationUtils.newConfigurationException(TYPE, processorTag, "key_setting", "inconsistent hash key [" + keySettingName + "]"); + } + String saltString = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "salt"); byte[] salt = saltString.getBytes(StandardCharsets.UTF_8); String methodProperty = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "method", "SHA256"); @@ -154,6 +179,7 @@ public HashProcessor create(Map registry, String proc Mac mac = createMac(method, key, salt, iterations); return new HashProcessor(processorTag, fields, targetField, salt, method, mac, ignoreMissing); } + } enum Method { From 750890f5e36d7c930bbf1e20058e64a003c51561 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Wed, 12 Feb 2020 10:49:52 -0600 Subject: [PATCH 4/8] don't use ConsistentSettingsService for ensuring consistent keys across ingest nodes --- .../ingest/common/ForEachProcessor.java | 3 +- .../ingest/ConfigurableProcessor.java | 26 +++++++ .../ingest/ConfigurationUtils.java | 23 ++++--- .../elasticsearch/ingest/IngestService.java | 27 +++++--- .../org/elasticsearch/ingest/Pipeline.java | 25 +++++-- .../ingest/PipelineConfiguration.java | 28 +++++++- .../org/elasticsearch/ingest/Processor.java | 5 ++ .../ingest/ConfigurationUtilsTests.java | 12 ++-- .../ingest/IngestServiceTests.java | 46 ++++++------- .../xpack/security/Security.java | 3 +- .../xpack/security/ingest/HashProcessor.java | 67 +++++++++++++------ .../ingest/HashProcessorFactoryTests.java | 21 ++++-- .../security/ingest/HashProcessorTests.java | 10 +-- 13 files changed, 201 insertions(+), 95 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/ingest/ConfigurableProcessor.java diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index d79a64626b611..86968e81077cf 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -27,6 +27,7 @@ import org.elasticsearch.script.ScriptService; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -144,7 +145,7 @@ public ForEachProcessor create(Map factories, String } Map.Entry> entry = entries.iterator().next(); Processor processor = - ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue()); + ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue(), Collections.emptyMap()); return new ForEachProcessor(tag, field, processor, ignoreMissing); } } diff --git a/server/src/main/java/org/elasticsearch/ingest/ConfigurableProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConfigurableProcessor.java new file mode 100644 index 0000000000000..b74d6c0677f66 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/ingest/ConfigurableProcessor.java @@ -0,0 +1,26 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import java.util.Map; + +public interface ConfigurableProcessor { + Map getMetadata(); +} diff --git a/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java b/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java index c2a5b09cd5352..d77129dd38391 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java @@ -326,14 +326,16 @@ public static ElasticsearchException newConfigurationException(String processorT public static List readProcessorConfigs(List> processorConfigs, ScriptService scriptService, - Map processorFactories) throws Exception { + Map processorFactories, + Map pipelineMetadata) throws Exception { Exception exception = null; List processors = new ArrayList<>(); if (processorConfigs != null) { for (Map processorConfigWithKey : processorConfigs) { for (Map.Entry entry : processorConfigWithKey.entrySet()) { try { - processors.add(readProcessor(processorFactories, scriptService, entry.getKey(), entry.getValue())); + processors.add(readProcessor(processorFactories, scriptService, entry.getKey(), entry.getValue(), + pipelineMetadata)); } catch (Exception e) { exception = ExceptionsHelper.useOrSuppress(exception, e); } @@ -393,13 +395,13 @@ private static void addMetadataToException(ElasticsearchException exception, Str @SuppressWarnings("unchecked") public static Processor readProcessor(Map processorFactories, ScriptService scriptService, - String type, Object config) throws Exception { + String type, Object config, Map pipelineMetadata) throws Exception { if (config instanceof Map) { - return readProcessor(processorFactories, scriptService, type, (Map) config); + return readProcessor(processorFactories, scriptService, type, (Map) config, pipelineMetadata); } else if (config instanceof String && "script".equals(type)) { Map normalizedScript = new HashMap<>(1); normalizedScript.put(ScriptType.INLINE.getParseField().getPreferredName(), config); - return readProcessor(processorFactories, scriptService, type, normalizedScript); + return readProcessor(processorFactories, scriptService, type, normalizedScript, pipelineMetadata); } else { throw newConfigurationException(type, null, null, "property isn't a map, but of type [" + config.getClass().getName() + "]"); @@ -407,8 +409,10 @@ public static Processor readProcessor(Map processorFa } public static Processor readProcessor(Map processorFactories, - ScriptService scriptService, - String type, Map config) throws Exception { + ScriptService scriptService, + String type, + Map config, + Map pipelineMetadata) throws Exception { String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY); Script conditionalScript = extractConditional(config); Processor.Factory factory = processorFactories.get(type); @@ -417,7 +421,8 @@ public static Processor readProcessor(Map processorFa List> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, Pipeline.ON_FAILURE_KEY); - List onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, scriptService, processorFactories); + List onFailureProcessors = + readProcessorConfigs(onFailureProcessorConfigs, scriptService, processorFactories, pipelineMetadata); if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) { throw newConfigurationException(type, tag, Pipeline.ON_FAILURE_KEY, @@ -425,7 +430,7 @@ public static Processor readProcessor(Map processorFa } try { - Processor processor = factory.create(processorFactories, tag, config); + Processor processor = factory.create(processorFactories, tag, config, pipelineMetadata); if (config.isEmpty() == false) { throw new ElasticsearchParseException("processor [{}] doesn't support one or more provided configuration parameters {}", type, Arrays.toString(config.keySet().toArray())); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 939443d05a856..d39aae03177bb 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -222,7 +222,7 @@ static List innerGetPipelines(IngestMetadata ingestMetada public void putPipeline(Map ingestInfos, PutPipelineRequest request, ActionListener listener) throws Exception { // validates the pipeline and processor configuration before submitting a cluster update task: - validatePipeline(ingestInfos, request); + final Map pipelineMetadata = validatePipeline(ingestInfos, request); clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(), new AckedClusterStateUpdateTask(request, listener) { @@ -233,7 +233,7 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) { @Override public ClusterState execute(ClusterState currentState) { - return innerPut(request, currentState); + return innerPut(request, currentState, pipelineMetadata); } }); } @@ -293,7 +293,7 @@ private static List> getProcessorMetrics(Compound return processorMetrics; } - static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) { + static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState, Map pipelineMetadata) { IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE); Map pipelines; if (currentIngestMetadata != null) { @@ -302,7 +302,8 @@ static ClusterState innerPut(PutPipelineRequest request, ClusterState currentSta pipelines = new HashMap<>(); } - pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), request.getSource(), request.getXContentType())); + pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), request.getSource(), pipelineMetadata, + request.getXContentType())); ClusterState.Builder newState = ClusterState.builder(currentState); newState.metaData(MetaData.builder(currentState.getMetaData()) .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines)) @@ -310,15 +311,22 @@ static ClusterState innerPut(PutPipelineRequest request, ClusterState currentSta return newState.build(); } - void validatePipeline(Map ingestInfos, PutPipelineRequest request) throws Exception { + Map validatePipeline(Map ingestInfos, PutPipelineRequest request) throws Exception { if (ingestInfos.isEmpty()) { throw new IllegalStateException("Ingest info is empty"); } + Map pipelineMetadata = new HashMap<>(); Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, scriptService); List exceptions = new ArrayList<>(); for (Processor processor : pipeline.flattenAllProcessors()) { + if (processor instanceof ConfigurableProcessor) { + final Map processorMetaData = ((ConfigurableProcessor) processor).getMetadata(); + if (processorMetaData.isEmpty() == false) { + pipelineMetadata.putAll(processorMetaData); + } + } for (Map.Entry entry : ingestInfos.entrySet()) { String type = processor.getType(); if (entry.getValue().containsProcessor(type) == false && ConditionalProcessor.TYPE.equals(type) == false) { @@ -330,6 +338,7 @@ void validatePipeline(Map ingestInfos, PutPipelineReq } } ExceptionsHelper.rethrowAndSuppress(exceptions); + return pipelineMetadata; } public void executeBulkRequest(int numberOfActionRequests, @@ -569,12 +578,8 @@ void innerUpdatePipelines(IngestMetadata newIngestMetadata) { newPipelines = new HashMap<>(existingPipelines); } try { - Pipeline newPipeline = - Pipeline.create(newConfiguration.getId(), newConfiguration.getConfigAsMap(), processorFactories, scriptService); - newPipelines.put( - newConfiguration.getId(), - new PipelineHolder(newConfiguration, newPipeline) - ); + Pipeline newPipeline = Pipeline.create(newConfiguration, processorFactories, scriptService); + newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline)); if (previous == null) { continue; diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index 3d41d991f3e10..ac29fcbe8177c 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -66,25 +66,36 @@ public Pipeline(String id, @Nullable String description, @Nullable Integer versi this.relativeTimeProvider = relativeTimeProvider; } - public static Pipeline create(String id, Map config, - Map processorFactories, ScriptService scriptService) throws Exception { + public static Pipeline create(PipelineConfiguration pc, Map processorFactories, ScriptService scriptService) + throws Exception { + return create(pc.getId(), pc.getConfigAsMap(), processorFactories, scriptService, pc.getMetadata()); + } + + public static Pipeline create(String id, Map config, Map processorFactories, + ScriptService scriptService) throws Exception { + return create(id, config, processorFactories, scriptService, Collections.emptyMap()); + } + + private static Pipeline create(String id, Map config, Map processorFactories, + ScriptService scriptService, Map pipelineMetadata) throws Exception { String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null); List> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY); - List processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, scriptService, processorFactories); + List processors = + ConfigurationUtils.readProcessorConfigs(processorConfigs, scriptService, processorFactories, pipelineMetadata); List> onFailureProcessorConfigs = - ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY); + ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY); List onFailureProcessors = - ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, scriptService, processorFactories); + ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, scriptService, processorFactories, pipelineMetadata); if (config.isEmpty() == false) { throw new ElasticsearchParseException("pipeline [" + id + - "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); } if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) { throw new ElasticsearchParseException("pipeline [" + id + "] cannot have an empty on_failure option defined"); } CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.unmodifiableList(processors), - Collections.unmodifiableList(onFailureProcessors)); + Collections.unmodifiableList(onFailureProcessors)); return new Pipeline(id, description, version, compoundProcessor); } diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java index 81ef55ecf40a6..de168e27ef239 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.xcontent.XContentType; import java.io.IOException; +import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -50,7 +51,10 @@ public final class PipelineConfiguration extends AbstractDiffable { + final Map metadata = parser.mapStrings(); + builder.setMetadata(metadata); + }, new ParseField("metadata"), ObjectParser.ValueType.OBJECT); } public static ContextParser getParser() { @@ -60,6 +64,7 @@ private static class Builder { private String id; private BytesReference config; + private Map metadata; private XContentType xContentType; void setId(String id) { @@ -71,8 +76,12 @@ void setConfig(BytesReference config, XContentType xContentType) { this.xContentType = xContentType; } + void setMetadata(Map metadata) { + this.metadata = metadata; + } + PipelineConfiguration build() { - return new PipelineConfiguration(id, config, xContentType); + return new PipelineConfiguration(id, config, metadata, xContentType); } } @@ -82,11 +91,17 @@ PipelineConfiguration build() { // also the get pipeline api just directly returns this to the caller private final BytesReference config; private final XContentType xContentType; + private final Map metadata; public PipelineConfiguration(String id, BytesReference config, XContentType xContentType) { + this(id, config, Collections.emptyMap(), xContentType); + } + + public PipelineConfiguration(String id, BytesReference config, Map metadata, XContentType xContentType) { this.id = Objects.requireNonNull(id); this.config = Objects.requireNonNull(config); this.xContentType = Objects.requireNonNull(xContentType); + this.metadata = metadata; } public String getId() { @@ -97,6 +112,10 @@ public Map getConfigAsMap() { return XContentHelper.convertToMap(config, true, xContentType).v2(); } + public Map getMetadata() { + return Collections.unmodifiableMap(metadata); + } + // pkg-private for tests XContentType getXContentType() { return xContentType; @@ -112,12 +131,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.field("id", id); builder.field("config", getConfigAsMap()); + builder.field("metadata", metadata); builder.endObject(); return builder; } public static PipelineConfiguration readFrom(StreamInput in) throws IOException { - return new PipelineConfiguration(in.readString(), in.readBytesReference(), in.readEnum(XContentType.class)); + return new PipelineConfiguration(in.readString(), in.readBytesReference(), + in.readMap(StreamInput::readString, StreamInput::readString), in.readEnum(XContentType.class)); } public static Diff readDiffFrom(StreamInput in) throws IOException { @@ -133,6 +154,7 @@ public String toString() { public void writeTo(StreamOutput out) throws IOException { out.writeString(id); out.writeBytesReference(config); + out.writeMap(metadata, StreamOutput::writeString, StreamOutput::writeString); out.writeEnum(xContentType); } diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index 671670212ab0a..e5d87fe2de284 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -90,6 +90,11 @@ interface Factory { */ Processor create(Map processorFactories, String tag, Map config) throws Exception; + + default Processor create(Map processorFactories, String tag, + Map config, Map metadata) throws Exception { + return create(processorFactories, tag, config); + } } /** diff --git a/server/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java b/server/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java index 20f67fd10a36d..5ad89c460cbb5 100644 --- a/server/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java @@ -124,7 +124,7 @@ public void testReadProcessors() throws Exception { config.add(Collections.singletonMap("test_processor", emptyConfig)); config.add(Collections.singletonMap("test_processor", emptyConfig)); - List result = ConfigurationUtils.readProcessorConfigs(config, scriptService, registry); + List result = ConfigurationUtils.readProcessorConfigs(config, scriptService, registry, Collections.emptyMap()); assertThat(result.size(), equalTo(2)); assertThat(result.get(0), sameInstance(processor)); assertThat(result.get(1), sameInstance(processor)); @@ -133,7 +133,7 @@ public void testReadProcessors() throws Exception { unknownTaggedConfig.put("tag", "my_unknown"); config.add(Collections.singletonMap("unknown_processor", unknownTaggedConfig)); ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, - () -> ConfigurationUtils.readProcessorConfigs(config, scriptService, registry)); + () -> ConfigurationUtils.readProcessorConfigs(config, scriptService, registry, Collections.emptyMap())); assertThat(e.getMessage(), equalTo("No processor type exists with name [unknown_processor]")); assertThat(e.getMetadata("es.processor_tag"), equalTo(Collections.singletonList("my_unknown"))); assertThat(e.getMetadata("es.processor_type"), equalTo(Collections.singletonList("unknown_processor"))); @@ -148,7 +148,7 @@ public void testReadProcessors() throws Exception { config2.add(Collections.singletonMap("second_unknown_processor", secondUnknownTaggedConfig)); e = expectThrows( ElasticsearchParseException.class, - () -> ConfigurationUtils.readProcessorConfigs(config2, scriptService, registry) + () -> ConfigurationUtils.readProcessorConfigs(config2, scriptService, registry, Collections.emptyMap()) ); assertThat(e.getMessage(), equalTo("No processor type exists with name [unknown_processor]")); assertThat(e.getMetadata("es.processor_tag"), equalTo(Collections.singletonList("my_unknown"))); @@ -173,17 +173,17 @@ public void testReadProcessorFromObjectOrMap() throws Exception { }); Object emptyConfig = Collections.emptyMap(); - Processor processor1 = ConfigurationUtils.readProcessor(registry, scriptService, "script", emptyConfig); + Processor processor1 = ConfigurationUtils.readProcessor(registry, scriptService, "script", emptyConfig, Collections.emptyMap()); assertThat(processor1, sameInstance(processor)); Object inlineScript = "test_script"; - Processor processor2 = ConfigurationUtils.readProcessor(registry, scriptService, "script", inlineScript); + Processor processor2 = ConfigurationUtils.readProcessor(registry, scriptService, "script", inlineScript, Collections.emptyMap()); assertThat(processor2, sameInstance(processor)); Object invalidConfig = 12L; ElasticsearchParseException ex = expectThrows(ElasticsearchParseException.class, - () -> ConfigurationUtils.readProcessor(registry, scriptService, "unknown_processor", invalidConfig)); + () -> ConfigurationUtils.readProcessor(registry, scriptService, "unknown_processor", invalidConfig, Collections.emptyMap())); assertThat(ex.getMessage(), equalTo("property isn't a map, but of type [" + invalidConfig.getClass().getName() + "]")); } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 0a16d5001638a..04c17e253e3fa 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -300,7 +300,7 @@ public void testGetProcessorsInPipeline() throws Exception { "{\"remove\" : {\"field\": \"_field\", \"tag\": \"tag2\"}}]}"), XContentType.JSON); ClusterState previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); pipeline = ingestService.getPipeline(id); assertThat(pipeline, notNullValue()); @@ -362,7 +362,7 @@ public void testGetProcessorsInPipelineComplexConditional() throws Exception { PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray("{\"processors\": [{\"complexSet\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON); ClusterState previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); pipeline = ingestService.getPipeline(id); assertThat(pipeline, notNullValue()); @@ -385,7 +385,7 @@ public void testCrud() throws Exception { PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON); ClusterState previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); pipeline = ingestService.getPipeline(id); assertThat(pipeline, notNullValue()); @@ -412,7 +412,7 @@ public void testPut() { // add a new pipeline: PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray("{\"processors\": []}"), XContentType.JSON); ClusterState previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); pipeline = ingestService.getPipeline(id); assertThat(pipeline, notNullValue()); @@ -424,7 +424,7 @@ public void testPut() { putRequest = new PutPipelineRequest(id, new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), XContentType.JSON); previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); pipeline = ingestService.getPipeline(id); assertThat(pipeline, notNullValue()); @@ -443,7 +443,7 @@ public void testPutWithErrorResponse() throws IllegalAccessException { PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray("{\"description\": \"empty processors\"}"), XContentType.JSON); ClusterState previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); MockLogAppender mockAppender = new MockLogAppender(); mockAppender.start(); mockAppender.addExpectation( @@ -629,7 +629,7 @@ public String getType() { PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); ClusterState previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final SetOnce failure = new SetOnce<>(); final IndexRequest indexRequest = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline(id).setFinalPipeline("_none"); @@ -656,7 +656,7 @@ public void testExecuteBulkPipelineDoesNotExist() { new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); BulkRequest bulkRequest = new BulkRequest(); @@ -697,7 +697,7 @@ public void testExecuteSuccess() { new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); @@ -716,7 +716,7 @@ public void testExecuteEmptyPipeline() throws Exception { new PutPipelineRequest("_id", new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); @@ -737,7 +737,7 @@ public void testExecutePropagateAllMetaDataUpdates() throws Exception { new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final long newVersion = randomLong(); final String versionType = randomFrom("internal", "external", "external_gt", "external_gte"); @@ -783,7 +783,7 @@ public void testExecuteFailure() throws Exception { new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); @@ -828,7 +828,7 @@ public void testExecuteSuccessWithOnFailure() throws Exception { new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); @@ -857,7 +857,7 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); @@ -918,7 +918,7 @@ public void testBulkRequestExecutionWithFailures() throws Exception { new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @SuppressWarnings("unchecked") @@ -970,7 +970,7 @@ public void testBulkRequestExecution() throws Exception { new BytesArray("{\"processors\": [{\"mock\": {}}], \"description\": \"_description\"}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); ClusterState previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @SuppressWarnings("unchecked") @@ -1020,12 +1020,12 @@ public void testStats() throws Exception { new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); putRequest = new PutPipelineRequest("_id2", new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); @@ -1068,7 +1068,7 @@ public void testStats() throws Exception { putRequest = new PutPipelineRequest("_id1", new BytesArray("{\"processors\": [{\"mock\" : {}}, {\"mock\" : {}}]}"), XContentType.JSON); previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); @@ -1092,7 +1092,7 @@ public void testStats() throws Exception { new BytesArray("{\"processors\": [{\"failure-mock\" : { \"on_failure\": [{\"mock\" : {}}]}}, {\"mock\" : {}}]}"), XContentType.JSON); previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); @@ -1157,7 +1157,7 @@ public String getTag() { new BytesArray("{\"processors\": [{\"drop\" : {}}, {\"mock\" : {}}]}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); @@ -1203,7 +1203,7 @@ public Map getProcessors(Processor.Parameters paramet new BytesArray("{\"processors\": [{\"test\" : {}}]}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); // Sanity check that counter has been updated twice: @@ -1220,7 +1220,7 @@ public void testCBORParsing() throws Exception { ClusterState previousClusterState = clusterState; PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray("{\"processors\": [{\"foo\" : {}}]}"), XContentType.JSON); - clusterState = IngestService.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState, Collections.emptyMap()); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); assertThat(ingestService.getPipeline("_id"), notNullValue()); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index 58d0fb77bf95d..f6569fdacdca0 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -813,8 +813,7 @@ public List getRestHandlers(Settings settings, RestController restC public Map getProcessors(Processor.Parameters parameters) { Map processors = new HashMap<>(); processors.put(SetSecurityUserProcessor.TYPE, new SetSecurityUserProcessor.Factory(parameters.threadContext)); - processors.put(HashProcessor.TYPE, new HashProcessor.Factory(parameters.env.settings(), - parameters.ingestService.getClusterService())); + processors.put(HashProcessor.TYPE, new HashProcessor.Factory(parameters)); return processors; } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/HashProcessor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/HashProcessor.java index 153dc7539cba8..73a7d3f6a77d3 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/HashProcessor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/HashProcessor.java @@ -6,17 +6,17 @@ package org.elasticsearch.xpack.security.ingest; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.Version; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.settings.ConsistentSettingsService; import org.elasticsearch.common.settings.SecureSetting; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.ConfigurableProcessor; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; @@ -28,18 +28,18 @@ import javax.crypto.spec.SecretKeySpec; import java.nio.charset.StandardCharsets; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.security.spec.InvalidKeySpecException; import java.util.Arrays; import java.util.Base64; -import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import java.util.stream.Collectors; import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; @@ -47,7 +47,7 @@ /** * A processor that hashes the contents of a field or fields using various hashing algorithms */ -public final class HashProcessor extends AbstractProcessor implements Consumer { +public final class HashProcessor extends AbstractProcessor implements ConfigurableProcessor { public static final String TYPE = "hash"; public static final Setting.AffixSetting HMAC_KEY_SETTING = SecureSetting .affixKeySetting(SecurityField.setting("ingest." + TYPE) + ".", "key", @@ -59,10 +59,12 @@ public final class HashProcessor extends AbstractProcessor implements Consumer fields, String targetField, byte[] salt, Method method, @Nullable Mac mac, - boolean ignoreMissing) { + boolean ignoreMissing, String keySettingName, String persistedSecretKeyHash, boolean consistentHashKey) { super(tag); this.fields = fields; this.targetField = targetField; @@ -70,6 +72,9 @@ public final class HashProcessor extends AbstractProcessor implements Consumer getFields() { @@ -105,7 +110,7 @@ public IngestDocument execute(IngestDocument document) { } return document; } else { - throw new IllegalArgumentException("inconsistent hash key"); + throw new IllegalStateException("inconsistent hash key"); } } @@ -115,19 +120,19 @@ public String getType() { } @Override - public void accept(ClusterState clusterState) { - // check hash keys for consistency and unset consistentHashes flag if inconsistent + public Map getMetadata() { + return Collections.singletonMap(keySettingName, persistedSecretKeyHash); } public static final class Factory implements Processor.Factory { private final Settings settings; - private final ClusterService clusterService; private final Map secureKeys; + private final ClusterService clusterService; - public Factory(Settings settings, ClusterService clusterService) { - this.settings = settings; - this.clusterService = clusterService; + public Factory(Parameters parameters) { + this.settings = parameters.env.settings(); + this.clusterService = parameters.ingestService.getClusterService(); this.secureKeys = new HashMap<>(); HMAC_KEY_SETTING.getAllConcreteSettings(settings).forEach(k -> { secureKeys.put(k.getKey(), k.get(settings)); @@ -149,6 +154,12 @@ private static Mac createMac(Method method, SecureString password, byte[] salt, @Override public HashProcessor create(Map registry, String processorTag, Map config) { + return create(registry, processorTag, config, null); + } + + @Override + public HashProcessor create(Map registry, String processorTag, Map config, + Map metadata) { boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false); List fields = ConfigurationUtils.readList(TYPE, processorTag, config, "fields"); if (fields.isEmpty()) { @@ -165,19 +176,33 @@ public HashProcessor create(Map registry, String proc "key [" + keySettingName + "] must match [xpack.security.ingest.hash.*.key]. It is not set"); } - Collection> consistentSettings = HMAC_KEY_SETTING.getAllConcreteSettings(settings).collect(Collectors.toList()); - ConsistentSettingsService consistentSettingsService = new ConsistentSettingsService(settings, clusterService, consistentSettings); - if (consistentSettingsService.areAllConsistent() == false) { - throw ConfigurationUtils.newConfigurationException(TYPE, processorTag, "key_setting", "inconsistent hash key [" + keySettingName + "]"); - } - String saltString = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "salt"); byte[] salt = saltString.getBytes(StandardCharsets.UTF_8); String methodProperty = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "method", "SHA256"); Method method = Method.fromString(processorTag, "method", methodProperty); int iterations = ConfigurationUtils.readIntProperty(TYPE, processorTag, config, "iterations", 5); Mac mac = createMac(method, key, salt, iterations); - return new HashProcessor(processorTag, fields, targetField, salt, method, mac, ignoreMissing); + + // check that all nodes are V8+ + if (clusterService.state().nodes().getMinNodeVersion().compareTo(Version.V_8_0_0) < 0) { + throw new ElasticsearchException("hash processor requires minimum node version of " + Version.V_8_0_0); + } + + final boolean consistentKeys; + final String secretKeyHash; + try { + final MessageDigest md = MessageDigest.getInstance("SHA-256"); + secretKeyHash = Base64.getEncoder().encodeToString(md.digest(key.toString().getBytes(StandardCharsets.UTF_8))); + final String configuredSecretKeyHash = metadata != null && metadata.containsKey(keySettingName) + ? metadata.get(keySettingName) + : secretKeyHash; + consistentKeys = secretKeyHash.equals(configuredSecretKeyHash); + } catch (NoSuchAlgorithmException e) { + throw ConfigurationUtils.newConfigurationException(TYPE, processorTag, "key_setting", e); + } + + return new HashProcessor(processorTag, fields, targetField, salt, method, mac, ignoreMissing, keySettingName, secretKeyHash, + consistentKeys); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/ingest/HashProcessorFactoryTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/ingest/HashProcessorFactoryTests.java index e9dda488e7216..cf2241a8c5131 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/ingest/HashProcessorFactoryTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/ingest/HashProcessorFactoryTests.java @@ -8,9 +8,12 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.Environment; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.test.ESTestCase; import java.nio.charset.StandardCharsets; +import java.nio.file.Path; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -23,7 +26,7 @@ public void testProcessor() { MockSecureSettings mockSecureSettings = new MockSecureSettings(); mockSecureSettings.setString("xpack.security.ingest.hash.processor.key", "my_key"); Settings settings = Settings.builder().setSecureSettings(mockSecureSettings).build(); - HashProcessor.Factory factory = new HashProcessor.Factory(settings); + HashProcessor.Factory factory = new HashProcessor.Factory(packageSettings(settings)); Map config = new HashMap<>(); config.put("fields", Collections.singletonList("_field")); config.put("target_field", "_target"); @@ -42,7 +45,7 @@ public void testProcessorNoFields() { MockSecureSettings mockSecureSettings = new MockSecureSettings(); mockSecureSettings.setString("xpack.security.ingest.hash.processor.key", "my_key"); Settings settings = Settings.builder().setSecureSettings(mockSecureSettings).build(); - HashProcessor.Factory factory = new HashProcessor.Factory(settings); + HashProcessor.Factory factory = new HashProcessor.Factory(packageSettings(settings)); Map config = new HashMap<>(); config.put("target_field", "_target"); config.put("salt", "_salt"); @@ -57,7 +60,7 @@ public void testProcessorNoTargetField() { MockSecureSettings mockSecureSettings = new MockSecureSettings(); mockSecureSettings.setString("xpack.security.ingest.hash.processor.key", "my_key"); Settings settings = Settings.builder().setSecureSettings(mockSecureSettings).build(); - HashProcessor.Factory factory = new HashProcessor.Factory(settings); + HashProcessor.Factory factory = new HashProcessor.Factory(packageSettings(settings)); Map config = new HashMap<>(); config.put("fields", Collections.singletonList("_field")); config.put("salt", "_salt"); @@ -72,7 +75,7 @@ public void testProcessorFieldsIsEmpty() { MockSecureSettings mockSecureSettings = new MockSecureSettings(); mockSecureSettings.setString("xpack.security.ingest.hash.processor.key", "my_key"); Settings settings = Settings.builder().setSecureSettings(mockSecureSettings).build(); - HashProcessor.Factory factory = new HashProcessor.Factory(settings); + HashProcessor.Factory factory = new HashProcessor.Factory(packageSettings(settings)); Map config = new HashMap<>(); config.put("fields", Collections.singletonList(randomBoolean() ? "" : null)); config.put("salt", "_salt"); @@ -88,7 +91,7 @@ public void testProcessorMissingSalt() { MockSecureSettings mockSecureSettings = new MockSecureSettings(); mockSecureSettings.setString("xpack.security.ingest.hash.processor.key", "my_key"); Settings settings = Settings.builder().setSecureSettings(mockSecureSettings).build(); - HashProcessor.Factory factory = new HashProcessor.Factory(settings); + HashProcessor.Factory factory = new HashProcessor.Factory(packageSettings(settings)); Map config = new HashMap<>(); config.put("fields", Collections.singletonList("_field")); config.put("target_field", "_target"); @@ -102,7 +105,7 @@ public void testProcessorInvalidMethod() { MockSecureSettings mockSecureSettings = new MockSecureSettings(); mockSecureSettings.setString("xpack.security.ingest.hash.processor.key", "my_key"); Settings settings = Settings.builder().setSecureSettings(mockSecureSettings).build(); - HashProcessor.Factory factory = new HashProcessor.Factory(settings); + HashProcessor.Factory factory = new HashProcessor.Factory(packageSettings(settings)); Map config = new HashMap<>(); config.put("fields", Collections.singletonList("_field")); config.put("salt", "_salt"); @@ -117,7 +120,7 @@ public void testProcessorInvalidMethod() { public void testProcessorInvalidOrMissingKeySetting() { Settings settings = Settings.builder().setSecureSettings(new MockSecureSettings()).build(); - HashProcessor.Factory factory = new HashProcessor.Factory(settings); + HashProcessor.Factory factory = new HashProcessor.Factory(packageSettings(settings)); Map config = new HashMap<>(); config.put("fields", Collections.singletonList("_field")); config.put("salt", "_salt"); @@ -133,4 +136,8 @@ public void testProcessorInvalidOrMissingKeySetting() { () -> factory.create(null, "_tag", config)); assertThat(ex.getMessage(), equalTo("[key_setting] required property is missing")); } + + private static Processor.Parameters packageSettings(Settings settings) { + return new Processor.Parameters(new Environment(settings, Path.of("dummy")), null, null, null, null, null, null, null, null); + } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/ingest/HashProcessorTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/ingest/HashProcessorTests.java index b3890600592f5..3647487dd9a65 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/ingest/HashProcessorTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/ingest/HashProcessorTests.java @@ -33,7 +33,7 @@ public void testIgnoreMissing() throws Exception { Map fields = new HashMap<>(); fields.put("one", "foo"); HashProcessor processor = new HashProcessor("_tag", Arrays.asList("one", "two"), - "target", "_salt".getBytes(StandardCharsets.UTF_8), Method.SHA1, mac, true); + "target", "_salt".getBytes(StandardCharsets.UTF_8), Method.SHA1, mac, true, "", "", true); IngestDocument ingestDocument = new IngestDocument(fields, new HashMap<>()); processor.execute(ingestDocument); Map target = ingestDocument.getFieldValue("target", Map.class); @@ -41,7 +41,7 @@ public void testIgnoreMissing() throws Exception { assertNotNull(target.get("one")); HashProcessor failProcessor = new HashProcessor("_tag", Arrays.asList("one", "two"), - "target", "_salt".getBytes(StandardCharsets.UTF_8), Method.SHA1, mac, false); + "target", "_salt".getBytes(StandardCharsets.UTF_8), Method.SHA1, mac, false, "", "", true); IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> failProcessor.execute(ingestDocument)); assertThat(exception.getMessage(), equalTo("field [two] not present as part of path [two]")); } @@ -56,7 +56,7 @@ public void testStaticKeyAndSalt() throws Exception { Map fields = new HashMap<>(); fields.put("field", "0123456789"); HashProcessor processor = new HashProcessor("_tag", Collections.singletonList("field"), - "target", salt, Method.SHA1, mac, false); + "target", salt, Method.SHA1, mac, false, "", "", true); IngestDocument ingestDocument = new IngestDocument(fields, new HashMap<>()); processor.execute(ingestDocument); assertThat(ingestDocument.getFieldValue("target", String.class), equalTo("X3NhbHQMW0oHJGEEE9obGcGv5tGd7HFyDw==")); @@ -73,7 +73,7 @@ public void testProcessorSingleField() throws Exception { Method method = randomFrom(Method.values()); Mac mac = createMac(method); byte[] salt = randomByteArrayOfLength(5); - HashProcessor processor = new HashProcessor("_tag", fields, targetField, salt, method, mac, false); + HashProcessor processor = new HashProcessor("_tag", fields, targetField, salt, method, mac, false, "", "", true); IngestDocument ingestDocument = new IngestDocument(docFields, new HashMap<>()); processor.execute(ingestDocument); @@ -101,7 +101,7 @@ public void testProcessorMultipleFields() throws Exception { Method method = randomFrom(Method.values()); Mac mac = createMac(method); byte[] salt = randomByteArrayOfLength(5); - HashProcessor processor = new HashProcessor("_tag", fields, targetField, salt, method, mac, false); + HashProcessor processor = new HashProcessor("_tag", fields, targetField, salt, method, mac, false, "", "", true); IngestDocument ingestDocument = new IngestDocument(docFields, new HashMap<>()); processor.execute(ingestDocument); From b663e21b69778ce35ddcef202e0dbd220b5c1e8c Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Wed, 12 Feb 2020 13:47:40 -0600 Subject: [PATCH 5/8] ForEach processor should pass pipeline metadata to child processors --- .../elasticsearch/ingest/common/ForEachProcessor.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index 86968e81077cf..391e15d6118fd 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -135,7 +135,7 @@ public static final class Factory implements Processor.Factory { @Override public ForEachProcessor create(Map factories, String tag, - Map config) throws Exception { + Map config, Map pipelineMetadata) throws Exception { String field = readStringProperty(TYPE, tag, config, "field"); boolean ignoreMissing = readBooleanProperty(TYPE, tag, config, "ignore_missing", false); Map> processorConfig = readMap(TYPE, tag, config, "processor"); @@ -145,8 +145,14 @@ public ForEachProcessor create(Map factories, String } Map.Entry> entry = entries.iterator().next(); Processor processor = - ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue(), Collections.emptyMap()); + ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue(), pipelineMetadata); return new ForEachProcessor(tag, field, processor, ignoreMissing); } + + @Override + public ForEachProcessor create(Map factories, String tag, + Map config) throws Exception { + return create(factories, tag, config, Collections.emptyMap()); + } } } From aec3db133b5375aeb45684aee3559d8c75a5ff96 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Wed, 12 Feb 2020 14:09:30 -0600 Subject: [PATCH 6/8] Unit test to verify that ForEach processor passes pipeline metadata to child processors --- .../common/ForEachProcessorFactoryTests.java | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java index bc522caf9b919..39f4dd8e66ad7 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java @@ -20,6 +20,9 @@ package org.elasticsearch.ingest.common; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.ConfigurableProcessor; +import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.script.ScriptService; @@ -32,6 +35,7 @@ import java.util.function.Consumer; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; public class ForEachProcessorFactoryTests extends ESTestCase { @@ -55,6 +59,24 @@ public void testCreate() throws Exception { assertFalse(forEachProcessor.isIgnoreMissing()); } + public void testCreatePassesPipelineMetadataToChildProcessors() throws Exception { + Map registry = new HashMap<>(); + registry.put("_name", new TestConfigurableProcessor.Factory()); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); + + Map config = new HashMap<>(); + config.put("field", "_field"); + config.put("processor", Collections.singletonMap("_name", Collections.emptyMap())); + Map pipelineMetadata = new HashMap<>(); + pipelineMetadata.put("foo", "bar"); + ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config, pipelineMetadata); + assertThat(forEachProcessor, Matchers.notNullValue()); + assertThat(forEachProcessor.getField(), equalTo("_field")); + assertThat(forEachProcessor.getInnerProcessor(), Matchers.instanceOf(ConfigurableProcessor.class)); + assertThat(((ConfigurableProcessor) forEachProcessor.getInnerProcessor()).getMetadata(), equalTo(pipelineMetadata)); + assertFalse(forEachProcessor.isIgnoreMissing()); + } + public void testSetIgnoreMissing() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); @@ -118,4 +140,50 @@ public void testCreateWithMissingProcessor() { assertThat(exception.getMessage(), equalTo("[processor] required property is missing")); } + private static class TestConfigurableProcessor extends AbstractProcessor implements ConfigurableProcessor { + + private final Map metadata; + + TestConfigurableProcessor(String tag, Map metadata) { + super(tag); + this.metadata = Collections.unmodifiableMap(metadata); + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + return ingestDocument; + } + + @Override + public String getType() { + return null; + } + + @Override + public String getTag() { + return null; + } + + @Override + public Map getMetadata() { + return metadata; + } + + public static final class Factory implements Processor.Factory { + + @Override + public Processor create(Map processorFactories, String tag, Map config) + throws Exception { + return create(processorFactories, tag, config, Collections.emptyMap()); + } + + @Override + public Processor create(Map processorFactories, String tag, Map config, + Map metadata) throws Exception { + return new TestConfigurableProcessor(tag, metadata); + } + } + + } + } From d5f0fe2acddaabe05e87ce144ee5936075e4ece1 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Wed, 12 Feb 2020 17:08:23 -0600 Subject: [PATCH 7/8] isolate pipeline metadata per processor --- .../ingest/common/ForEachProcessor.java | 2 +- .../common/ForEachProcessorFactoryTests.java | 19 +++++-- .../ingest/ConfigurableProcessor.java | 15 ++++++ .../ingest/ConfigurationUtils.java | 6 +-- .../elasticsearch/ingest/IngestService.java | 24 ++++++--- .../org/elasticsearch/ingest/Pipeline.java | 2 +- .../ingest/PipelineConfiguration.java | 18 +++---- .../org/elasticsearch/ingest/Processor.java | 2 +- .../xpack/security/ingest/HashProcessor.java | 53 ++++++++++++------- 9 files changed, 96 insertions(+), 45 deletions(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index 391e15d6118fd..219316ba3739d 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -135,7 +135,7 @@ public static final class Factory implements Processor.Factory { @Override public ForEachProcessor create(Map factories, String tag, - Map config, Map pipelineMetadata) throws Exception { + Map config, Map pipelineMetadata) throws Exception { String field = readStringProperty(TYPE, tag, config, "field"); boolean ignoreMissing = readBooleanProperty(TYPE, tag, config, "ignore_missing", false); Map> processorConfig = readMap(TYPE, tag, config, "processor"); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java index 39f4dd8e66ad7..074c003553393 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java @@ -35,7 +35,6 @@ import java.util.function.Consumer; import static org.hamcrest.Matchers.equalTo; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; public class ForEachProcessorFactoryTests extends ESTestCase { @@ -67,8 +66,10 @@ public void testCreatePassesPipelineMetadataToChildProcessors() throws Exception Map config = new HashMap<>(); config.put("field", "_field"); config.put("processor", Collections.singletonMap("_name", Collections.emptyMap())); - Map pipelineMetadata = new HashMap<>(); - pipelineMetadata.put("foo", "bar"); + Map processorMetadata = new HashMap<>(); + processorMetadata.put("foo", "bar"); + Map pipelineMetadata = new HashMap<>(); + pipelineMetadata.put(TestConfigurableProcessor.IDENTIFIER, processorMetadata); ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config, pipelineMetadata); assertThat(forEachProcessor, Matchers.notNullValue()); assertThat(forEachProcessor.getField(), equalTo("_field")); @@ -142,6 +143,8 @@ public void testCreateWithMissingProcessor() { private static class TestConfigurableProcessor extends AbstractProcessor implements ConfigurableProcessor { + static final String IDENTIFIER = "testConfigurableProcessor"; + private final Map metadata; TestConfigurableProcessor(String tag, Map metadata) { @@ -164,6 +167,11 @@ public String getTag() { return null; } + @Override + public String getIdentifier() { + return IDENTIFIER; + } + @Override public Map getMetadata() { return metadata; @@ -177,10 +185,11 @@ public Processor create(Map processorFactories, Strin return create(processorFactories, tag, config, Collections.emptyMap()); } + @SuppressWarnings("unchecked") @Override public Processor create(Map processorFactories, String tag, Map config, - Map metadata) throws Exception { - return new TestConfigurableProcessor(tag, metadata); + Map metadata) throws Exception { + return new TestConfigurableProcessor(tag, (Map) metadata.get(IDENTIFIER)); } } diff --git a/server/src/main/java/org/elasticsearch/ingest/ConfigurableProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConfigurableProcessor.java index b74d6c0677f66..e20e002caad9d 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConfigurableProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConfigurableProcessor.java @@ -21,6 +21,21 @@ import java.util.Map; +/** + * Ingest processors may implement this interface to supply environment-specific metadata at pipeline creation time. This + * metadata will be persisted in the pipeline's definition in cluster state and supplied to the processor's factory any + * time the processor is created on ingest nodes in the cluster. + */ public interface ConfigurableProcessor { + + /** + * Processors must supply an identifier that uniquely identifies themselves based on their configuration. + * @return Unique identifier + */ + String getIdentifier(); + + /** + * @return Metadata to be persisted in pipeline definition for this processor. + */ Map getMetadata(); } diff --git a/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java b/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java index d77129dd38391..fdf95d067c64a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java @@ -327,7 +327,7 @@ public static ElasticsearchException newConfigurationException(String processorT public static List readProcessorConfigs(List> processorConfigs, ScriptService scriptService, Map processorFactories, - Map pipelineMetadata) throws Exception { + Map pipelineMetadata) throws Exception { Exception exception = null; List processors = new ArrayList<>(); if (processorConfigs != null) { @@ -395,7 +395,7 @@ private static void addMetadataToException(ElasticsearchException exception, Str @SuppressWarnings("unchecked") public static Processor readProcessor(Map processorFactories, ScriptService scriptService, - String type, Object config, Map pipelineMetadata) throws Exception { + String type, Object config, Map pipelineMetadata) throws Exception { if (config instanceof Map) { return readProcessor(processorFactories, scriptService, type, (Map) config, pipelineMetadata); } else if (config instanceof String && "script".equals(type)) { @@ -412,7 +412,7 @@ public static Processor readProcessor(Map processorFa ScriptService scriptService, String type, Map config, - Map pipelineMetadata) throws Exception { + Map pipelineMetadata) throws Exception { String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY); Script conditionalScript = extractConditional(config); Processor.Factory factory = processorFactories.get(type); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index d39aae03177bb..9cf23b4e1e6ed 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -222,7 +222,7 @@ static List innerGetPipelines(IngestMetadata ingestMetada public void putPipeline(Map ingestInfos, PutPipelineRequest request, ActionListener listener) throws Exception { // validates the pipeline and processor configuration before submitting a cluster update task: - final Map pipelineMetadata = validatePipeline(ingestInfos, request); + final Map pipelineMetadata = validatePipeline(ingestInfos, request); clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(), new AckedClusterStateUpdateTask(request, listener) { @@ -293,7 +293,7 @@ private static List> getProcessorMetrics(Compound return processorMetrics; } - static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState, Map pipelineMetadata) { + static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState, Map pipelineMetadata) { IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE); Map pipelines; if (currentIngestMetadata != null) { @@ -311,20 +311,22 @@ static ClusterState innerPut(PutPipelineRequest request, ClusterState currentSta return newState.build(); } - Map validatePipeline(Map ingestInfos, PutPipelineRequest request) throws Exception { + Map validatePipeline(Map ingestInfos, PutPipelineRequest request) + throws Exception { if (ingestInfos.isEmpty()) { throw new IllegalStateException("Ingest info is empty"); } - Map pipelineMetadata = new HashMap<>(); + Map pipelineMetadata = new HashMap<>(); Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, scriptService); List exceptions = new ArrayList<>(); for (Processor processor : pipeline.flattenAllProcessors()) { if (processor instanceof ConfigurableProcessor) { - final Map processorMetaData = ((ConfigurableProcessor) processor).getMetadata(); + final ConfigurableProcessor cp = (ConfigurableProcessor) processor; + final Map processorMetaData = cp.getMetadata(); if (processorMetaData.isEmpty() == false) { - pipelineMetadata.putAll(processorMetaData); + getOrCreateMetadataNode(pipelineMetadata, cp.getIdentifier()).putAll(processorMetaData); } } for (Map.Entry entry : ingestInfos.entrySet()) { @@ -341,6 +343,16 @@ Map validatePipeline(Map ingestInfos, return pipelineMetadata; } + @SuppressWarnings("unchecked") + private static Map getOrCreateMetadataNode(Map metadata, String identifier) { + if (metadata.containsKey(identifier) == false) { + Map node = new HashMap<>(); + metadata.put(identifier, node); + } + return (Map) metadata.get(identifier); + } + + public void executeBulkRequest(int numberOfActionRequests, Iterable> actionRequests, BiConsumer onFailure, diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index ac29fcbe8177c..00a1b07c60759 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -77,7 +77,7 @@ public static Pipeline create(String id, Map config, Map config, Map processorFactories, - ScriptService scriptService, Map pipelineMetadata) throws Exception { + ScriptService scriptService, Map pipelineMetadata) throws Exception { String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null); List> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY); diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java index de168e27ef239..a974cd170b43b 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java @@ -52,7 +52,7 @@ public final class PipelineConfiguration extends AbstractDiffable { - final Map metadata = parser.mapStrings(); + final Map metadata = parser.map(); builder.setMetadata(metadata); }, new ParseField("metadata"), ObjectParser.ValueType.OBJECT); } @@ -64,7 +64,7 @@ private static class Builder { private String id; private BytesReference config; - private Map metadata; + private Map metadata; private XContentType xContentType; void setId(String id) { @@ -76,7 +76,7 @@ void setConfig(BytesReference config, XContentType xContentType) { this.xContentType = xContentType; } - void setMetadata(Map metadata) { + void setMetadata(Map metadata) { this.metadata = metadata; } @@ -91,13 +91,13 @@ PipelineConfiguration build() { // also the get pipeline api just directly returns this to the caller private final BytesReference config; private final XContentType xContentType; - private final Map metadata; + private final Map metadata; public PipelineConfiguration(String id, BytesReference config, XContentType xContentType) { this(id, config, Collections.emptyMap(), xContentType); } - public PipelineConfiguration(String id, BytesReference config, Map metadata, XContentType xContentType) { + public PipelineConfiguration(String id, BytesReference config, Map metadata, XContentType xContentType) { this.id = Objects.requireNonNull(id); this.config = Objects.requireNonNull(config); this.xContentType = Objects.requireNonNull(xContentType); @@ -112,7 +112,7 @@ public Map getConfigAsMap() { return XContentHelper.convertToMap(config, true, xContentType).v2(); } - public Map getMetadata() { + public Map getMetadata() { return Collections.unmodifiableMap(metadata); } @@ -137,8 +137,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } public static PipelineConfiguration readFrom(StreamInput in) throws IOException { - return new PipelineConfiguration(in.readString(), in.readBytesReference(), - in.readMap(StreamInput::readString, StreamInput::readString), in.readEnum(XContentType.class)); + return new PipelineConfiguration(in.readString(), in.readBytesReference(), in.readMap(), in.readEnum(XContentType.class)); } public static Diff readDiffFrom(StreamInput in) throws IOException { @@ -154,7 +153,7 @@ public String toString() { public void writeTo(StreamOutput out) throws IOException { out.writeString(id); out.writeBytesReference(config); - out.writeMap(metadata, StreamOutput::writeString, StreamOutput::writeString); + out.writeMap(metadata); out.writeEnum(xContentType); } @@ -167,7 +166,6 @@ public boolean equals(Object o) { if (!id.equals(that.id)) return false; return getConfigAsMap().equals(that.getConfigAsMap()); - } @Override diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index e5d87fe2de284..e373bbf8662dd 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -92,7 +92,7 @@ Processor create(Map processorFactories, String tag, Map config) throws Exception; default Processor create(Map processorFactories, String tag, - Map config, Map metadata) throws Exception { + Map config, Map metadata) throws Exception { return create(processorFactories, tag, config); } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/HashProcessor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/HashProcessor.java index 73a7d3f6a77d3..3f3e4c7df24f3 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/HashProcessor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/HashProcessor.java @@ -60,11 +60,11 @@ public final class HashProcessor extends AbstractProcessor implements Configurab private final byte[] salt; private final boolean ignoreMissing; private final AtomicBoolean consistentHashes = new AtomicBoolean(false); - private final String keySettingName; + private final String identifier; private final String persistedSecretKeyHash; HashProcessor(String tag, List fields, String targetField, byte[] salt, Method method, @Nullable Mac mac, - boolean ignoreMissing, String keySettingName, String persistedSecretKeyHash, boolean consistentHashKey) { + boolean ignoreMissing, String identifier, String persistedSecretKeyHash, boolean consistentHashKey) { super(tag); this.fields = fields; this.targetField = targetField; @@ -72,7 +72,7 @@ public final class HashProcessor extends AbstractProcessor implements Configurab this.mac = mac; this.salt = salt; this.ignoreMissing = ignoreMissing; - this.keySettingName = keySettingName; + this.identifier = identifier; this.persistedSecretKeyHash = persistedSecretKeyHash; this.consistentHashes.set(consistentHashKey); } @@ -121,7 +121,12 @@ public String getType() { @Override public Map getMetadata() { - return Collections.singletonMap(keySettingName, persistedSecretKeyHash); + return Collections.singletonMap("keyHash", persistedSecretKeyHash); + } + + @Override + public String getIdentifier() { + return identifier; } public static final class Factory implements Processor.Factory { @@ -154,12 +159,12 @@ private static Mac createMac(Method method, SecureString password, byte[] salt, @Override public HashProcessor create(Map registry, String processorTag, Map config) { - return create(registry, processorTag, config, null); + return create(registry, processorTag, config, Collections.emptyMap()); } @Override public HashProcessor create(Map registry, String processorTag, Map config, - Map metadata) { + Map metadata) { boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false); List fields = ConfigurationUtils.readList(TYPE, processorTag, config, "fields"); if (fields.isEmpty()) { @@ -188,23 +193,35 @@ public HashProcessor create(Map registry, String proc throw new ElasticsearchException("hash processor requires minimum node version of " + Version.V_8_0_0); } - final boolean consistentKeys; - final String secretKeyHash; + final MessageDigest md; try { - final MessageDigest md = MessageDigest.getInstance("SHA-256"); - secretKeyHash = Base64.getEncoder().encodeToString(md.digest(key.toString().getBytes(StandardCharsets.UTF_8))); - final String configuredSecretKeyHash = metadata != null && metadata.containsKey(keySettingName) - ? metadata.get(keySettingName) - : secretKeyHash; - consistentKeys = secretKeyHash.equals(configuredSecretKeyHash); + md = MessageDigest.getInstance("SHA-256"); } catch (NoSuchAlgorithmException e) { - throw ConfigurationUtils.newConfigurationException(TYPE, processorTag, "key_setting", e); + throw new ElasticsearchException("Error creating hash", e); } - return new HashProcessor(processorTag, fields, targetField, salt, method, mac, ignoreMissing, keySettingName, secretKeyHash, - consistentKeys); - } + StringBuilder identifierInput = new StringBuilder(Boolean.toString(ignoreMissing)); + for (String field : fields) { + identifierInput.append(field); + } + identifierInput.append(targetField); + identifierInput.append(keySettingName); + identifierInput.append(saltString); + identifierInput.append(methodProperty); + identifierInput.append(iterations); + final String identifier = + TYPE + "_" + Base64.getEncoder().encodeToString(md.digest(identifierInput.toString().getBytes(StandardCharsets.UTF_8))); + + @SuppressWarnings("unchecked") + Map hashMetadata = metadata != null ? (Map) metadata.get(identifier) : null; + final String secretKeyHash = Base64.getEncoder().encodeToString(md.digest(key.toString().getBytes(StandardCharsets.UTF_8))); + final String configuredSecretKeyHash = hashMetadata != null && hashMetadata.containsKey("keyHash") + ? hashMetadata.get("keyHash") + : secretKeyHash; + return new HashProcessor(processorTag, fields, targetField, salt, method, mac, ignoreMissing, identifier, secretKeyHash, + secretKeyHash.equals(configuredSecretKeyHash)); + } } enum Method { From 4ca8f0a81cd33b4759c9ff12ebf943fe7e90af6b Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Thu, 13 Feb 2020 08:30:53 -0600 Subject: [PATCH 8/8] fix tests --- .../common/ForEachProcessorFactoryTests.java | 2 +- .../ingest/PipelineConfigurationTests.java | 2 +- .../ingest/HashProcessorFactoryTests.java | 54 +++++++++++++++---- 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java index 074c003553393..38960f2698c38 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java @@ -74,7 +74,7 @@ public void testCreatePassesPipelineMetadataToChildProcessors() throws Exception assertThat(forEachProcessor, Matchers.notNullValue()); assertThat(forEachProcessor.getField(), equalTo("_field")); assertThat(forEachProcessor.getInnerProcessor(), Matchers.instanceOf(ConfigurableProcessor.class)); - assertThat(((ConfigurableProcessor) forEachProcessor.getInnerProcessor()).getMetadata(), equalTo(pipelineMetadata)); + assertThat(((ConfigurableProcessor) forEachProcessor.getInnerProcessor()).getMetadata(), equalTo(processorMetadata)); assertFalse(forEachProcessor.isIgnoreMissing()); } diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java index eb1171f66a597..abdbfa8425e6b 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java @@ -93,6 +93,6 @@ protected boolean supportsUnknownFields() { @Override protected Predicate getRandomFieldsExcludeFilter() { - return field -> field.equals("config"); + return field -> field.equals("config") || field.equals("metadata"); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/ingest/HashProcessorFactoryTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/ingest/HashProcessorFactoryTests.java index cf2241a8c5131..f0a5d54942fc6 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/ingest/HashProcessorFactoryTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/ingest/HashProcessorFactoryTests.java @@ -6,9 +6,14 @@ package org.elasticsearch.xpack.security.ingest; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Processor; import org.elasticsearch.test.ESTestCase; @@ -19,6 +24,9 @@ import java.util.Map; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.startsWith; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class HashProcessorFactoryTests extends ESTestCase { @@ -26,7 +34,7 @@ public void testProcessor() { MockSecureSettings mockSecureSettings = new MockSecureSettings(); mockSecureSettings.setString("xpack.security.ingest.hash.processor.key", "my_key"); Settings settings = Settings.builder().setSecureSettings(mockSecureSettings).build(); - HashProcessor.Factory factory = new HashProcessor.Factory(packageSettings(settings)); + HashProcessor.Factory factory = new HashProcessor.Factory(setupEnvironment(settings)); Map config = new HashMap<>(); config.put("fields", Collections.singletonList("_field")); config.put("target_field", "_target"); @@ -41,11 +49,26 @@ public void testProcessor() { } } + public void testProcessorIsNotCreatedInMixedCluster() { + MockSecureSettings mockSecureSettings = new MockSecureSettings(); + mockSecureSettings.setString("xpack.security.ingest.hash.processor.key", "my_key"); + Settings settings = Settings.builder().setSecureSettings(mockSecureSettings).build(); + HashProcessor.Factory factory = new HashProcessor.Factory(setupEnvironment(settings, Version.V_7_6_0)); + Map config = new HashMap<>(); + config.put("fields", Collections.singletonList("_field")); + config.put("target_field", "_target"); + config.put("salt", "_salt"); + config.put("key_setting", "xpack.security.ingest.hash.processor.key"); + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> factory.create(null, "_tag", config)); + assertThat(e.getMessage(), startsWith("hash processor requires minimum node version")); + } + public void testProcessorNoFields() { MockSecureSettings mockSecureSettings = new MockSecureSettings(); mockSecureSettings.setString("xpack.security.ingest.hash.processor.key", "my_key"); Settings settings = Settings.builder().setSecureSettings(mockSecureSettings).build(); - HashProcessor.Factory factory = new HashProcessor.Factory(packageSettings(settings)); + HashProcessor.Factory factory = new HashProcessor.Factory(setupEnvironment(settings)); Map config = new HashMap<>(); config.put("target_field", "_target"); config.put("salt", "_salt"); @@ -60,7 +83,7 @@ public void testProcessorNoTargetField() { MockSecureSettings mockSecureSettings = new MockSecureSettings(); mockSecureSettings.setString("xpack.security.ingest.hash.processor.key", "my_key"); Settings settings = Settings.builder().setSecureSettings(mockSecureSettings).build(); - HashProcessor.Factory factory = new HashProcessor.Factory(packageSettings(settings)); + HashProcessor.Factory factory = new HashProcessor.Factory(setupEnvironment(settings)); Map config = new HashMap<>(); config.put("fields", Collections.singletonList("_field")); config.put("salt", "_salt"); @@ -75,7 +98,7 @@ public void testProcessorFieldsIsEmpty() { MockSecureSettings mockSecureSettings = new MockSecureSettings(); mockSecureSettings.setString("xpack.security.ingest.hash.processor.key", "my_key"); Settings settings = Settings.builder().setSecureSettings(mockSecureSettings).build(); - HashProcessor.Factory factory = new HashProcessor.Factory(packageSettings(settings)); + HashProcessor.Factory factory = new HashProcessor.Factory(setupEnvironment(settings)); Map config = new HashMap<>(); config.put("fields", Collections.singletonList(randomBoolean() ? "" : null)); config.put("salt", "_salt"); @@ -91,7 +114,7 @@ public void testProcessorMissingSalt() { MockSecureSettings mockSecureSettings = new MockSecureSettings(); mockSecureSettings.setString("xpack.security.ingest.hash.processor.key", "my_key"); Settings settings = Settings.builder().setSecureSettings(mockSecureSettings).build(); - HashProcessor.Factory factory = new HashProcessor.Factory(packageSettings(settings)); + HashProcessor.Factory factory = new HashProcessor.Factory(setupEnvironment(settings)); Map config = new HashMap<>(); config.put("fields", Collections.singletonList("_field")); config.put("target_field", "_target"); @@ -105,7 +128,7 @@ public void testProcessorInvalidMethod() { MockSecureSettings mockSecureSettings = new MockSecureSettings(); mockSecureSettings.setString("xpack.security.ingest.hash.processor.key", "my_key"); Settings settings = Settings.builder().setSecureSettings(mockSecureSettings).build(); - HashProcessor.Factory factory = new HashProcessor.Factory(packageSettings(settings)); + HashProcessor.Factory factory = new HashProcessor.Factory(setupEnvironment(settings)); Map config = new HashMap<>(); config.put("fields", Collections.singletonList("_field")); config.put("salt", "_salt"); @@ -120,7 +143,7 @@ public void testProcessorInvalidMethod() { public void testProcessorInvalidOrMissingKeySetting() { Settings settings = Settings.builder().setSecureSettings(new MockSecureSettings()).build(); - HashProcessor.Factory factory = new HashProcessor.Factory(packageSettings(settings)); + HashProcessor.Factory factory = new HashProcessor.Factory(setupEnvironment(settings)); Map config = new HashMap<>(); config.put("fields", Collections.singletonList("_field")); config.put("salt", "_salt"); @@ -137,7 +160,20 @@ public void testProcessorInvalidOrMissingKeySetting() { assertThat(ex.getMessage(), equalTo("[key_setting] required property is missing")); } - private static Processor.Parameters packageSettings(Settings settings) { - return new Processor.Parameters(new Environment(settings, Path.of("dummy")), null, null, null, null, null, null, null, null); + private static Processor.Parameters setupEnvironment(Settings settings) { + return setupEnvironment(settings, Version.V_8_0_0); + } + + private static Processor.Parameters setupEnvironment(Settings settings, Version minNodeVersion) { + Settings s = Settings.builder().put(settings).put("path.home", "dummy").build(); + final DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class); + final ClusterState clusterState = mock(ClusterState.class); + final ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(clusterState); + when(clusterState.nodes()).thenReturn(discoveryNodes); + when(discoveryNodes.getMinNodeVersion()).thenReturn(minNodeVersion); + final IngestService ingestService = mock(IngestService.class); + when(ingestService.getClusterService()).thenReturn(clusterService); + return new Processor.Parameters(new Environment(s, Path.of("dummy")), null, null, null, null, null, ingestService, null, null); } }