From bc39114097ba92f24a9a69147fcb45d1c863da07 Mon Sep 17 00:00:00 2001 From: zane-neo Date: Tue, 14 Nov 2023 11:24:38 +0800 Subject: [PATCH 1/6] Fix cluster level restart model not auto redeploy issue Signed-off-by: zane-neo --- .../MLCommonsClusterManagerEventListener.java | 12 +++++++- .../opensearch/ml/cluster/MLSyncUpCron.java | 29 ++++++++++++------- .../ml/plugin/MachineLearningPlugin.java | 3 +- 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java b/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java index c3012fdade..6e98e6414d 100644 --- a/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java +++ b/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java @@ -14,6 +14,7 @@ import org.opensearch.common.lifecycle.LifecycleListener; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.ml.autoredeploy.MLModelAutoReDeployer; import org.opensearch.ml.engine.encryptor.Encryptor; import org.opensearch.ml.indices.MLIndicesHandler; import org.opensearch.threadpool.Scheduler; @@ -21,6 +22,9 @@ import lombok.extern.log4j.Log4j2; +import java.util.Arrays; +import java.util.List; + @Log4j2 public class MLCommonsClusterManagerEventListener implements LocalNodeClusterManagerListener { @@ -35,6 +39,8 @@ public class MLCommonsClusterManagerEventListener implements LocalNodeClusterMan private volatile Integer jobInterval; + private final MLModelAutoReDeployer mlModelAutoReDeployer; + public MLCommonsClusterManagerEventListener( ClusterService clusterService, Client client, @@ -42,7 +48,8 @@ public MLCommonsClusterManagerEventListener( ThreadPool threadPool, DiscoveryNodeHelper nodeHelper, MLIndicesHandler mlIndicesHandler, - Encryptor encryptor + Encryptor encryptor, + MLModelAutoReDeployer modelAutoReDeployer ) { this.clusterService = clusterService; this.client = client; @@ -51,6 +58,7 @@ public MLCommonsClusterManagerEventListener( this.nodeHelper = nodeHelper; this.mlIndicesHandler = mlIndicesHandler; this.encryptor = encryptor; + this.mlModelAutoReDeployer = modelAutoReDeployer; this.jobInterval = ML_COMMONS_SYNC_UP_JOB_INTERVAL_IN_SECONDS.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_SYNC_UP_JOB_INTERVAL_IN_SECONDS, it -> { @@ -62,6 +70,8 @@ public MLCommonsClusterManagerEventListener( @Override public void onClusterManager() { + String localNodeId = clusterService.localNode().getId(); + mlModelAutoReDeployer.buildAutoReloadArrangement(List.of(localNodeId), localNodeId); if (syncModelRoutingCron == null) { startSyncModelRoutingCron(); } diff --git a/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java b/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java index 3a5ea83347..10f8b9502f 100644 --- a/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java +++ b/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java @@ -32,8 +32,10 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.util.CollectionUtils; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.TermsQueryBuilder; +import org.opensearch.ml.autoredeploy.MLModelAutoReDeployer; import org.opensearch.ml.common.FunctionName; import org.opensearch.ml.common.MLModel; import org.opensearch.ml.common.model.MLModelState; @@ -63,6 +65,7 @@ public class MLSyncUpCron implements Runnable { private volatile Boolean mlConfigInited; @VisibleForTesting Semaphore updateModelStateSemaphore; + private MLModelAutoReDeployer mlModelAutoReDeployer; public MLSyncUpCron( Client client, @@ -116,6 +119,8 @@ public void run() { Set workerNodes = deployingModels.computeIfAbsent(modelId, it -> new HashSet<>()); workerNodes.add(nodeId); } + } else { + } String[] runningDeployModelTaskIds = response.getRunningDeployModelTaskIds(); @@ -270,17 +275,19 @@ void refreshModelState(Map> modelWorkerNodes, Map createComponents( threadPool, nodeHelper, mlIndicesHandler, - encryptor + encryptor, + mlModelAutoRedeployer ); // TODO move this into MLFeatureEnabledSetting From 9a50c61d2b68ddcb8989eb819918cf3e7ba21cae Mon Sep 17 00:00:00 2001 From: zane-neo Date: Tue, 14 Nov 2023 19:43:34 +0800 Subject: [PATCH 2/6] Fix cluster level restart model not auto redeploy issuee Signed-off-by: zane-neo --- .../autoredeploy/MLModelAutoReDeployer.java | 22 ++++++++++++++++--- .../MLCommonsClusterManagerEventListener.java | 19 +++++++++++----- .../opensearch/ml/cluster/MLSyncUpCron.java | 2 -- 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployer.java b/plugin/src/main/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployer.java index dc322f2836..01b4078984 100644 --- a/plugin/src/main/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployer.java +++ b/plugin/src/main/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployer.java @@ -18,6 +18,7 @@ import java.util.function.Consumer; import java.util.stream.Collectors; +import lombok.Setter; import org.opensearch.action.search.SearchAction; import org.opensearch.action.search.SearchRequestBuilder; import org.opensearch.action.search.SearchResponse; @@ -68,6 +69,9 @@ public class MLModelAutoReDeployer { private final SearchRequestBuilderFactory searchRequestBuilderFactory; + @Setter + private ActionListener startCronJobListener; + public MLModelAutoReDeployer( ClusterService clusterService, Client client, @@ -126,6 +130,7 @@ Consumer undeployModelsOnDataNodesConsumer() { public void buildAutoReloadArrangement(List addedNodes, String clusterManagerNodeId) { if (!enableAutoReDeployModel) { log.info("Model auto reload configuration is false, not performing auto reloading!"); + startCronjobAndClearListener(); return; } String localNodeId = clusterService.localNode().getId(); @@ -142,10 +147,12 @@ public void buildAutoReloadArrangement(List addedNodes, String clusterMa public void redeployAModel() { if (!enableAutoReDeployModel) { log.info("Model auto reload configuration is false, not performing auto reloading!"); + startCronjobAndClearListener(); return; } if (modelAutoRedeployArrangements.size() == 0) { log.info("No models needs to be auto redeployed!"); + startCronjobAndClearListener(); return; } ModelAutoRedeployArrangement modelAutoRedeployArrangement = modelAutoRedeployArrangements.poll(); @@ -176,9 +183,10 @@ private void triggerAutoDeployModels(List addedNodes) { }); redeployAModel(); } - }, - e -> { log.error("Failed to query need auto redeploy models, no action will be performed, addedNodes are: {}", addedNodes, e); } - ); + }, e -> { + log.error("Failed to query need auto redeploy models, no action will be performed, addedNodes are: {}", addedNodes, e); + startCronjobAndClearListener(); + }); queryRunningModels(listener); } @@ -296,6 +304,14 @@ private void triggerModelRedeploy(ModelAutoRedeployArrangement modelAutoRedeploy client.execute(MLDeployModelAction.INSTANCE, deployModelRequest, listener); } + private void startCronjobAndClearListener() { + boolean managerNode = clusterService.localNode().isClusterManagerNode(); + if (managerNode && startCronJobListener != null) { + startCronJobListener.onResponse(true); + startCronJobListener = null; + } + } + @Data @Builder static class ModelAutoRedeployArrangement { diff --git a/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java b/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java index 6e98e6414d..a741571187 100644 --- a/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java +++ b/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java @@ -14,6 +14,7 @@ import org.opensearch.common.lifecycle.LifecycleListener; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; import org.opensearch.ml.autoredeploy.MLModelAutoReDeployer; import org.opensearch.ml.engine.encryptor.Encryptor; import org.opensearch.ml.indices.MLIndicesHandler; @@ -22,7 +23,6 @@ import lombok.extern.log4j.Log4j2; -import java.util.Arrays; import java.util.List; @Log4j2 @@ -70,11 +70,20 @@ public MLCommonsClusterManagerEventListener( @Override public void onClusterManager() { + ActionListener listener = ActionListener.wrap(r -> { + if (syncModelRoutingCron == null) { + startSyncModelRoutingCron(); + } + }, e -> { + if (syncModelRoutingCron == null) { + startSyncModelRoutingCron(); + } + }); + mlModelAutoReDeployer.setStartCronJobListener(listener); String localNodeId = clusterService.localNode().getId(); - mlModelAutoReDeployer.buildAutoReloadArrangement(List.of(localNodeId), localNodeId); - if (syncModelRoutingCron == null) { - startSyncModelRoutingCron(); - } + threadPool.schedule(() -> mlModelAutoReDeployer.buildAutoReloadArrangement(List.of(localNodeId), localNodeId), + TimeValue.timeValueSeconds(jobInterval), + GENERAL_THREAD_POOL); } private void startSyncModelRoutingCron() { diff --git a/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java b/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java index 10f8b9502f..afeacaf4cd 100644 --- a/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java +++ b/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java @@ -119,8 +119,6 @@ public void run() { Set workerNodes = deployingModels.computeIfAbsent(modelId, it -> new HashSet<>()); workerNodes.add(nodeId); } - } else { - } String[] runningDeployModelTaskIds = response.getRunningDeployModelTaskIds(); From a8c71f1fdbbcc129bdc6eb3b3da261cc3f950786 Mon Sep 17 00:00:00 2001 From: zane-neo Date: Wed, 15 Nov 2023 10:26:54 +0800 Subject: [PATCH 3/6] remove unuseful changes Signed-off-by: zane-neo --- .../opensearch/ml/cluster/MLSyncUpCron.java | 51 +++++++++---------- 1 file changed, 23 insertions(+), 28 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java b/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java index afeacaf4cd..801b7f0961 100644 --- a/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java +++ b/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java @@ -32,10 +32,8 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.common.util.CollectionUtils; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.TermsQueryBuilder; -import org.opensearch.ml.autoredeploy.MLModelAutoReDeployer; import org.opensearch.ml.common.FunctionName; import org.opensearch.ml.common.MLModel; import org.opensearch.ml.common.model.MLModelState; @@ -65,7 +63,6 @@ public class MLSyncUpCron implements Runnable { private volatile Boolean mlConfigInited; @VisibleForTesting Semaphore updateModelStateSemaphore; - private MLModelAutoReDeployer mlModelAutoReDeployer; public MLSyncUpCron( Client client, @@ -247,20 +244,20 @@ void refreshModelState(Map> modelWorkerNodes, Map planningWorkNodes = sourceAsMap.containsKey(MLModel.PLANNING_WORKER_NODES_FIELD) - ? (List) sourceAsMap.get(MLModel.PLANNING_WORKER_NODES_FIELD) - : new ArrayList<>(); + ? (List) sourceAsMap.get(MLModel.PLANNING_WORKER_NODES_FIELD) + : new ArrayList<>(); if (deployToAllNodes) { DiscoveryNode[] eligibleNodes = nodeHelper.getEligibleNodes(functionName); planningWorkerNodeCount = eligibleNodes.length; @@ -273,19 +270,17 @@ void refreshModelState(Map> modelWorkerNodes, Map Instant.now().toEpochMilli())) { + && lastUpdateTime != null + && lastUpdateTime + DEPLOY_MODEL_TASK_GRACE_TIME_IN_MS > Instant.now().toEpochMilli())) { // If model not deployed to any node and no node is deploying the model, then set model state as DEPLOY_FAILED return MLModelState.DEPLOY_FAILED; } From 1b8a75ce8f8817d4512345c3b70ff4338492b986 Mon Sep 17 00:00:00 2001 From: zane-neo Date: Wed, 15 Nov 2023 10:28:07 +0800 Subject: [PATCH 4/6] format code Signed-off-by: zane-neo --- .../autoredeploy/MLModelAutoReDeployer.java | 2 +- .../MLCommonsClusterManagerEventListener.java | 13 ++++++---- .../opensearch/ml/cluster/MLSyncUpCron.java | 24 +++++++++---------- 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployer.java b/plugin/src/main/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployer.java index 01b4078984..ff774ac0be 100644 --- a/plugin/src/main/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployer.java +++ b/plugin/src/main/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployer.java @@ -18,7 +18,6 @@ import java.util.function.Consumer; import java.util.stream.Collectors; -import lombok.Setter; import org.opensearch.action.search.SearchAction; import org.opensearch.action.search.SearchRequestBuilder; import org.opensearch.action.search.SearchResponse; @@ -51,6 +50,7 @@ import lombok.Builder; import lombok.Data; +import lombok.Setter; import lombok.extern.log4j.Log4j2; @Log4j2 diff --git a/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java b/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java index a741571187..71ccfb9569 100644 --- a/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java +++ b/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java @@ -8,6 +8,8 @@ import static org.opensearch.ml.plugin.MachineLearningPlugin.GENERAL_THREAD_POOL; import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_SYNC_UP_JOB_INTERVAL_IN_SECONDS; +import java.util.List; + import org.opensearch.client.Client; import org.opensearch.cluster.LocalNodeClusterManagerListener; import org.opensearch.cluster.service.ClusterService; @@ -23,8 +25,6 @@ import lombok.extern.log4j.Log4j2; -import java.util.List; - @Log4j2 public class MLCommonsClusterManagerEventListener implements LocalNodeClusterManagerListener { @@ -81,9 +81,12 @@ public void onClusterManager() { }); mlModelAutoReDeployer.setStartCronJobListener(listener); String localNodeId = clusterService.localNode().getId(); - threadPool.schedule(() -> mlModelAutoReDeployer.buildAutoReloadArrangement(List.of(localNodeId), localNodeId), - TimeValue.timeValueSeconds(jobInterval), - GENERAL_THREAD_POOL); + threadPool + .schedule( + () -> mlModelAutoReDeployer.buildAutoReloadArrangement(List.of(localNodeId), localNodeId), + TimeValue.timeValueSeconds(jobInterval), + GENERAL_THREAD_POOL + ); } private void startSyncModelRoutingCron() { diff --git a/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java b/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java index 801b7f0961..3a5ea83347 100644 --- a/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java +++ b/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java @@ -244,20 +244,20 @@ void refreshModelState(Map> modelWorkerNodes, Map planningWorkNodes = sourceAsMap.containsKey(MLModel.PLANNING_WORKER_NODES_FIELD) - ? (List) sourceAsMap.get(MLModel.PLANNING_WORKER_NODES_FIELD) - : new ArrayList<>(); + ? (List) sourceAsMap.get(MLModel.PLANNING_WORKER_NODES_FIELD) + : new ArrayList<>(); if (deployToAllNodes) { DiscoveryNode[] eligibleNodes = nodeHelper.getEligibleNodes(functionName); planningWorkerNodeCount = eligibleNodes.length; @@ -312,8 +312,8 @@ private MLModelState getNewModelState( if (currentWorkerNodeCount == 0 && state != MLModelState.DEPLOY_FAILED && !(state == MLModelState.DEPLOYING - && lastUpdateTime != null - && lastUpdateTime + DEPLOY_MODEL_TASK_GRACE_TIME_IN_MS > Instant.now().toEpochMilli())) { + && lastUpdateTime != null + && lastUpdateTime + DEPLOY_MODEL_TASK_GRACE_TIME_IN_MS > Instant.now().toEpochMilli())) { // If model not deployed to any node and no node is deploying the model, then set model state as DEPLOY_FAILED return MLModelState.DEPLOY_FAILED; } From 459c438734d9dce3ae173d5ca2cb6d48ef8db544 Mon Sep 17 00:00:00 2001 From: zane-neo Date: Wed, 15 Nov 2023 14:25:57 +0800 Subject: [PATCH 5/6] Add auto redeploy success ratio configuration Signed-off-by: zane-neo --- .../java/org/opensearch/ml/plugin/MachineLearningPlugin.java | 1 + 1 file changed, 1 insertion(+) diff --git a/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java b/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java index 5c294de64d..c83c212e88 100644 --- a/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java +++ b/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java @@ -686,6 +686,7 @@ public List> getSettings() { MLCommonsSettings.ML_COMMONS_ENABLE_INHOUSE_PYTHON_MODEL, MLCommonsSettings.ML_COMMONS_MODEL_AUTO_REDEPLOY_ENABLE, MLCommonsSettings.ML_COMMONS_MODEL_AUTO_REDEPLOY_LIFETIME_RETRY_TIMES, + MLCommonsSettings.ML_COMMONS_MODEL_AUTO_REDEPLOY_SUCCESS_RATIO, MLCommonsSettings.ML_COMMONS_ALLOW_MODEL_URL, MLCommonsSettings.ML_COMMONS_ALLOW_LOCAL_FILE_UPLOAD, MLCommonsSettings.ML_COMMONS_MODEL_ACCESS_CONTROL_ENABLED, From 226fae3240f001386f9937d1ae4d0097607c92da Mon Sep 17 00:00:00 2001 From: zane-neo Date: Thu, 16 Nov 2023 14:20:25 +0800 Subject: [PATCH 6/6] Add start cron job log Signed-off-by: zane-neo --- .../ml/cluster/MLCommonsClusterManagerEventListener.java | 1 + 1 file changed, 1 insertion(+) diff --git a/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java b/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java index 71ccfb9569..c4cdd9f899 100644 --- a/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java +++ b/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java @@ -91,6 +91,7 @@ public void onClusterManager() { private void startSyncModelRoutingCron() { if (jobInterval > 0) { + log.info("Starting ML sync up job..."); syncModelRoutingCron = threadPool .scheduleWithFixedDelay( new MLSyncUpCron(client, clusterService, nodeHelper, mlIndicesHandler, encryptor),