Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Execute EnrichPolicyRunner on a non dedicated master node. #77164

Merged
merged 2 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
Expand All @@ -20,6 +22,7 @@
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -48,12 +51,17 @@
import java.util.Set;

import static org.elasticsearch.test.NodeRoles.ingestOnlyNode;
import static org.elasticsearch.test.NodeRoles.masterOnlyNode;
import static org.elasticsearch.test.NodeRoles.nonIngestNode;
import static org.elasticsearch.test.NodeRoles.nonMasterNode;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.enrich.MatchProcessorTests.mapOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class EnrichMultiNodeIT extends ESIntegTestCase {
Expand Down Expand Up @@ -150,6 +158,67 @@ public void testEnrichNoIngestNodes() {
assertThat(e.getMessage(), equalTo("no ingest nodes in this cluster"));
}

public void testExecutePolicyWithDedicatedMasterNodes() throws Exception {
internalCluster().startNodes(3, masterOnlyNode());
internalCluster().startNodes(2, nonMasterNode());
ensureStableCluster(5, (String) null);

assertAcked(prepareCreate(SOURCE_INDEX_NAME).addMapping("_doc", MATCH_FIELD, "type=keyword"));
EnrichPolicy enrichPolicy = new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
Collections.singletonList(SOURCE_INDEX_NAME),
MATCH_FIELD,
Arrays.asList(DECORATE_FIELDS)
);
PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy);
assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet());
ExecuteEnrichPolicyAction.Request executePolicyRequest = new ExecuteEnrichPolicyAction.Request(POLICY_NAME);
executePolicyRequest.setWaitForCompletion(false); // From tne returned taks id the node that executes the policy can be determined
ExecuteEnrichPolicyAction.Response executePolicyResponse = client().execute(
ExecuteEnrichPolicyAction.INSTANCE,
executePolicyRequest
).actionGet();
assertThat(executePolicyResponse.getStatus(), nullValue());
assertThat(executePolicyResponse.getTaskId(), notNullValue());

GetTaskRequest getTaskRequest = new GetTaskRequest().setTaskId(executePolicyResponse.getTaskId()).setWaitForCompletion(true);
client().admin().cluster().getTask(getTaskRequest).actionGet();

DiscoveryNodes discoNodes = client().admin().cluster().state(new ClusterStateRequest()).actionGet().getState().nodes();
assertThat(discoNodes.get(executePolicyResponse.getTaskId().getNodeId()).isMasterNode(), is(false));
}

public void testExecutePolicyNeverOnElectedMaster() throws Exception {
internalCluster().startNodes(3);
ensureStableCluster(3, (String) null);

assertAcked(prepareCreate(SOURCE_INDEX_NAME).addMapping("_doc", MATCH_FIELD, "type=keyword"));
EnrichPolicy enrichPolicy = new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
Collections.singletonList(SOURCE_INDEX_NAME),
MATCH_FIELD,
Arrays.asList(DECORATE_FIELDS)
);
PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy);
assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet());
ExecuteEnrichPolicyAction.Request executePolicyRequest = new ExecuteEnrichPolicyAction.Request(POLICY_NAME);
executePolicyRequest.setWaitForCompletion(false); // From tne returned taks id the node that executes the policy can be determined
ExecuteEnrichPolicyAction.Response executePolicyResponse = client().execute(
ExecuteEnrichPolicyAction.INSTANCE,
executePolicyRequest
).actionGet();
assertThat(executePolicyResponse.getStatus(), nullValue());
assertThat(executePolicyResponse.getTaskId(), notNullValue());

GetTaskRequest getTaskRequest = new GetTaskRequest().setTaskId(executePolicyResponse.getTaskId()).setWaitForCompletion(true);
client().admin().cluster().getTask(getTaskRequest).actionGet();

DiscoveryNodes discoNodes = client().admin().cluster().state(new ClusterStateRequest()).actionGet().getState().nodes();
assertThat(executePolicyResponse.getTaskId().getNodeId(), not(equalTo(discoNodes.getMasterNodeId())));
}

