Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run GetPipelineTransportAction on local node #120445

Merged
merged 5 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/120445.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120445
summary: Run `GetPipelineTransportAction` on local node
area: Ingest Node
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesAction;
import org.elasticsearch.action.admin.indices.template.post.SimulateIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.post.SimulateTemplateAction;
import org.elasticsearch.action.ingest.GetPipelineAction;
import org.elasticsearch.action.support.CancellableActionTestPlugin;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.RefCountingListener;
Expand Down Expand Up @@ -103,6 +104,10 @@ public void testClusterGetSettingsCancellation() {
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_cluster/settings"), ClusterGetSettingsAction.NAME);
}

public void testGetPipelineCancellation() {
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_ingest/pipeline"), GetPipelineAction.NAME);
}

private void runRestActionCancellationTest(Request request, String actionName) {
final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@
package org.elasticsearch.action.ingest;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Map;

public class GetPipelineRequest extends MasterNodeReadRequest<GetPipelineRequest> {
public class GetPipelineRequest extends LocalClusterStateRequest {

private final String[] ids;
private final boolean summary;
Expand All @@ -35,19 +39,17 @@ public GetPipelineRequest(TimeValue masterNodeTimeout, String... ids) {
this(masterNodeTimeout, false, ids);
}

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
public GetPipelineRequest(StreamInput in) throws IOException {
super(in);
ids = in.readStringArray();
summary = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(ids);
out.writeBoolean(summary);
}

public String[] getIds() {
return ids;
}
Expand All @@ -60,4 +62,9 @@ public boolean isSummary() {
public ActionRequestValidationException validate() {
return null;
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -30,16 +29,6 @@ public class GetPipelineResponse extends ActionResponse implements ToXContentObj
private final List<PipelineConfiguration> pipelines;
private final boolean summary;

public GetPipelineResponse(StreamInput in) throws IOException {
super(in);
int size = in.readVInt();
pipelines = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
pipelines.add(PipelineConfiguration.readFrom(in));
}
summary = in.readBoolean();
}

public GetPipelineResponse(List<PipelineConfiguration> pipelines, boolean summary) {
this.pipelines = pipelines;
this.summary = summary;
Expand All @@ -58,6 +47,11 @@ public List<PipelineConfiguration> pipelines() {
return Collections.unmodifiableList(pipelines);
}

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(pipelines);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,56 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

public class GetPipelineTransportAction extends TransportMasterNodeReadAction<GetPipelineRequest, GetPipelineResponse> {
public class GetPipelineTransportAction extends TransportLocalClusterStateAction<GetPipelineRequest, GetPipelineResponse> {

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@SuppressWarnings("this-escape")
@Inject
public GetPipelineTransportAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
public GetPipelineTransportAction(ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
super(
GetPipelineAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
GetPipelineRequest::new,
indexNameExpressionResolver,
GetPipelineResponse::new,
transportService.getTaskManager(),
clusterService,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);

transportService.registerRequestHandler(
actionName,
executor,
false,
true,
GetPipelineRequest::new,
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
);
}

@Override
protected void masterOperation(Task task, GetPipelineRequest request, ClusterState state, ActionListener<GetPipelineResponse> listener)
throws Exception {
protected void localClusterStateOperation(
Task task,
GetPipelineRequest request,
ClusterState state,
ActionListener<GetPipelineResponse> listener
) throws Exception {
((CancellableTask) task).ensureNotCancelled();
listener.onResponse(new GetPipelineResponse(IngestService.getPipelines(state, request.getIds()), request.isSummary()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestToXContentListener;

import java.io.IOException;
Expand Down Expand Up @@ -46,7 +47,7 @@ public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient cl
restRequest.paramAsBoolean("summary", false),
Strings.splitStringByCommaToArray(restRequest.param("id"))
);
return channel -> client.execute(
return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute(
GetPipelineAction.INSTANCE,
request,
new RestToXContentListener<>(channel, GetPipelineResponse::status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,23 @@
package org.elasticsearch.action.ingest;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.test.AbstractXContentSerializingTestCase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

public class GetPipelineResponseTests extends AbstractXContentSerializingTestCase<GetPipelineResponse> {
public class GetPipelineResponseTests extends ESTestCase {

private XContentBuilder getRandomXContentBuilder() throws IOException {
XContentType xContentType = randomFrom(XContentType.values());
Expand Down Expand Up @@ -83,7 +80,6 @@ public void testXContentDeserialization() throws IOException {
}
}

@Override
protected GetPipelineResponse doParseInstance(XContentParser parser) throws IOException {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
List<PipelineConfiguration> pipelines = new ArrayList<>();
Expand All @@ -104,24 +100,4 @@ protected GetPipelineResponse doParseInstance(XContentParser parser) throws IOEx
return new GetPipelineResponse(pipelines);
}

@Override
protected GetPipelineResponse createTestInstance() {
try {
return new GetPipelineResponse(new ArrayList<>(createPipelineConfigMap().values()));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
protected Writeable.Reader<GetPipelineResponse> instanceReader() {
return GetPipelineResponse::new;
}

@Override
protected GetPipelineResponse mutateInstance(GetPipelineResponse response) throws IOException {
return new GetPipelineResponse(
CollectionUtils.appendToCopy(response.pipelines(), createRandomPipeline("pipeline_" + response.pipelines().size() + 1))
);
}
}

This file was deleted.