Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add templating support to pipeline processor. #49030

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/reference/ingest/processors/pipeline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Executes another pipeline.
[options="header"]
|======
| Name | Required | Default | Description
| `name` | yes | - | The name of the pipeline to execute
| `name` | yes | - | The name of the pipeline to execute. Supports <<accessing-template-fields,template snippets>>.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have these links changed? accessing-template-fields points to common-options and template snippets to the accessing template fields paragraph on the pipeline.asciidoc page but there isn't such a paragraph

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a direct copy from the set processor docs page and that links correctly. So I think it is good?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, might be the github navigation that's not able to follow the links correctly.

include::common-options.asciidoc[]
|======

Expand Down
7 changes: 6 additions & 1 deletion modules/ingest-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,9 @@ dependencies {
compileOnly project(':modules:lang-painless')
compile project(':libs:elasticsearch-grok')
compile project(':libs:elasticsearch-dissect')
}
}

testClusters.integTest {
// Needed in order to test ingest pipeline templating:
module file(project(':modules:lang-mustache').tasks.bundlePlugin.archiveFile)
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,97 @@ teardown:
body: {}
- match: { error.root_cause.0.type: "exception" }
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: inner" }

---
"Test Pipeline Processor with templating":
- do:
ingest.put_pipeline:
id: "engineering-department"
body: >
{
"processors" : [
{
"set" : {
"field": "manager",
"value": "john"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: "sales-department"
body: >
{
"processors" : [
{
"set" : {
"field": "manager",
"value": "jan"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: "outer"
body: >
{
"processors" : [
{
"pipeline" : {
"name": "{{org}}-department"
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: 1
pipeline: "outer"
body: >
{
"org": "engineering"
}

- do:
get:
index: test
id: 1
- match: { _source.manager: "john" }

- do:
index:
index: test
id: 2
pipeline: "outer"
body: >
{
"org": "sales"
}

- do:
get:
index: test
id: 2
- match: { _source.manager: "jan" }

- do:
catch: /illegal_state_exception/
index:
index: test
id: 3
pipeline: "outer"
body: >
{
"org": "legal"
}
- match: { error.root_cause.0.type: "exception" }
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Pipeline processor configured for non-existent pipeline [legal-department]" }
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ public void addIngestClusterStateListener(Consumer<ClusterState> listener) {
}

//package private for testing
static String getProcessorName(Processor processor){
static String getProcessorName(Processor processor) {
// conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name
if(processor instanceof ConditionalProcessor){
processor = ((ConditionalProcessor) processor).getInnerProcessor();
Expand All @@ -440,7 +440,7 @@ static String getProcessorName(Processor processor){
sb.append(processor.getType());

if(processor instanceof PipelineProcessor){
String pipelineName = ((PipelineProcessor) processor).getPipelineName();
String pipelineName = ((PipelineProcessor) processor).getPipelineName().newInstance(Map.of()).execute();
sb.append(":");
sb.append(pipelineName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,27 @@

package org.elasticsearch.ingest;

import org.elasticsearch.script.TemplateScript;

import java.util.Map;
import java.util.function.BiConsumer;

public class PipelineProcessor extends AbstractProcessor {

public static final String TYPE = "pipeline";

private final String pipelineName;

private final TemplateScript.Factory pipelineName;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to rename this to something that points it to the templating aspect? (eg. pipelineTemplate ?) it's a bit confusing otherwise below when we render the template and yield a pipelineName from another pipelineName

private final IngestService ingestService;

private PipelineProcessor(String tag, String pipelineName, IngestService ingestService) {
private PipelineProcessor(String tag, TemplateScript.Factory pipelineName, IngestService ingestService) {
super(tag);
this.pipelineName = pipelineName;
this.ingestService = ingestService;
}

@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
String pipelineName = ingestDocument.renderTemplate(this.pipelineName);
Pipeline pipeline = ingestService.getPipeline(pipelineName);
if (pipeline != null) {
ingestDocument.executePipeline(pipeline, handler);
Expand All @@ -52,7 +54,8 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException("this method should not get executed");
}

Pipeline getPipeline(){
Pipeline getPipeline(IngestDocument ingestDocument) {
String pipelineName = ingestDocument.renderTemplate(this.pipelineName);
return ingestService.getPipeline(pipelineName);
}

Expand All @@ -61,7 +64,7 @@ public String getType() {
return TYPE;
}

String getPipelineName() {
TemplateScript.Factory getPipelineName() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would avoid a bit of confusion if the getter is renamed to getPipelineNameTemplate or something along those lines (it took me a while to understand we are not instantiating a name here 91d8ada#diff-579dffc1e22e3db13c41f685046b2891R443)

return pipelineName;
}

Expand All @@ -76,9 +79,10 @@ public Factory(IngestService ingestService) {
@Override
public PipelineProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String pipeline =
ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "name");
return new PipelineProcessor(processorTag, pipeline, ingestService);
String pipelineNameValue = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "name");
TemplateScript.Factory pipelineName =
ConfigurationUtils.compileTemplate(TYPE, processorTag, "name", pipelineNameValue, ingestService.getScriptService());
return new PipelineProcessor(processorTag, pipelineName, ingestService);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ public final class TrackingResultProcessor implements Processor {
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
if (actualProcessor instanceof PipelineProcessor) {
PipelineProcessor pipelineProcessor = ((PipelineProcessor) actualProcessor);
Pipeline pipeline = pipelineProcessor.getPipeline();
Pipeline pipeline = pipelineProcessor.getPipeline(ingestDocument);
//runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines
IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument);
ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(), (result, e) -> {
ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(ingestDocument), (result, e) -> {
// do nothing, let the tracking processors throw the exception while recording the path up to the failure
if (e instanceof ElasticsearchException) {
ElasticsearchException elasticsearchException = (ElasticsearchException) e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,7 @@ public void testStatName(){

PipelineProcessor pipelineProcessor = mock(PipelineProcessor.class);
String pipelineName = randomAlphaOfLength(10);
when(pipelineProcessor.getPipelineName()).thenReturn(pipelineName);
when(pipelineProcessor.getPipelineName()).thenReturn(new TestTemplateService.MockTemplateScript.Factory(pipelineName));
name = PipelineProcessor.TYPE;
when(pipelineProcessor.getType()).thenReturn(name);
assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.ingest;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;

import java.util.Arrays;
Expand All @@ -37,7 +38,7 @@ public class PipelineProcessorTests extends ESTestCase {

public void testExecutesPipeline() throws Exception {
String pipelineId = "pipeline";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
CompletableFuture<IngestDocument> invoked = new CompletableFuture<>();
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Pipeline pipeline = new Pipeline(
Expand Down Expand Up @@ -69,7 +70,7 @@ public String getTag() {
}

public void testThrowsOnMissingPipeline() throws Exception {
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
Map<String, Object> config = new HashMap<>();
Expand All @@ -85,7 +86,7 @@ public void testThrowsOnMissingPipeline() throws Exception {
public void testThrowsOnRecursivePipelineInvocations() throws Exception {
String innerPipelineId = "inner";
String outerPipelineId = "outer";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Map<String, Object> outerConfig = new HashMap<>();
outerConfig.put("name", innerPipelineId);
Expand Down Expand Up @@ -113,7 +114,7 @@ public void testThrowsOnRecursivePipelineInvocations() throws Exception {

public void testAllowsRepeatedPipelineInvocations() throws Exception {
String innerPipelineId = "inner";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Map<String, Object> outerConfig = new HashMap<>();
outerConfig.put("name", innerPipelineId);
Expand All @@ -131,7 +132,7 @@ public void testPipelineProcessorWithPipelineChain() throws Exception {
String pipeline1Id = "pipeline1";
String pipeline2Id = "pipeline2";
String pipeline3Id = "pipeline3";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);

Map<String, Object> pipeline1ProcessorConfig = new HashMap<>();
Expand Down Expand Up @@ -203,4 +204,11 @@ pipeline3Id, null, null, new CompoundProcessor(
assertThat(pipeline2Stats.getIngestFailedCount(), equalTo(0L));
assertThat(pipeline3Stats.getIngestFailedCount(), equalTo(1L));
}

private static IngestService createIngestService() {
IngestService ingestService = mock(IngestService.class);
ScriptService scriptService = mock(ScriptService.class);
when(ingestService.getScriptService()).thenReturn(scriptService);
return ingestService;
}
}