private static void enrich(List<String> keys, String coordinatingNode) {
int numDocs = 256;
BulkRequest bulkRequest = new BulkRequest("my-index");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorStatsAction;
import org.elasticsearch.xpack.enrich.action.EnrichReindexAction;
import org.elasticsearch.xpack.enrich.action.EnrichShardMultiSearchAction;
import org.elasticsearch.xpack.enrich.action.InternalExecutePolicyAction;
import org.elasticsearch.xpack.enrich.action.TransportDeleteEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.action.TransportEnrichReindexAction;
import org.elasticsearch.xpack.enrich.action.TransportEnrichStatsAction;
Expand Down Expand Up @@ -155,7 +156,8 @@ protected XPackLicenseState getLicenseState() {
new ActionHandler<>(EnrichCoordinatorProxyAction.INSTANCE, EnrichCoordinatorProxyAction.TransportAction.class),
new ActionHandler<>(EnrichShardMultiSearchAction.INSTANCE, EnrichShardMultiSearchAction.TransportAction.class),
new ActionHandler<>(EnrichCoordinatorStatsAction.INSTANCE, EnrichCoordinatorStatsAction.TransportAction.class),
new ActionHandler<>(EnrichReindexAction.INSTANCE, TransportEnrichReindexAction.class)
new ActionHandler<>(EnrichReindexAction.INSTANCE, TransportEnrichReindexAction.class),
new ActionHandler<>(InternalExecutePolicyAction.INSTANCE, InternalExecutePolicyAction.Transport.class)
);
}

Expand Down Expand Up @@ -196,6 +198,15 @@ public Collection<Object> createComponents(
}

EnrichPolicyLocks enrichPolicyLocks = new EnrichPolicyLocks();
EnrichPolicyExecutor enrichPolicyExecutor = new EnrichPolicyExecutor(
settings,
clusterService,
client,
threadPool,
expressionResolver,
enrichPolicyLocks,
System::currentTimeMillis
);
EnrichPolicyMaintenanceService enrichPolicyMaintenanceService = new EnrichPolicyMaintenanceService(
settings,
client,
Expand All @@ -207,7 +218,8 @@ public Collection<Object> createComponents(
return Arrays.asList(
enrichPolicyLocks,
new EnrichCoordinatorProxyAction.Coordinator(client, settings),
enrichPolicyMaintenanceService
enrichPolicyMaintenanceService,
enrichPolicyExecutor
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,19 @@
package org.elasticsearch.xpack.enrich;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskAwareRequest;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskListener;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
import org.elasticsearch.xpack.enrich.action.InternalExecutePolicyAction;

import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;

public class EnrichPolicyExecutor {
Expand All @@ -34,7 +29,6 @@ public class EnrichPolicyExecutor {

private final ClusterService clusterService;
private final Client client;
private final TaskManager taskManager;
private final ThreadPool threadPool;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final LongSupplier nowSupplier;
Expand All @@ -48,15 +42,13 @@ public EnrichPolicyExecutor(
Settings settings,
ClusterService clusterService,
Client client,
TaskManager taskManager,
ThreadPool threadPool,
IndexNameExpressionResolver indexNameExpressionResolver,
EnrichPolicyLocks policyLocks,
LongSupplier nowSupplier
) {
this.clusterService = clusterService;
this.client = client;
this.taskManager = taskManager;
this.threadPool = threadPool;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.nowSupplier = nowSupplier;
Expand All @@ -67,6 +59,43 @@ public EnrichPolicyExecutor(
this.policyExecutionPermits = new Semaphore(maximumConcurrentPolicyExecutions);
}

public void coordinatePolicyExecution(
ExecuteEnrichPolicyAction.Request request,
ActionListener<ExecuteEnrichPolicyAction.Response> listener
) {
tryLockingPolicy(request.getName());
try {
client.execute(InternalExecutePolicyAction.INSTANCE, request, ActionListener.wrap(response -> {
if (response.getStatus() != null) {
releasePolicy(request.getName());
listener.onResponse(response);
} else {
waitAndThenRelease(request.getName(), response);
listener.onResponse(response);
}
}, e -> {
releasePolicy(request.getName());
listener.onFailure(e);
}));
} catch (Exception e) {
// Be sure to unlock if submission failed.
releasePolicy(request.getName());
throw e;
}
}

public void runPolicyLocally(ExecuteEnrichPolicyTask task, String policyName, ActionListener<ExecuteEnrichPolicyStatus> listener) {
try {
EnrichPolicy policy = EnrichStore.getPolicy(policyName, clusterService.state());
task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.SCHEDULED));
Runnable runnable = createPolicyRunner(policyName, policy, task, listener);
threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable);
} catch (Exception e) {
task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED));
throw e;
}
}

private void tryLockingPolicy(String policyName) {
policyLocks.lockPolicy(policyName);
if (policyExecutionPermits.tryAcquire() == false) {
Expand All @@ -91,49 +120,14 @@ private void releasePolicy(String policyName) {
}
}

private class PolicyCompletionListener implements ActionListener<ExecuteEnrichPolicyStatus> {
private final String policyName;
private final ExecuteEnrichPolicyTask task;
private final BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse;
private final BiConsumer<Task, Exception> onFailure;

PolicyCompletionListener(
String policyName,
ExecuteEnrichPolicyTask task,
BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse,
BiConsumer<Task, Exception> onFailure
) {
this.policyName = policyName;
this.task = task;
this.onResponse = onResponse;
this.onFailure = onFailure;
}

@Override
public void onResponse(ExecuteEnrichPolicyStatus status) {
assert ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE.equals(status.getPhase()) : "incomplete task returned";
releasePolicy(policyName);
try {
taskManager.unregister(task);
} finally {
onResponse.accept(task, status);
}
}

@Override
public void onFailure(Exception e) {
// Set task status to failed to avoid having to catch and rethrow exceptions everywhere
task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED));
releasePolicy(policyName);
try {
taskManager.unregister(task);
} finally {
onFailure.accept(task, e);
}
}
private void waitAndThenRelease(String policyName, ExecuteEnrichPolicyAction.Response response) {
GetTaskRequest getTaskRequest = new GetTaskRequest();
getTaskRequest.setTaskId(response.getTaskId());
getTaskRequest.setWaitForCompletion(true);
client.admin().cluster().getTask(getTaskRequest, ActionListener.wrap(() -> releasePolicy(policyName)));
}

protected Runnable createPolicyRunner(
private Runnable createPolicyRunner(
String policyName,
EnrichPolicy policy,
ExecuteEnrichPolicyTask task,
Expand All @@ -153,94 +147,4 @@ protected Runnable createPolicyRunner(
);
}

private EnrichPolicy getPolicy(ExecuteEnrichPolicyAction.Request request) {
// Look up policy in policy store and execute it
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), clusterService.state());
if (policy == null) {
throw new IllegalArgumentException("Policy execution failed. Could not locate policy with id [" + request.getName() + "]");
}
return policy;
}

