Skip to content

Commit

Permalink
[7.x][ML] Make PUT data frame analytics action a master node action (e…
Browse files Browse the repository at this point in the history
…lastic#47394)

While it seemed like the PUT data frame analytics action did not
have to be a master node action as the config is stored in an index
rather than the cluster state, there are other subtle nuances which
make it worthwhile to convert it. In particular, it helps maintain
order of execution for put actions which are anyhow user driven and
are expected to have low volume.

This commit converts `TransportPutDataFrameAnalyticsAction` from
a handled transport action to a master node action.

Note this means that the action might fail in a mixed cluster
but as the API is still experimental and not widely used there will
be few moments more suitable to make this change than now.
  • Loading branch information
dimitris-athanasiou committed Oct 2, 2019
1 parent 42453ae commit 37d1758
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public Response(DataFrameAnalyticsConfig config) {

Response() {}

Response(StreamInput in) throws IOException {
public Response(StreamInput in) throws IOException {
super(in);
config = new DataFrameAnalyticsConfig(in);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -54,17 +57,14 @@
import java.util.Objects;

public class TransportPutDataFrameAnalyticsAction
extends HandledTransportAction<PutDataFrameAnalyticsAction.Request, PutDataFrameAnalyticsAction.Response> {
extends TransportMasterNodeAction<PutDataFrameAnalyticsAction.Request, PutDataFrameAnalyticsAction.Response> {

private static final Logger logger = LogManager.getLogger(TransportPutDataFrameAnalyticsAction.class);

private final XPackLicenseState licenseState;
private final DataFrameAnalyticsConfigProvider configProvider;
private final ThreadPool threadPool;
private final SecurityContext securityContext;
private final Client client;
private final ClusterService clusterService;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final DataFrameAnalyticsAuditor auditor;

private volatile ByteSizeValue maxModelMemoryLimit;
Expand All @@ -74,15 +74,13 @@ public TransportPutDataFrameAnalyticsAction(Settings settings, TransportService
XPackLicenseState licenseState, Client client, ThreadPool threadPool,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
DataFrameAnalyticsConfigProvider configProvider, DataFrameAnalyticsAuditor auditor) {
super(PutDataFrameAnalyticsAction.NAME, transportService, actionFilters, PutDataFrameAnalyticsAction.Request::new);
super(PutDataFrameAnalyticsAction.NAME, transportService, clusterService, threadPool, actionFilters,
PutDataFrameAnalyticsAction.Request::new, indexNameExpressionResolver);
this.licenseState = licenseState;
this.configProvider = configProvider;
this.threadPool = threadPool;
this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) ?
new SecurityContext(settings, threadPool.getThreadContext()) : null;
this.client = client;
this.clusterService = clusterService;
this.indexNameExpressionResolver = Objects.requireNonNull(indexNameExpressionResolver);
this.auditor = Objects.requireNonNull(auditor);

maxModelMemoryLimit = MachineLearningField.MAX_MODEL_MEMORY_LIMIT.get(settings);
Expand All @@ -95,12 +93,23 @@ private void setMaxModelMemoryLimit(ByteSizeValue maxModelMemoryLimit) {
}

@Override
protected void doExecute(Task task, PutDataFrameAnalyticsAction.Request request,
ActionListener<PutDataFrameAnalyticsAction.Response> listener) {
if (licenseState.isMachineLearningAllowed() == false) {
listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING));
return;
}
protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected PutDataFrameAnalyticsAction.Response read(StreamInput in) throws IOException {
return new PutDataFrameAnalyticsAction.Response(in);
}

@Override
protected ClusterBlockException checkBlock(PutDataFrameAnalyticsAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

@Override
protected void masterOperation(Task task, PutDataFrameAnalyticsAction.Request request, ClusterState state,
ActionListener<PutDataFrameAnalyticsAction.Response> listener) {
validateConfig(request.getConfig());
DataFrameAnalyticsConfig memoryCappedConfig =
new DataFrameAnalyticsConfig.Builder(request.getConfig(), maxModelMemoryLimit)
Expand Down Expand Up @@ -137,7 +146,7 @@ protected void doExecute(Task task, PutDataFrameAnalyticsAction.Request request,
ActionListener.wrap(
indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)),
listener::onFailure
));
));
}
}

Expand Down Expand Up @@ -204,6 +213,15 @@ private void validateConfig(DataFrameAnalyticsConfig config) {
}
config.getDest().validate();
new SourceDestValidator(clusterService.state(), indexNameExpressionResolver).check(config);
}

@Override
protected void doExecute(Task task, PutDataFrameAnalyticsAction.Request request,
ActionListener<PutDataFrameAnalyticsAction.Response> listener) {
if (licenseState.isMachineLearningAllowed()) {
super.doExecute(task, request, listener);
} else {
listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING));
}
}
}

0 comments on commit 37d1758

Please sign in to comment.