Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reroute processor #76511

Merged
merged 40 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
2d80949
Add data_stream_router processor
felixbarny Apr 15, 2022
1ede9bc
Don't override event.dataset
felixbarny Apr 27, 2022
eba46c6
Merge remote-tracking branch 'origin/main' into data_stream_router_pr…
felixbarny Jun 27, 2022
39b57ee
Update docs/changelog/76511.yaml
felixbarny Jun 27, 2022
6751fc2
Merge branch 'main' into data_stream_router_processor
felixbarny Feb 21, 2023
3e25d0e
Use IngestDocument#redirect
felixbarny Feb 22, 2023
c31b51f
Extract data stream parsing methods
felixbarny Feb 22, 2023
ac5f282
Add changelog
felixbarny Feb 22, 2023
d4bc1ad
Revert "Add changelog"
felixbarny Feb 22, 2023
afa1527
Merge remote-tracking branch 'origin/main' into data_stream_router_pr…
felixbarny Mar 1, 2023
c76d5da
Rename processor to reroute and add destination option
felixbarny Mar 1, 2023
b514a6c
Fix docs build
felixbarny Mar 1, 2023
6c44e35
Add support for field references and multiple fallbacks
felixbarny Mar 4, 2023
4888843
Add skip_if_target_unchanged option
felixbarny Mar 9, 2023
26b3c79
Use mustache scripts instead of custom field reference syntax
felixbarny Mar 19, 2023
88a7d2a
Merge remote-tracking branch 'origin/main' into data_stream_router_pr…
felixbarny Mar 22, 2023
2d02830
Revert "Use mustache scripts instead of custom field reference syntax"
felixbarny Mar 27, 2023
584f8c5
Set event.dataset to be consistent with data_stream.dataset
felixbarny Mar 27, 2023
60df140
Revert "Add skip_if_target_unchanged option"
felixbarny Mar 27, 2023
a312432
Field references to use same syntax as mustache templates
felixbarny Mar 28, 2023
7dbd47e
Add comments to RerouteProcessor
felixbarny Mar 30, 2023
c8cdaae
Use {{data_stream.dataset}} and {{data_sream.namespace}} as default v…
felixbarny Mar 30, 2023
24aaab9
Update docs
felixbarny Mar 30, 2023
19344bb
Merge remote-tracking branch 'origin/main' into data_stream_router_pr…
felixbarny Mar 30, 2023
1d454b6
Apply spotless suggestions
felixbarny Mar 30, 2023
5904db6
Merge branch 'main' into data_stream_router_processor
felixbarny Apr 11, 2023
d043ae3
Update docs/reference/ingest/processors/reroute.asciidoc
felixbarny Apr 11, 2023
b2b9e7a
Merge branch 'main' into data_stream_router_processor
joegallo Apr 12, 2023
d5491de
Tidy up imports
joegallo Apr 11, 2023
e3c3ad1
Prefer toList
joegallo Apr 11, 2023
7cc0a99
Add a comment
joegallo Apr 11, 2023
7647f9e
Drop a utility method, add a test
joegallo Apr 11, 2023
e43ac5f
Merge branch 'main' into data_stream_router_processor
elasticmachine Apr 12, 2023
0ef9503
Add sanitization tests
felixbarny Apr 12, 2023
7553638
Move sanitization constants
felixbarny Apr 13, 2023
c49e70b
Use regex for sanitization
felixbarny Apr 13, 2023
0c29f73
Drop toString
joegallo Apr 11, 2023
55177e1
Merge remote-tracking branch 'origin/main' into data_stream_router_pr…
felixbarny Apr 13, 2023
e13451e
Be more strict regarding non-string field references
felixbarny Apr 18, 2023
dbb7dd6
Merge remote-tracking branch 'origin/main' into data_stream_router_pr…
felixbarny Apr 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/76511.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 76511
summary: Add `reroute` processor
area: Ingest Node
type: enhancement
issues: []
1 change: 1 addition & 0 deletions docs/reference/ingest/processors.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ include::processors/redact.asciidoc[]
include::processors/registered-domain.asciidoc[]
include::processors/remove.asciidoc[]
include::processors/rename.asciidoc[]
include::processors/reroute.asciidoc[]
include::processors/script.asciidoc[]
include::processors/set.asciidoc[]
include::processors/set-security-user.asciidoc[]
Expand Down
93 changes: 93 additions & 0 deletions docs/reference/ingest/processors/reroute.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
[[reroute-processor]]
=== Reroute processor
++++
<titleabbrev>Reroute</titleabbrev>
++++

