Skip to content

Commit

Permalink
Ingest: Add conditional per processor (#32398)
Browse files Browse the repository at this point in the history
* Ingest: Add conditional per processor
* closes #21248
  • Loading branch information
original-brownbear authored Aug 30, 2018
1 parent d93b2a2 commit cc4d705
Show file tree
Hide file tree
Showing 15 changed files with 788 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.elasticsearch.script.ScriptService;

import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
Expand Down Expand Up @@ -96,6 +97,13 @@ Processor getProcessor() {
}

public static final class Factory implements Processor.Factory {

private final ScriptService scriptService;

Factory(ScriptService scriptService) {
this.scriptService = scriptService;
}

@Override
public ForEachProcessor create(Map<String, Processor.Factory> factories, String tag,
Map<String, Object> config) throws Exception {
Expand All @@ -107,7 +115,8 @@ public ForEachProcessor create(Map<String, Processor.Factory> factories, String
throw newConfigurationException(TYPE, tag, "processor", "Must specify exactly one processor type");
}
Map.Entry<String, Map<String, Object>> entry = entries.iterator().next();
Processor processor = ConfigurationUtils.readProcessor(factories, entry.getKey(), entry.getValue());
Processor processor =
ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue());
return new ForEachProcessor(tag, field, processor, ignoreMissing);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
processors.put(ConvertProcessor.TYPE, new ConvertProcessor.Factory());
processors.put(GsubProcessor.TYPE, new GsubProcessor.Factory());
processors.put(FailProcessor.TYPE, new FailProcessor.Factory(parameters.scriptService));
processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory());
processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService));
processors.put(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory(parameters.scriptService));
processors.put(SortProcessor.TYPE, new SortProcessor.Factory());
processors.put(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS, createGrokThreadWatchdog(parameters)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;

Expand All @@ -30,14 +31,17 @@
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;

public class ForEachProcessorFactoryTests extends ESTestCase {

private final ScriptService scriptService = mock(ScriptService.class);

public void testCreate() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Expand All @@ -53,7 +57,7 @@ public void testSetIgnoreMissing() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Expand All @@ -71,7 +75,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception {
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_first", (r, t, c) -> processor);
registry.put("_second", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Expand All @@ -84,7 +88,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception {
}

public void testCreateWithNonExistingProcessorType() throws Exception {
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("processor", Collections.singletonMap("_name", Collections.emptyMap()));
Expand All @@ -97,15 +101,15 @@ public void testCreateWithMissingField() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
Map<String, Object> config = new HashMap<>();
config.put("processor", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap())));
Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(registry, null, config));
assertThat(exception.getMessage(), equalTo("[field] required property is missing"));
}

public void testCreateWithMissingProcessor() {
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(Collections.emptyMap(), null, config));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
---
teardown:
- do:
ingest.delete_pipeline:
id: "my_pipeline"
ignore: 404

---
"Test conditional processor fulfilled condition":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"bytes" : {
"if" : "ctx.conditional_field == 'bar'",
"field" : "bytes_source_field",
"target_field" : "bytes_target_field"
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
type: test
id: 1
pipeline: "my_pipeline"
body: {bytes_source_field: "1kb", conditional_field: "bar"}

- do:
get:
index: test
type: test
id: 1
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.conditional_field: "bar" }
- match: { _source.bytes_target_field: 1024 }

---
"Test conditional processor unfulfilled condition":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"bytes" : {
"if" : "ctx.conditional_field == 'foo'",
"field" : "bytes_source_field",
"target_field" : "bytes_target_field"
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
type: test
id: 1
pipeline: "my_pipeline"
body: {bytes_source_field: "1kb", conditional_field: "bar"}

- do:
get:
index: test
type: test
id: 1
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.conditional_field: "bar" }
- is_false: _source.bytes_target_field

Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,11 @@ static Parsed parseWithPipelineId(String pipelineId, Map<String, Object> config,
return new Parsed(pipeline, ingestDocumentList, verbose);
}

static Parsed parse(Map<String, Object> config, boolean verbose, IngestService pipelineStore) throws Exception {
static Parsed parse(Map<String, Object> config, boolean verbose, IngestService ingestService) throws Exception {
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
Pipeline pipeline = Pipeline.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactories());
Pipeline pipeline = Pipeline.create(
SIMULATED_PIPELINE_ID, pipelineConfig, ingestService.getProcessorFactories(), ingestService.getScriptService()
);
List<IngestDocument> ingestDocumentList = parseDocs(config);
return new Parsed(pipeline, ingestDocumentList, verbose);
}
Expand Down
Loading

0 comments on commit cc4d705

Please sign in to comment.