Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Re-introduce hash processor #47047

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.script.ScriptService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -134,7 +135,7 @@ public static final class Factory implements Processor.Factory {

@Override
public ForEachProcessor create(Map<String, Processor.Factory> factories, String tag,
Map<String, Object> config) throws Exception {
Map<String, Object> config, Map<String, Object> pipelineMetadata) throws Exception {
String field = readStringProperty(TYPE, tag, config, "field");
boolean ignoreMissing = readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
Map<String, Map<String, Object>> processorConfig = readMap(TYPE, tag, config, "processor");
Expand All @@ -144,8 +145,14 @@ public ForEachProcessor create(Map<String, Processor.Factory> factories, String
}
Map.Entry<String, Map<String, Object>> entry = entries.iterator().next();
Processor processor =
ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue());
ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue(), pipelineMetadata);
return new ForEachProcessor(tag, field, processor, ignoreMissing);
}

@Override
public ForEachProcessor create(Map<String, Processor.Factory> factories, String tag,
Map<String, Object> config) throws Exception {
return create(factories, tag, config, Collections.emptyMap());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
package org.elasticsearch.ingest.common;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurableProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.script.ScriptService;
Expand Down Expand Up @@ -55,6 +58,26 @@ public void testCreate() throws Exception {
assertFalse(forEachProcessor.isIgnoreMissing());
}

public void testCreatePassesPipelineMetadataToChildProcessors() throws Exception {
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", new TestConfigurableProcessor.Factory());
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("processor", Collections.singletonMap("_name", Collections.emptyMap()));
Map<String, String> processorMetadata = new HashMap<>();
processorMetadata.put("foo", "bar");
Map<String, Object> pipelineMetadata = new HashMap<>();
pipelineMetadata.put(TestConfigurableProcessor.IDENTIFIER, processorMetadata);
ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config, pipelineMetadata);
assertThat(forEachProcessor, Matchers.notNullValue());
assertThat(forEachProcessor.getField(), equalTo("_field"));
assertThat(forEachProcessor.getInnerProcessor(), Matchers.instanceOf(ConfigurableProcessor.class));
assertThat(((ConfigurableProcessor) forEachProcessor.getInnerProcessor()).getMetadata(), equalTo(processorMetadata));
assertFalse(forEachProcessor.isIgnoreMissing());
}

public void testSetIgnoreMissing() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
Expand Down Expand Up @@ -118,4 +141,58 @@ public void testCreateWithMissingProcessor() {
assertThat(exception.getMessage(), equalTo("[processor] required property is missing"));
}