experimental::[]

The `reroute` processor allows to route a document to another target index or data stream.
It has two main modes:

When setting the `destination` option, the target is explicitly specified and the `dataset` and `namespace` options can't be set.

When the `destination` option is not set, this processor is in a data stream mode.
Note that in this mode, the `reroute` processor can only be used on data streams that follow the {fleet-guide}/data-streams.html#data-streams-naming-scheme[data stream naming scheme].
Trying to use this processor on a data stream with a non-compliant name will raise an exception.

The name of a data stream consists of three parts: `<type>-<dataset>-<namespace>`.
See the {fleet-guide}/data-streams.html#data-streams-naming-scheme[data stream naming scheme] documentation for more details.

This processor can use both static values or reference fields from the document to determine the `dataset` and `namespace` components of the new target.
See <<reroute-options>> for more details.

NOTE: It's not possible to change the `type` of the data stream with the `reroute` processor.

After a `reroute` processor has been executed, all the other processors of the current pipeline are skipped, including the final pipeline.
If the current pipeline is executed in the context of a <<pipeline-processor>>, the calling pipeline will be skipped, too.
This means that at most one `reroute` processor is ever executed within a pipeline,
allowing to define mutually exclusive routing conditions,
similar to a if, else-if, else-if, … condition.

The reroute processor ensures that the `data_stream.<type|dataset|namespace>` fields are set according to the new target.
If the document contains a `event.dataset` value, it will be updated to reflect the same value as `data_stream.dataset`.

Note that the client needs to have permissions to the final target.
Otherwise, the document will be rejected with a security exception which looks like this:

[source,js]
--------------------------------------------------
{"type":"security_exception","reason":"action [indices:admin/auto_create] is unauthorized for API key id [8-dt9H8BqGblnY2uSI--] of user [elastic/fleet-server] on indices [logs-foo-default], this action is granted by the index privileges [auto_configure,create_index,manage,all]"}
--------------------------------------------------
// NOTCONSOLE

[[reroute-options]]
.Reroute options
[options="header"]
|======
| Name | Required | Default | Description
| `destination` | no | - | A static value for the target. Can't be set when the `dataset` or `namespace` option is set.
| `dataset` | no | `{{data_stream.dataset}}` a| Field references or a static value for the dataset part of the data stream name. In addition to the criteria for <<indices-create-api-path-params, index names>>, cannot contain `-` and must be no longer than 100 characters. Example values are `nginx.access` and `nginx.error`.

Supports field references with a mustache-like syntax (denoted as `{{double}}` or `{{{triple}}}` curly braces). When resolving field references, the processor replaces invalid characters with `_`. Uses the `<dataset>` part of the index name as a fallback if all field references resolve to a `null`, missing, or non-string value.
| `namespace` | no | `{{data_stream.namespace}}` a| Field references or a static value for the namespace part of the data stream name. See the criteria for <<indices-create-api-path-params, index names>> for allowed characters. Must be no longer than 100 characters.

Supports field references with a mustache-like syntax (denoted as `{{double}}` or `{{{triple}}}` curly braces). When resolving field references, the processor replaces invalid characters with `_`. Uses the `<namespace>` part of the index name as a fallback if all field references resolve to a `null`, missing, or non-string value.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use a * and write it below just once? There you could also add the invalid chars list.

