Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Job in Index: Stop and preview datafeed #34605

Merged
merged 4 commits into from
Oct 19, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,19 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;

import java.io.BufferedReader;
import java.io.InputStream;
Expand All @@ -37,51 +35,56 @@
public class TransportPreviewDatafeedAction extends HandledTransportAction<PreviewDatafeedAction.Request, PreviewDatafeedAction.Response> {

private final Client client;
private final ClusterService clusterService;
private final JobConfigProvider jobConfigProvider;
private final DatafeedConfigProvider datafeedConfigProvider;

@Inject
public TransportPreviewDatafeedAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Client client, ClusterService clusterService) {
Client client, JobConfigProvider jobConfigProvider,
DatafeedConfigProvider datafeedConfigProvider) {
super(settings, PreviewDatafeedAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
PreviewDatafeedAction.Request::new);
this.client = client;
this.clusterService = clusterService;
this.jobConfigProvider = jobConfigProvider;
this.datafeedConfigProvider = datafeedConfigProvider;
}

@Override
protected void doExecute(PreviewDatafeedAction.Request request, ActionListener<PreviewDatafeedAction.Response> listener) {
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state());
DatafeedConfig datafeed = mlMetadata.getDatafeed(request.getDatafeedId());
if (datafeed == null) {
throw ExceptionsHelper.missingDatafeedException(request.getDatafeedId());
}
Job job = mlMetadata.getJobs().get(datafeed.getJobId());
if (job == null) {
throw ExceptionsHelper.missingJobException(datafeed.getJobId());
}

