Skip to content

Commit

Permalink
move MINIMAL_SUPPORTED_VERSION_FOR_MODEL_TTL to MLDeploySetting
Browse files Browse the repository at this point in the history
Signed-off-by: Xun Zhang <[email protected]>
  • Loading branch information
Zhangxunmt committed Apr 30, 2024
1 parent 16416ac commit 7c1dc81
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.ml.common.transport.sync.MLSyncUpInput;

import java.io.IOException;

Expand All @@ -28,6 +27,7 @@ public class MLDeploySetting implements ToXContentObject, Writeable {
public static final String IS_AUTO_DEPLOY_ENABLED_FIELD = "is_auto_deploy_enabled";
public static final String MODEL_TTL_MINUTES_FIELD = "model_ttl_minutes";
private static final long DEFAULT_TTL_MINUTES = -1;
public static final Version MINIMAL_SUPPORTED_VERSION_FOR_MODEL_TTL = Version.V_2_14_0;

private Boolean isAutoDeployEnabled;
private Long modelTTLInMinutes; // in minutes
Expand All @@ -44,7 +44,7 @@ public MLDeploySetting(Boolean isAutoDeployEnabled, Long modelTTLInMinutes) {
public MLDeploySetting(StreamInput in) throws IOException {
this.isAutoDeployEnabled = in.readOptionalBoolean();
Version streamInputVersion = in.getVersion();
if (streamInputVersion.onOrAfter(MLSyncUpInput.MINIMAL_SUPPORTED_VERSION_FOR_MODEL_TTL)) {
if (streamInputVersion.onOrAfter(MINIMAL_SUPPORTED_VERSION_FOR_MODEL_TTL)) {
this.modelTTLInMinutes = in.readOptionalLong();
}
}
Expand All @@ -53,7 +53,7 @@ public MLDeploySetting(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
Version streamOutputVersion = out.getVersion();
out.writeOptionalBoolean(isAutoDeployEnabled);
if (streamOutputVersion.onOrAfter(MLSyncUpInput.MINIMAL_SUPPORTED_VERSION_FOR_MODEL_TTL)) {
if (streamOutputVersion.onOrAfter(MINIMAL_SUPPORTED_VERSION_FOR_MODEL_TTL)) {
out.writeOptionalLong(modelTTLInMinutes);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ public class MLSyncUpInput implements Writeable {
// profile API has consistent data with model index.
private Map<String, Boolean> deployToAllNodes;

public static final Version MINIMAL_SUPPORTED_VERSION_FOR_MODEL_TTL = Version.V_2_14_0;

@Builder
public MLSyncUpInput(boolean getDeployedModels,
Map<String, String[]> addedWorkerNodes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.ml.common.model.MLDeploySetting;

import java.io.IOException;

Expand Down Expand Up @@ -42,7 +43,7 @@ public MLSyncUpNodeResponse(StreamInput in) throws IOException {
this.deployedModelIds = in.readOptionalStringArray();
this.runningDeployModelIds = in.readOptionalStringArray();
this.runningDeployModelTaskIds = in.readOptionalStringArray();
if (streamInputVersion.onOrAfter(MLSyncUpInput.MINIMAL_SUPPORTED_VERSION_FOR_MODEL_TTL)) {
if (streamInputVersion.onOrAfter(MLDeploySetting.MINIMAL_SUPPORTED_VERSION_FOR_MODEL_TTL)) {
this.expiredModelIds = in.readOptionalStringArray();
}
}
Expand All @@ -59,7 +60,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalStringArray(deployedModelIds);
out.writeOptionalStringArray(runningDeployModelIds);
out.writeOptionalStringArray(runningDeployModelTaskIds);
if (streamOutputVersion.onOrAfter(MLSyncUpInput.MINIMAL_SUPPORTED_VERSION_FOR_MODEL_TTL)) {
if (streamOutputVersion.onOrAfter(MLDeploySetting.MINIMAL_SUPPORTED_VERSION_FOR_MODEL_TTL)) {
out.writeOptionalStringArray(expiredModelIds);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void run() {

Set<String> modelsToUndeploy = new HashSet<>();
for (String modelId : expiredModelToNodes.keySet()) {
if (expiredModelToNodes.get(modelId).size() == modelWorkerNodes.get(modelId).size()) {
if (modelWorkerNodes.containsKey(modelId) && expiredModelToNodes.get(modelId).size() == modelWorkerNodes.get(modelId).size()) {
// this model has expired in all the nodes
modelWorkerNodes.remove(modelId);
modelsToUndeploy.add(modelId);
Expand Down

0 comments on commit 7c1dc81

Please sign in to comment.