include::common-options.asciidoc[]
|======

The `if` option can be used to define the condition in which the document should be rerouted to a new target.

[source,js]
--------------------------------------------------
{
"reroute": {
"tag": "nginx",
"if" : "ctx?.log?.file?.path?.contains('nginx')",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to describe a little bit more around how this if condition work. Why ctx is there and all the ?. If you are not used to write ingest pipeline (conditions), this might not be obvious. I would do a very quick description here and then link to the ES docs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs contain a description of the if option and a link to the corresponding docs. Isn't that enough?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what we do in most places of our docs and I really don't like it. The link is important but for a user to be successful, a user should not have to jump through 3 doc pages to get a single task completed. Instead I would rather repeat some of the docs that the basic use case is all covered in on flow.

"dataset": "nginx"
}
}
--------------------------------------------------
// NOTCONSOLE

The dataset and namespace options can contain either a single value or a list of values that are used as a fallback.
If a field reference evaluates to `null`, is not present in the document, or if the value is not a `String`, the next value or field reference is used.
felixbarny marked this conversation as resolved.
Show resolved Hide resolved

In the following example, the processor would first try to resolve the value for the `service.name` field to determine the value for `dataset`.
If that field resolves to `null`, is missing, or is a non-string value, it would try the next element in the list.
In this case, this is the static value `"generic`".
The `namespace` option is configured with just a single static value.

[source,js]
--------------------------------------------------
{
"reroute": {
"dataset": [
"{{service.name}}",
"generic"
],
"namespace": "default"
}
}
--------------------------------------------------
// NOTCONSOLE
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
entry(RegisteredDomainProcessor.TYPE, new RegisteredDomainProcessor.Factory()),
entry(RemoveProcessor.TYPE, new RemoveProcessor.Factory(parameters.scriptService)),
entry(RenameProcessor.TYPE, new RenameProcessor.Factory(parameters.scriptService)),
entry(RerouteProcessor.TYPE, new RerouteProcessor.Factory()),
entry(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService)),
entry(SetProcessor.TYPE, new SetProcessor.Factory(parameters.scriptService)),
entry(SortProcessor.TYPE, new SortProcessor.Factory()),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest.common;

import org.elasticsearch.core.Nullable;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource.DATASET_VALUE_SOURCE;
import static org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource.NAMESPACE_VALUE_SOURCE;

