Skip to content

Commit

Permalink
Cluster restart model auto redeploy (opensearch-project#1627)
Browse files Browse the repository at this point in the history
* Fix cluster level restart model not auto redeploy issue

Signed-off-by: zane-neo <[email protected]>

* Fix cluster level restart model not auto redeploy issuee

Signed-off-by: zane-neo <[email protected]>

* remove unuseful changes

Signed-off-by: zane-neo <[email protected]>

* format code

Signed-off-by: zane-neo <[email protected]>

* Add auto redeploy success ratio configuration

Signed-off-by: zane-neo <[email protected]>

* Add start cron job log

Signed-off-by: zane-neo <[email protected]>

---------

Signed-off-by: zane-neo <[email protected]>
  • Loading branch information
zane-neo authored Nov 22, 2023
1 parent 56f1663 commit 034a212
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

import lombok.Builder;
import lombok.Data;
import lombok.Setter;
import lombok.extern.log4j.Log4j2;

@Log4j2
Expand All @@ -68,6 +69,9 @@ public class MLModelAutoReDeployer {

private final SearchRequestBuilderFactory searchRequestBuilderFactory;

@Setter
private ActionListener<Boolean> startCronJobListener;

public MLModelAutoReDeployer(
ClusterService clusterService,
Client client,
Expand Down Expand Up @@ -126,6 +130,7 @@ Consumer<Boolean> undeployModelsOnDataNodesConsumer() {
public void buildAutoReloadArrangement(List<String> addedNodes, String clusterManagerNodeId) {
if (!enableAutoReDeployModel) {
log.info("Model auto reload configuration is false, not performing auto reloading!");
startCronjobAndClearListener();
return;
}
String localNodeId = clusterService.localNode().getId();
Expand All @@ -142,10 +147,12 @@ public void buildAutoReloadArrangement(List<String> addedNodes, String clusterMa
public void redeployAModel() {
if (!enableAutoReDeployModel) {
log.info("Model auto reload configuration is false, not performing auto reloading!");
startCronjobAndClearListener();
return;
}
if (modelAutoRedeployArrangements.size() == 0) {
log.info("No models needs to be auto redeployed!");
startCronjobAndClearListener();
return;
}
ModelAutoRedeployArrangement modelAutoRedeployArrangement = modelAutoRedeployArrangements.poll();
Expand Down Expand Up @@ -176,9 +183,10 @@ private void triggerAutoDeployModels(List<String> addedNodes) {
});
redeployAModel();
}
},
e -> { log.error("Failed to query need auto redeploy models, no action will be performed, addedNodes are: {}", addedNodes, e); }
);
}, e -> {
log.error("Failed to query need auto redeploy models, no action will be performed, addedNodes are: {}", addedNodes, e);
startCronjobAndClearListener();
});

queryRunningModels(listener);
}
Expand Down Expand Up @@ -296,6 +304,14 @@ private void triggerModelRedeploy(ModelAutoRedeployArrangement modelAutoRedeploy
client.execute(MLDeployModelAction.INSTANCE, deployModelRequest, listener);
}

private void startCronjobAndClearListener() {
boolean managerNode = clusterService.localNode().isClusterManagerNode();
if (managerNode && startCronJobListener != null) {
startCronJobListener.onResponse(true);
startCronJobListener = null;
}
}

@Data
@Builder
static class ModelAutoRedeployArrangement {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@
import static org.opensearch.ml.plugin.MachineLearningPlugin.GENERAL_THREAD_POOL;
import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_SYNC_UP_JOB_INTERVAL_IN_SECONDS;

import java.util.List;

import org.opensearch.client.Client;
import org.opensearch.cluster.LocalNodeClusterManagerListener;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lifecycle.LifecycleListener;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.ml.autoredeploy.MLModelAutoReDeployer;
import org.opensearch.ml.engine.encryptor.Encryptor;
import org.opensearch.ml.indices.MLIndicesHandler;
import org.opensearch.threadpool.Scheduler;
Expand All @@ -35,14 +39,17 @@ public class MLCommonsClusterManagerEventListener implements LocalNodeClusterMan

private volatile Integer jobInterval;

private final MLModelAutoReDeployer mlModelAutoReDeployer;

public MLCommonsClusterManagerEventListener(
ClusterService clusterService,
Client client,
Settings settings,
ThreadPool threadPool,
DiscoveryNodeHelper nodeHelper,
MLIndicesHandler mlIndicesHandler,
Encryptor encryptor
Encryptor encryptor,
MLModelAutoReDeployer modelAutoReDeployer
) {
this.clusterService = clusterService;
this.client = client;
Expand All @@ -51,6 +58,7 @@ public MLCommonsClusterManagerEventListener(
this.nodeHelper = nodeHelper;
this.mlIndicesHandler = mlIndicesHandler;
this.encryptor = encryptor;
this.mlModelAutoReDeployer = modelAutoReDeployer;

this.jobInterval = ML_COMMONS_SYNC_UP_JOB_INTERVAL_IN_SECONDS.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_SYNC_UP_JOB_INTERVAL_IN_SECONDS, it -> {
Expand All @@ -62,13 +70,28 @@ public MLCommonsClusterManagerEventListener(

@Override
public void onClusterManager() {
if (syncModelRoutingCron == null) {
startSyncModelRoutingCron();
}
ActionListener<Boolean> listener = ActionListener.wrap(r -> {
if (syncModelRoutingCron == null) {
startSyncModelRoutingCron();
}
}, e -> {
if (syncModelRoutingCron == null) {
startSyncModelRoutingCron();
}
});
mlModelAutoReDeployer.setStartCronJobListener(listener);
String localNodeId = clusterService.localNode().getId();
threadPool
.schedule(
() -> mlModelAutoReDeployer.buildAutoReloadArrangement(List.of(localNodeId), localNodeId),
TimeValue.timeValueSeconds(jobInterval),
GENERAL_THREAD_POOL
);
}

private void startSyncModelRoutingCron() {
if (jobInterval > 0) {
log.info("Starting ML sync up job...");
syncModelRoutingCron = threadPool
.scheduleWithFixedDelay(
new MLSyncUpCron(client, clusterService, nodeHelper, mlIndicesHandler, encryptor),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,8 @@ public Collection<Object> createComponents(
threadPool,
nodeHelper,
mlIndicesHandler,
encryptor
encryptor,
mlModelAutoRedeployer
);

// TODO move this into MLFeatureEnabledSetting
Expand Down Expand Up @@ -685,6 +686,7 @@ public List<Setting<?>> getSettings() {
MLCommonsSettings.ML_COMMONS_ENABLE_INHOUSE_PYTHON_MODEL,
MLCommonsSettings.ML_COMMONS_MODEL_AUTO_REDEPLOY_ENABLE,
MLCommonsSettings.ML_COMMONS_MODEL_AUTO_REDEPLOY_LIFETIME_RETRY_TIMES,
MLCommonsSettings.ML_COMMONS_MODEL_AUTO_REDEPLOY_SUCCESS_RATIO,
MLCommonsSettings.ML_COMMONS_ALLOW_MODEL_URL,
MLCommonsSettings.ML_COMMONS_ALLOW_LOCAL_FILE_UPLOAD,
MLCommonsSettings.ML_COMMONS_MODEL_ACCESS_CONTROL_ENABLED,
Expand Down

0 comments on commit 034a212

Please sign in to comment.