Skip to content

Commit

Permalink
[ML] Extract persistent task methods from MlMetadata (#32319)
Browse files Browse the repository at this point in the history
Move ML persistent task helper functions to the new class MlTasks
and remove MLMetadataField after moving the string constant to
MlMetadata.
  • Loading branch information
davidkyle committed Jul 24, 2018
1 parent aed3a53 commit 690a603
Show file tree
Hide file tree
Showing 40 changed files with 262 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import org.elasticsearch.xpack.core.action.TransportXPackUsageAction;
import org.elasticsearch.xpack.core.action.XPackInfoAction;
import org.elasticsearch.xpack.core.action.XPackUsageAction;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.rest.action.RestXPackInfoAction;
import org.elasticsearch.xpack.core.rest.action.RestXPackUsageAction;
import org.elasticsearch.xpack.core.security.authc.TokenMetaData;
Expand Down Expand Up @@ -197,7 +197,7 @@ public static List<DiscoveryNode> nodesNotReadyForXPackCustomMetadata(ClusterSta
private static boolean alreadyContainsXPackCustomMetadata(ClusterState clusterState) {
final MetaData metaData = clusterState.metaData();
return metaData.custom(LicensesMetaData.TYPE) != null ||
metaData.custom(MLMetadataField.TYPE) != null ||
metaData.custom(MlMetadata.TYPE) != null ||
metaData.custom(WatcherMetaData.TYPE) != null ||
clusterState.custom(TokenMetaData.TYPE) != null;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {

public static final String TYPE = "ml";
private static final ParseField JOBS_FIELD = new ParseField("jobs");
private static final ParseField DATAFEEDS_FIELD = new ParseField("datafeeds");

Expand Down Expand Up @@ -119,7 +120,7 @@ public Version getMinimalSupportedVersion() {

@Override
public String getWriteableName() {
return MLMetadataField.TYPE;
return TYPE;
}

@Override
Expand Down Expand Up @@ -213,7 +214,7 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public String getWriteableName() {
return MLMetadataField.TYPE;
return TYPE;
}

static Diff<Job> readJobDiffFrom(StreamInput in) throws IOException {
Expand Down Expand Up @@ -277,7 +278,7 @@ public Builder putJob(Job job, boolean overwrite) {
public Builder deleteJob(String jobId, PersistentTasksCustomMetaData tasks) {
checkJobHasNoDatafeed(jobId);

JobState jobState = MlMetadata.getJobState(jobId, tasks);
JobState jobState = MlTasks.getJobState(jobId, tasks);
if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) {
throw ExceptionsHelper.conflictStatusException("Unexpected job state [" + jobState + "], expected [" +
JobState.CLOSED + " or " + JobState.FAILED + "]");
Expand Down Expand Up @@ -362,7 +363,7 @@ private Optional<DatafeedConfig> getDatafeedByJobId(String jobId) {

private void checkDatafeedIsStopped(Supplier<String> msg, String datafeedId, PersistentTasksCustomMetaData persistentTasks) {
if (persistentTasks != null) {
if (persistentTasks.getTask(MLMetadataField.datafeedTaskId(datafeedId)) != null) {
if (persistentTasks.getTask(MlTasks.datafeedTaskId(datafeedId)) != null) {
throw ExceptionsHelper.conflictStatusException(msg.get());
}
}
Expand Down Expand Up @@ -399,7 +400,7 @@ public void markJobAsDeleted(String jobId, PersistentTasksCustomMetaData tasks,
checkJobHasNoDatafeed(jobId);

if (allowDeleteOpenJob == false) {
PersistentTask<?> jobTask = getJobTask(jobId, tasks);
PersistentTask<?> jobTask = MlTasks.getJobTask(jobId, tasks);
if (jobTask != null) {
JobTaskState jobTaskState = (JobTaskState) jobTask.getState();
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because the job is "
Expand All @@ -420,56 +421,10 @@ void checkJobHasNoDatafeed(String jobId) {
}
}

/**
* Namespaces the task ids for jobs.
* A datafeed id can be used as a job id, because they are stored separately in cluster state.
*/
public static String jobTaskId(String jobId) {
return "job-" + jobId;
}

@Nullable
public static PersistentTask<?> getJobTask(String jobId, @Nullable PersistentTasksCustomMetaData tasks) {
if (tasks == null) {
return null;
}
return tasks.getTask(jobTaskId(jobId));
}

@Nullable
public static PersistentTask<?> getDatafeedTask(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) {
if (tasks == null) {
return null;
}
return tasks.getTask(MLMetadataField.datafeedTaskId(datafeedId));
}

public static JobState getJobState(String jobId, @Nullable PersistentTasksCustomMetaData tasks) {
PersistentTask<?> task = getJobTask(jobId, tasks);
if (task != null) {
JobTaskState jobTaskState = (JobTaskState) task.getState();
if (jobTaskState == null) {
return JobState.OPENING;
}
return jobTaskState.getState();
}
// If we haven't opened a job than there will be no persistent task, which is the same as if the job was closed
return JobState.CLOSED;
}

public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) {
PersistentTask<?> task = getDatafeedTask(datafeedId, tasks);
if (task != null && task.getState() != null) {
return (DatafeedState) task.getState();
} else {
// If we haven't started a datafeed then there will be no persistent task,
// which is the same as if the datafeed was't started
return DatafeedState.STOPPED;
}
}

public static MlMetadata getMlMetadata(ClusterState state) {
MlMetadata mlMetadata = (state == null) ? null : state.getMetaData().custom(MLMetadataField.TYPE);
MlMetadata mlMetadata = (state == null) ? null : state.getMetaData().custom(TYPE);
if (mlMetadata == null) {
return EMPTY_METADATA;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.ml;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;

public final class MlTasks {

private MlTasks() {
}

/**
* Namespaces the task ids for jobs.
* A datafeed id can be used as a job id, because they are stored separately in cluster state.
*/
public static String jobTaskId(String jobId) {
return "job-" + jobId;
}

/**
* Namespaces the task ids for datafeeds.
* A job id can be used as a datafeed id, because they are stored separately in cluster state.
*/
public static String datafeedTaskId(String datafeedId) {
return "datafeed-" + datafeedId;
}

@Nullable
public static PersistentTasksCustomMetaData.PersistentTask<?> getJobTask(String jobId, @Nullable PersistentTasksCustomMetaData tasks) {
return tasks == null ? null : tasks.getTask(jobTaskId(jobId));
}

@Nullable
public static PersistentTasksCustomMetaData.PersistentTask<?> getDatafeedTask(String datafeedId,
@Nullable PersistentTasksCustomMetaData tasks) {
return tasks == null ? null : tasks.getTask(datafeedTaskId(datafeedId));
}

public static JobState getJobState(String jobId, @Nullable PersistentTasksCustomMetaData tasks) {
PersistentTasksCustomMetaData.PersistentTask<?> task = getJobTask(jobId, tasks);
if (task != null) {
JobTaskState jobTaskState = (JobTaskState) task.getState();
if (jobTaskState == null) {
return JobState.OPENING;
}
return jobTaskState.getState();
}
// If we haven't opened a job than there will be no persistent task, which is the same as if the job was closed
return JobState.CLOSED;
}

public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) {
PersistentTasksCustomMetaData.PersistentTask<?> task = getDatafeedTask(datafeedId, tasks);
if (task != null && task.getState() != null) {
return (DatafeedState) task.getState();
} else {
// If we haven't started a datafeed then there will be no persistent task,
// which is the same as if the datafeed was't started
return DatafeedState.STOPPED;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

Expand Down Expand Up @@ -90,11 +90,8 @@ public String getDatafeedId() {

@Override
public boolean match(Task task) {
String expectedDescription = MLMetadataField.datafeedTaskId(datafeedId);
if (task instanceof StartDatafeedAction.DatafeedTaskMatcher && expectedDescription.equals(task.getDescription())){
return true;
}
return false;
String expectedDescription = MlTasks.datafeedTaskId(datafeedId);
return task instanceof StartDatafeedAction.DatafeedTaskMatcher && expectedDescription.equals(task.getDescription());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

Expand Down Expand Up @@ -131,7 +131,7 @@ public void setAllowNoDatafeeds(boolean allowNoDatafeeds) {
@Override
public boolean match(Task task) {
for (String id : resolvedStartedDatafeedIds) {
String expectedDescription = MLMetadataField.datafeedTaskId(id);
String expectedDescription = MlTasks.datafeedTaskId(id);
if (task instanceof StartDatafeedAction.DatafeedTaskMatcher && expectedDescription.equals(task.getDescription())){
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
Expand Down Expand Up @@ -121,7 +122,7 @@ static void resolveAndValidateJobId(CloseJobAction.Request request, ClusterState
private static void addJobAccordingToState(String jobId, PersistentTasksCustomMetaData tasksMetaData,
List<String> openJobs, List<String> closingJobs, List<String> failedJobs) {

JobState jobState = MlMetadata.getJobState(jobId, tasksMetaData);
JobState jobState = MlTasks.getJobState(jobId, tasksMetaData);
switch (jobState) {
case CLOSING:
closingJobs.add(jobId);
Expand All @@ -143,15 +144,15 @@ static TransportCloseJobAction.WaitForCloseRequest buildWaitForCloseRequest(List
TransportCloseJobAction.WaitForCloseRequest waitForCloseRequest = new TransportCloseJobAction.WaitForCloseRequest();

for (String jobId : openJobIds) {
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlTasks.getJobTask(jobId, tasks);
if (jobTask != null) {
auditor.info(jobId, Messages.JOB_AUDIT_CLOSING);
waitForCloseRequest.persistentTaskIds.add(jobTask.getId());
waitForCloseRequest.jobsToFinalize.add(jobId);
}
}
for (String jobId : closingJobIds) {
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlTasks.getJobTask(jobId, tasks);
if (jobTask != null) {
waitForCloseRequest.persistentTaskIds.add(jobTask.getId());
}
Expand Down Expand Up @@ -180,7 +181,7 @@ static void validateJobAndTaskState(String jobId, MlMetadata mlMetadata, Persist

Optional<DatafeedConfig> datafeed = mlMetadata.getDatafeedByJobId(jobId);
if (datafeed.isPresent()) {
DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeed.get().getId(), tasks);
DatafeedState datafeedState = MlTasks.getDatafeedState(datafeed.get().getId(), tasks);
if (datafeedState != DatafeedState.STOPPED) {
throw ExceptionsHelper.conflictStatusException("cannot close job [{}], datafeed hasn't been stopped", jobId);
}
Expand Down Expand Up @@ -230,7 +231,7 @@ protected void doExecute(Task task, CloseJobAction.Request request, ActionListen
Set<String> executorNodes = new HashSet<>();
PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
for (String resolvedJobId : request.getOpenJobIds()) {
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(resolvedJobId, tasks);
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlTasks.getJobTask(resolvedJobId, tasks);
if (jobTask == null || jobTask.isAssigned() == false) {
String message = "Cannot close job [" + resolvedJobId + "] because the job does not have an assigned node." +
" Use force close to close the job";
Expand Down Expand Up @@ -312,7 +313,7 @@ private void forceCloseJob(ClusterState currentState, CloseJobAction.Request req
final AtomicArray<Exception> failures = new AtomicArray<>(numberOfJobs);

for (String jobId : jobIdsToForceClose) {
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlTasks.getJobTask(jobId, tasks);
if (jobTask != null) {
auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING);
persistentTasksService.sendRemoveRequest(jobTask.getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
Expand Down Expand Up @@ -86,7 +86,7 @@ private void forceDeleteDatafeed(DeleteDatafeedAction.Request request, ClusterSt

private void removeDatafeedTask(DeleteDatafeedAction.Request request, ClusterState state, ActionListener<Boolean> listener) {
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasks);
PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(request.getDatafeedId(), tasks);
if (datafeedTask == null) {
listener.onResponse(true);
} else {
Expand Down Expand Up @@ -128,7 +128,7 @@ public ClusterState execute(ClusterState currentState) {
MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata)
.removeDatafeed(request.getDatafeedId(), persistentTasks).build();
return ClusterState.builder(currentState).metaData(
MetaData.builder(currentState.getMetaData()).putCustom(MLMetadataField.TYPE, newMetadata).build())
MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, newMetadata).build())
.build();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
import org.elasticsearch.xpack.core.ml.job.persistence.JobStorageDeletionTask;
Expand Down Expand Up @@ -177,7 +177,7 @@ private void removePersistentTask(String jobId, ClusterState currentState,
ActionListener<Boolean> listener) {
PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);

PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlTasks.getJobTask(jobId, tasks);
if (jobTask == null) {
listener.onResponse(null);
} else {
Expand Down Expand Up @@ -251,7 +251,7 @@ static boolean jobIsDeletedFromState(String jobId, ClusterState clusterState) {

private static ClusterState buildNewClusterState(ClusterState currentState, MlMetadata.Builder builder) {
ClusterState.Builder newState = ClusterState.builder(currentState);
newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MLMetadataField.TYPE, builder.build()).build());
newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build());
return newState.build();
}
}
Loading

0 comments on commit 690a603

Please sign in to comment.