public final class RerouteProcessor extends AbstractProcessor {

public static final String TYPE = "reroute";

private static final String NAMING_SCHEME_ERROR_MESSAGE =
"invalid data stream name: [%s]; must follow naming scheme <type>-<dataset>-<namespace>";

private static final String DATA_STREAM_PREFIX = "data_stream.";
private static final String DATA_STREAM_TYPE = DATA_STREAM_PREFIX + "type";
private static final String DATA_STREAM_DATASET = DATA_STREAM_PREFIX + "dataset";
private static final String DATA_STREAM_NAMESPACE = DATA_STREAM_PREFIX + "namespace";
private static final String EVENT_DATASET = "event.dataset";
private static final int MAX_LENGTH = 100;
private static final char REPLACEMENT_CHAR = '_';
private final List<DataStreamValueSource> dataset;
private final List<DataStreamValueSource> namespace;
private final String destination;

RerouteProcessor(
String tag,
String description,
List<DataStreamValueSource> dataset,
List<DataStreamValueSource> namespace,
String destination
) {
super(tag, description);
if (dataset.isEmpty()) {
this.dataset = List.of(DATASET_VALUE_SOURCE);
} else {
this.dataset = dataset;
}
if (namespace.isEmpty()) {
this.namespace = List.of(NAMESPACE_VALUE_SOURCE);
} else {
this.namespace = namespace;
}
this.destination = destination;
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
if (destination != null) {
ingestDocument.reroute(destination);
return ingestDocument;
}
final String indexName = ingestDocument.getFieldValue(IngestDocument.Metadata.INDEX.getFieldName(), String.class);
final String type;
final String currentDataset;
final String currentNamespace;

// parse out the <type>-<dataset>-<namespace> components from _index
int indexOfFirstDash = indexName.indexOf('-');
if (indexOfFirstDash < 0) {
throw new IllegalArgumentException(format(NAMING_SCHEME_ERROR_MESSAGE, indexName));
}
int indexOfSecondDash = indexName.indexOf('-', indexOfFirstDash + 1);
if (indexOfSecondDash < 0) {
throw new IllegalArgumentException(format(NAMING_SCHEME_ERROR_MESSAGE, indexName));
}
type = parseDataStreamType(indexName, indexOfFirstDash);
currentDataset = parseDataStreamDataset(indexName, indexOfFirstDash, indexOfSecondDash);
currentNamespace = parseDataStreamNamespace(indexName, indexOfSecondDash);

String dataset = determineDataStreamField(ingestDocument, this.dataset, currentDataset);
String namespace = determineDataStreamField(ingestDocument, this.namespace, currentNamespace);
String newTarget = type + "-" + dataset + "-" + namespace;
ingestDocument.reroute(newTarget);
ingestDocument.setFieldValue(DATA_STREAM_TYPE, type);
ingestDocument.setFieldValue(DATA_STREAM_DATASET, dataset);
ingestDocument.setFieldValue(DATA_STREAM_NAMESPACE, namespace);
if (ingestDocument.hasField(EVENT_DATASET)) {
// ECS specifies that "event.dataset should have the same value as data_stream.dataset"
// not eagerly set event.dataset but only if the doc contains it already to ensure it's consistent with data_stream.dataset
ingestDocument.setFieldValue(EVENT_DATASET, dataset);
}
return ingestDocument;
}

private static String parseDataStreamType(String dataStreamName, int indexOfFirstDash) {
return dataStreamName.substring(0, indexOfFirstDash);
}

private static String parseDataStreamDataset(String dataStreamName, int indexOfFirstDash, int indexOfSecondDash) {
return dataStreamName.substring(indexOfFirstDash + 1, indexOfSecondDash);
}

private static String parseDataStreamNamespace(String dataStreamName, int indexOfSecondDash) {
return dataStreamName.substring(indexOfSecondDash + 1);
}

private String determineDataStreamField(
IngestDocument ingestDocument,
List<DataStreamValueSource> valueSources,
String fallbackFromCurrentTarget
) {
// first try to get value from the configured dataset/namespace field references
// if this contains a static value rather than a field reference, this is guaranteed to return
for (DataStreamValueSource value : valueSources) {
String result = value.resolve(ingestDocument);
if (result != null) {
return result;
}
}
// use the dataset/namespace value we parsed out from the current target (_index) as a fallback
return fallbackFromCurrentTarget;
}

@Override
public String getType() {
return TYPE;
}

List<DataStreamValueSource> getDataStreamDataset() {
return dataset;
}

List<DataStreamValueSource> getDataStreamNamespace() {
return namespace;
}

String getDestination() {
return destination;
}

public static final class Factory implements Processor.Factory {

@Override
public RerouteProcessor create(
Map<String, Processor.Factory> processorFactories,
String tag,
String description,
Map<String, Object> config
) throws Exception {
List<DataStreamValueSource> dataset;
try {
dataset = ConfigurationUtils.readOptionalListOrString(TYPE, tag, config, "dataset")
.stream()
.map(DataStreamValueSource::dataset)
.toList();
} catch (IllegalArgumentException e) {
throw newConfigurationException(TYPE, tag, "dataset", e.getMessage());
}
List<DataStreamValueSource> namespace;
try {
namespace = ConfigurationUtils.readOptionalListOrString(TYPE, tag, config, "namespace")
.stream()
.map(DataStreamValueSource::namespace)
.toList();
} catch (IllegalArgumentException e) {
throw newConfigurationException(TYPE, tag, "namespace", e.getMessage());
}

String destination = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "destination");
if (destination != null && (dataset.isEmpty() == false || namespace.isEmpty() == false)) {
throw newConfigurationException(TYPE, tag, "destination", "can only be set if dataset and namespace are not set");
}

return new RerouteProcessor(tag, description, dataset, namespace, destination);
}
}