private static class TestConfigurableProcessor extends AbstractProcessor implements ConfigurableProcessor {

static final String IDENTIFIER = "testConfigurableProcessor";

private final Map<String, String> metadata;

TestConfigurableProcessor(String tag, Map<String, String> metadata) {
super(tag);
this.metadata = Collections.unmodifiableMap(metadata);
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
return ingestDocument;
}

@Override
public String getType() {
return null;
}

@Override
public String getTag() {
return null;
}

@Override
public String getIdentifier() {
return IDENTIFIER;
}

@Override
public Map<String, String> getMetadata() {
return metadata;
}

public static final class Factory implements Processor.Factory {

@Override
public Processor create(Map<String, Processor.Factory> processorFactories, String tag, Map<String, Object> config)
throws Exception {
return create(processorFactories, tag, config, Collections.emptyMap());
}

@SuppressWarnings("unchecked")
@Override
public Processor create(Map<String, Processor.Factory> processorFactories, String tag, Map<String, Object> config,
Map<String, Object> metadata) throws Exception {
return new TestConfigurableProcessor(tag, (Map<String, String>) metadata.get(IDENTIFIER));
}
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.ingest;

import java.util.Map;

/**
* Ingest processors may implement this interface to supply environment-specific metadata at pipeline creation time. This
* metadata will be persisted in the pipeline's definition in cluster state and supplied to the processor's factory any
* time the processor is created on ingest nodes in the cluster.
*/
public interface ConfigurableProcessor {

/**
* Processors must supply an identifier that uniquely identifies themselves based on their configuration.
* @return Unique identifier
*/
String getIdentifier();

/**
* @return Metadata to be persisted in pipeline definition for this processor.
*/
Map<String, String> getMetadata();
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,14 +326,16 @@ public static ElasticsearchException newConfigurationException(String processorT

public static List<Processor> readProcessorConfigs(List<Map<String, Object>> processorConfigs,
ScriptService scriptService,
Map<String, Processor.Factory> processorFactories) throws Exception {
Map<String, Processor.Factory> processorFactories,
Map<String, Object> pipelineMetadata) throws Exception {
Exception exception = null;
List<Processor> processors = new ArrayList<>();
if (processorConfigs != null) {
for (Map<String, Object> processorConfigWithKey : processorConfigs) {
for (Map.Entry<String, Object> entry : processorConfigWithKey.entrySet()) {
try {
processors.add(readProcessor(processorFactories, scriptService, entry.getKey(), entry.getValue()));
processors.add(readProcessor(processorFactories, scriptService, entry.getKey(), entry.getValue(),
pipelineMetadata));
} catch (Exception e) {
exception = ExceptionsHelper.useOrSuppress(exception, e);
}
Expand Down Expand Up @@ -393,22 +395,24 @@ private static void addMetadataToException(ElasticsearchException exception, Str
@SuppressWarnings("unchecked")
public static Processor readProcessor(Map<String, Processor.Factory> processorFactories,
ScriptService scriptService,
String type, Object config) throws Exception {
String type, Object config, Map<String, Object> pipelineMetadata) throws Exception {
if (config instanceof Map) {
return readProcessor(processorFactories, scriptService, type, (Map<String, Object>) config);
return readProcessor(processorFactories, scriptService, type, (Map<String, Object>) config, pipelineMetadata);
} else if (config instanceof String && "script".equals(type)) {
Map<String, Object> normalizedScript = new HashMap<>(1);
normalizedScript.put(ScriptType.INLINE.getParseField().getPreferredName(), config);
return readProcessor(processorFactories, scriptService, type, normalizedScript);
return readProcessor(processorFactories, scriptService, type, normalizedScript, pipelineMetadata);
} else {
throw newConfigurationException(type, null, null,
"property isn't a map, but of type [" + config.getClass().getName() + "]");
}
}

public static Processor readProcessor(Map<String, Processor.Factory> processorFactories,
ScriptService scriptService,
String type, Map<String, Object> config) throws Exception {
ScriptService scriptService,
String type,
Map<String, Object> config,
Map<String, Object> pipelineMetadata) throws Exception {
String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);
Script conditionalScript = extractConditional(config);
Processor.Factory factory = processorFactories.get(type);
Expand All @@ -417,15 +421,16 @@ public static Processor readProcessor(Map<String, Processor.Factory> processorFa
List<Map<String, Object>> onFailureProcessorConfigs =
ConfigurationUtils.readOptionalList(null, null, config, Pipeline.ON_FAILURE_KEY);

List<Processor> onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, scriptService, processorFactories);
List<Processor> onFailureProcessors =
readProcessorConfigs(onFailureProcessorConfigs, scriptService, processorFactories, pipelineMetadata);

if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) {
throw newConfigurationException(type, tag, Pipeline.ON_FAILURE_KEY,
"processors list cannot be empty");
}

try {
Processor processor = factory.create(processorFactories, tag, config);
Processor processor = factory.create(processorFactories, tag, config, pipelineMetadata);
if (config.isEmpty() == false) {
throw new ElasticsearchParseException("processor [{}] doesn't support one or more provided configuration parameters {}",
type, Arrays.toString(config.keySet().toArray()));
Expand Down
39 changes: 28 additions & 11 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ static List<PipelineConfiguration> innerGetPipelines(IngestMetadata ingestMetada
public void putPipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineRequest request,
ActionListener<AcknowledgedResponse> listener) throws Exception {
// validates the pipeline and processor configuration before submitting a cluster update task:
validatePipeline(ingestInfos, request);
final Map<String, Object> pipelineMetadata = validatePipeline(ingestInfos, request);
clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(),
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {

Expand All @@ -233,7 +233,7 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) {

@Override
public ClusterState execute(ClusterState currentState) {
return innerPut(request, currentState);
return innerPut(request, currentState, pipelineMetadata);
}
});
}
Expand Down Expand Up @@ -293,7 +293,7 @@ private static List<Tuple<Processor, IngestMetric>> getProcessorMetrics(Compound
return processorMetrics;
}

static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) {
static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState, Map<String, Object> pipelineMetadata) {
IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE);
Map<String, PipelineConfiguration> pipelines;
if (currentIngestMetadata != null) {
Expand All @@ -302,23 +302,33 @@ static ClusterState innerPut(PutPipelineRequest request, ClusterState currentSta
pipelines = new HashMap<>();
}

pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), request.getSource(), request.getXContentType()));
pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), request.getSource(), pipelineMetadata,
request.getXContentType()));
ClusterState.Builder newState = ClusterState.builder(currentState);
newState.metaData(MetaData.builder(currentState.getMetaData())
.putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines))
.build());
return newState.build();
}

void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineRequest request) throws Exception {
Map<String, Object> validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineRequest request)
throws Exception {
if (ingestInfos.isEmpty()) {
throw new IllegalStateException("Ingest info is empty");
}

Map<String, Object> pipelineMetadata = new HashMap<>();
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, scriptService);
List<Exception> exceptions = new ArrayList<>();
for (Processor processor : pipeline.flattenAllProcessors()) {
if (processor instanceof ConfigurableProcessor) {
final ConfigurableProcessor cp = (ConfigurableProcessor) processor;
final Map<String, String> processorMetaData = cp.getMetadata();
if (processorMetaData.isEmpty() == false) {
getOrCreateMetadataNode(pipelineMetadata, cp.getIdentifier()).putAll(processorMetaData);
}
}
for (Map.Entry<DiscoveryNode, IngestInfo> entry : ingestInfos.entrySet()) {
String type = processor.getType();
if (entry.getValue().containsProcessor(type) == false && ConditionalProcessor.TYPE.equals(type) == false) {
Expand All @@ -330,8 +340,19 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineReq
}
}
ExceptionsHelper.rethrowAndSuppress(exceptions);
return pipelineMetadata;
}

@SuppressWarnings("unchecked")
private static Map<String, String> getOrCreateMetadataNode(Map<String, Object> metadata, String identifier) {
if (metadata.containsKey(identifier) == false) {
Map<String, String> node = new HashMap<>();
metadata.put(identifier, node);
}
return (Map<String, String>) metadata.get(identifier);
}


public void executeBulkRequest(int numberOfActionRequests,
Iterable<DocWriteRequest<?>> actionRequests,
BiConsumer<Integer, Exception> onFailure,
Expand Down Expand Up @@ -569,12 +590,8 @@ void innerUpdatePipelines(IngestMetadata newIngestMetadata) {
newPipelines = new HashMap<>(existingPipelines);
}
try {
Pipeline newPipeline =
Pipeline.create(newConfiguration.getId(), newConfiguration.getConfigAsMap(), processorFactories, scriptService);
newPipelines.put(
newConfiguration.getId(),
new PipelineHolder(newConfiguration, newPipeline)
);
Pipeline newPipeline = Pipeline.create(newConfiguration, processorFactories, scriptService);
newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline));

if (previous == null) {
continue;
Expand Down
Loading