DatafeedConfig.Builder previewDatafeed = buildPreviewDatafeed(datafeed);
Map<String, String> headers = threadPool.getThreadContext().getHeaders().entrySet().stream()
.filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
previewDatafeed.setHeaders(headers);
// NB: this is using the client from the transport layer, NOT the internal client.
// This is important because it means the datafeed search will fail if the user
// requesting the preview doesn't have permission to search the relevant indices.
DataExtractorFactory.create(client, previewDatafeed.build(), job, new ActionListener<DataExtractorFactory>() {
@Override
public void onResponse(DataExtractorFactory dataExtractorFactory) {
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, Long.MAX_VALUE);
threadPool.generic().execute(() -> previewDatafeed(dataExtractor, listener));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
datafeedConfigProvider.getDatafeedConfig(request.getDatafeedId(), ActionListener.wrap(
datafeedConfigBuilder -> {
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
jobConfigProvider.getJob(datafeedConfig.getJobId(), ActionListener.wrap(
jobBuilder -> {
DatafeedConfig.Builder previewDatafeed = buildPreviewDatafeed(datafeedConfig);
Map<String, String> headers = threadPool.getThreadContext().getHeaders().entrySet().stream()
.filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
previewDatafeed.setHeaders(headers);
// NB: this is using the client from the transport layer, NOT the internal client.
// This is important because it means the datafeed search will fail if the user
// requesting the preview doesn't have permission to search the relevant indices.
DataExtractorFactory.create(client, previewDatafeed.build(), jobBuilder.build(),
new ActionListener<DataExtractorFactory>() {
@Override
public void onResponse(DataExtractorFactory dataExtractorFactory) {
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, Long.MAX_VALUE);
threadPool.generic().execute(() -> previewDatafeed(dataExtractor, listener));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
},
listener::onFailure
));
},
listener::onFailure
));
}

/** Visible for testing */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,17 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -50,35 +48,36 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
StopDatafeedAction.Response, StopDatafeedAction.Response> {

private final PersistentTasksService persistentTasksService;
private final DatafeedConfigProvider datafeedConfigProvider;

@Inject
public TransportStopDatafeedAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, PersistentTasksService persistentTasksService) {
ClusterService clusterService, PersistentTasksService persistentTasksService,
DatafeedConfigProvider datafeedConfigProvider) {
super(settings, StopDatafeedAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, StopDatafeedAction.Request::new, StopDatafeedAction.Response::new,
MachineLearning.UTILITY_THREAD_POOL_NAME);
this.persistentTasksService = persistentTasksService;
this.datafeedConfigProvider = datafeedConfigProvider;

}

/**
* Resolve the requested datafeeds and add their IDs to one of the list
* arguments depending on datafeed state.
*
* @param request The stop datafeed request
* @param mlMetadata ML Metadata
* @param expandedDatafeedIds The expanded set of IDs
* @param tasks Persistent task meta data
* @param startedDatafeedIds Started datafeed ids are added to this list
* @param stoppingDatafeedIds Stopping datafeed ids are added to this list
*/
static void resolveDataFeedIds(StopDatafeedAction.Request request, MlMetadata mlMetadata,
static void resolveDataFeedIds(Set<String> expandedDatafeedIds,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is no longer expanding ids, we should rename. Or even remove completely?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean. The function still needs to call addDatafeedTaskIdAccordingToState for each Id I've just changed the function so the set of Ids is a parameter rather than passing the MlMetatdata and reading from that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The resolve bit in the name was there to communicate it was expanding ids. It no longer does that. So we should change the method name accordingly. If we keep the method around, then it'd be more a sortDatafeedIdsBasedOnState or something.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I see what you mean I renamed to sortDatafeedIdsByTaskState

PersistentTasksCustomMetaData tasks,
List<String> startedDatafeedIds,
List<String> stoppingDatafeedIds) {

Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds());
for (String expandedDatafeedId : expandedDatafeedIds) {
validateDatafeedTask(expandedDatafeedId, mlMetadata);
addDatafeedTaskIdAccordingToState(expandedDatafeedId, MlTasks.getDatafeedState(expandedDatafeedId, tasks),
startedDatafeedIds, stoppingDatafeedIds);
}
Expand All @@ -102,20 +101,6 @@ private static void addDatafeedTaskIdAccordingToState(String datafeedId,
}
}

/**
* Validate the stop request.
* Throws an {@code ResourceNotFoundException} if there is no datafeed
* with id {@code datafeedId}
* @param datafeedId The datafeed Id
* @param mlMetadata ML meta data
*/
static void validateDatafeedTask(String datafeedId, MlMetadata mlMetadata) {
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
if (datafeed == null) {
throw new ResourceNotFoundException(Messages.getMessage(Messages.DATAFEED_NOT_FOUND, datafeedId));
}
}

@Override
protected void doExecute(Task task, StopDatafeedAction.Request request, ActionListener<StopDatafeedAction.Response> listener) {
final ClusterState state = clusterService.state();
Expand All @@ -130,23 +115,27 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi
new ActionListenerResponseHandler<>(listener, StopDatafeedAction.Response::new));
}
} else {
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap(
expandedIds -> {
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);

List<String> startedDatafeeds = new ArrayList<>();
List<String> stoppingDatafeeds = new ArrayList<>();
resolveDataFeedIds(request, mlMetadata, tasks, startedDatafeeds, stoppingDatafeeds);
if (startedDatafeeds.isEmpty() && stoppingDatafeeds.isEmpty()) {
listener.onResponse(new StopDatafeedAction.Response(true));
return;
}
request.setResolvedStartedDatafeedIds(startedDatafeeds.toArray(new String[startedDatafeeds.size()]));
List<String> startedDatafeeds = new ArrayList<>();
List<String> stoppingDatafeeds = new ArrayList<>();
resolveDataFeedIds(expandedIds, tasks, startedDatafeeds, stoppingDatafeeds);
if (startedDatafeeds.isEmpty() && stoppingDatafeeds.isEmpty()) {
listener.onResponse(new StopDatafeedAction.Response(true));
return;
}
request.setResolvedStartedDatafeedIds(startedDatafeeds.toArray(new String[startedDatafeeds.size()]));

if (request.isForce()) {
forceStopDatafeed(request, listener, tasks, startedDatafeeds);
} else {
normalStopDatafeed(task, request, listener, tasks, startedDatafeeds, stoppingDatafeeds);
}
if (request.isForce()) {
forceStopDatafeed(request, listener, tasks, startedDatafeeds);
} else {
normalStopDatafeed(task, request, listener, tasks, startedDatafeeds, stoppingDatafeeds);
}
},
listener::onFailure
));
}
}

Expand Down
Loading