public Task runPolicy(ExecuteEnrichPolicyAction.Request request, ActionListener<ExecuteEnrichPolicyStatus> listener) {
return runPolicy(request, getPolicy(request), listener);
}

public Task runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener<ExecuteEnrichPolicyStatus> listener) {
return runPolicy(request, getPolicy(request), listener);
}

public Task runPolicy(
ExecuteEnrichPolicyAction.Request request,
EnrichPolicy policy,
ActionListener<ExecuteEnrichPolicyStatus> listener
) {
return runPolicy(request, policy, (t, r) -> listener.onResponse(r), (t, e) -> listener.onFailure(e));
}

public Task runPolicy(
ExecuteEnrichPolicyAction.Request request,
EnrichPolicy policy,
TaskListener<ExecuteEnrichPolicyStatus> listener
) {
return runPolicy(request, policy, listener::onResponse, listener::onFailure);
}

private Task runPolicy(
ExecuteEnrichPolicyAction.Request request,
EnrichPolicy policy,
BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse,
BiConsumer<Task, Exception> onFailure
) {
tryLockingPolicy(request.getName());
try {
return runPolicyTask(request, policy, onResponse, onFailure);
} catch (Exception e) {
// Be sure to unlock if submission failed.
releasePolicy(request.getName());
throw e;
}
}

private Task runPolicyTask(
final ExecuteEnrichPolicyAction.Request request,
EnrichPolicy policy,
BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse,
BiConsumer<Task, Exception> onFailure
) {
Task asyncTask = taskManager.register("enrich", TASK_ACTION, new TaskAwareRequest() {
@Override
public void setParentTask(TaskId taskId) {
request.setParentTask(taskId);
}

@Override
public TaskId getParentTask() {
return request.getParentTask();
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ExecuteEnrichPolicyTask(id, type, action, getDescription(), parentTaskId, headers);
}

@Override
public String getDescription() {
return request.getName();
}
});
ExecuteEnrichPolicyTask task = (ExecuteEnrichPolicyTask) asyncTask;
try {
task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.SCHEDULED));
PolicyCompletionListener completionListener = new PolicyCompletionListener(request.getName(), task, onResponse, onFailure);
Runnable runnable = createPolicyRunner(request.getName(), policy, task, completionListener);
threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable);
return asyncTask;
} catch (Exception e) {
// Unregister task in case of exception
task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED));
taskManager.unregister(asyncTask);
throw e;
}
}
}
Loading