Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Invoke default pipeline of new index #85931

Closed
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/85931.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 85931
summary: Invoke default pipeline of new index
area: Ingest Node
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.ingest.Processor;
Expand All @@ -49,6 +50,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

Expand Down Expand Up @@ -156,7 +158,7 @@ public void testFinalPipelineOfNewDestinationIsInvoked() {
assertEquals(true, target.getHits().getAt(0).getSourceAsMap().get("final"));
}

public void testDefaultPipelineOfNewDestinationIsNotInvoked() {
public void testDefaultPipelineOfNewDestinationIsInvoked() {
Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build();
createIndex("index", settings);

Expand Down Expand Up @@ -188,6 +190,70 @@ public void testDefaultPipelineOfNewDestinationIsNotInvoked() {
assertFalse(target.getHits().getAt(0).getSourceAsMap().containsKey("final"));
}

public void testDefaultPipelineOfRedirectDestinationIsInvoked() {
Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build();
createIndex("index", settings);

settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build();
createIndex("target", settings);

BytesReference defaultPipelineBody = new BytesArray("""
{"processors": [{"redirect": {}}]}""");
client().admin()
.cluster()
.putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON))
.actionGet();

BytesReference targetPipeline = new BytesArray("""
{"processors": [{"final": {}}]}""");
client().admin()
.cluster()
.putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON))
.actionGet();

IndexResponse indexResponse = client().prepareIndex("index")
.setId("1")
.setSource(Map.of("field", "value"))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
assertEquals(RestStatus.CREATED, indexResponse.status());
SearchResponse target = client().prepareSearch("target").get();
assertEquals(1, target.getHits().getTotalHits().value);
assertTrue(target.getHits().getAt(0).getSourceAsMap().containsKey("final"));
}

public void testAvoidIndexingLoop() {
Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build();
createIndex("index", settings);

settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build();
createIndex("target", settings);

BytesReference defaultPipelineBody = new BytesArray("""
{"processors": [{"redirect": {"dest": "target"}}]}""");
client().admin()
.cluster()
.putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON))
.actionGet();

BytesReference targetPipeline = new BytesArray("""
{"processors": [{"redirect": {"dest": "index"}}]}""");
client().admin()
.cluster()
.putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON))
.actionGet();

IllegalStateException exception = expectThrows(
IllegalStateException.class,
() -> client().prepareIndex("index")
.setId("1")
.setSource(Map.of("dest", "index"))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get()
);
assertThat(exception.getMessage(), containsString("index cycle detected while processing pipelines: [index, target, index]"));
}

public void testFinalPipeline() {
final Settings settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build();
createIndex("index", settings);
Expand Down Expand Up @@ -394,6 +460,26 @@ public String getType() {
return "changing_dest";
}

},
"redirect",
(processorFactories, tag, description, config) -> {
final String dest = Objects.requireNonNullElse(
ConfigurationUtils.readOptionalStringProperty(description, tag, config, "dest"),
"target"
);
return new AbstractProcessor(tag, description) {
@Override
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
ingestDocument.setFieldValue(IngestDocument.Metadata.REDIRECT.getFieldName(), dest);
return ingestDocument;
}

@Override
public String getType() {
return "redirect";
}

};
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class IngestDocMetadata extends Metadata {
static final Map<String, FieldProperty<?>> PROPERTIES = Map.of(
INDEX,
StringField.withWritable().withNullable(),
REDIRECT,
StringField.withWritable().withNullable(),
ID,
StringField.withWritable().withNullable(),
ROUTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,7 @@ public String toString() {

public enum Metadata {
INDEX(IndexFieldMapper.NAME),
REDIRECT("_redirect"),
TYPE("_type"),
ID(IdFieldMapper.NAME),
ROUTING(RoutingFieldMapper.NAME),
Expand Down
107 changes: 83 additions & 24 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -714,21 +715,8 @@ protected void doRun() {
continue;
}

final String pipelineId = indexRequest.getPipeline();
indexRequest.setPipeline(NOOP_PIPELINE_NAME);
final String finalPipelineId = indexRequest.getFinalPipeline();
indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME);
boolean hasFinalPipeline = true;
final List<String> pipelines;
if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false
&& IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) {
pipelines = List.of(pipelineId, finalPipelineId);
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false) {
pipelines = List.of(pipelineId);
hasFinalPipeline = false;
} else if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) {
pipelines = List.of(finalPipelineId);
} else {
Pipelines pipelines = getPipelines(indexRequest);
if (pipelines.isEmpty()) {
i++;
continue;
}
Expand Down Expand Up @@ -763,21 +751,78 @@ public void onFailure(Exception e) {
});

IngestDocument ingestDocument = newIngestDocument(indexRequest);
executePipelines(pipelines.iterator(), hasFinalPipeline, indexRequest, ingestDocument, documentListener);

LinkedHashSet<String> indexRecursionDetection = new LinkedHashSet<>();
indexRecursionDetection.add(indexRequest.index());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More general question: I assume during the routing, index() will always be the data_stream name for data_streams and the data_stream is only resolved to a write index after the processing happened? Asking it here because the data stream name is the one that should be part of the index recursion detection, not the write index as this could even change during ingest time I assume.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, _index isn't the internal write index of the data stream.

executePipelines(
pipelines.iterator(),
pipelines.hasFinalPipeline(),
indexRequest,
ingestDocument,
documentListener,
indexRecursionDetection
);
i++;
}
}
}
});
}