/**
* Contains either a {{field reference}} or a static value for a dataset or a namespace field
*/
static final class DataStreamValueSource {

private static final char[] DISALLOWED_IN_DATASET = new char[] { '\\', '/', '*', '?', '"', '<', '>', '|', ' ', ',', '#', ':', '-' };
private static final char[] DISALLOWED_IN_NAMESPACE = new char[] { '\\', '/', '*', '?', '"', '<', '>', '|', ' ', ',', '#', ':' };
static final DataStreamValueSource DATASET_VALUE_SOURCE = dataset("{{" + DATA_STREAM_DATASET + "}}");
static final DataStreamValueSource NAMESPACE_VALUE_SOURCE = namespace("{{" + DATA_STREAM_NAMESPACE + "}}");

private final String value;
private final String fieldReference;
private final Function<String, String> sanitizer;

public static DataStreamValueSource dataset(String dataset) {
return new DataStreamValueSource(dataset, ds -> sanitizeDataStreamField(ds, DISALLOWED_IN_DATASET));
}

public static DataStreamValueSource namespace(String namespace) {
return new DataStreamValueSource(namespace, nsp -> sanitizeDataStreamField(nsp, DISALLOWED_IN_NAMESPACE));
}

private static String sanitizeDataStreamField(String s, char[] disallowedInDataset) {
if (s == null) {
return null;
}
s = s.toLowerCase(Locale.ROOT);
s = s.substring(0, Math.min(s.length(), MAX_LENGTH));
for (char c : disallowedInDataset) {
s = s.replace(c, REPLACEMENT_CHAR);
}
return s;
}

private DataStreamValueSource(String value, Function<String, String> sanitizer) {
this.sanitizer = sanitizer;
this.value = value;
if (value.contains("{{") || value.contains("}}")) {
if (value.startsWith("{{") == false || value.endsWith("}}") == false) {
throw new IllegalArgumentException("'" + value + "' is not a valid field reference");
}
String fieldReference = value.substring(2, value.length() - 2);
// field references may have two or three curly braces
if (fieldReference.startsWith("{") && fieldReference.endsWith("}")) {
fieldReference = fieldReference.substring(1, fieldReference.length() - 1);
}
// only a single field reference is allowed
felixbarny marked this conversation as resolved.
Show resolved Hide resolved
// so something like this is disallowed: {{foo}}-{{bar}}
if (fieldReference.contains("{") || fieldReference.contains("}")) {
throw new IllegalArgumentException("'" + value + "' is not a valid field reference");
}
this.fieldReference = fieldReference;
} else {
this.fieldReference = null;
if (Objects.equals(sanitizer.apply(value), value) == false) {
throw new IllegalArgumentException("'" + value + "' contains disallowed characters");
}
}
}

/**
* Resolves the field reference from the provided ingest document or returns the static value if this value source doesn't represent
* a field reference.
* @param ingestDocument
* @return the resolved field reference or static value
*/
@Nullable
public String resolve(IngestDocument ingestDocument) {
if (fieldReference != null) {
try {
return sanitizer.apply(ingestDocument.getFieldValue(fieldReference, String.class, true));
} catch (IllegalArgumentException e) {
// thrown if fieldReference refers to something that isn't a String
return null;
}
} else {
return value;
}
}

@Override
public String toString() {
return value;
}
}
}
Loading