From 36b8dbd6732a2ec44a984b5bbdd7373b6455ecd7 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Mon, 5 Nov 2018 17:38:24 +0000 Subject: [PATCH 1/7] [ML] Reimplement established model memory This is the 6.6/6.7 implementation of a master node service to keep track of the native process memory requirement of each ML job with an associated native process. The new ML memory tracker service works when the whole cluster is upgraded to at least version 6.6. For mixed version clusters the old mechanism of established model memory stored on the job in cluster state is used. This means that the old (and complex) code to keep established model memory up to date on the job object cannot yet be removed. When this change is forward ported to 7.0 the old way of keeping established model memory updated will be removed. --- .../xpack/core/XPackClientPlugin.java | 2 + .../RefreshJobMemoryRequirementAction.java | 134 +++++++++++ .../xpack/core/ml/job/config/Job.java | 7 +- ...obMemoryRequirementActionRequestTests.java | 33 +++ .../xpack/core/ml/job/config/JobTests.java | 2 +- .../xpack/ml/MachineLearning.java | 14 +- .../ml/action/TransportDeleteJobAction.java | 7 +- .../ml/action/TransportOpenJobAction.java | 124 ++++++++-- ...portRefreshJobMemoryRequirementAction.java | 64 +++++ .../output/AutoDetectResultProcessor.java | 1 + .../xpack/ml/process/MlMemoryTracker.java | 219 ++++++++++++++++++ .../action/TransportOpenJobActionTests.java | 33 +-- ...efreshJobMemoryRequirementActionTests.java | 12 + .../xpack/ml/integration/TooManyJobsIT.java | 2 - .../ml/process/MlMemoryTrackerTests.java | 153 ++++++++++++ 15 files changed, 763 insertions(+), 44 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/RefreshJobMemoryRequirementAction.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/RefreshJobMemoryRequirementActionRequestTests.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRefreshJobMemoryRequirementAction.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportRefreshJobMemoryRequirementActionTests.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 480bd1f4534d3..1bf673e0c3e9d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -84,6 +84,7 @@ import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.action.PutFilterAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; +import org.elasticsearch.xpack.core.ml.action.RefreshJobMemoryRequirementAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; @@ -231,6 +232,7 @@ public List getClientActions() { UpdateFilterAction.INSTANCE, DeleteFilterAction.INSTANCE, KillProcessAction.INSTANCE, + RefreshJobMemoryRequirementAction.INSTANCE, GetBucketsAction.INSTANCE, GetInfluencersAction.INSTANCE, GetOverallBucketsAction.INSTANCE, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/RefreshJobMemoryRequirementAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/RefreshJobMemoryRequirementAction.java new file mode 100644 index 0000000000000..76025bebfa8b8 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/RefreshJobMemoryRequirementAction.java @@ -0,0 +1,134 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.ml.job.config.Job; + +import java.io.IOException; +import java.util.Objects; + +public class RefreshJobMemoryRequirementAction extends Action { + + public static final RefreshJobMemoryRequirementAction INSTANCE = new RefreshJobMemoryRequirementAction(); + public static final String NAME = "cluster:internal/xpack/ml/job/refresh_memory_requirement"; + + private RefreshJobMemoryRequirementAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client, this); + } + + @Override + public AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + public static class Request extends AcknowledgedRequest implements ToXContentObject { + + public static ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); + + static { + PARSER.declareString(Request::setJobId, Job.ID); + } + + public static Request parseRequest(String jobId, XContentParser parser) { + Request request = PARSER.apply(parser, null); + if (jobId != null) { + request.setJobId(jobId); + } + return request; + } + + private String jobId; + + public Request(String jobId) { + + this.jobId = jobId; + } + + public Request() { + } + + public void setJobId(String jobId) { + this.jobId = jobId; + } + + public String getJobId() { + return jobId; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + jobId = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(jobId); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(Job.ID.getPreferredName(), jobId); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Request request = (Request) o; + return Objects.equals(jobId, request.jobId); + } + + @Override + public int hashCode() { + return Objects.hash(jobId); + } + + @Override + public final String toString() { + return Strings.toString(this); + } + } + + public static class RequestBuilder extends MasterNodeOperationRequestBuilder { + + public RequestBuilder(ElasticsearchClient client, RefreshJobMemoryRequirementAction action) { + super(client, action, new Request()); + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java index ffe24dff8ced0..fd7fa70bded43 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java @@ -142,6 +142,7 @@ private static ObjectParser createParser(boolean ignoreUnknownFie private final Date createTime; private final Date finishedTime; private final Date lastDataTime; + // TODO: Remove in 7.0 private final Long establishedModelMemory; private final AnalysisConfig analysisConfig; private final AnalysisLimits analysisLimits; @@ -439,6 +440,7 @@ public Collection allInputFields() { * program code and stack. * @return an estimate of the memory requirement of this job, in bytes */ + // TODO: remove this method in 7.0 public long estimateMemoryFootprint() { if (establishedModelMemory != null && establishedModelMemory > 0) { return establishedModelMemory + PROCESS_MEMORY_OVERHEAD.getBytes(); @@ -658,6 +660,7 @@ public static class Builder implements Writeable, ToXContentObject { private Date createTime; private Date finishedTime; private Date lastDataTime; + // TODO: remove in 7.0 private Long establishedModelMemory; private ModelPlotConfig modelPlotConfig; private Long renormalizationWindowDays; @@ -1102,10 +1105,6 @@ private void validateGroups() { public Job build(Date createTime) { setCreateTime(createTime); setJobVersion(Version.CURRENT); - // TODO: Maybe we _could_ accept a value for this supplied at create time - it would - // mean cloned jobs that hadn't been edited much would start with an accurate expected size. - // But on the other hand it would mean jobs that were cloned and then completely changed - // would start with a size that was completely wrong. setEstablishedModelMemory(null); return build(); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/RefreshJobMemoryRequirementActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/RefreshJobMemoryRequirementActionRequestTests.java new file mode 100644 index 0000000000000..60ee1aeb5f2e5 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/RefreshJobMemoryRequirementActionRequestTests.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractStreamableXContentTestCase; + +public class RefreshJobMemoryRequirementActionRequestTests + extends AbstractStreamableXContentTestCase { + + @Override + protected RefreshJobMemoryRequirementAction.Request createTestInstance() { + return new RefreshJobMemoryRequirementAction.Request(randomAlphaOfLengthBetween(1, 20)); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } + + @Override + protected RefreshJobMemoryRequirementAction.Request createBlankInstance() { + return new RefreshJobMemoryRequirementAction.Request(); + } + + @Override + protected RefreshJobMemoryRequirementAction.Request doParseInstance(XContentParser parser) { + return RefreshJobMemoryRequirementAction.Request.parseRequest(null, parser); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java index 62340ba6cf63c..0fae85f6d6b5c 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java @@ -561,7 +561,7 @@ public void testEstimateMemoryFootprint_GivenNoLimitAndNotEstablished() { builder.setEstablishedModelMemory(0L); } assertEquals(ByteSizeUnit.MB.toBytes(AnalysisLimits.PRE_6_1_DEFAULT_MODEL_MEMORY_LIMIT_MB) - + Job.PROCESS_MEMORY_OVERHEAD.getBytes(), builder.build().estimateMemoryFootprint()); + + Job.PROCESS_MEMORY_OVERHEAD.getBytes(), builder.build().estimateMemoryFootprint()); } public void testEarliestValidTimestamp_GivenEmptyDataCounts() { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index e0c44faec7471..153e0883a1f1f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -95,6 +95,7 @@ import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.action.PutFilterAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; +import org.elasticsearch.xpack.core.ml.action.RefreshJobMemoryRequirementAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; @@ -149,6 +150,7 @@ import org.elasticsearch.xpack.ml.action.TransportPutDatafeedAction; import org.elasticsearch.xpack.ml.action.TransportPutFilterAction; import org.elasticsearch.xpack.ml.action.TransportPutJobAction; +import org.elasticsearch.xpack.ml.action.TransportRefreshJobMemoryRequirementAction; import org.elasticsearch.xpack.ml.action.TransportRevertModelSnapshotAction; import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction; import org.elasticsearch.xpack.ml.action.TransportStopDatafeedAction; @@ -181,6 +183,7 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.NativeControllerHolder; import org.elasticsearch.xpack.ml.rest.RestDeleteExpiredDataAction; @@ -278,6 +281,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu private final SetOnce autodetectProcessManager = new SetOnce<>(); private final SetOnce datafeedManager = new SetOnce<>(); + private final SetOnce memoryTracker = new SetOnce<>(); public MachineLearning(Settings settings, Path configPath) { this.settings = settings; @@ -299,6 +303,7 @@ public List> getSettings() { MachineLearningField.MAX_MODEL_MEMORY_LIMIT, MAX_LAZY_ML_NODES, MAX_MACHINE_MEMORY_PERCENT, + MlMemoryTracker.ML_MEMORY_UPDATE_FREQUENCY, AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING, AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING, AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC, @@ -420,6 +425,8 @@ public Collection createComponents(Client client, ClusterService cluster this.datafeedManager.set(datafeedManager); MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager, autodetectProcessManager); + MlMemoryTracker memoryTracker = new MlMemoryTracker(settings, clusterService, threadPool, jobManager, jobResultsProvider); + this.memoryTracker.set(memoryTracker); // This object's constructor attaches to the license state, so there's no need to retain another reference to it new InvalidLicenseEnforcer(settings, getLicenseState(), threadPool, datafeedManager, autodetectProcessManager); @@ -438,7 +445,8 @@ public Collection createComponents(Client client, ClusterService cluster jobDataCountsPersister, datafeedManager, auditor, - new MlAssignmentNotifier(settings, auditor, clusterService) + new MlAssignmentNotifier(settings, auditor, clusterService), + memoryTracker ); } @@ -449,7 +457,8 @@ public List> getPersistentTasksExecutor(ClusterServic } return Arrays.asList( - new TransportOpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager.get()), + new TransportOpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager.get(), + memoryTracker.get()), new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor(settings, datafeedManager.get()) ); } @@ -543,6 +552,7 @@ public List getRestHandlers(Settings settings, RestController restC new ActionHandler<>(UpdateFilterAction.INSTANCE, TransportUpdateFilterAction.class), new ActionHandler<>(DeleteFilterAction.INSTANCE, TransportDeleteFilterAction.class), new ActionHandler<>(KillProcessAction.INSTANCE, TransportKillProcessAction.class), + new ActionHandler<>(RefreshJobMemoryRequirementAction.INSTANCE, TransportRefreshJobMemoryRequirementAction.class), new ActionHandler<>(GetBucketsAction.INSTANCE, TransportGetBucketsAction.class), new ActionHandler<>(GetInfluencersAction.INSTANCE, TransportGetInfluencersAction.class), new ActionHandler<>(GetOverallBucketsAction.INSTANCE, TransportGetOverallBucketsAction.class), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index 761c21b63f165..73e98b7b0c67b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -69,6 +69,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import org.elasticsearch.xpack.ml.utils.MlIndicesUtils; import java.util.ArrayList; @@ -94,6 +95,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction(); } @@ -210,6 +214,7 @@ private void notifyListeners(String jobId, @Nullable AcknowledgedResponse ack, @ private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJobAction.Request request, ActionListener listener) { String jobId = request.getJobId(); + memoryTracker.removeJob(jobId); // Step 4. When the job has been removed from the cluster state, return a response // ------- diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index edd61ec595123..124fcc43fcf93 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -55,6 +55,7 @@ import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; +import org.elasticsearch.xpack.core.ml.action.RefreshJobMemoryRequirementAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; @@ -69,6 +70,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; +import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import java.io.IOException; import java.util.ArrayList; @@ -144,6 +146,7 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j int maxConcurrentJobAllocations, int fallbackMaxNumberOfOpenJobs, int maxMachineMemoryPercent, + MlMemoryTracker memoryTracker, Logger logger) { String resultsIndexName = job != null ? job.getResultsIndexName() : null; List unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsIndexName, clusterState); @@ -156,8 +159,12 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j List reasons = new LinkedList<>(); long maxAvailableCount = Long.MIN_VALUE; + long maxAvailableMemory = Long.MIN_VALUE; DiscoveryNode minLoadedNodeByCount = null; - + DiscoveryNode minLoadedNodeByMemory = null; + // Try to allocate jobs according to memory usage, but if that's not possible (maybe due to a mixed version cluster or maybe + // because of some weird OS problem) then fall back to the old mechanism of only considering numbers of assigned jobs + boolean allocateByMemory = true; PersistentTasksCustomMetaData persistentTasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); for (DiscoveryNode node : clusterState.getNodes()) { Map nodeAttributes = node.getAttributes(); @@ -197,10 +204,9 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j } } - long numberOfAssignedJobs = 0; int numberOfAllocatingJobs = 0; - + long assignedJobMemory = 0; if (persistentTasks != null) { // find all the job tasks assigned to this node Collection> assignedTasks = persistentTasks.findTasks( @@ -231,6 +237,15 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) { // Don't count CLOSED or FAILED jobs, as they don't consume native memory ++numberOfAssignedJobs; + OpenJobAction.JobParams params = (OpenJobAction.JobParams) assignedTask.getParams(); + Long jobMemoryRequirement = memoryTracker.getJobMemoryRequirement(params.getJobId()); + if (jobMemoryRequirement == null) { + allocateByMemory = false; + logger.debug("Falling back to allocating job [{}] by job counts because " + + "the memory requirement for job [{}] was not available", jobId, params.getJobId()); + } else { + assignedJobMemory += jobMemoryRequirement; + } } } } @@ -271,10 +286,62 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j maxAvailableCount = availableCount; minLoadedNodeByCount = node; } + + String machineMemoryStr = nodeAttributes.get(MachineLearning.MACHINE_MEMORY_NODE_ATTR); + long machineMemory = -1; + // TODO: remove leniency and reject the node if the attribute is null in 7.0 + if (machineMemoryStr != null) { + try { + machineMemory = Long.parseLong(machineMemoryStr); + } catch (NumberFormatException e) { + String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because " + + MachineLearning.MACHINE_MEMORY_NODE_ATTR + " attribute [" + machineMemoryStr + "] is not a long"; + logger.trace(reason); + reasons.add(reason); + continue; + } + } + + if (allocateByMemory) { + if (machineMemory > 0) { + long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100; + Long estimatedMemoryFootprint = memoryTracker.getJobMemoryRequirement(jobId); + if (estimatedMemoryFootprint != null) { + long availableMemory = maxMlMemory - assignedJobMemory; + if (estimatedMemoryFootprint > availableMemory) { + String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + + "], because this node has insufficient available memory. Available memory for ML [" + maxMlMemory + + "], memory required by existing jobs [" + assignedJobMemory + + "], estimated memory required for this job [" + estimatedMemoryFootprint + "]"; + logger.trace(reason); + reasons.add(reason); + continue; + } + + if (maxAvailableMemory < availableMemory) { + maxAvailableMemory = availableMemory; + minLoadedNodeByMemory = node; + } + } else { + // If we cannot get the job memory requirement, + // fall back to simply allocating by job count + allocateByMemory = false; + logger.debug("Falling back to allocating job [{}] by job counts because its memory requirement was not available", + jobId); + } + } else { + // If we cannot get the available memory on any machine in + // the cluster, fall back to simply allocating by job count + allocateByMemory = false; + logger.debug("Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]", + jobId, nodeNameAndMlAttributes(node)); + } + } } - if (minLoadedNodeByCount != null) { - logger.debug("selected node [{}] for job [{}]", minLoadedNodeByCount, jobId); - return new PersistentTasksCustomMetaData.Assignment(minLoadedNodeByCount.getId(), ""); + DiscoveryNode minLoadedNode = allocateByMemory ? minLoadedNodeByMemory : minLoadedNodeByCount; + if (minLoadedNode != null) { + logger.debug("selected node [{}] for job [{}]", minLoadedNode, jobId); + return new PersistentTasksCustomMetaData.Assignment(minLoadedNode.getId(), ""); } else { String explanation = String.join("|", reasons); logger.debug("no node selected for job [{}], reasons [{}]", jobId, explanation); @@ -415,6 +482,11 @@ protected void masterOperation(OpenJobAction.Request request, ClusterState state OpenJobAction.JobParams jobParams = request.getJobParams(); if (licenseState.isMachineLearningAllowed()) { + // If the whole cluster supports the ML memory tracker then we don't need + // to worry about updating established model memory on the job objects + // TODO: remove in 7.0 as it will always be true + boolean clusterSupportsMlMemoryTracker = state.getNodes().getMinNodeVersion().onOrAfter(Version.V_6_6_0); + // Clear job finished time once the job is started and respond ActionListener clearJobFinishTime = ActionListener.wrap( response -> { @@ -446,15 +518,21 @@ public void onFailure(Exception e) { }; // Start job task - ActionListener jobUpateListener = ActionListener.wrap( - response -> { - persistentTasksService.sendStartRequest(MlTasks.jobTaskId(jobParams.getJobId()), - MlTasks.JOB_TASK_NAME, jobParams, waitForJobToStart); - }, + ActionListener memoryRequirementRefreshListener = ActionListener.wrap( + response -> persistentTasksService.sendStartRequest(MlTasks.jobTaskId(jobParams.getJobId()), MlTasks.JOB_TASK_NAME, + jobParams, waitForJobToStart), listener::onFailure ); - // Update established model memory for pre-6.1 jobs that haven't had it set + // Tell the job tracker to refresh the job memory requirement if the cluster supports this (TODO: simplify in 7.0) + ActionListener jobUpdateListener = clusterSupportsMlMemoryTracker ? ActionListener.wrap( + response -> executeAsyncWithOrigin(client, ML_ORIGIN, RefreshJobMemoryRequirementAction.INSTANCE, + new RefreshJobMemoryRequirementAction.Request(jobParams.getJobId()), memoryRequirementRefreshListener), + listener::onFailure + ) : ActionListener.wrap(response -> memoryRequirementRefreshListener.onResponse(new AcknowledgedResponse(true)), + listener::onFailure); + + // Update established model memory for pre-6.1 jobs that haven't had it set (TODO: remove in 7.0) // and increase the model memory limit for 6.1 - 6.3 jobs ActionListener missingMappingsListener = ActionListener.wrap( response -> { @@ -462,8 +540,9 @@ public void onFailure(Exception e) { if (job != null) { Version jobVersion = job.getJobVersion(); Long jobEstablishedModelMemory = job.getEstablishedModelMemory(); - if ((jobVersion == null || jobVersion.before(Version.V_6_1_0)) + if (clusterSupportsMlMemoryTracker == false && (jobVersion == null || jobVersion.before(Version.V_6_1_0)) && (jobEstablishedModelMemory == null || jobEstablishedModelMemory == 0)) { + // TODO: remove in 7.0 - established model memory no longer needs to be set on the job object // Set the established memory usage for pre 6.1 jobs jobResultsProvider.getEstablishedMemoryUsage(job.getId(), null, null, establishedModelMemory -> { if (establishedModelMemory != null && establishedModelMemory > 0) { @@ -472,9 +551,9 @@ public void onFailure(Exception e) { UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(job.getId(), update); executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, - jobUpateListener); + jobUpdateListener); } else { - jobUpateListener.onResponse(null); + jobUpdateListener.onResponse(null); } }, listener::onFailure); } else if (jobVersion != null && @@ -491,16 +570,16 @@ public void onFailure(Exception e) { .setAnalysisLimits(limits).build(); UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(job.getId(), update); executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, - jobUpateListener); + jobUpdateListener); } else { - jobUpateListener.onResponse(null); + jobUpdateListener.onResponse(null); } } else { - jobUpateListener.onResponse(null); + jobUpdateListener.onResponse(null); } } else { - jobUpateListener.onResponse(null); + jobUpdateListener.onResponse(null); } }, listener::onFailure ); @@ -644,6 +723,7 @@ private void addDocMappingIfMissing(String alias, CheckedSupplier { private final AutodetectProcessManager autodetectProcessManager; + private final MlMemoryTracker memoryTracker; /** * The maximum number of open jobs can be different on each node. However, nodes on older versions @@ -657,9 +737,10 @@ public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecut private volatile int maxLazyMLNodes; public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService, - AutodetectProcessManager autodetectProcessManager) { + AutodetectProcessManager autodetectProcessManager, MlMemoryTracker memoryTracker) { super(settings, MlTasks.JOB_TASK_NAME, MachineLearning.UTILITY_THREAD_POOL_NAME); this.autodetectProcessManager = autodetectProcessManager; + this.memoryTracker = memoryTracker; this.fallbackMaxNumberOfOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings); this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings); this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings); @@ -679,10 +760,11 @@ public PersistentTasksCustomMetaData.Assignment getAssignment(OpenJobAction.JobP maxConcurrentJobAllocations, fallbackMaxNumberOfOpenJobs, maxMachineMemoryPercent, + memoryTracker, logger); if (assignment.getExecutorNode() == null) { int numMlNodes = 0; - for(DiscoveryNode node : clusterState.getNodes()) { + for (DiscoveryNode node : clusterState.getNodes()) { if (Boolean.valueOf(node.getAttributes().get(MachineLearning.ML_ENABLED_NODE_ATTR))) { numMlNodes++; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRefreshJobMemoryRequirementAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRefreshJobMemoryRequirementAction.java new file mode 100644 index 0000000000000..ddb5c99de1fb7 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRefreshJobMemoryRequirementAction.java @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.action.RefreshJobMemoryRequirementAction; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.process.MlMemoryTracker; + +public class TransportRefreshJobMemoryRequirementAction + extends TransportMasterNodeAction { + + private final MlMemoryTracker memoryTracker; + + @Inject + public TransportRefreshJobMemoryRequirementAction(Settings settings, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + MlMemoryTracker memoryTracker) { + super(settings, RefreshJobMemoryRequirementAction.NAME, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, RefreshJobMemoryRequirementAction.Request::new); + this.memoryTracker = memoryTracker; + } + + @Override + protected String executor() { + return MachineLearning.UTILITY_THREAD_POOL_NAME; + } + + @Override + protected AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + @Override + protected void masterOperation(RefreshJobMemoryRequirementAction.Request request, ClusterState state, + ActionListener listener) { + try { + memoryTracker.refreshJobMemory(request.getJobId(), mem -> listener.onResponse(new AcknowledgedResponse(mem != null))); + } catch (Exception e) { + listener.onFailure(e); + } + } + + @Override + protected ClusterBlockException checkBlock(RefreshJobMemoryRequirementAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.READ); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index a3614a2353e90..72e4a60963f48 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -111,6 +111,7 @@ public class AutoDetectResultProcessor { * New model size stats are read as the process is running */ private volatile ModelSizeStats latestModelSizeStats; + // TODO: remove in 7.0, along with all established model memory functionality in this class private volatile Date latestDateForEstablishedModelMemoryCalc; private volatile long latestEstablishedModelMemory; private volatile boolean haveNewLatestModelSizeStats; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java new file mode 100644 index 0000000000000..f8474c45df480 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java @@ -0,0 +1,219 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.process; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.LocalNodeMasterListener; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.JobManager; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * This class keeps track of the memory requirement of ML jobs. + * It only functions on the master node - for this reason it should only be used by master node actions. + * The memory requirement for open ML jobs is updated at the following times: + * 1. When a master node is elected the memory requirement for all non-closed ML jobs is updated + * 2. The memory requirement for all non-closed ML jobs is updated periodically thereafter - every 30 seconds by default + * 3. When a job is opened the memory requirement for that single job is updated + * As a result of this every open job should have a value for its memory requirement that is no more than 30 seconds out-of-date. + */ +public class MlMemoryTracker extends AbstractComponent implements LocalNodeMasterListener { + + public static final Setting ML_MEMORY_UPDATE_FREQUENCY = + Setting.timeSetting("xpack.ml.memory_update_frequency", TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(10), + Setting.Property.Dynamic, Setting.Property.NodeScope); + + private final ThreadPool threadPool; + private final ClusterService clusterService; + private final JobManager jobManager; + private final JobResultsProvider jobResultsProvider; + private final ConcurrentHashMap memoryRequirementByJob; + private volatile boolean isMaster; + private volatile TimeValue updateFrequency; + + public MlMemoryTracker(Settings settings, ClusterService clusterService, ThreadPool threadPool, JobManager jobManager, + JobResultsProvider jobResultsProvider) { + super(settings); + this.threadPool = threadPool; + this.clusterService = clusterService; + this.jobManager = jobManager; + this.jobResultsProvider = jobResultsProvider; + this.updateFrequency = ML_MEMORY_UPDATE_FREQUENCY.get(settings); + memoryRequirementByJob = new ConcurrentHashMap<>(); + clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_MEMORY_UPDATE_FREQUENCY, + this::setUpdateFrequency); + clusterService.addLocalNodeMasterListener(this); + } + + @Override + public void onMaster() { + isMaster = true; + logger.trace("Elected master - scheduling ML memory update"); + try { + // Submit a job that will start after updateFrequency, and reschedule itself after running + threadPool.schedule(updateFrequency, executorName(), new SubmitReschedulingMlMemoryUpdate()); + threadPool.executor(executorName()).execute( + () -> refresh(clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE))); + } catch (EsRejectedExecutionException e) { + logger.debug("Couldn't schedule ML memory update - node might be shutting down", e); + } + } + + @Override + public void offMaster() { + isMaster = false; + memoryRequirementByJob.clear(); + } + + @Override + public String executorName() { + return MachineLearning.UTILITY_THREAD_POOL_NAME; + } + + public Long getJobMemoryRequirement(String jobId) { + if (isMaster == false) { + return null; + } + + Long memoryRequirement = memoryRequirementByJob.get(jobId); + if (memoryRequirement != null) { + return memoryRequirement; + } + + // Fallback for mixed version 6.6+/pre-6.6 cluster - TODO: remove in 7.0 + Job job = MlMetadata.getMlMetadata(clusterService.state()).getJobs().get(jobId); + if (job != null) { + return job.estimateMemoryFootprint(); + } + + return null; + } + + public void removeJob(String jobId) { + memoryRequirementByJob.remove(jobId); + } + + void setUpdateFrequency(TimeValue updateFrequency) { + this.updateFrequency = updateFrequency; + } + + /** + * This refreshes the memory requirement for every ML job that has a corresponding persistent task. + */ + void refresh(PersistentTasksCustomMetaData persistentTasks) { + + // persistentTasks will be null if there's never been a persistent task created in this cluster + if (isMaster == false || persistentTasks == null) { + return; + } + + List> mlJobTasks = persistentTasks.tasks().stream() + .filter(task -> MlTasks.JOB_TASK_NAME.equals(task.getTaskName())).collect(Collectors.toList()); + for (PersistentTasksCustomMetaData.PersistentTask mlJobTask : mlJobTasks) { + OpenJobAction.JobParams jobParams = (OpenJobAction.JobParams) mlJobTask.getParams(); + refreshJobMemory(jobParams.getJobId(), mem -> {}); + } + } + + /** + * Refresh the memory requirement for a single job. + * @param jobId The ID of the job to refresh the memory requirement for + * @param listener A callback that will receive the memory requirement, + * or null if it cannot be calculated + */ + public void refreshJobMemory(String jobId, Consumer listener) { + if (isMaster == false) { + listener.accept(null); + return; + } + + jobResultsProvider.getEstablishedMemoryUsage(jobId, null, null, establishedModelMemoryBytes -> { + if (establishedModelMemoryBytes <= 0L) { + setJobMemoryToLimit(jobId, listener); + } else { + Long memoryRequirementBytes = establishedModelMemoryBytes + Job.PROCESS_MEMORY_OVERHEAD.getBytes(); + memoryRequirementByJob.put(jobId, memoryRequirementBytes); + listener.accept(memoryRequirementBytes); + } + }, e -> { + logger.error("[" + jobId + "] failed to calculate job established model memory requirement", e); + setJobMemoryToLimit(jobId, listener); + }); + } + + private void setJobMemoryToLimit(String jobId, Consumer listener) { + jobManager.getJob(jobId, ActionListener.wrap(job -> { + Long memoryLimitMb = job.getAnalysisLimits().getModelMemoryLimit(); + if (memoryLimitMb != null) { + Long memoryRequirementBytes = ByteSizeUnit.MB.toBytes(memoryLimitMb) + Job.PROCESS_MEMORY_OVERHEAD.getBytes(); + memoryRequirementByJob.put(jobId, memoryRequirementBytes); + listener.accept(memoryRequirementBytes); + } else { + memoryRequirementByJob.remove(jobId); + listener.accept(null); + } + }, e -> { + if (e instanceof ResourceNotFoundException) { + // TODO: does this also happen if the .ml-config index exists but is unavailable? + logger.trace("[{}] job deleted during ML memory update", jobId); + } else { + logger.error("[" + jobId + "] failed to get job during ML memory update", e); + } + memoryRequirementByJob.remove(jobId); + listener.accept(null); + })); + } + + /** + * Class used to submit {@link #refresh} on the {@link MlMemoryTracker} threadpool, these jobs will + * reschedule themselves by placing a new instance of this class onto the scheduled threadpool. + */ + private class SubmitReschedulingMlMemoryUpdate implements Runnable { + @Override + public void run() { + try { + threadPool.executor(executorName()).execute(() -> { + try { + refresh(clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); + } finally { + // schedule again if still on master + if (isMaster) { + if (logger.isTraceEnabled()) { + logger.trace("Scheduling next run for updating ML memory in: {}", updateFrequency); + } + try { + threadPool.schedule(updateFrequency, executorName(), this); + } catch (EsRejectedExecutionException e) { + logger.debug("Couldn't schedule ML memory update - node might be shutting down", e); + } + } + } + }); + } catch (EsRejectedExecutionException e) { + logger.debug("Couldn't execute ML memory update - node might be shutting down", e); + } + } + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index 4a98b380b0929..886c4a7521b4d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -50,6 +50,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.notifications.AuditorField; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import java.io.IOException; @@ -71,6 +72,8 @@ public class TransportOpenJobActionTests extends ESTestCase { + private MlMemoryTracker memoryTracker = mock(MlMemoryTracker.class); + public void testValidate_jobMissing() { expectThrows(ResourceNotFoundException.class, () -> TransportOpenJobAction.validate("job_id2", null)); } @@ -125,7 +128,7 @@ public void testSelectLeastLoadedMlNode_byCount() { jobBuilder.setJobVersion(Version.CURRENT); Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id4", jobBuilder.build(), - cs.build(), 2, 10, 30, logger); + cs.build(), 2, 10, 30, memoryTracker, logger); assertEquals("", result.getExplanation()); assertEquals("_node_id3", result.getExecutorNode()); } @@ -161,7 +164,7 @@ public void testSelectLeastLoadedMlNode_maxCapacity() { Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id0", new ByteSizeValue(150, ByteSizeUnit.MB)).build(new Date()); Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id0", job, cs.build(), 2, - maxRunningJobsPerNode, 30, logger); + maxRunningJobsPerNode, 30, memoryTracker, logger); assertNull(result.getExecutorNode()); assertTrue(result.getExplanation().contains("because this node is full. Number of opened jobs [" + maxRunningJobsPerNode + "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]")); @@ -187,7 +190,7 @@ public void testSelectLeastLoadedMlNode_noMlNodes() { Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id2", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 2, 10, 30, memoryTracker, logger); assertTrue(result.getExplanation().contains("because this node isn't a ml node")); assertNull(result.getExecutorNode()); } @@ -221,7 +224,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id6", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()); ClusterState cs = csBuilder.build(); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 2, 10, 30, memoryTracker, logger); assertEquals("_node_id3", result.getExecutorNode()); tasksBuilder = PersistentTasksCustomMetaData.builder(tasks); @@ -231,7 +234,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker, logger); assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); @@ -242,7 +245,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker, logger); assertNull("no node selected, because stale task", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); @@ -253,7 +256,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker, logger); assertNull("no node selected, because null state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } @@ -291,7 +294,7 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id7", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()); // Allocation won't be possible if the stale failed job is treated as opening - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker, logger); assertEquals("_node_id1", result.getExecutorNode()); tasksBuilder = PersistentTasksCustomMetaData.builder(tasks); @@ -301,7 +304,7 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 2, 10, 30, logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 2, 10, 30, memoryTracker, logger); assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } @@ -332,7 +335,8 @@ public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() { cs.nodes(nodes); metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 2, 10, 30, + memoryTracker, logger); assertThat(result.getExplanation(), containsString("because this node does not support jobs of type [incompatible_type]")); assertNull(result.getExecutorNode()); } @@ -359,7 +363,8 @@ public void testSelectLeastLoadedMlNode_noNodesPriorTo_V_5_5() { Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id7", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 2, 10, 30, + memoryTracker, logger); assertThat(result.getExplanation(), containsString("because this node does not support machine learning jobs")); assertNull(result.getExecutorNode()); } @@ -385,7 +390,8 @@ public void testSelectLeastLoadedMlNode_jobWithRulesButNoNodeMeetsRequiredVersio cs.metaData(metaData); Job job = jobWithRules("job_with_rules"); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 10, 30, memoryTracker, + logger); assertThat(result.getExplanation(), containsString( "because jobs using custom_rules require a node of version [6.4.0] or higher")); assertNull(result.getExecutorNode()); @@ -412,7 +418,8 @@ public void testSelectLeastLoadedMlNode_jobWithRulesAndNodeMeetsRequiredVersion( cs.metaData(metaData); Job job = jobWithRules("job_with_rules"); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 10, 30, memoryTracker, + logger); assertNotNull(result.getExecutorNode()); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportRefreshJobMemoryRequirementActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportRefreshJobMemoryRequirementActionTests.java new file mode 100644 index 0000000000000..1b231adea7170 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportRefreshJobMemoryRequirementActionTests.java @@ -0,0 +1,12 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.test.ESTestCase; + +public class TransportRefreshJobMemoryRequirementActionTests extends ESTestCase { + +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java index 87aa3c5b926e3..c4150d633a8f0 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java @@ -123,12 +123,10 @@ public void testLazyNodeValidation() throws Exception { }); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/34084") public void testSingleNode() throws Exception { verifyMaxNumberOfJobsLimit(1, randomIntBetween(1, 100)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/34084") public void testMultipleNodes() throws Exception { verifyMaxNumberOfJobsLimit(3, randomIntBetween(1, 100)); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java new file mode 100644 index 0000000000000..64d6bbad36ae1 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java @@ -0,0 +1,153 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.process; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.JobManager; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; +import org.junit.Before; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class MlMemoryTrackerTests extends ESTestCase { + + private ClusterService clusterService; + private ThreadPool threadPool; + private JobManager jobManager; + private JobResultsProvider jobResultsProvider; + private MlMemoryTracker memoryTracker; + + @Before + public void setup() { + + clusterService = mock(ClusterService.class); + ClusterSettings clusterSettings = + new ClusterSettings(Settings.EMPTY, Collections.singleton(MlMemoryTracker.ML_MEMORY_UPDATE_FREQUENCY)); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + threadPool = mock(ThreadPool.class); + ExecutorService executorService = mock(ExecutorService.class); + when(threadPool.executor(anyString())).thenReturn(executorService); + jobManager = mock(JobManager.class); + jobResultsProvider = mock(JobResultsProvider.class); + memoryTracker = new MlMemoryTracker(Settings.EMPTY, clusterService, threadPool, jobManager, jobResultsProvider); + } + + public void testRefreshAll() { + + boolean isMaster = randomBoolean(); + if (isMaster) { + memoryTracker.onMaster(); + } else { + memoryTracker.offMaster(); + } + + int numMlJobTasks = randomIntBetween(2, 5); + Map> tasks = new HashMap<>(); + for (int i = 1; i <= numMlJobTasks; ++i) { + String jobId = "job" + i; + PersistentTasksCustomMetaData.PersistentTask task = makeTestTask(jobId); + tasks.put(task.getId(), task); + } + PersistentTasksCustomMetaData persistentTasks = new PersistentTasksCustomMetaData(numMlJobTasks, tasks); + + memoryTracker.refresh(persistentTasks); + + if (isMaster) { + for (int i = 1; i <= numMlJobTasks; ++i) { + String jobId = "job" + i; + verify(jobResultsProvider, times(1)).getEstablishedMemoryUsage(eq(jobId), any(), any(), any(), any()); + } + } else { + verify(jobResultsProvider, never()).getEstablishedMemoryUsage(anyString(), any(), any(), any(), any()); + } + } + + @SuppressWarnings("unchecked") + public void testRefreshOne() throws InterruptedException { + + boolean isMaster = randomBoolean(); + if (isMaster) { + memoryTracker.onMaster(); + } else { + memoryTracker.offMaster(); + } + + String jobId = "job"; + boolean haveEstablishedModelMemory = randomBoolean(); + + long modelBytes = 1024 * 1024; + doAnswer(invocation -> { + ((Consumer) invocation.getArguments()[3]).accept(haveEstablishedModelMemory ? modelBytes : 0L); + return null; + }).when(jobResultsProvider).getEstablishedMemoryUsage(eq(jobId), any(), any(), any(), any()); + + long modelMemoryLimitMb = 2; + Job job = mock(Job.class); + when(job.getAnalysisLimits()).thenReturn(new AnalysisLimits(modelMemoryLimitMb, 4L)); + doAnswer(invocation -> { + ((ActionListener) invocation.getArguments()[1]).onResponse(job); + return null; + }).when(jobManager).getJob(eq(jobId), any()); + + AtomicReference refreshedMemoryRequirement = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + memoryTracker.refreshJobMemory(jobId, mem -> { + refreshedMemoryRequirement.set(mem); + latch.countDown(); + }); + assertTrue(latch.await(1, TimeUnit.SECONDS)); + + if (isMaster) { + if (haveEstablishedModelMemory) { + assertEquals(Long.valueOf(modelBytes + Job.PROCESS_MEMORY_OVERHEAD.getBytes()), + memoryTracker.getJobMemoryRequirement(jobId)); + } else { + assertEquals(Long.valueOf(ByteSizeUnit.MB.toBytes(modelMemoryLimitMb) + Job.PROCESS_MEMORY_OVERHEAD.getBytes()), + memoryTracker.getJobMemoryRequirement(jobId)); + } + } else { + assertNull(memoryTracker.getJobMemoryRequirement(jobId)); + } + + assertEquals(memoryTracker.getJobMemoryRequirement(jobId), refreshedMemoryRequirement.get()); + + memoryTracker.removeJob(jobId); + assertNull(memoryTracker.getJobMemoryRequirement(jobId)); + } + + private PersistentTasksCustomMetaData.PersistentTask makeTestTask(String jobId) { + return new PersistentTasksCustomMetaData.PersistentTask<>("job-" + jobId, MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams(jobId), + 0, PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT); + } +} From f860da1d92fcf48c494e8ae114b1c0ccd063726d Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 7 Nov 2018 16:45:21 +0000 Subject: [PATCH 2/7] Modifications based on initial review comments - not complete! --- .../xpack/core/XPackClientPlugin.java | 2 - .../RefreshJobMemoryRequirementAction.java | 134 ------------ ...obMemoryRequirementActionRequestTests.java | 33 --- .../xpack/ml/MachineLearning.java | 4 - .../ml/action/TransportOpenJobAction.java | 59 +++-- ...portRefreshJobMemoryRequirementAction.java | 64 ------ .../xpack/ml/process/MlMemoryTracker.java | 206 +++++++++++------- .../action/TransportOpenJobActionTests.java | 9 +- ...efreshJobMemoryRequirementActionTests.java | 12 - .../ml/process/MlMemoryTrackerTests.java | 18 +- 10 files changed, 180 insertions(+), 361 deletions(-) delete mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/RefreshJobMemoryRequirementAction.java delete mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/RefreshJobMemoryRequirementActionRequestTests.java delete mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRefreshJobMemoryRequirementAction.java delete mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportRefreshJobMemoryRequirementActionTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 1bf673e0c3e9d..480bd1f4534d3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -84,7 +84,6 @@ import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.action.PutFilterAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; -import org.elasticsearch.xpack.core.ml.action.RefreshJobMemoryRequirementAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; @@ -232,7 +231,6 @@ public List getClientActions() { UpdateFilterAction.INSTANCE, DeleteFilterAction.INSTANCE, KillProcessAction.INSTANCE, - RefreshJobMemoryRequirementAction.INSTANCE, GetBucketsAction.INSTANCE, GetInfluencersAction.INSTANCE, GetOverallBucketsAction.INSTANCE, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/RefreshJobMemoryRequirementAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/RefreshJobMemoryRequirementAction.java deleted file mode 100644 index 76025bebfa8b8..0000000000000 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/RefreshJobMemoryRequirementAction.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.core.ml.action; - -import org.elasticsearch.action.Action; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.master.AcknowledgedRequest; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; -import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.ObjectParser; -import org.elasticsearch.common.xcontent.ToXContentObject; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.xpack.core.ml.job.config.Job; - -import java.io.IOException; -import java.util.Objects; - -public class RefreshJobMemoryRequirementAction extends Action { - - public static final RefreshJobMemoryRequirementAction INSTANCE = new RefreshJobMemoryRequirementAction(); - public static final String NAME = "cluster:internal/xpack/ml/job/refresh_memory_requirement"; - - private RefreshJobMemoryRequirementAction() { - super(NAME); - } - - @Override - public RequestBuilder newRequestBuilder(ElasticsearchClient client) { - return new RequestBuilder(client, this); - } - - @Override - public AcknowledgedResponse newResponse() { - return new AcknowledgedResponse(); - } - - public static class Request extends AcknowledgedRequest implements ToXContentObject { - - public static ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); - - static { - PARSER.declareString(Request::setJobId, Job.ID); - } - - public static Request parseRequest(String jobId, XContentParser parser) { - Request request = PARSER.apply(parser, null); - if (jobId != null) { - request.setJobId(jobId); - } - return request; - } - - private String jobId; - - public Request(String jobId) { - - this.jobId = jobId; - } - - public Request() { - } - - public void setJobId(String jobId) { - this.jobId = jobId; - } - - public String getJobId() { - return jobId; - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - jobId = in.readString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(jobId); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field(Job.ID.getPreferredName(), jobId); - builder.endObject(); - return builder; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Request request = (Request) o; - return Objects.equals(jobId, request.jobId); - } - - @Override - public int hashCode() { - return Objects.hash(jobId); - } - - @Override - public final String toString() { - return Strings.toString(this); - } - } - - public static class RequestBuilder extends MasterNodeOperationRequestBuilder { - - public RequestBuilder(ElasticsearchClient client, RefreshJobMemoryRequirementAction action) { - super(client, action, new Request()); - } - } -} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/RefreshJobMemoryRequirementActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/RefreshJobMemoryRequirementActionRequestTests.java deleted file mode 100644 index 60ee1aeb5f2e5..0000000000000 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/RefreshJobMemoryRequirementActionRequestTests.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.core.ml.action; - -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.test.AbstractStreamableXContentTestCase; - -public class RefreshJobMemoryRequirementActionRequestTests - extends AbstractStreamableXContentTestCase { - - @Override - protected RefreshJobMemoryRequirementAction.Request createTestInstance() { - return new RefreshJobMemoryRequirementAction.Request(randomAlphaOfLengthBetween(1, 20)); - } - - @Override - protected boolean supportsUnknownFields() { - return false; - } - - @Override - protected RefreshJobMemoryRequirementAction.Request createBlankInstance() { - return new RefreshJobMemoryRequirementAction.Request(); - } - - @Override - protected RefreshJobMemoryRequirementAction.Request doParseInstance(XContentParser parser) { - return RefreshJobMemoryRequirementAction.Request.parseRequest(null, parser); - } -} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 153e0883a1f1f..8796373ab5264 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -95,7 +95,6 @@ import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.action.PutFilterAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; -import org.elasticsearch.xpack.core.ml.action.RefreshJobMemoryRequirementAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; @@ -150,7 +149,6 @@ import org.elasticsearch.xpack.ml.action.TransportPutDatafeedAction; import org.elasticsearch.xpack.ml.action.TransportPutFilterAction; import org.elasticsearch.xpack.ml.action.TransportPutJobAction; -import org.elasticsearch.xpack.ml.action.TransportRefreshJobMemoryRequirementAction; import org.elasticsearch.xpack.ml.action.TransportRevertModelSnapshotAction; import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction; import org.elasticsearch.xpack.ml.action.TransportStopDatafeedAction; @@ -303,7 +301,6 @@ public List> getSettings() { MachineLearningField.MAX_MODEL_MEMORY_LIMIT, MAX_LAZY_ML_NODES, MAX_MACHINE_MEMORY_PERCENT, - MlMemoryTracker.ML_MEMORY_UPDATE_FREQUENCY, AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING, AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING, AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC, @@ -552,7 +549,6 @@ public List getRestHandlers(Settings settings, RestController restC new ActionHandler<>(UpdateFilterAction.INSTANCE, TransportUpdateFilterAction.class), new ActionHandler<>(DeleteFilterAction.INSTANCE, TransportDeleteFilterAction.class), new ActionHandler<>(KillProcessAction.INSTANCE, TransportKillProcessAction.class), - new ActionHandler<>(RefreshJobMemoryRequirementAction.INSTANCE, TransportRefreshJobMemoryRequirementAction.class), new ActionHandler<>(GetBucketsAction.INSTANCE, TransportGetBucketsAction.class), new ActionHandler<>(GetInfluencersAction.INSTANCE, TransportGetInfluencersAction.class), new ActionHandler<>(GetOverallBucketsAction.INSTANCE, TransportGetOverallBucketsAction.class), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 124fcc43fcf93..1a17b786db945 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -55,7 +55,6 @@ import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; -import org.elasticsearch.xpack.core.ml.action.RefreshJobMemoryRequirementAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; @@ -97,20 +96,23 @@ To ensure that a subsequent close job call will see that same task status (and s */ public class TransportOpenJobAction extends TransportMasterNodeAction { + private static final PersistentTasksCustomMetaData.Assignment AWAITING_LAZY_ASSIGNMENT = + new PersistentTasksCustomMetaData.Assignment(null, "persistent task is awaiting node assignment."); + private final XPackLicenseState licenseState; private final PersistentTasksService persistentTasksService; private final Client client; private final JobResultsProvider jobResultsProvider; private final JobConfigProvider jobConfigProvider; - private static final PersistentTasksCustomMetaData.Assignment AWAITING_LAZY_ASSIGNMENT = - new PersistentTasksCustomMetaData.Assignment(null, "persistent task is awaiting node assignment."); + private final MlMemoryTracker memoryTracker; @Inject public TransportOpenJobAction(Settings settings, TransportService transportService, ThreadPool threadPool, XPackLicenseState licenseState, ClusterService clusterService, PersistentTasksService persistentTasksService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Client client, - JobResultsProvider jobResultsProvider, JobConfigProvider jobConfigProvider) { + JobResultsProvider jobResultsProvider, JobConfigProvider jobConfigProvider, + MlMemoryTracker memoryTracker) { super(settings, OpenJobAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, OpenJobAction.Request::new); this.licenseState = licenseState; @@ -118,6 +120,7 @@ public TransportOpenJobAction(Settings settings, TransportService transportServi this.client = client; this.jobResultsProvider = jobResultsProvider; this.jobConfigProvider = jobConfigProvider; + this.memoryTracker = memoryTracker; } /** @@ -157,14 +160,38 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j return new PersistentTasksCustomMetaData.Assignment(null, reason); } + // Try to allocate jobs according to memory usage, but if that's not possible (maybe due to a mixed version cluster or maybe + // because of some weird OS problem) then fall back to the old mechanism of only considering numbers of assigned jobs + boolean allocateByMemory = true; + + if (memoryTracker.isRecentlyRefreshed() == false) { + + boolean scheduledRefresh = memoryTracker.asyncRefresh(ActionListener.wrap( + aVoid -> { + // TODO: find a way to get the persistent task framework to do another reassignment check BLOCKER! + // Persistent task allocation reacts to custom metadata changes, so one way would be to retain the + // MlMetadata as a single number that we increment when we want to kick persistent tasks. + // A less sneaky way would be to introduce an internal action specifically for the purpose of + // asking persistent tasks to re-check whether unallocated tasks can be allocated. + }, + e -> logger.error("Failed to refresh job memory requirements", e) + )); + if (scheduledRefresh) { + String reason = "Not opening job [" + jobId + "] because job memory requirements are stale - refresh requested"; + logger.debug(reason); + return new PersistentTasksCustomMetaData.Assignment(null, reason); + } else { + allocateByMemory = false; + logger.warn("Falling back to allocating job [{}] by job counts because a memory requirement refresh could not be scheduled", + jobId); + } + } + List reasons = new LinkedList<>(); long maxAvailableCount = Long.MIN_VALUE; long maxAvailableMemory = Long.MIN_VALUE; DiscoveryNode minLoadedNodeByCount = null; DiscoveryNode minLoadedNodeByMemory = null; - // Try to allocate jobs according to memory usage, but if that's not possible (maybe due to a mixed version cluster or maybe - // because of some weird OS problem) then fall back to the old mechanism of only considering numbers of assigned jobs - boolean allocateByMemory = true; PersistentTasksCustomMetaData persistentTasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); for (DiscoveryNode node : clusterState.getNodes()) { Map nodeAttributes = node.getAttributes(); @@ -518,19 +545,17 @@ public void onFailure(Exception e) { }; // Start job task - ActionListener memoryRequirementRefreshListener = ActionListener.wrap( - response -> persistentTasksService.sendStartRequest(MlTasks.jobTaskId(jobParams.getJobId()), MlTasks.JOB_TASK_NAME, - jobParams, waitForJobToStart), - listener::onFailure + ActionListener memoryRequirementRefreshListener = ActionListener.wrap( + mem -> persistentTasksService.sendStartRequest(MlTasks.jobTaskId(jobParams.getJobId()), MlTasks.JOB_TASK_NAME, jobParams, + waitForJobToStart), + listener::onFailure ); - // Tell the job tracker to refresh the job memory requirement if the cluster supports this (TODO: simplify in 7.0) - ActionListener jobUpdateListener = clusterSupportsMlMemoryTracker ? ActionListener.wrap( - response -> executeAsyncWithOrigin(client, ML_ORIGIN, RefreshJobMemoryRequirementAction.INSTANCE, - new RefreshJobMemoryRequirementAction.Request(jobParams.getJobId()), memoryRequirementRefreshListener), + // Tell the job tracker to refresh the memory requirement for this job and all other jobs that have persistent tasks + ActionListener jobUpdateListener = ActionListener.wrap( + response -> memoryTracker.refreshJobMemoryAndAllOthers(jobParams.getJobId(), memoryRequirementRefreshListener), listener::onFailure - ) : ActionListener.wrap(response -> memoryRequirementRefreshListener.onResponse(new AcknowledgedResponse(true)), - listener::onFailure); + ); // Update established model memory for pre-6.1 jobs that haven't had it set (TODO: remove in 7.0) // and increase the model memory limit for 6.1 - 6.3 jobs diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRefreshJobMemoryRequirementAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRefreshJobMemoryRequirementAction.java deleted file mode 100644 index ddb5c99de1fb7..0000000000000 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRefreshJobMemoryRequirementAction.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.action; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.action.RefreshJobMemoryRequirementAction; -import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.process.MlMemoryTracker; - -public class TransportRefreshJobMemoryRequirementAction - extends TransportMasterNodeAction { - - private final MlMemoryTracker memoryTracker; - - @Inject - public TransportRefreshJobMemoryRequirementAction(Settings settings, TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, - MlMemoryTracker memoryTracker) { - super(settings, RefreshJobMemoryRequirementAction.NAME, transportService, clusterService, threadPool, actionFilters, - indexNameExpressionResolver, RefreshJobMemoryRequirementAction.Request::new); - this.memoryTracker = memoryTracker; - } - - @Override - protected String executor() { - return MachineLearning.UTILITY_THREAD_POOL_NAME; - } - - @Override - protected AcknowledgedResponse newResponse() { - return new AcknowledgedResponse(); - } - - @Override - protected void masterOperation(RefreshJobMemoryRequirementAction.Request request, ClusterState state, - ActionListener listener) { - try { - memoryTracker.refreshJobMemory(request.getJobId(), mem -> listener.onResponse(new AcknowledgedResponse(mem != null))); - } catch (Exception e) { - listener.onFailure(e); - } - } - - @Override - protected ClusterBlockException checkBlock(RefreshJobMemoryRequirementAction.Request request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.READ); - } -} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java index f8474c45df480..b3e2edbc36dab 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java @@ -10,10 +10,8 @@ import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.threadpool.ThreadPool; @@ -25,9 +23,10 @@ import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Consumer; import java.util.stream.Collectors; /** @@ -41,17 +40,16 @@ */ public class MlMemoryTracker extends AbstractComponent implements LocalNodeMasterListener { - public static final Setting ML_MEMORY_UPDATE_FREQUENCY = - Setting.timeSetting("xpack.ml.memory_update_frequency", TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(10), - Setting.Property.Dynamic, Setting.Property.NodeScope); + private static final Long RECENT_UPDATE_THRESHOLD_NS = 30_000_000_000L; // 30 seconds private final ThreadPool threadPool; private final ClusterService clusterService; private final JobManager jobManager; private final JobResultsProvider jobResultsProvider; private final ConcurrentHashMap memoryRequirementByJob; + private final List> fullRefreshCompletionListeners; private volatile boolean isMaster; - private volatile TimeValue updateFrequency; + private volatile Long lastUpdateNanotime; public MlMemoryTracker(Settings settings, ClusterService clusterService, ThreadPool threadPool, JobManager jobManager, JobResultsProvider jobResultsProvider) { @@ -60,31 +58,22 @@ public MlMemoryTracker(Settings settings, ClusterService clusterService, ThreadP this.clusterService = clusterService; this.jobManager = jobManager; this.jobResultsProvider = jobResultsProvider; - this.updateFrequency = ML_MEMORY_UPDATE_FREQUENCY.get(settings); memoryRequirementByJob = new ConcurrentHashMap<>(); - clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_MEMORY_UPDATE_FREQUENCY, - this::setUpdateFrequency); + fullRefreshCompletionListeners = new ArrayList<>(); clusterService.addLocalNodeMasterListener(this); } @Override public void onMaster() { isMaster = true; - logger.trace("Elected master - scheduling ML memory update"); - try { - // Submit a job that will start after updateFrequency, and reschedule itself after running - threadPool.schedule(updateFrequency, executorName(), new SubmitReschedulingMlMemoryUpdate()); - threadPool.executor(executorName()).execute( - () -> refresh(clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE))); - } catch (EsRejectedExecutionException e) { - logger.debug("Couldn't schedule ML memory update - node might be shutting down", e); - } + logger.trace("ML memory tracker on master"); } @Override public void offMaster() { isMaster = false; memoryRequirementByJob.clear(); + lastUpdateNanotime = null; } @Override @@ -92,6 +81,22 @@ public String executorName() { return MachineLearning.UTILITY_THREAD_POOL_NAME; } + /** + * Is the information in this object sufficiently up to date + * for valid allocation decisions to be made using it? + */ + public boolean isRecentlyRefreshed() { + Long localLastUpdateNanotime = lastUpdateNanotime; + return localLastUpdateNanotime != null && System.nanoTime() - localLastUpdateNanotime < RECENT_UPDATE_THRESHOLD_NS; + } + + /** + * Get the memory requirement for a job. + * This method only works on the master node. + * @param jobId The job ID. + * @return The memory requirement of the job specified by {@code jobId}, + * or null if it cannot be calculated. + */ public Long getJobMemoryRequirement(String jobId) { if (isMaster == false) { return null; @@ -115,64 +120,138 @@ public void removeJob(String jobId) { memoryRequirementByJob.remove(jobId); } - void setUpdateFrequency(TimeValue updateFrequency) { - this.updateFrequency = updateFrequency; + /** + * Uses a separate thread to refresh the memory requirement for every ML job that has + * a corresponding persistent task. This method only works on the master node. + * @param listener Will be called when the async refresh completes or fails. + * @return true if the async refresh is scheduled, and false + * if this is not possible for some reason. + */ + public boolean asyncRefresh(ActionListener listener) { + + if (isMaster) { + try { + threadPool.executor(executorName()).execute( + () -> refresh(clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE), listener)); + return true; + } catch (EsRejectedExecutionException e) { + logger.debug("Couldn't schedule ML memory update - node might be shutting down", e); + } + } + + return false; + } + + /** + * This refreshes the memory requirement for every ML job that has a corresponding + * persistent task and, in addition, one job that doesn't have a persistent task. + * This method only works on the master node. + * @param jobId The job ID of the job whose memory requirement is to be refreshed + * despite not having a corresponding persistent task. + * @param listener Receives the memory requirement of the job specified by {@code jobId}, + * or null if it cannot be calculated. + */ + public void refreshJobMemoryAndAllOthers(String jobId, ActionListener listener) { + + if (isMaster == false) { + listener.onResponse(null); + return; + } + + PersistentTasksCustomMetaData persistentTasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + refresh(persistentTasks, ActionListener.wrap(aVoid -> refreshJobMemory(jobId, listener), listener::onFailure)); } /** * This refreshes the memory requirement for every ML job that has a corresponding persistent task. */ - void refresh(PersistentTasksCustomMetaData persistentTasks) { + void refresh(PersistentTasksCustomMetaData persistentTasks, ActionListener onCompletion) { + + synchronized (fullRefreshCompletionListeners) { + fullRefreshCompletionListeners.add(onCompletion); + if (fullRefreshCompletionListeners.size() > 1) { + // A refresh is already in progress, so don't do another + return; + } + } + + ActionListener refreshComplete = ActionListener.wrap(aVoid -> { + lastUpdateNanotime = System.nanoTime(); + synchronized (fullRefreshCompletionListeners) { + assert fullRefreshCompletionListeners.isEmpty() == false; + for (ActionListener listener : fullRefreshCompletionListeners) { + listener.onResponse(null); + } + fullRefreshCompletionListeners.clear(); + } + }, onCompletion::onFailure); // persistentTasks will be null if there's never been a persistent task created in this cluster - if (isMaster == false || persistentTasks == null) { - return; + if (persistentTasks == null) { + refreshComplete.onResponse(null); + } else { + List> mlJobTasks = persistentTasks.tasks().stream() + .filter(task -> MlTasks.JOB_TASK_NAME.equals(task.getTaskName())).collect(Collectors.toList()); + iterateMlJobTasks(mlJobTasks.iterator(), refreshComplete); } + } - List> mlJobTasks = persistentTasks.tasks().stream() - .filter(task -> MlTasks.JOB_TASK_NAME.equals(task.getTaskName())).collect(Collectors.toList()); - for (PersistentTasksCustomMetaData.PersistentTask mlJobTask : mlJobTasks) { - OpenJobAction.JobParams jobParams = (OpenJobAction.JobParams) mlJobTask.getParams(); - refreshJobMemory(jobParams.getJobId(), mem -> {}); + private void iterateMlJobTasks(Iterator> iterator, + ActionListener refreshComplete) { + if (iterator.hasNext()) { + OpenJobAction.JobParams jobParams = (OpenJobAction.JobParams) iterator.next().getParams(); + refreshJobMemory(jobParams.getJobId(), + ActionListener.wrap(mem -> iterateMlJobTasks(iterator, refreshComplete), refreshComplete::onFailure)); + } else { + refreshComplete.onResponse(null); } } /** * Refresh the memory requirement for a single job. - * @param jobId The ID of the job to refresh the memory requirement for - * @param listener A callback that will receive the memory requirement, - * or null if it cannot be calculated + * This method only works on the master node. + * @param jobId The ID of the job to refresh the memory requirement for. + * @param listener Receives the job's memory requirement, or null + * if it cannot be calculated. */ - public void refreshJobMemory(String jobId, Consumer listener) { + public void refreshJobMemory(String jobId, ActionListener listener) { if (isMaster == false) { - listener.accept(null); + listener.onResponse(null); return; } - jobResultsProvider.getEstablishedMemoryUsage(jobId, null, null, establishedModelMemoryBytes -> { - if (establishedModelMemoryBytes <= 0L) { - setJobMemoryToLimit(jobId, listener); - } else { - Long memoryRequirementBytes = establishedModelMemoryBytes + Job.PROCESS_MEMORY_OVERHEAD.getBytes(); - memoryRequirementByJob.put(jobId, memoryRequirementBytes); - listener.accept(memoryRequirementBytes); - } - }, e -> { + try { + jobResultsProvider.getEstablishedMemoryUsage(jobId, null, null, + establishedModelMemoryBytes -> { + if (establishedModelMemoryBytes <= 0L) { + setJobMemoryToLimit(jobId, listener); + } else { + Long memoryRequirementBytes = establishedModelMemoryBytes + Job.PROCESS_MEMORY_OVERHEAD.getBytes(); + memoryRequirementByJob.put(jobId, memoryRequirementBytes); + listener.onResponse(memoryRequirementBytes); + } + }, + e -> { + logger.error("[" + jobId + "] failed to calculate job established model memory requirement", e); + setJobMemoryToLimit(jobId, listener); + } + ); + } catch (Exception e) { logger.error("[" + jobId + "] failed to calculate job established model memory requirement", e); setJobMemoryToLimit(jobId, listener); - }); + } } - private void setJobMemoryToLimit(String jobId, Consumer listener) { + private void setJobMemoryToLimit(String jobId, ActionListener listener) { jobManager.getJob(jobId, ActionListener.wrap(job -> { Long memoryLimitMb = job.getAnalysisLimits().getModelMemoryLimit(); if (memoryLimitMb != null) { Long memoryRequirementBytes = ByteSizeUnit.MB.toBytes(memoryLimitMb) + Job.PROCESS_MEMORY_OVERHEAD.getBytes(); memoryRequirementByJob.put(jobId, memoryRequirementBytes); - listener.accept(memoryRequirementBytes); + listener.onResponse(memoryRequirementBytes); } else { memoryRequirementByJob.remove(jobId); - listener.accept(null); + listener.onResponse(null); } }, e -> { if (e instanceof ResourceNotFoundException) { @@ -182,38 +261,7 @@ private void setJobMemoryToLimit(String jobId, Consumer listener) { logger.error("[" + jobId + "] failed to get job during ML memory update", e); } memoryRequirementByJob.remove(jobId); - listener.accept(null); + listener.onResponse(null); })); } - - /** - * Class used to submit {@link #refresh} on the {@link MlMemoryTracker} threadpool, these jobs will - * reschedule themselves by placing a new instance of this class onto the scheduled threadpool. - */ - private class SubmitReschedulingMlMemoryUpdate implements Runnable { - @Override - public void run() { - try { - threadPool.executor(executorName()).execute(() -> { - try { - refresh(clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); - } finally { - // schedule again if still on master - if (isMaster) { - if (logger.isTraceEnabled()) { - logger.trace("Scheduling next run for updating ML memory in: {}", updateFrequency); - } - try { - threadPool.schedule(updateFrequency, executorName(), this); - } catch (EsRejectedExecutionException e) { - logger.debug("Couldn't schedule ML memory update - node might be shutting down", e); - } - } - } - }); - } catch (EsRejectedExecutionException e) { - logger.debug("Couldn't execute ML memory update - node might be shutting down", e); - } - } - } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index 886c4a7521b4d..393fc492f5d63 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -52,6 +52,7 @@ import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; +import org.junit.Before; import java.io.IOException; import java.net.InetAddress; @@ -72,7 +73,13 @@ public class TransportOpenJobActionTests extends ESTestCase { - private MlMemoryTracker memoryTracker = mock(MlMemoryTracker.class); + private MlMemoryTracker memoryTracker; + + @Before + public void setup() { + memoryTracker = mock(MlMemoryTracker.class); + when(memoryTracker.isRecentlyRefreshed()).thenReturn(true); + } public void testValidate_jobMissing() { expectThrows(ResourceNotFoundException.class, () -> TransportOpenJobAction.validate("job_id2", null)); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportRefreshJobMemoryRequirementActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportRefreshJobMemoryRequirementActionTests.java deleted file mode 100644 index 1b231adea7170..0000000000000 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportRefreshJobMemoryRequirementActionTests.java +++ /dev/null @@ -1,12 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.action; - -import org.elasticsearch.test.ESTestCase; - -public class TransportRefreshJobMemoryRequirementActionTests extends ESTestCase { - -} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java index 64d6bbad36ae1..1efaeff123e19 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java @@ -7,7 +7,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -21,12 +20,9 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.junit.Before; -import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -52,9 +48,6 @@ public class MlMemoryTrackerTests extends ESTestCase { public void setup() { clusterService = mock(ClusterService.class); - ClusterSettings clusterSettings = - new ClusterSettings(Settings.EMPTY, Collections.singleton(MlMemoryTracker.ML_MEMORY_UPDATE_FREQUENCY)); - when(clusterService.getClusterSettings()).thenReturn(clusterSettings); threadPool = mock(ThreadPool.class); ExecutorService executorService = mock(ExecutorService.class); when(threadPool.executor(anyString())).thenReturn(executorService); @@ -81,7 +74,7 @@ public void testRefreshAll() { } PersistentTasksCustomMetaData persistentTasks = new PersistentTasksCustomMetaData(numMlJobTasks, tasks); - memoryTracker.refresh(persistentTasks); + memoryTracker.refresh(persistentTasks, ActionListener.wrap(aVoid -> {}, ESTestCase::assertNull)); if (isMaster) { for (int i = 1; i <= numMlJobTasks; ++i) { @@ -94,7 +87,7 @@ public void testRefreshAll() { } @SuppressWarnings("unchecked") - public void testRefreshOne() throws InterruptedException { + public void testRefreshOne() { boolean isMaster = randomBoolean(); if (isMaster) { @@ -121,12 +114,7 @@ public void testRefreshOne() throws InterruptedException { }).when(jobManager).getJob(eq(jobId), any()); AtomicReference refreshedMemoryRequirement = new AtomicReference<>(); - CountDownLatch latch = new CountDownLatch(1); - memoryTracker.refreshJobMemory(jobId, mem -> { - refreshedMemoryRequirement.set(mem); - latch.countDown(); - }); - assertTrue(latch.await(1, TimeUnit.SECONDS)); + memoryTracker.refreshJobMemory(jobId, ActionListener.wrap(refreshedMemoryRequirement::set, ESTestCase::assertNull)); if (isMaster) { if (haveEstablishedModelMemory) { From f30c8cc98778e127de68541385641f16f99f97f0 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Thu, 8 Nov 2018 13:54:55 +0000 Subject: [PATCH 3/7] Kick persistent tasks after memory requirement refresh Persistent tasks will recheck allocations when custom metadata changes, so updating a timestamp in the ML metadata will enable persistent tasks whose allocation was deferred to allow for a memory refresh to have another go at selecting an ML node. --- .../xpack/core/ml/MlMetadata.java | 73 ++++++++++++++-- .../ml/action/TransportOpenJobAction.java | 12 +-- .../xpack/ml/process/MlMemoryTracker.java | 83 +++++++++++++++---- .../xpack/ml/MlMetadataTests.java | 16 +++- .../ml/process/MlMemoryTrackerTests.java | 15 ++-- 5 files changed, 164 insertions(+), 35 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index 81762def4cc35..d15e056e251c3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -40,6 +40,7 @@ import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import java.io.IOException; +import java.time.Instant; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -57,8 +58,9 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { public static final String TYPE = "ml"; private static final ParseField JOBS_FIELD = new ParseField("jobs"); private static final ParseField DATAFEEDS_FIELD = new ParseField("datafeeds"); + private static final ParseField LAST_MEMORY_REFRESH_TIME_FIELD = new ParseField("last_memory_refresh_time"); - public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap()); + public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap(), null); // This parser follows the pattern that metadata is parsed leniently (to allow for enhancements) public static final ObjectParser LENIENT_PARSER = new ObjectParser<>("ml_metadata", true, Builder::new); @@ -66,15 +68,18 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { LENIENT_PARSER.declareObjectArray(Builder::putJobs, (p, c) -> Job.LENIENT_PARSER.apply(p, c).build(), JOBS_FIELD); LENIENT_PARSER.declareObjectArray(Builder::putDatafeeds, (p, c) -> DatafeedConfig.LENIENT_PARSER.apply(p, c).build(), DATAFEEDS_FIELD); + LENIENT_PARSER.declareLong(Builder::setLastMemoryRefreshTimeMs, LAST_MEMORY_REFRESH_TIME_FIELD); } private final SortedMap jobs; private final SortedMap datafeeds; + private final Instant lastMemoryRefreshTime; private final GroupOrJobLookup groupOrJobLookup; - private MlMetadata(SortedMap jobs, SortedMap datafeeds) { + private MlMetadata(SortedMap jobs, SortedMap datafeeds, Instant lastMemoryRefreshTime) { this.jobs = Collections.unmodifiableSortedMap(jobs); this.datafeeds = Collections.unmodifiableSortedMap(datafeeds); + this.lastMemoryRefreshTime = lastMemoryRefreshTime; this.groupOrJobLookup = new GroupOrJobLookup(jobs.values()); } @@ -112,6 +117,10 @@ public Set expandDatafeedIds(String expression, boolean allowNoDatafeeds .expand(expression, allowNoDatafeeds); } + public Instant getLastMemoryRefreshTime() { + return lastMemoryRefreshTime; + } + @Override public Version getMinimalSupportedVersion() { return Version.V_5_4_0; @@ -145,7 +154,11 @@ public MlMetadata(StreamInput in) throws IOException { datafeeds.put(in.readString(), new DatafeedConfig(in)); } this.datafeeds = datafeeds; - + if (in.getVersion().onOrAfter(Version.V_6_6_0)) { + lastMemoryRefreshTime = in.readBoolean() ? Instant.ofEpochSecond(in.readVLong(), in.readVInt()) : null; + } else { + lastMemoryRefreshTime = null; + } this.groupOrJobLookup = new GroupOrJobLookup(jobs.values()); } @@ -153,6 +166,15 @@ public MlMetadata(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { writeMap(jobs, out); writeMap(datafeeds, out); + if (out.getVersion().onOrAfter(Version.V_6_6_0)) { + if (lastMemoryRefreshTime == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeVLong(lastMemoryRefreshTime.getEpochSecond()); + out.writeVInt(lastMemoryRefreshTime.getNano()); + } + } } private static void writeMap(Map map, StreamOutput out) throws IOException { @@ -169,6 +191,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"), params); mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams); mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams); + if (lastMemoryRefreshTime != null) { + // We lose precision lower than milliseconds here - OK as millisecond precision is adequate for this use case + builder.timeField(LAST_MEMORY_REFRESH_TIME_FIELD.getPreferredName(), + LAST_MEMORY_REFRESH_TIME_FIELD.getPreferredName() + "_string", lastMemoryRefreshTime.toEpochMilli()); + } return builder; } @@ -185,10 +212,12 @@ public static class MlMetadataDiff implements NamedDiff { final Diff> jobs; final Diff> datafeeds; + final Instant lastMemoryRefreshTime; MlMetadataDiff(MlMetadata before, MlMetadata after) { this.jobs = DiffableUtils.diff(before.jobs, after.jobs, DiffableUtils.getStringKeySerializer()); this.datafeeds = DiffableUtils.diff(before.datafeeds, after.datafeeds, DiffableUtils.getStringKeySerializer()); + this.lastMemoryRefreshTime = after.lastMemoryRefreshTime; } public MlMetadataDiff(StreamInput in) throws IOException { @@ -196,19 +225,34 @@ public MlMetadataDiff(StreamInput in) throws IOException { MlMetadataDiff::readJobDiffFrom); this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new, MlMetadataDiff::readSchedulerDiffFrom); + if (in.getVersion().onOrAfter(Version.V_6_6_0)) { + lastMemoryRefreshTime = in.readBoolean() ? Instant.ofEpochSecond(in.readVLong(), in.readVInt()) : null; + } else { + lastMemoryRefreshTime = null; + } } @Override public MetaData.Custom apply(MetaData.Custom part) { TreeMap newJobs = new TreeMap<>(jobs.apply(((MlMetadata) part).jobs)); TreeMap newDatafeeds = new TreeMap<>(datafeeds.apply(((MlMetadata) part).datafeeds)); - return new MlMetadata(newJobs, newDatafeeds); + Instant lastMemoryRefreshTime = ((MlMetadata) part).lastMemoryRefreshTime; + return new MlMetadata(newJobs, newDatafeeds, lastMemoryRefreshTime); } @Override public void writeTo(StreamOutput out) throws IOException { jobs.writeTo(out); datafeeds.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_6_6_0)) { + if (lastMemoryRefreshTime == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeVLong(lastMemoryRefreshTime.getEpochSecond()); + out.writeVInt(lastMemoryRefreshTime.getNano()); + } + } } @Override @@ -233,7 +277,8 @@ public boolean equals(Object o) { return false; MlMetadata that = (MlMetadata) o; return Objects.equals(jobs, that.jobs) && - Objects.equals(datafeeds, that.datafeeds); + Objects.equals(datafeeds, that.datafeeds) && + Objects.equals(lastMemoryRefreshTime, that.lastMemoryRefreshTime); } @Override @@ -243,13 +288,14 @@ public final String toString() { @Override public int hashCode() { - return Objects.hash(jobs, datafeeds); + return Objects.hash(jobs, datafeeds, lastMemoryRefreshTime); } public static class Builder { private TreeMap jobs; private TreeMap datafeeds; + private Instant lastMemoryRefreshTime; public Builder() { jobs = new TreeMap<>(); @@ -263,6 +309,7 @@ public Builder(@Nullable MlMetadata previous) { } else { jobs = new TreeMap<>(previous.jobs); datafeeds = new TreeMap<>(previous.datafeeds); + lastMemoryRefreshTime = previous.lastMemoryRefreshTime; } } @@ -382,8 +429,18 @@ private Builder putDatafeeds(Collection datafeeds) { return this; } + Builder setLastMemoryRefreshTimeMs(long lastMemoryRefreshTimeMs) { + lastMemoryRefreshTime = Instant.ofEpochMilli(lastMemoryRefreshTimeMs); + return this; + } + + public Builder setLastMemoryRefreshTime(Instant lastMemoryRefreshTime) { + this.lastMemoryRefreshTime = lastMemoryRefreshTime; + return this; + } + public MlMetadata build() { - return new MlMetadata(jobs, datafeeds); + return new MlMetadata(jobs, datafeeds, lastMemoryRefreshTime); } public void markJobAsDeleting(String jobId, PersistentTasksCustomMetaData tasks, boolean allowDeleteOpenJob) { @@ -420,8 +477,6 @@ void checkJobHasNoDatafeed(String jobId) { } } - - public static MlMetadata getMlMetadata(ClusterState state) { MlMetadata mlMetadata = (state == null) ? null : state.getMetaData().custom(TYPE); if (mlMetadata == null) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 993cf16abecb7..ce820db0babec 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -167,12 +167,12 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j if (memoryTracker.isRecentlyRefreshed() == false) { boolean scheduledRefresh = memoryTracker.asyncRefresh(ActionListener.wrap( - aVoid -> { - // TODO: find a way to get the persistent task framework to do another reassignment check BLOCKER! - // Persistent task allocation reacts to custom metadata changes, so one way would be to retain the - // MlMetadata as a single number that we increment when we want to kick persistent tasks. - // A less sneaky way would be to introduce an internal action specifically for the purpose of - // asking persistent tasks to re-check whether unallocated tasks can be allocated. + acknowledged -> { + if (acknowledged) { + logger.trace("Job memory requirement refresh request completed successfully"); + } else { + logger.warn("Job memory requirement refresh request completed but did not set time in cluster state"); + } }, e -> logger.error("Failed to refresh job memory requirements", e) )); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java index d3ab9b0193afe..65d2e1053804e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java @@ -9,9 +9,15 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.LocalNodeMasterListener; +import org.elasticsearch.cluster.ack.AckedRequest; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.threadpool.ThreadPool; @@ -23,6 +29,8 @@ import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -32,15 +40,27 @@ /** * This class keeps track of the memory requirement of ML jobs. * It only functions on the master node - for this reason it should only be used by master node actions. - * The memory requirement for open ML jobs is updated at the following times: - * 1. When a master node is elected the memory requirement for all non-closed ML jobs is updated - * 2. The memory requirement for all non-closed ML jobs is updated periodically thereafter - every 30 seconds by default - * 3. When a job is opened the memory requirement for that single job is updated - * As a result of this every open job should have a value for its memory requirement that is no more than 30 seconds out-of-date. + * The memory requirement for ML jobs can be updated in 3 ways: + * 1. For all open ML jobs (via {@link #asyncRefresh}) + * 2. For all open ML jobs, plus one named ML job that is not open (via {@link #refreshJobMemoryAndAllOthers}) + * 3. For one named ML job (via {@link #refreshJobMemory}) + * In all cases a listener informs the caller when the requested updates are complete. */ public class MlMemoryTracker implements LocalNodeMasterListener { - private static final Long RECENT_UPDATE_THRESHOLD_NS = 30_000_000_000L; // 30 seconds + private static final AckedRequest ACKED_REQUEST = new AckedRequest() { + @Override + public TimeValue ackTimeout() { + return AcknowledgedRequest.DEFAULT_ACK_TIMEOUT; + } + + @Override + public TimeValue masterNodeTimeout() { + return AcknowledgedRequest.DEFAULT_ACK_TIMEOUT; + } + }; + + private static final Duration RECENT_UPDATE_THRESHOLD = Duration.ofMinutes(1); private final Logger logger = LogManager.getLogger(MlMemoryTracker.class); private final ConcurrentHashMap memoryRequirementByJob = new ConcurrentHashMap<>(); @@ -51,7 +71,7 @@ public class MlMemoryTracker implements LocalNodeMasterListener { private final JobManager jobManager; private final JobResultsProvider jobResultsProvider; private volatile boolean isMaster; - private volatile Long lastUpdateNanotime; + private volatile Instant lastUpdateTime; public MlMemoryTracker(ClusterService clusterService, ThreadPool threadPool, JobManager jobManager, JobResultsProvider jobResultsProvider) { @@ -72,7 +92,7 @@ public void onMaster() { public void offMaster() { isMaster = false; memoryRequirementByJob.clear(); - lastUpdateNanotime = null; + lastUpdateTime = null; } @Override @@ -85,8 +105,8 @@ public String executorName() { * for valid allocation decisions to be made using it? */ public boolean isRecentlyRefreshed() { - Long localLastUpdateNanotime = lastUpdateNanotime; - return localLastUpdateNanotime != null && System.nanoTime() - localLastUpdateNanotime < RECENT_UPDATE_THRESHOLD_NS; + Instant localLastUpdateTime = lastUpdateTime; + return localLastUpdateTime != null && localLastUpdateTime.plus(RECENT_UPDATE_THRESHOLD).isAfter(Instant.now()); } /** @@ -122,16 +142,24 @@ public void removeJob(String jobId) { /** * Uses a separate thread to refresh the memory requirement for every ML job that has * a corresponding persistent task. This method only works on the master node. - * @param listener Will be called when the async refresh completes or fails. + * @param listener Will be called when the async refresh completes or fails. The + * boolean value indicates whether the cluster state was updated + * with the refresh completion time. (If it was then this will in + * cause the persistent tasks framework to check if any persistent + * tasks are awaiting allocation.) * @return true if the async refresh is scheduled, and false * if this is not possible for some reason. */ - public boolean asyncRefresh(ActionListener listener) { + public boolean asyncRefresh(ActionListener listener) { if (isMaster) { try { + ActionListener mlMetaUpdateListener = ActionListener.wrap( + aVoid -> recordUpdateTimeInClusterState(listener), + listener::onFailure + ); threadPool.executor(executorName()).execute( - () -> refresh(clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE), listener)); + () -> refresh(clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE), mlMetaUpdateListener)); return true; } catch (EsRejectedExecutionException e) { logger.debug("Couldn't schedule ML memory update - node might be shutting down", e); @@ -175,7 +203,7 @@ void refresh(PersistentTasksCustomMetaData persistentTasks, ActionListener } ActionListener refreshComplete = ActionListener.wrap(aVoid -> { - lastUpdateNanotime = System.nanoTime(); + lastUpdateTime = Instant.now(); synchronized (fullRefreshCompletionListeners) { assert fullRefreshCompletionListeners.isEmpty() == false; for (ActionListener listener : fullRefreshCompletionListeners) { @@ -195,6 +223,33 @@ void refresh(PersistentTasksCustomMetaData persistentTasks, ActionListener } } + void recordUpdateTimeInClusterState(ActionListener listener) { + + clusterService.submitStateUpdateTask("ml-memory-last-update-time", + new AckedClusterStateUpdateTask(ACKED_REQUEST, listener) { + @Override + protected Boolean newResponse(boolean acknowledged) { + return acknowledged; + } + + @Override + public ClusterState execute(ClusterState currentState) { + MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(currentState); + MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata); + MlMetadata newMlMetadata = builder.build(); + builder.setLastMemoryRefreshTime(lastUpdateTime); + if (newMlMetadata.equals(currentMlMetadata)) { + // Return same reference if nothing has changed + return currentState; + } else { + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, newMlMetadata).build()); + return newState.build(); + } + } + }); + } + private void iterateMlJobTasks(Iterator> iterator, ActionListener refreshComplete) { if (iterator.hasNext()) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index c7ca2ff805eba..1e018227f5c01 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField; +import java.time.Instant; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -69,6 +70,11 @@ protected MlMetadata createTestInstance() { builder.putJob(job, false); } } + if (randomBoolean()) { + // Round-tripping to JSON loses time precision beyond milliseconds, + // so restrict the instant to a whole number of milliseconds + builder.setLastMemoryRefreshTime(Instant.ofEpochMilli(System.currentTimeMillis())); + } return builder.build(); } @@ -438,8 +444,9 @@ protected MlMetadata mutateInstance(MlMetadata instance) { for (Map.Entry entry : datafeeds.entrySet()) { metadataBuilder.putDatafeed(entry.getValue(), Collections.emptyMap()); } + metadataBuilder.setLastMemoryRefreshTime(instance.getLastMemoryRefreshTime()); - switch (between(0, 1)) { + switch (between(0, 2)) { case 0: metadataBuilder.putJob(JobTests.createRandomizedJob(), true); break; @@ -459,6 +466,13 @@ protected MlMetadata mutateInstance(MlMetadata instance) { metadataBuilder.putJob(randomJob, false); metadataBuilder.putDatafeed(datafeedConfig, Collections.emptyMap()); break; + case 2: + if (instance.getLastMemoryRefreshTime() == null) { + metadataBuilder.setLastMemoryRefreshTime(Instant.now()); + } else { + metadataBuilder.setLastMemoryRefreshTime(null); + } + break; default: throw new AssertionError("Illegal randomisation branch"); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java index 4205a216918c0..39a43c763b136 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -54,7 +55,6 @@ public void setup() { memoryTracker = new MlMemoryTracker(clusterService, threadPool, jobManager, jobResultsProvider); } - @SuppressWarnings("unchecked") public void testRefreshAll() { boolean isMaster = randomBoolean(); @@ -74,7 +74,9 @@ public void testRefreshAll() { PersistentTasksCustomMetaData persistentTasks = new PersistentTasksCustomMetaData(numMlJobTasks, tasks); doAnswer(invocation -> { - ((ActionListener) invocation.getArguments()[3]).onResponse(randomLongBetween(1000, 1000000)); + @SuppressWarnings("unchecked") + Consumer listener = (Consumer) invocation.getArguments()[3]; + listener.accept(randomLongBetween(1000, 1000000)); return null; }).when(jobResultsProvider).getEstablishedMemoryUsage(any(), any(), any(), any(), any()); @@ -90,7 +92,6 @@ public void testRefreshAll() { } } - @SuppressWarnings("unchecked") public void testRefreshOne() { boolean isMaster = randomBoolean(); @@ -105,7 +106,9 @@ public void testRefreshOne() { long modelBytes = 1024 * 1024; doAnswer(invocation -> { - ((ActionListener) invocation.getArguments()[3]).onResponse(haveEstablishedModelMemory ? modelBytes : 0L); + @SuppressWarnings("unchecked") + Consumer listener = (Consumer) invocation.getArguments()[3]; + listener.accept(haveEstablishedModelMemory ? modelBytes : 0L); return null; }).when(jobResultsProvider).getEstablishedMemoryUsage(eq(jobId), any(), any(), any(), any()); @@ -113,7 +116,9 @@ public void testRefreshOne() { Job job = mock(Job.class); when(job.getAnalysisLimits()).thenReturn(new AnalysisLimits(modelMemoryLimitMb, 4L)); doAnswer(invocation -> { - ((ActionListener) invocation.getArguments()[1]).onResponse(job); + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation.getArguments()[1]; + listener.onResponse(job); return null; }).when(jobManager).getJob(eq(jobId), any()); From 10a1467680452a30dcb4f9403e8d8931b0fa4bf2 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Thu, 8 Nov 2018 15:02:44 +0000 Subject: [PATCH 4/7] Improve comments --- .../xpack/ml/action/TransportDeleteJobAction.java | 2 ++ .../elasticsearch/xpack/ml/process/MlMemoryTracker.java | 8 ++++++++ 2 files changed, 10 insertions(+) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index 73e98b7b0c67b..10e6b8093f763 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -214,6 +214,8 @@ private void notifyListeners(String jobId, @Nullable AcknowledgedResponse ack, @ private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJobAction.Request request, ActionListener listener) { String jobId = request.getJobId(); + + // We clean up the memory tracker on delete rather than close as close is not a master node action memoryTracker.removeJob(jobId); // Step 4. When the job has been removed from the cluster state, return a response diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java index 65d2e1053804e..a222789793fd6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java @@ -135,6 +135,11 @@ public Long getJobMemoryRequirement(String jobId) { return null; } + /** + * Remove any memory requirement that is stored for the specified job. + * It doesn't matter if this method is called for a job that doesn't have + * a stored memory requirement. + */ public void removeJob(String jobId) { memoryRequirementByJob.remove(jobId); } @@ -191,6 +196,9 @@ public void refreshJobMemoryAndAllOthers(String jobId, ActionListener list /** * This refreshes the memory requirement for every ML job that has a corresponding persistent task. + * It does NOT remove entries for jobs that no longer have a persistent task, because that would + * lead to a race where a job was opened part way through the refresh. (Instead, entries are removed + * when jobs are deleted.) */ void refresh(PersistentTasksCustomMetaData persistentTasks, ActionListener onCompletion) { From 7ac0883d23c8b3c22979e0ddfd15abe9cc09756f Mon Sep 17 00:00:00 2001 From: David Roberts Date: Thu, 8 Nov 2018 16:53:34 +0000 Subject: [PATCH 5/7] Fix bug and add test --- .../xpack/ml/process/MlMemoryTracker.java | 4 +- .../ml/process/MlMemoryTrackerTests.java | 52 +++++++++++++++++-- 2 files changed, 51 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java index a222789793fd6..a769780f435d7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java @@ -231,7 +231,7 @@ void refresh(PersistentTasksCustomMetaData persistentTasks, ActionListener } } - void recordUpdateTimeInClusterState(ActionListener listener) { + private void recordUpdateTimeInClusterState(ActionListener listener) { clusterService.submitStateUpdateTask("ml-memory-last-update-time", new AckedClusterStateUpdateTask(ACKED_REQUEST, listener) { @@ -244,8 +244,8 @@ protected Boolean newResponse(boolean acknowledged) { public ClusterState execute(ClusterState currentState) { MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(currentState); MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata); - MlMetadata newMlMetadata = builder.build(); builder.setLastMemoryRefreshTime(lastUpdateTime); + MlMetadata newMlMetadata = builder.build(); if (newMlMetadata.equals(currentMlMetadata)) { // Return same reference if nothing has changed return currentState; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java index 39a43c763b136..5e35e4d1533a9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java @@ -6,11 +6,14 @@ package org.elasticsearch.xpack.ml.process; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; @@ -19,12 +22,15 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.junit.Before; +import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.anyString; @@ -49,6 +55,12 @@ public void setup() { clusterService = mock(ClusterService.class); threadPool = mock(ThreadPool.class); ExecutorService executorService = mock(ExecutorService.class); + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + Runnable r = (Runnable) invocation.getArguments()[0]; + r.run(); + return null; + }).when(executorService).execute(any(Runnable.class)); when(threadPool.executor(anyString())).thenReturn(executorService); jobManager = mock(JobManager.class); jobResultsProvider = mock(JobResultsProvider.class); @@ -78,7 +90,7 @@ public void testRefreshAll() { Consumer listener = (Consumer) invocation.getArguments()[3]; listener.accept(randomLongBetween(1000, 1000000)); return null; - }).when(jobResultsProvider).getEstablishedMemoryUsage(any(), any(), any(), any(), any()); + }).when(jobResultsProvider).getEstablishedMemoryUsage(anyString(), any(), any(), any(Consumer.class), any()); memoryTracker.refresh(persistentTasks, ActionListener.wrap(aVoid -> {}, ESTestCase::assertNull)); @@ -110,7 +122,7 @@ public void testRefreshOne() { Consumer listener = (Consumer) invocation.getArguments()[3]; listener.accept(haveEstablishedModelMemory ? modelBytes : 0L); return null; - }).when(jobResultsProvider).getEstablishedMemoryUsage(eq(jobId), any(), any(), any(), any()); + }).when(jobResultsProvider).getEstablishedMemoryUsage(eq(jobId), any(), any(), any(Consumer.class), any()); long modelMemoryLimitMb = 2; Job job = mock(Job.class); @@ -120,7 +132,7 @@ public void testRefreshOne() { ActionListener listener = (ActionListener) invocation.getArguments()[1]; listener.onResponse(job); return null; - }).when(jobManager).getJob(eq(jobId), any()); + }).when(jobManager).getJob(eq(jobId), any(ActionListener.class)); AtomicReference refreshedMemoryRequirement = new AtomicReference<>(); memoryTracker.refreshJobMemory(jobId, ActionListener.wrap(refreshedMemoryRequirement::set, ESTestCase::assertNull)); @@ -143,6 +155,40 @@ public void testRefreshOne() { assertNull(memoryTracker.getJobMemoryRequirement(jobId)); } + @SuppressWarnings("unchecked") + public void testRecordUpdateTimeInClusterState() { + + boolean isMaster = randomBoolean(); + if (isMaster) { + memoryTracker.onMaster(); + } else { + memoryTracker.offMaster(); + } + + when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); + + AtomicReference updateTime = new AtomicReference<>(); + + doAnswer(invocation -> { + AckedClusterStateUpdateTask task = (AckedClusterStateUpdateTask) invocation.getArguments()[1]; + ClusterState currentClusterState = ClusterState.EMPTY_STATE; + ClusterState newClusterState = task.execute(currentClusterState); + assertThat(currentClusterState, not(equalTo(newClusterState))); + MlMetadata newMlMetadata = MlMetadata.getMlMetadata(newClusterState); + updateTime.set(newMlMetadata.getLastMemoryRefreshTime()); + task.onAllNodesAcked(null); + return null; + }).when(clusterService).submitStateUpdateTask(anyString(), any(AckedClusterStateUpdateTask.class)); + + memoryTracker.asyncRefresh(ActionListener.wrap(ESTestCase::assertTrue, ESTestCase::assertNull)); + + if (isMaster) { + assertNotNull(updateTime.get()); + } else { + assertNull(updateTime.get()); + } + } + private PersistentTasksCustomMetaData.PersistentTask makeTestTask(String jobId) { return new PersistentTasksCustomMetaData.PersistentTask<>("job-" + jobId, MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams(jobId), 0, PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT); From 7492b979d033c75b45f43c48fc8f973283d71701 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Fri, 9 Nov 2018 17:16:05 +0000 Subject: [PATCH 6/7] Add an integration test Adding this test showed that there was a problem with using Instant.now() in a cluster state update. It is possible that the cluster state update can get applied on more than one master node if the cluster is unstable, and it's essential that they both apply exactly the same change. Therefore I changed the discriminant to the old cluster state version plus one. The actual time is not required. We just need a field whose change will kick persistent tasks. --- .../xpack/core/ml/MlMetadata.java | 72 ++++++--------- .../xpack/ml/process/MlMemoryTracker.java | 4 +- .../xpack/ml/MlMetadataTests.java | 13 ++- .../integration/MlDistributedFailureIT.java | 89 +++++++++++++++++-- .../ml/process/MlMemoryTrackerTests.java | 9 +- 5 files changed, 121 insertions(+), 66 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index d15e056e251c3..bf7ab8b1ced9f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -40,7 +40,6 @@ import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import java.io.IOException; -import java.time.Instant; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -58,7 +57,7 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { public static final String TYPE = "ml"; private static final ParseField JOBS_FIELD = new ParseField("jobs"); private static final ParseField DATAFEEDS_FIELD = new ParseField("datafeeds"); - private static final ParseField LAST_MEMORY_REFRESH_TIME_FIELD = new ParseField("last_memory_refresh_time"); + private static final ParseField LAST_MEMORY_REFRESH_VERSION_FIELD = new ParseField("last_memory_refresh_version"); public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap(), null); // This parser follows the pattern that metadata is parsed leniently (to allow for enhancements) @@ -68,18 +67,18 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { LENIENT_PARSER.declareObjectArray(Builder::putJobs, (p, c) -> Job.LENIENT_PARSER.apply(p, c).build(), JOBS_FIELD); LENIENT_PARSER.declareObjectArray(Builder::putDatafeeds, (p, c) -> DatafeedConfig.LENIENT_PARSER.apply(p, c).build(), DATAFEEDS_FIELD); - LENIENT_PARSER.declareLong(Builder::setLastMemoryRefreshTimeMs, LAST_MEMORY_REFRESH_TIME_FIELD); + LENIENT_PARSER.declareLong(Builder::setLastMemoryRefreshVersion, LAST_MEMORY_REFRESH_VERSION_FIELD); } private final SortedMap jobs; private final SortedMap datafeeds; - private final Instant lastMemoryRefreshTime; + private final Long lastMemoryRefreshVersion; private final GroupOrJobLookup groupOrJobLookup; - private MlMetadata(SortedMap jobs, SortedMap datafeeds, Instant lastMemoryRefreshTime) { + private MlMetadata(SortedMap jobs, SortedMap datafeeds, Long lastMemoryRefreshVersion) { this.jobs = Collections.unmodifiableSortedMap(jobs); this.datafeeds = Collections.unmodifiableSortedMap(datafeeds); - this.lastMemoryRefreshTime = lastMemoryRefreshTime; + this.lastMemoryRefreshVersion = lastMemoryRefreshVersion; this.groupOrJobLookup = new GroupOrJobLookup(jobs.values()); } @@ -117,8 +116,8 @@ public Set expandDatafeedIds(String expression, boolean allowNoDatafeeds .expand(expression, allowNoDatafeeds); } - public Instant getLastMemoryRefreshTime() { - return lastMemoryRefreshTime; + public Long getLastMemoryRefreshVersion() { + return lastMemoryRefreshVersion; } @Override @@ -155,9 +154,9 @@ public MlMetadata(StreamInput in) throws IOException { } this.datafeeds = datafeeds; if (in.getVersion().onOrAfter(Version.V_6_6_0)) { - lastMemoryRefreshTime = in.readBoolean() ? Instant.ofEpochSecond(in.readVLong(), in.readVInt()) : null; + lastMemoryRefreshVersion = in.readOptionalLong(); } else { - lastMemoryRefreshTime = null; + lastMemoryRefreshVersion = null; } this.groupOrJobLookup = new GroupOrJobLookup(jobs.values()); } @@ -167,13 +166,7 @@ public void writeTo(StreamOutput out) throws IOException { writeMap(jobs, out); writeMap(datafeeds, out); if (out.getVersion().onOrAfter(Version.V_6_6_0)) { - if (lastMemoryRefreshTime == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - out.writeVLong(lastMemoryRefreshTime.getEpochSecond()); - out.writeVInt(lastMemoryRefreshTime.getNano()); - } + out.writeOptionalLong(lastMemoryRefreshVersion); } } @@ -191,10 +184,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"), params); mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams); mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams); - if (lastMemoryRefreshTime != null) { - // We lose precision lower than milliseconds here - OK as millisecond precision is adequate for this use case - builder.timeField(LAST_MEMORY_REFRESH_TIME_FIELD.getPreferredName(), - LAST_MEMORY_REFRESH_TIME_FIELD.getPreferredName() + "_string", lastMemoryRefreshTime.toEpochMilli()); + if (lastMemoryRefreshVersion != null) { + builder.field(LAST_MEMORY_REFRESH_VERSION_FIELD.getPreferredName(), lastMemoryRefreshVersion); } return builder; } @@ -212,12 +203,12 @@ public static class MlMetadataDiff implements NamedDiff { final Diff> jobs; final Diff> datafeeds; - final Instant lastMemoryRefreshTime; + final Long lastMemoryRefreshVersion; MlMetadataDiff(MlMetadata before, MlMetadata after) { this.jobs = DiffableUtils.diff(before.jobs, after.jobs, DiffableUtils.getStringKeySerializer()); this.datafeeds = DiffableUtils.diff(before.datafeeds, after.datafeeds, DiffableUtils.getStringKeySerializer()); - this.lastMemoryRefreshTime = after.lastMemoryRefreshTime; + this.lastMemoryRefreshVersion = after.lastMemoryRefreshVersion; } public MlMetadataDiff(StreamInput in) throws IOException { @@ -226,9 +217,9 @@ public MlMetadataDiff(StreamInput in) throws IOException { this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new, MlMetadataDiff::readSchedulerDiffFrom); if (in.getVersion().onOrAfter(Version.V_6_6_0)) { - lastMemoryRefreshTime = in.readBoolean() ? Instant.ofEpochSecond(in.readVLong(), in.readVInt()) : null; + lastMemoryRefreshVersion = in.readOptionalLong(); } else { - lastMemoryRefreshTime = null; + lastMemoryRefreshVersion = null; } } @@ -236,8 +227,8 @@ public MlMetadataDiff(StreamInput in) throws IOException { public MetaData.Custom apply(MetaData.Custom part) { TreeMap newJobs = new TreeMap<>(jobs.apply(((MlMetadata) part).jobs)); TreeMap newDatafeeds = new TreeMap<>(datafeeds.apply(((MlMetadata) part).datafeeds)); - Instant lastMemoryRefreshTime = ((MlMetadata) part).lastMemoryRefreshTime; - return new MlMetadata(newJobs, newDatafeeds, lastMemoryRefreshTime); + Long lastMemoryRefreshVersion = ((MlMetadata) part).lastMemoryRefreshVersion; + return new MlMetadata(newJobs, newDatafeeds, lastMemoryRefreshVersion); } @Override @@ -245,13 +236,7 @@ public void writeTo(StreamOutput out) throws IOException { jobs.writeTo(out); datafeeds.writeTo(out); if (out.getVersion().onOrAfter(Version.V_6_6_0)) { - if (lastMemoryRefreshTime == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - out.writeVLong(lastMemoryRefreshTime.getEpochSecond()); - out.writeVInt(lastMemoryRefreshTime.getNano()); - } + out.writeOptionalLong(lastMemoryRefreshVersion); } } @@ -278,7 +263,7 @@ public boolean equals(Object o) { MlMetadata that = (MlMetadata) o; return Objects.equals(jobs, that.jobs) && Objects.equals(datafeeds, that.datafeeds) && - Objects.equals(lastMemoryRefreshTime, that.lastMemoryRefreshTime); + Objects.equals(lastMemoryRefreshVersion, that.lastMemoryRefreshVersion); } @Override @@ -288,14 +273,14 @@ public final String toString() { @Override public int hashCode() { - return Objects.hash(jobs, datafeeds, lastMemoryRefreshTime); + return Objects.hash(jobs, datafeeds, lastMemoryRefreshVersion); } public static class Builder { private TreeMap jobs; private TreeMap datafeeds; - private Instant lastMemoryRefreshTime; + private Long lastMemoryRefreshVersion; public Builder() { jobs = new TreeMap<>(); @@ -309,7 +294,7 @@ public Builder(@Nullable MlMetadata previous) { } else { jobs = new TreeMap<>(previous.jobs); datafeeds = new TreeMap<>(previous.datafeeds); - lastMemoryRefreshTime = previous.lastMemoryRefreshTime; + lastMemoryRefreshVersion = previous.lastMemoryRefreshVersion; } } @@ -429,18 +414,13 @@ private Builder putDatafeeds(Collection datafeeds) { return this; } - Builder setLastMemoryRefreshTimeMs(long lastMemoryRefreshTimeMs) { - lastMemoryRefreshTime = Instant.ofEpochMilli(lastMemoryRefreshTimeMs); - return this; - } - - public Builder setLastMemoryRefreshTime(Instant lastMemoryRefreshTime) { - this.lastMemoryRefreshTime = lastMemoryRefreshTime; + public Builder setLastMemoryRefreshVersion(Long lastMemoryRefreshVersion) { + this.lastMemoryRefreshVersion = lastMemoryRefreshVersion; return this; } public MlMetadata build() { - return new MlMetadata(jobs, datafeeds, lastMemoryRefreshTime); + return new MlMetadata(jobs, datafeeds, lastMemoryRefreshVersion); } public void markJobAsDeleting(String jobId, PersistentTasksCustomMetaData tasks, boolean allowDeleteOpenJob) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java index a769780f435d7..2ea92d98391eb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java @@ -91,6 +91,7 @@ public void onMaster() { @Override public void offMaster() { isMaster = false; + logger.trace("ML memory tracker off master"); memoryRequirementByJob.clear(); lastUpdateTime = null; } @@ -117,6 +118,7 @@ public boolean isRecentlyRefreshed() { * or null if it cannot be calculated. */ public Long getJobMemoryRequirement(String jobId) { + if (isMaster == false) { return null; } @@ -244,7 +246,7 @@ protected Boolean newResponse(boolean acknowledged) { public ClusterState execute(ClusterState currentState) { MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(currentState); MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata); - builder.setLastMemoryRefreshTime(lastUpdateTime); + builder.setLastMemoryRefreshVersion(currentState.getVersion() + 1); MlMetadata newMlMetadata = builder.build(); if (newMlMetadata.equals(currentMlMetadata)) { // Return same reference if nothing has changed diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index 1e018227f5c01..eb58221bf5f35 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField; -import java.time.Instant; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -71,9 +70,7 @@ protected MlMetadata createTestInstance() { } } if (randomBoolean()) { - // Round-tripping to JSON loses time precision beyond milliseconds, - // so restrict the instant to a whole number of milliseconds - builder.setLastMemoryRefreshTime(Instant.ofEpochMilli(System.currentTimeMillis())); + builder.setLastMemoryRefreshVersion(randomNonNegativeLong()); } return builder.build(); } @@ -444,7 +441,7 @@ protected MlMetadata mutateInstance(MlMetadata instance) { for (Map.Entry entry : datafeeds.entrySet()) { metadataBuilder.putDatafeed(entry.getValue(), Collections.emptyMap()); } - metadataBuilder.setLastMemoryRefreshTime(instance.getLastMemoryRefreshTime()); + metadataBuilder.setLastMemoryRefreshVersion(instance.getLastMemoryRefreshVersion()); switch (between(0, 2)) { case 0: @@ -467,10 +464,10 @@ protected MlMetadata mutateInstance(MlMetadata instance) { metadataBuilder.putDatafeed(datafeedConfig, Collections.emptyMap()); break; case 2: - if (instance.getLastMemoryRefreshTime() == null) { - metadataBuilder.setLastMemoryRefreshTime(Instant.now()); + if (instance.getLastMemoryRefreshVersion() == null) { + metadataBuilder.setLastMemoryRefreshVersion(randomNonNegativeLong()); } else { - metadataBuilder.setLastMemoryRefreshTime(null); + metadataBuilder.setLastMemoryRefreshVersion(null); } break; default: diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index 2e14289da705e..37f789fee6067 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -8,10 +8,13 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentHelper; @@ -20,7 +23,6 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction.Response.DatafeedStats; @@ -31,21 +33,32 @@ import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; +import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.elasticsearch.persistent.PersistentTasksClusterService.needsReassignment; public class MlDistributedFailureIT extends BaseMlIntegTestCase { + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)) + .put(MachineLearning.CONCURRENT_JOB_ALLOCATIONS.getKey(), 4) + .build(); + } + public void testFailOver() throws Exception { internalCluster().ensureAtLeastNumDataNodes(3); ensureStableClusterOnAllNodes(3); @@ -58,8 +71,6 @@ public void testFailOver() throws Exception { }); } - @TestLogging("org.elasticsearch.xpack.ml.action:DEBUG,org.elasticsearch.xpack.persistent:TRACE," + - "org.elasticsearch.xpack.ml.datafeed:TRACE") public void testLoseDedicatedMasterNode() throws Exception { internalCluster().ensureAtMostNumDataNodes(0); logger.info("Starting dedicated master node..."); @@ -136,12 +147,12 @@ public void testCloseUnassignedJobAndDatafeed() throws Exception { // Job state is opened but the job is not assigned to a node (because we just killed the only ML node) GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(jobId); GetJobsStatsAction.Response jobStatsResponse = client().execute(GetJobsStatsAction.INSTANCE, jobStatsRequest).actionGet(); - assertEquals(jobStatsResponse.getResponse().results().get(0).getState(), JobState.OPENED); + assertEquals(JobState.OPENED, jobStatsResponse.getResponse().results().get(0).getState()); GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request(datafeedId); GetDatafeedsStatsAction.Response datafeedStatsResponse = client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet(); - assertEquals(datafeedStatsResponse.getResponse().results().get(0).getDatafeedState(), DatafeedState.STARTED); + assertEquals(DatafeedState.STARTED, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState()); // Can't normal stop an unassigned datafeed StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId); @@ -170,6 +181,72 @@ public void testCloseUnassignedJobAndDatafeed() throws Exception { assertTrue(closeJobResponse.isClosed()); } + public void testJobRelocationIsMemoryAware() throws Exception { + + internalCluster().ensureAtLeastNumDataNodes(1); + ensureStableClusterOnAllNodes(1); + + // Open 4 small jobs. Since there is only 1 node in the cluster they'll have to go on that node. + + setupJobWithoutDatafeed("small1", new ByteSizeValue(2, ByteSizeUnit.MB)); + setupJobWithoutDatafeed("small2", new ByteSizeValue(2, ByteSizeUnit.MB)); + setupJobWithoutDatafeed("small3", new ByteSizeValue(2, ByteSizeUnit.MB)); + setupJobWithoutDatafeed("small4", new ByteSizeValue(2, ByteSizeUnit.MB)); + + // Expand the cluster to 3 nodes. The 4 small jobs will stay on the + // same node because we don't rebalance jobs that are happily running. + + internalCluster().ensureAtLeastNumDataNodes(3); + ensureStableClusterOnAllNodes(3); + + // Open a big job. This should go on a different node to the 4 small ones. + + setupJobWithoutDatafeed("big1", new ByteSizeValue(500, ByteSizeUnit.MB)); + + // Stop the current master node - this should be the one with the 4 small jobs on. + + internalCluster().stopCurrentMasterNode(); + ensureStableClusterOnAllNodes(2); + + // If memory requirements are used to reallocate the 4 small jobs (as we expect) then they should + // all reallocate to the same node, that being the one that doesn't have the big job on. If job counts + // are used to reallocate the small jobs then this implies the fallback allocation mechanism has been + // used in a situation we don't want it to be used in, and at least one of the small jobs will be on + // the same node as the big job. (This all relies on xpack.ml.node_concurrent_job_allocations being set + // to at least 4, which we do in the nodeSettings() method.) + + assertBusy(() -> { + GetJobsStatsAction.Response statsResponse = + client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(MetaData.ALL)).actionGet(); + QueryPage jobStats = statsResponse.getResponse(); + assertNotNull(jobStats); + List smallJobNodes = jobStats.results().stream().filter(s -> s.getJobId().startsWith("small") && s.getNode() != null) + .map(s -> s.getNode().getName()).collect(Collectors.toList()); + List bigJobNodes = jobStats.results().stream().filter(s -> s.getJobId().startsWith("big") && s.getNode() != null) + .map(s -> s.getNode().getName()).collect(Collectors.toList()); + logger.info("small job nodes: " + smallJobNodes + ", big job nodes: " + bigJobNodes); + assertEquals(5, jobStats.count()); + assertEquals(4, smallJobNodes.size()); + assertEquals(1, bigJobNodes.size()); + assertEquals(1L, smallJobNodes.stream().distinct().count()); + assertEquals(1L, bigJobNodes.stream().distinct().count()); + assertNotEquals(smallJobNodes, bigJobNodes); + }); + } + + private void setupJobWithoutDatafeed(String jobId, ByteSizeValue modelMemoryLimit) throws Exception { + Job.Builder job = createFareQuoteJob(jobId, modelMemoryLimit); + PutJobAction.Request putJobRequest = new PutJobAction.Request(job); + client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet(); + + client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).actionGet(); + assertBusy(() -> { + GetJobsStatsAction.Response statsResponse = + client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet(); + assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState()); + }); + } + private void setupJobAndDatafeed(String jobId, String datafeedId) throws Exception { Job.Builder job = createScheduledJob(jobId); PutJobAction.Request putJobRequest = new PutJobAction.Request(job); @@ -183,7 +260,7 @@ private void setupJobAndDatafeed(String jobId, String datafeedId) throws Excepti assertBusy(() -> { GetJobsStatsAction.Response statsResponse = client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet(); - assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED); + assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState()); }); StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(config.getId(), 0L); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java index 5e35e4d1533a9..cbba7ffa04972 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.junit.Before; -import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -167,7 +166,7 @@ public void testRecordUpdateTimeInClusterState() { when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); - AtomicReference updateTime = new AtomicReference<>(); + AtomicReference updateVersion = new AtomicReference<>(); doAnswer(invocation -> { AckedClusterStateUpdateTask task = (AckedClusterStateUpdateTask) invocation.getArguments()[1]; @@ -175,7 +174,7 @@ public void testRecordUpdateTimeInClusterState() { ClusterState newClusterState = task.execute(currentClusterState); assertThat(currentClusterState, not(equalTo(newClusterState))); MlMetadata newMlMetadata = MlMetadata.getMlMetadata(newClusterState); - updateTime.set(newMlMetadata.getLastMemoryRefreshTime()); + updateVersion.set(newMlMetadata.getLastMemoryRefreshVersion()); task.onAllNodesAcked(null); return null; }).when(clusterService).submitStateUpdateTask(anyString(), any(AckedClusterStateUpdateTask.class)); @@ -183,9 +182,9 @@ public void testRecordUpdateTimeInClusterState() { memoryTracker.asyncRefresh(ActionListener.wrap(ESTestCase::assertTrue, ESTestCase::assertNull)); if (isMaster) { - assertNotNull(updateTime.get()); + assertNotNull(updateVersion.get()); } else { - assertNull(updateTime.get()); + assertNull(updateVersion.get()); } } From 4e004e409883b2ec3fa03ec5dd4770adc6381fc1 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Mon, 12 Nov 2018 16:06:34 +0000 Subject: [PATCH 7/7] Fix applying diff Also added test logging in case it fails again --- .../org/elasticsearch/xpack/core/ml/MlMetadata.java | 11 ++++++++--- .../xpack/ml/integration/MlDistributedFailureIT.java | 2 ++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index bf7ab8b1ced9f..febed3d97efbb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -215,7 +215,7 @@ public MlMetadataDiff(StreamInput in) throws IOException { this.jobs = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Job::new, MlMetadataDiff::readJobDiffFrom); this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new, - MlMetadataDiff::readSchedulerDiffFrom); + MlMetadataDiff::readDatafeedDiffFrom); if (in.getVersion().onOrAfter(Version.V_6_6_0)) { lastMemoryRefreshVersion = in.readOptionalLong(); } else { @@ -223,11 +223,16 @@ public MlMetadataDiff(StreamInput in) throws IOException { } } + /** + * Merge the diff with the ML metadata. + * @param part The current ML metadata. + * @return The new ML metadata. + */ @Override public MetaData.Custom apply(MetaData.Custom part) { TreeMap newJobs = new TreeMap<>(jobs.apply(((MlMetadata) part).jobs)); TreeMap newDatafeeds = new TreeMap<>(datafeeds.apply(((MlMetadata) part).datafeeds)); - Long lastMemoryRefreshVersion = ((MlMetadata) part).lastMemoryRefreshVersion; + // lastMemoryRefreshVersion always comes from the diff - no need to merge with the old value return new MlMetadata(newJobs, newDatafeeds, lastMemoryRefreshVersion); } @@ -249,7 +254,7 @@ static Diff readJobDiffFrom(StreamInput in) throws IOException { return AbstractDiffable.readDiffFrom(Job::new, in); } - static Diff readSchedulerDiffFrom(StreamInput in) throws IOException { + static Diff readDatafeedDiffFrom(StreamInput in) throws IOException { return AbstractDiffable.readDiffFrom(DatafeedConfig::new, in); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index 37f789fee6067..5e4d8fd06030c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction.Response.DatafeedStats; @@ -181,6 +182,7 @@ public void testCloseUnassignedJobAndDatafeed() throws Exception { assertTrue(closeJobResponse.isClosed()); } + @TestLogging("org.elasticsearch.xpack.ml.action:TRACE,org.elasticsearch.xpack.ml.process:TRACE") public void testJobRelocationIsMemoryAware() throws Exception { internalCluster().ensureAtLeastNumDataNodes(1);