private Pipelines getPipelines(IndexRequest indexRequest) {
final String pipelineId = indexRequest.getPipeline();
indexRequest.setPipeline(NOOP_PIPELINE_NAME);
final String finalPipelineId = indexRequest.getFinalPipeline();
indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME);
return new Pipelines(pipelineId, finalPipelineId);
}

private static class Pipelines implements Iterable<String> {
private String defaultPipeline;
private String finalPipeline;

private Pipelines(String defaultPipeline, String finalPipeline) {
if (NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
this.defaultPipeline = defaultPipeline;
}
if (NOOP_PIPELINE_NAME.equals(finalPipeline) == false) {
this.finalPipeline = finalPipeline;
}
}

public boolean hasFinalPipeline() {
return finalPipeline != null;
}

public boolean isEmpty() {
return defaultPipeline == null && finalPipeline == null;
}

public void withoutDefaultPipeline() {
defaultPipeline = null;
}

@Override
public Iterator<String> iterator() {
if (defaultPipeline != null && finalPipeline != null) {
return List.of(defaultPipeline, finalPipeline).iterator();
}
if (finalPipeline != null) {
return List.of(finalPipeline).iterator();
}
if (defaultPipeline != null) {
return List.of(defaultPipeline).iterator();
}
return Collections.emptyIterator();
}
}

private void executePipelines(
final Iterator<String> pipelineIds,
final boolean hasFinalPipeline,
final IndexRequest indexRequest,
final IngestDocument ingestDocument,
final ActionListener<Boolean> listener
final ActionListener<Boolean> listener,
final Set<String> indexRecursionDetection
) {
assert pipelineIds.hasNext();
final String pipelineId = pipelineIds.next();
Expand Down Expand Up @@ -840,6 +885,14 @@ private void executePipelines(
final String newIndex = indexRequest.indices()[0];

if (Objects.equals(originalIndex, newIndex) == false) {
if (indexRecursionDetection.add(newIndex) == false) {
List<String> indexRoute = new ArrayList<>(indexRecursionDetection);
indexRoute.add(newIndex);
listener.onFailure(
new IllegalStateException(format("index cycle detected while processing pipelines: %s", indexRoute))
);
return; // document failed!
}
if (hasFinalPipeline && pipelineIds.hasNext() == false) {
listener.onFailure(
new IllegalStateException(
Expand All @@ -854,19 +907,21 @@ private void executePipelines(
);
return; // document failed!
} else {
// reset request pipeline that is set to _none which would take precedence over the default pipeline
indexRequest.setPipeline(null);
indexRequest.isPipelineResolved(false);
resolvePipelines(null, indexRequest, state.metadata());
if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) {
newPipelineIds = Collections.singleton(indexRequest.getFinalPipeline()).iterator();
newHasFinalPipeline = true;
} else {
newPipelineIds = Collections.emptyIterator();
Pipelines pipelines = getPipelines(indexRequest);
if (newIndex.equals(ingestDocument.getMetadata().getRedirect()) == false) {
pipelines.withoutDefaultPipeline();
}
newHasFinalPipeline = pipelines.hasFinalPipeline();
newPipelineIds = pipelines.iterator();
}
}

if (newPipelineIds.hasNext()) {
executePipelines(newPipelineIds, newHasFinalPipeline, indexRequest, ingestDocument, listener);
executePipelines(newPipelineIds, newHasFinalPipeline, indexRequest, ingestDocument, listener, indexRecursionDetection);
} else {
// update the index request's source and (potentially) cache the timestamp for TSDB
updateIndexRequestSource(indexRequest, ingestDocument);
Expand Down Expand Up @@ -972,6 +1027,10 @@ private static void updateIndexRequestMetadata(final IndexRequest request, final
// it's fine to set all metadata fields all the time, as ingest document holds their starting values
// before ingestion, which might also get modified during ingestion.
request.index(metadata.getIndex());
String redirectIndex = metadata.getRedirect();
if (redirectIndex != null) {
request.index(redirectIndex);
}
request.id(metadata.getId());
request.routing(metadata.getRouting());
request.version(metadata.getVersion());
Expand Down
9 changes: 9 additions & 0 deletions server/src/main/java/org/elasticsearch/script/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
*/
public class Metadata {
protected static final String INDEX = "_index";
protected static final String REDIRECT = "_redirect";
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unsure if _redirect should also be added to IndexRequest. I guess that could potentially expose _redirect to the _bulk API where this doesn't make much sense. It's also more like an alias to _index but with slightly different semantics.

@dakrone do you have a suggestion?

protected static final String ID = "_id";
protected static final String ROUTING = "_routing";
protected static final String VERSION_TYPE = "_version_type";
Expand Down Expand Up @@ -118,6 +119,14 @@ public void setIndex(String index) {
put(INDEX, index);
}

public String getRedirect() {
return getString(REDIRECT);
}

public void setRedirect(String redirect) {
put(REDIRECT, redirect);
}

public String getId() {
return getString(ID);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ public void testExecuteIndexPipelineDoesNotExist() {
List.of(DUMMY_PLUGIN),
client
);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, clusterState));
final IndexRequest indexRequest = new IndexRequest("_index").id("_id")
.source(Map.of())
.setPipeline("_id")
Expand Down