diff --git a/docs/changelog/76511.yaml b/docs/changelog/76511.yaml
new file mode 100644
index 0000000000000..ef98c99a03f95
--- /dev/null
+++ b/docs/changelog/76511.yaml
@@ -0,0 +1,5 @@
+pr: 76511
+summary: Add `reroute` processor
+area: Ingest Node
+type: enhancement
+issues: []
diff --git a/docs/reference/ingest/processors.asciidoc b/docs/reference/ingest/processors.asciidoc
index e97e2dfa0c1a4..4132773e3d427 100644
--- a/docs/reference/ingest/processors.asciidoc
+++ b/docs/reference/ingest/processors.asciidoc
@@ -64,6 +64,7 @@ include::processors/redact.asciidoc[]
include::processors/registered-domain.asciidoc[]
include::processors/remove.asciidoc[]
include::processors/rename.asciidoc[]
+include::processors/reroute.asciidoc[]
include::processors/script.asciidoc[]
include::processors/set.asciidoc[]
include::processors/set-security-user.asciidoc[]
diff --git a/docs/reference/ingest/processors/reroute.asciidoc b/docs/reference/ingest/processors/reroute.asciidoc
new file mode 100644
index 0000000000000..eb7eb211cd62f
--- /dev/null
+++ b/docs/reference/ingest/processors/reroute.asciidoc
@@ -0,0 +1,94 @@
+[[reroute-processor]]
+=== Reroute processor
+++++
+Reroute
+++++
+
+experimental::[]
+
+The `reroute` processor allows to route a document to another target index or data stream.
+It has two main modes:
+
+When setting the `destination` option, the target is explicitly specified and the `dataset` and `namespace` options can't be set.
+
+When the `destination` option is not set, this processor is in a data stream mode.
+Note that in this mode, the `reroute` processor can only be used on data streams that follow the {fleet-guide}/data-streams.html#data-streams-naming-scheme[data stream naming scheme].
+Trying to use this processor on a data stream with a non-compliant name will raise an exception.
+
+The name of a data stream consists of three parts: `--`.
+See the {fleet-guide}/data-streams.html#data-streams-naming-scheme[data stream naming scheme] documentation for more details.
+
+This processor can use both static values or reference fields from the document to determine the `dataset` and `namespace` components of the new target.
+See <> for more details.
+
+NOTE: It's not possible to change the `type` of the data stream with the `reroute` processor.
+
+After a `reroute` processor has been executed, all the other processors of the current pipeline are skipped, including the final pipeline.
+If the current pipeline is executed in the context of a <>, the calling pipeline will be skipped, too.
+This means that at most one `reroute` processor is ever executed within a pipeline,
+allowing to define mutually exclusive routing conditions,
+similar to a if, else-if, else-if, … condition.
+
+The reroute processor ensures that the `data_stream.` fields are set according to the new target.
+If the document contains a `event.dataset` value, it will be updated to reflect the same value as `data_stream.dataset`.
+
+Note that the client needs to have permissions to the final target.
+Otherwise, the document will be rejected with a security exception which looks like this:
+
+[source,js]
+--------------------------------------------------
+{"type":"security_exception","reason":"action [indices:admin/auto_create] is unauthorized for API key id [8-dt9H8BqGblnY2uSI--] of user [elastic/fleet-server] on indices [logs-foo-default], this action is granted by the index privileges [auto_configure,create_index,manage,all]"}
+--------------------------------------------------
+// NOTCONSOLE
+
+[[reroute-options]]
+.Reroute options
+[options="header"]
+|======
+| Name | Required | Default | Description
+| `destination` | no | - | A static value for the target. Can't be set when the `dataset` or `namespace` option is set.
+| `dataset` | no | `{{data_stream.dataset}}` a| Field references or a static value for the dataset part of the data stream name. In addition to the criteria for <>, cannot contain `-` and must be no longer than 100 characters. Example values are `nginx.access` and `nginx.error`.
+
+Supports field references with a mustache-like syntax (denoted as `{{double}}` or `{{{triple}}}` curly braces). When resolving field references, the processor replaces invalid characters with `_`. Uses the `` part of the index name as a fallback if all field references resolve to a `null`, missing, or non-string value.
+| `namespace` | no | `{{data_stream.namespace}}` a| Field references or a static value for the namespace part of the data stream name. See the criteria for <> for allowed characters. Must be no longer than 100 characters.
+
+Supports field references with a mustache-like syntax (denoted as `{{double}}` or `{{{triple}}}` curly braces). When resolving field references, the processor replaces invalid characters with `_`. Uses the `` part of the index name as a fallback if all field references resolve to a `null`, missing, or non-string value.
+include::common-options.asciidoc[]
+|======
+
+The `if` option can be used to define the condition in which the document should be rerouted to a new target.
+
+[source,js]
+--------------------------------------------------
+{
+ "reroute": {
+ "tag": "nginx",
+ "if" : "ctx?.log?.file?.path?.contains('nginx')",
+ "dataset": "nginx"
+ }
+}
+--------------------------------------------------
+// NOTCONSOLE
+
+The dataset and namespace options can contain either a single value or a list of values that are used as a fallback.
+If a field reference evaluates to `null`, is not present in the document, the next value or field reference is used.
+If a field reference evaluates to a non-`String` value, the processor fails.
+
+In the following example, the processor would first try to resolve the value for the `service.name` field to determine the value for `dataset`.
+If that field resolves to `null`, is missing, or is a non-string value, it would try the next element in the list.
+In this case, this is the static value `"generic`".
+The `namespace` option is configured with just a single static value.
+
+[source,js]
+--------------------------------------------------
+{
+ "reroute": {
+ "dataset": [
+ "{{service.name}}",
+ "generic"
+ ],
+ "namespace": "default"
+ }
+}
+--------------------------------------------------
+// NOTCONSOLE
diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java
index 43cb72a2ac37a..ce20f184d1cd4 100644
--- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java
+++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java
@@ -79,6 +79,7 @@ public Map getProcessors(Processor.Parameters paramet
entry(RegisteredDomainProcessor.TYPE, new RegisteredDomainProcessor.Factory()),
entry(RemoveProcessor.TYPE, new RemoveProcessor.Factory(parameters.scriptService)),
entry(RenameProcessor.TYPE, new RenameProcessor.Factory(parameters.scriptService)),
+ entry(RerouteProcessor.TYPE, new RerouteProcessor.Factory()),
entry(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService)),
entry(SetProcessor.TYPE, new SetProcessor.Factory(parameters.scriptService)),
entry(SortProcessor.TYPE, new SortProcessor.Factory()),
diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java
new file mode 100644
index 0000000000000..6c2b321112821
--- /dev/null
+++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java
@@ -0,0 +1,262 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.ingest.common;
+
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.ingest.AbstractProcessor;
+import org.elasticsearch.ingest.ConfigurationUtils;
+import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.ingest.Processor;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import static org.elasticsearch.core.Strings.format;
+import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
+import static org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource.DATASET_VALUE_SOURCE;
+import static org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource.NAMESPACE_VALUE_SOURCE;
+
+public final class RerouteProcessor extends AbstractProcessor {
+
+ public static final String TYPE = "reroute";
+
+ private static final String NAMING_SCHEME_ERROR_MESSAGE =
+ "invalid data stream name: [%s]; must follow naming scheme --";
+
+ private static final String DATA_STREAM_PREFIX = "data_stream.";
+ private static final String DATA_STREAM_TYPE = DATA_STREAM_PREFIX + "type";
+ private static final String DATA_STREAM_DATASET = DATA_STREAM_PREFIX + "dataset";
+ private static final String DATA_STREAM_NAMESPACE = DATA_STREAM_PREFIX + "namespace";
+ private static final String EVENT_DATASET = "event.dataset";
+ private final List dataset;
+ private final List namespace;
+ private final String destination;
+
+ RerouteProcessor(
+ String tag,
+ String description,
+ List dataset,
+ List namespace,
+ String destination
+ ) {
+ super(tag, description);
+ if (dataset.isEmpty()) {
+ this.dataset = List.of(DATASET_VALUE_SOURCE);
+ } else {
+ this.dataset = dataset;
+ }
+ if (namespace.isEmpty()) {
+ this.namespace = List.of(NAMESPACE_VALUE_SOURCE);
+ } else {
+ this.namespace = namespace;
+ }
+ this.destination = destination;
+ }
+
+ @Override
+ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
+ if (destination != null) {
+ ingestDocument.reroute(destination);
+ return ingestDocument;
+ }
+ final String indexName = ingestDocument.getFieldValue(IngestDocument.Metadata.INDEX.getFieldName(), String.class);
+ final String type;
+ final String currentDataset;
+ final String currentNamespace;
+
+ // parse out the -- components from _index
+ int indexOfFirstDash = indexName.indexOf('-');
+ if (indexOfFirstDash < 0) {
+ throw new IllegalArgumentException(format(NAMING_SCHEME_ERROR_MESSAGE, indexName));
+ }
+ int indexOfSecondDash = indexName.indexOf('-', indexOfFirstDash + 1);
+ if (indexOfSecondDash < 0) {
+ throw new IllegalArgumentException(format(NAMING_SCHEME_ERROR_MESSAGE, indexName));
+ }
+ type = parseDataStreamType(indexName, indexOfFirstDash);
+ currentDataset = parseDataStreamDataset(indexName, indexOfFirstDash, indexOfSecondDash);
+ currentNamespace = parseDataStreamNamespace(indexName, indexOfSecondDash);
+
+ String dataset = determineDataStreamField(ingestDocument, this.dataset, currentDataset);
+ String namespace = determineDataStreamField(ingestDocument, this.namespace, currentNamespace);
+ String newTarget = type + "-" + dataset + "-" + namespace;
+ ingestDocument.reroute(newTarget);
+ ingestDocument.setFieldValue(DATA_STREAM_TYPE, type);
+ ingestDocument.setFieldValue(DATA_STREAM_DATASET, dataset);
+ ingestDocument.setFieldValue(DATA_STREAM_NAMESPACE, namespace);
+ if (ingestDocument.hasField(EVENT_DATASET)) {
+ // ECS specifies that "event.dataset should have the same value as data_stream.dataset"
+ // not eagerly set event.dataset but only if the doc contains it already to ensure it's consistent with data_stream.dataset
+ ingestDocument.setFieldValue(EVENT_DATASET, dataset);
+ }
+ return ingestDocument;
+ }
+
+ private static String parseDataStreamType(String dataStreamName, int indexOfFirstDash) {
+ return dataStreamName.substring(0, indexOfFirstDash);
+ }
+
+ private static String parseDataStreamDataset(String dataStreamName, int indexOfFirstDash, int indexOfSecondDash) {
+ return dataStreamName.substring(indexOfFirstDash + 1, indexOfSecondDash);
+ }
+
+ private static String parseDataStreamNamespace(String dataStreamName, int indexOfSecondDash) {
+ return dataStreamName.substring(indexOfSecondDash + 1);
+ }
+
+ private String determineDataStreamField(
+ IngestDocument ingestDocument,
+ List valueSources,
+ String fallbackFromCurrentTarget
+ ) {
+ // first try to get value from the configured dataset/namespace field references
+ // if this contains a static value rather than a field reference, this is guaranteed to return
+ for (DataStreamValueSource value : valueSources) {
+ String result = value.resolve(ingestDocument);
+ if (result != null) {
+ return result;
+ }
+ }
+ // use the dataset/namespace value we parsed out from the current target (_index) as a fallback
+ return fallbackFromCurrentTarget;
+ }
+
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+
+ List getDataStreamDataset() {
+ return dataset;
+ }
+
+ List getDataStreamNamespace() {
+ return namespace;
+ }
+
+ String getDestination() {
+ return destination;
+ }
+
+ public static final class Factory implements Processor.Factory {
+
+ @Override
+ public RerouteProcessor create(
+ Map processorFactories,
+ String tag,
+ String description,
+ Map config
+ ) throws Exception {
+ List dataset;
+ try {
+ dataset = ConfigurationUtils.readOptionalListOrString(TYPE, tag, config, "dataset")
+ .stream()
+ .map(DataStreamValueSource::dataset)
+ .toList();
+ } catch (IllegalArgumentException e) {
+ throw newConfigurationException(TYPE, tag, "dataset", e.getMessage());
+ }
+ List namespace;
+ try {
+ namespace = ConfigurationUtils.readOptionalListOrString(TYPE, tag, config, "namespace")
+ .stream()
+ .map(DataStreamValueSource::namespace)
+ .toList();
+ } catch (IllegalArgumentException e) {
+ throw newConfigurationException(TYPE, tag, "namespace", e.getMessage());
+ }
+
+ String destination = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "destination");
+ if (destination != null && (dataset.isEmpty() == false || namespace.isEmpty() == false)) {
+ throw newConfigurationException(TYPE, tag, "destination", "can only be set if dataset and namespace are not set");
+ }
+
+ return new RerouteProcessor(tag, description, dataset, namespace, destination);
+ }
+ }
+
+ /**
+ * Contains either a {{field reference}} or a static value for a dataset or a namespace field
+ */
+ static final class DataStreamValueSource {
+
+ private static final int MAX_LENGTH = 100;
+ private static final String REPLACEMENT = "_";
+ private static final Pattern DISALLOWED_IN_DATASET = Pattern.compile("[\\\\/*?\"<>| ,#:-]");
+ private static final Pattern DISALLOWED_IN_NAMESPACE = Pattern.compile("[\\\\/*?\"<>| ,#:]");
+ static final DataStreamValueSource DATASET_VALUE_SOURCE = dataset("{{" + DATA_STREAM_DATASET + "}}");
+ static final DataStreamValueSource NAMESPACE_VALUE_SOURCE = namespace("{{" + DATA_STREAM_NAMESPACE + "}}");
+
+ private final String value;
+ private final String fieldReference;
+ private final Function sanitizer;
+
+ public static DataStreamValueSource dataset(String dataset) {
+ return new DataStreamValueSource(dataset, ds -> sanitizeDataStreamField(ds, DISALLOWED_IN_DATASET));
+ }
+
+ public static DataStreamValueSource namespace(String namespace) {
+ return new DataStreamValueSource(namespace, nsp -> sanitizeDataStreamField(nsp, DISALLOWED_IN_NAMESPACE));
+ }
+
+ private static String sanitizeDataStreamField(String s, Pattern disallowedInDataset) {
+ if (s == null) {
+ return null;
+ }
+ s = s.toLowerCase(Locale.ROOT);
+ s = s.substring(0, Math.min(s.length(), MAX_LENGTH));
+ return disallowedInDataset.matcher(s).replaceAll(REPLACEMENT);
+ }
+
+ private DataStreamValueSource(String value, Function sanitizer) {
+ this.sanitizer = sanitizer;
+ this.value = value;
+ if (value.contains("{{") || value.contains("}}")) {
+ if (value.startsWith("{{") == false || value.endsWith("}}") == false) {
+ throw new IllegalArgumentException("'" + value + "' is not a valid field reference");
+ }
+ String fieldReference = value.substring(2, value.length() - 2);
+ // field references may have two or three curly braces
+ if (fieldReference.startsWith("{") && fieldReference.endsWith("}")) {
+ fieldReference = fieldReference.substring(1, fieldReference.length() - 1);
+ }
+ // only a single field reference is allowed
+ // so something like this is disallowed: {{foo}}-{{bar}}
+ if (fieldReference.contains("{") || fieldReference.contains("}")) {
+ throw new IllegalArgumentException("'" + value + "' is not a valid field reference");
+ }
+ this.fieldReference = fieldReference;
+ } else {
+ this.fieldReference = null;
+ if (Objects.equals(sanitizer.apply(value), value) == false) {
+ throw new IllegalArgumentException("'" + value + "' contains disallowed characters");
+ }
+ }
+ }
+
+ /**
+ * Resolves the field reference from the provided ingest document or returns the static value if this value source doesn't represent
+ * a field reference.
+ * @param ingestDocument
+ * @return the resolved field reference or static value
+ */
+ @Nullable
+ public String resolve(IngestDocument ingestDocument) {
+ if (fieldReference != null) {
+ return sanitizer.apply(ingestDocument.getFieldValue(fieldReference, String.class, true));
+ } else {
+ return value;
+ }
+ }
+ }
+}
diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorFactoryTests.java
new file mode 100644
index 0000000000000..580645a4ef46a
--- /dev/null
+++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorFactoryTests.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.ingest.common;
+
+import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class RerouteProcessorFactoryTests extends ESTestCase {
+
+ public void testDefaults() throws Exception {
+ RerouteProcessor processor = create(null, null);
+ assertThat(processor.getDataStreamDataset(), equalTo(List.of(DataStreamValueSource.DATASET_VALUE_SOURCE)));
+ assertThat(processor.getDataStreamNamespace(), equalTo(List.of(DataStreamValueSource.NAMESPACE_VALUE_SOURCE)));
+ }
+
+ public void testInvalidDataset() throws Exception {
+ ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> create("my-service", null));
+ assertThat(e.getMessage(), equalTo("[dataset] 'my-service' contains disallowed characters"));
+ }
+
+ public void testInvalidNamespace() throws Exception {
+ ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> create("generic", "foo:bar"));
+ assertThat(e.getMessage(), equalTo("[namespace] 'foo:bar' contains disallowed characters"));
+ }
+
+ public void testDestinationSuccess() throws Exception {
+ RerouteProcessor processor = create(Map.of("destination", "foo"));
+ assertThat(processor.getDestination(), equalTo("foo"));
+ }
+
+ public void testDestinationAndDataset() {
+ ElasticsearchParseException e = expectThrows(
+ ElasticsearchParseException.class,
+ () -> create(Map.of("destination", "foo", "dataset", "bar"))
+ );
+ assertThat(e.getMessage(), equalTo("[destination] can only be set if dataset and namespace are not set"));
+ }
+
+ public void testFieldReference() throws Exception {
+ create("{{foo}}", "{{{bar}}}");
+ }
+
+ public void testInvalidFieldReference() throws Exception {
+ ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> create("{{foo}}-{{bar}}", "foo"));
+ assertThat(e.getMessage(), equalTo("[dataset] '{{foo}}-{{bar}}' is not a valid field reference"));
+
+ e = expectThrows(ElasticsearchParseException.class, () -> create("{{{{foo}}}}", "foo"));
+ assertThat(e.getMessage(), equalTo("[dataset] '{{{{foo}}}}' is not a valid field reference"));
+ }
+
+ private static RerouteProcessor create(String dataset, String namespace) throws Exception {
+ Map config = new HashMap<>();
+ if (dataset != null) {
+ config.put("dataset", dataset);
+ }
+ if (namespace != null) {
+ config.put("namespace", namespace);
+ }
+ return create(config);
+ }
+
+ private static RerouteProcessor create(Map config) throws Exception {
+ return new RerouteProcessor.Factory().create(null, null, null, new HashMap<>(config));
+ }
+}
diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java
new file mode 100644
index 0000000000000..3da394575d625
--- /dev/null
+++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java
@@ -0,0 +1,303 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.ingest.common;
+
+import org.elasticsearch.ingest.CompoundProcessor;
+import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.ingest.Processor;
+import org.elasticsearch.ingest.RandomDocumentPicks;
+import org.elasticsearch.ingest.TestProcessor;
+import org.elasticsearch.ingest.WrappingProcessor;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class RerouteProcessorTests extends ESTestCase {
+
+ public void testDefaults() throws Exception {
+ IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+
+ RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
+ processor.execute(ingestDocument);
+ assertDataSetFields(ingestDocument, "logs", "generic", "default");
+ }
+
+ public void testEventDataset() throws Exception {
+ IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+ ingestDocument.setFieldValue("event.dataset", "foo");
+
+ RerouteProcessor processor = createRerouteProcessor(List.of("{{event.dataset}}"), List.of());
+ processor.execute(ingestDocument);
+ assertDataSetFields(ingestDocument, "logs", "foo", "default");
+ assertThat(ingestDocument.getFieldValue("event.dataset", String.class), equalTo("foo"));
+ }
+
+ public void testNoDataset() throws Exception {
+ IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+ ingestDocument.setFieldValue("ds", "foo");
+
+ RerouteProcessor processor = createRerouteProcessor(List.of("{{ds}}"), List.of());
+ processor.execute(ingestDocument);
+ assertDataSetFields(ingestDocument, "logs", "foo", "default");
+ assertFalse(ingestDocument.hasField("event.dataset"));
+ }
+
+ public void testSkipFirstProcessor() throws Exception {
+ IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+
+ RerouteProcessor skippedProcessor = createRerouteProcessor(List.of("skip"), List.of());
+ RerouteProcessor executedProcessor = createRerouteProcessor(List.of("executed"), List.of());
+ CompoundProcessor processor = new CompoundProcessor(new SkipProcessor(skippedProcessor), executedProcessor);
+ processor.execute(ingestDocument);
+ assertDataSetFields(ingestDocument, "logs", "executed", "default");
+ }
+
+ public void testSkipLastProcessor() throws Exception {
+ IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+
+ RerouteProcessor executedProcessor = createRerouteProcessor(List.of("executed"), List.of());
+ RerouteProcessor skippedProcessor = createRerouteProcessor(List.of("skip"), List.of());
+ CompoundProcessor processor = new CompoundProcessor(executedProcessor, skippedProcessor);
+ processor.execute(ingestDocument);
+ assertDataSetFields(ingestDocument, "logs", "executed", "default");
+ }
+
+ public void testDataStreamFieldsFromDocument() throws Exception {
+ IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+ ingestDocument.setFieldValue("data_stream.dataset", "foo");
+ ingestDocument.setFieldValue("data_stream.namespace", "bar");
+
+ RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
+ processor.execute(ingestDocument);
+ assertDataSetFields(ingestDocument, "logs", "foo", "bar");
+ }
+
+ public void testInvalidDataStreamFieldsFromDocument() throws Exception {
+ IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+ ingestDocument.setFieldValue("data_stream.dataset", "foo-bar");
+ ingestDocument.setFieldValue("data_stream.namespace", "baz#qux");
+
+ RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
+ processor.execute(ingestDocument);
+ assertDataSetFields(ingestDocument, "logs", "foo_bar", "baz_qux");
+ }
+
+ public void testDestination() throws Exception {
+ IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+
+ RerouteProcessor processor = createRerouteProcessor("foo");
+ processor.execute(ingestDocument);
+ assertFalse(ingestDocument.hasField("data_stream"));
+ assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo("foo"));
+ }
+
+ public void testFieldReference() throws Exception {
+ IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+ ingestDocument.setFieldValue("service.name", "opbeans-java");
+ ingestDocument.setFieldValue("service.environment", "dev");
+
+ RerouteProcessor processor = createRerouteProcessor(List.of("{{service.name}}"), List.of("{{service.environment}}"));
+ processor.execute(ingestDocument);
+ assertDataSetFields(ingestDocument, "logs", "opbeans_java", "dev");
+ }
+
+ public void testRerouteToCurrentTarget() throws Exception {
+ IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+
+ RerouteProcessor reroute = createRerouteProcessor(List.of("generic"), List.of("default"));
+ CompoundProcessor processor = new CompoundProcessor(
+ reroute,
+ new TestProcessor(doc -> doc.setFieldValue("pipeline_is_continued", true))
+ );
+ processor.execute(ingestDocument);
+ assertDataSetFields(ingestDocument, "logs", "generic", "default");
+ assertFalse(ingestDocument.hasField("pipeline_is_continued"));
+ }
+
+ public void testFieldReferenceWithMissingReroutesToCurrentTarget() throws Exception {
+ IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+
+ RerouteProcessor reroute = createRerouteProcessor(List.of("{{service.name}}"), List.of("{{service.environment}}"));
+ CompoundProcessor processor = new CompoundProcessor(
+ reroute,
+ new TestProcessor(doc -> doc.setFieldValue("pipeline_is_continued", true))
+ );
+ processor.execute(ingestDocument);
+ assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo("logs-generic-default"));
+ assertDataSetFields(ingestDocument, "logs", "generic", "default");
+ assertFalse(ingestDocument.hasField("pipeline_is_continued"));
+ }
+
+ public void testDataStreamFieldReference() throws Exception {
+ IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+ ingestDocument.setFieldValue("data_stream.dataset", "dataset_from_doc");
+ ingestDocument.setFieldValue("data_stream.namespace", "namespace_from_doc");
+
+ RerouteProcessor processor = createRerouteProcessor(
+ List.of("{{{data_stream.dataset}}}", "fallback"),
+ List.of("{{data_stream.namespace}}", "fallback")
+ );
+ processor.execute(ingestDocument);
+ assertDataSetFields(ingestDocument, "logs", "dataset_from_doc", "namespace_from_doc");
+ }
+
+ public void testDatasetFieldReferenceMissingValue() throws Exception {
+ IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+
+ RerouteProcessor processor = createRerouteProcessor(
+ List.of("{{data_stream.dataset}}", "fallback"),
+ List.of("{{data_stream.namespace}}", "fallback")
+ );
+ processor.execute(ingestDocument);
+ assertDataSetFields(ingestDocument, "logs", "fallback", "fallback");
+ }
+
+ public void testDatasetFieldReference() throws Exception {
+ IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+ ingestDocument.setFieldValue("data_stream.dataset", "generic");
+ ingestDocument.setFieldValue("data_stream.namespace", "default");
+
+ RerouteProcessor processor = createRerouteProcessor(
+ List.of("{{data_stream.dataset}}", "fallback"),
+ List.of("{{{data_stream.namespace}}}", "fallback")
+ );
+ processor.execute(ingestDocument);
+ assertDataSetFields(ingestDocument, "logs", "generic", "default");
+ }
+
+ public void testFallbackToValuesFrom_index() throws Exception {
+ IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+ ingestDocument.setFieldValue("data_stream.dataset", "foo");
+ ingestDocument.setFieldValue("data_stream.namespace", "bar");
+
+ RerouteProcessor processor = createRerouteProcessor(List.of("{{foo}}"), List.of("{{bar}}"));
+ processor.execute(ingestDocument);
+ assertDataSetFields(ingestDocument, "logs", "generic", "default");
+ }
+
+ public void testInvalidDataStreamName() throws Exception {
+ {
+ IngestDocument ingestDocument = createIngestDocument("foo");
+ RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
+ IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
+ assertThat(e.getMessage(), equalTo("invalid data stream name: [foo]; must follow naming scheme --"));
+ }
+
+ {
+ // naturally, though, a plain destination doesn't have to match the data stream naming convention
+ IngestDocument ingestDocument = createIngestDocument("foo");
+ RerouteProcessor processor = createRerouteProcessor("bar");
+ processor.execute(ingestDocument);
+ assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo("bar"));
+ }
+ }
+
+ public void testRouteOnNonStringFieldFails() {
+ IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+ ingestDocument.setFieldValue("numeric_field", 42);
+ RerouteProcessor processor = createRerouteProcessor(List.of("{{numeric_field}}"), List.of());
+ IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
+ assertThat(e.getMessage(), equalTo("field [numeric_field] of type [java.lang.Integer] cannot be cast to [java.lang.String]"));
+ }
+
+ public void testDatasetSanitization() {
+ assertDatasetSanitization("\\/*?\"<>| ,#:-", "_____________");
+ assertDatasetSanitization("foo*bar", "foo_bar");
+ }
+
+ public void testNamespaceSanitization() {
+ assertNamespaceSanitization("\\/*?\"<>| ,#:-", "____________-");
+ assertNamespaceSanitization("foo*bar", "foo_bar");
+ }
+
+ private static void assertDatasetSanitization(String dataset, String sanitizedDataset) {
+ assertThat(
+ RerouteProcessor.DataStreamValueSource.dataset("{{foo}}")
+ .resolve(RandomDocumentPicks.randomIngestDocument(random(), Map.of("foo", dataset))),
+ equalTo(sanitizedDataset)
+ );
+ }
+
+ private static void assertNamespaceSanitization(String namespace, String sanitizedNamespace) {
+ assertThat(
+ RerouteProcessor.DataStreamValueSource.namespace("{{foo}}")
+ .resolve(RandomDocumentPicks.randomIngestDocument(random(), Map.of("foo", namespace))),
+ equalTo(sanitizedNamespace)
+ );
+ }
+
+ private RerouteProcessor createRerouteProcessor(List dataset, List namespace) {
+ return new RerouteProcessor(
+ null,
+ null,
+ dataset.stream().map(RerouteProcessor.DataStreamValueSource::dataset).toList(),
+ namespace.stream().map(RerouteProcessor.DataStreamValueSource::namespace).toList(),
+ null
+ );
+ }
+
+ private RerouteProcessor createRerouteProcessor(String destination) {
+ return new RerouteProcessor(null, null, List.of(), List.of(), destination);
+ }
+
+ private void assertDataSetFields(IngestDocument ingestDocument, String type, String dataset, String namespace) {
+ assertThat(ingestDocument.getFieldValue("data_stream.type", String.class), equalTo(type));
+ assertThat(ingestDocument.getFieldValue("data_stream.dataset", String.class), equalTo(dataset));
+ assertThat(ingestDocument.getFieldValue("data_stream.namespace", String.class), equalTo(namespace));
+ assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo(type + "-" + dataset + "-" + namespace));
+ if (ingestDocument.hasField("event.dataset")) {
+ assertThat(
+ ingestDocument.getFieldValue("event.dataset", String.class),
+ equalTo(ingestDocument.getFieldValue("data_stream.dataset", String.class))
+ );
+ }
+ }
+
+ private static IngestDocument createIngestDocument(String dataStream) {
+ IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
+ ingestDocument.setFieldValue("_index", dataStream);
+ return ingestDocument;
+ }
+
+ private static class SkipProcessor implements WrappingProcessor {
+ private final Processor processor;
+
+ SkipProcessor(Processor processor) {
+ this.processor = processor;
+ }
+
+ @Override
+ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
+ return ingestDocument;
+ }
+
+ @Override
+ public Processor getInnerProcessor() {
+ return processor;
+ }
+
+ @Override
+ public String getType() {
+ return "skip";
+ }
+
+ @Override
+ public String getTag() {
+ return null;
+ }
+
+ @Override
+ public String getDescription() {
+ return null;
+ }
+ }
+}
diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/310_reroute_processor.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/310_reroute_processor.yml
new file mode 100644
index 0000000000000..dbdd9b9d7e519
--- /dev/null
+++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/310_reroute_processor.yml
@@ -0,0 +1,140 @@
+---
+teardown:
+ - do:
+ ingest.delete_pipeline:
+ id: "pipeline-with-two-data-stream-processors"
+ ignore: 404
+ - do:
+ ingest.delete_pipeline:
+ id: "logs-router-default"
+ ignore: 404
+ - do:
+ ingest.delete_pipeline:
+ id: "logs-nginx-default"
+ ignore: 404
+ - do:
+ indices.delete_index_template:
+ name: logs-router
+ ignore: 404
+ - do:
+ indices.delete_index_template:
+ name: logs-nginx
+ ignore: 404
+
+---
+"Test first matching router terminates pipeline":
+ - do:
+ ingest.put_pipeline:
+ id: "pipeline-with-two-data-stream-processors"
+ body: >
+ {
+ "processors": [
+ {
+ "reroute" : {
+ "dataset" : "first"
+ }
+ },
+ {
+ "reroute" : {
+ "dataset" : "second"
+ }
+ }
+ ]
+ }
+ - match: { acknowledged: true }
+
+ - do:
+ index:
+ index: logs-generic-default
+ id: "1"
+ pipeline: "pipeline-with-two-data-stream-processors"
+ body: {
+ foo: "bar"
+ }
+
+ - do:
+ get:
+ index: logs-first-default
+ id: "1"
+ - match: { _source.foo: "bar" }
+---
+"Test two stage routing":
+ - skip:
+ features: allowed_warnings
+ - do:
+ ingest.put_pipeline:
+ id: "logs-router"
+ body: >
+ {
+ "processors": [
+ {
+ "reroute" : {
+ "tag": "nginx",
+ "if" : "ctx?.log?.file?.path?.contains('nginx')",
+ "dataset": "nginx"
+ }
+ }
+ ]
+ }
+ - match: { acknowledged: true }
+ - do:
+ allowed_warnings:
+ - "index template [logs-router] has index patterns [logs-router-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [logs-router] will take precedence during new index creation"
+ indices.put_index_template:
+ name: logs-router
+ body:
+ index_patterns: [ "logs-router-*" ]
+ template:
+ settings:
+ index.default_pipeline: "logs-router"
+ - do:
+ ingest.put_pipeline:
+ id: "logs-nginx"
+ body: >
+ {
+ "processors": [
+ {
+ "reroute": {
+ "tag": "nginx.access",
+ "if": "ctx?.log?.file?.path?.contains('access')",
+ "dataset": "nginx.access"
+ }
+ },
+ {
+ "reroute": {
+ "tag": "nginx.error",
+ "if": "ctx?.log?.file?.path?.contains('error')",
+ "dataset": "nginx.error"
+ }
+ }
+ ]
+ }
+ - match: { acknowledged: true }
+ - do:
+ allowed_warnings:
+ - "index template [logs-nginx] has index patterns [logs-nginx-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [logs-nginx] will take precedence during new index creation"
+ indices.put_index_template:
+ name: logs-nginx
+ body:
+ index_patterns: [ "logs-nginx-*" ]
+ template:
+ settings:
+ index.default_pipeline: "logs-nginx"
+
+ - do:
+ index:
+ index: logs-nginx-default
+ id: "example-log"
+ op_type: create
+ body:
+ "@timestamp": "2022-04-13"
+ message: "this is an error log"
+ log:
+ file:
+ path: "nginx-error.log"
+
+ - do:
+ get:
+ index: logs-nginx.error-default
+ id: "example-log"
+ - match: { _source.message: "this is an error log" }
diff --git a/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java b/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java
index e8dce119d343b..df5104e9164ab 100644
--- a/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java
+++ b/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java
@@ -318,6 +318,27 @@ public static List readOptionalList(
return readList(processorType, processorTag, propertyName, value);
}
+ /**
+ * Returns and removes the specified property of type list from the specified configuration map.
+ *
+ * If the property value isn't of type list or string an {@link ElasticsearchParseException} is thrown.
+ */
+ public static List readOptionalListOrString(
+ String processorType,
+ String processorTag,
+ Map configuration,
+ String propertyName
+ ) {
+ Object value = configuration.remove(propertyName);
+ if (value == null) {
+ return List.of();
+ }
+ if (value instanceof String) {
+ return List.of(readString(processorType, processorTag, propertyName, value));
+ }
+ return readList(processorType, processorTag, propertyName, value);
+ }
+
/**
* Returns and removes the specified property of type list from the specified configuration map.
*