-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
the dev of [FEATURE]Auto reload model when cluster rebooted/node rejoin #711
Conversation
Codecov Report
📣 This organization is not using Codecov’s GitHub App Integration. We recommend you install it so Codecov can continue to function properly for your repositories. Learn more @@ Coverage Diff @@
## 2.x #711 +/- ##
============================================
+ Coverage 84.95% 85.27% +0.32%
- Complexity 1076 1105 +29
============================================
Files 100 101 +1
Lines 3922 4055 +133
Branches 370 378 +8
============================================
+ Hits 3332 3458 +126
- Misses 433 440 +7
Partials 157 157
Flags with carried forward coverage won't be shown. Click here to find out more.
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. |
* the main method: model auto reloading | ||
*/ | ||
public void autoReLoadModel() { | ||
log.info("enableAutoReLoadModel: {} ", enableAutoReLoadModel); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log message is not so readable. How about changing to "Auto reload model enabled: {}"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
String localNodeId = clusterService.localNode().getId(); | ||
// auto reload all models of this local ml node | ||
threadPool.generic().submit(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is another function,if I use load model thread poo,it maybe will effect the original process of model load
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
threadPool.generic()
is not dedicated for ML. Using this thread pool may impact other OpenSearch tasks. Suggest change to ML dedicated load model thread pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I use threadPool.executor(LOAD_THREAD_POOL)
try { | ||
autoReLoadModelByNodeId(localNodeId); | ||
} catch (ExecutionException | InterruptedException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add error log here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, added it~
indexName | ||
); | ||
|
||
indicesExistsRequestBuilder.execute(ActionListener.wrap(actionListener::onResponse, actionListener::onFailure)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about just checking if index exists in cluster metadata ? Refer to PR #717
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, it is a very useful suggestion, I can reduce the complexity of coding implement, thanks.
indexResponseActionListener.onResponse(indexResponse); | ||
return; | ||
} | ||
indexResponseActionListener.onFailure(new RuntimeException("node id:" + localNodeId + " insert retry times unsuccessfully")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change to MLException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
log.error("Can't auto reload model in node id {} ,has tried {} times\nThe reason is:{}", localNodeId, reTryTimes, e); | ||
} | ||
|
||
// Store the latest value of the reTryTimes and node id under the index ".plugins-ml-model-reload" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason we have to persist retryTimes to index ? Is it ok to just cache retry times in memory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because when the ml-node happened reboot,the info of retrying time will be lost in cache if we put them in cache
autoReLoadModelByNodeAndModelId(localNodeId, mlTask.getModelId()); | ||
|
||
// if reload the model successfully,the number of unsuccessful reload should be reset to zero. | ||
result.setReTryTimes(reTryTimes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Load model is async, I don't think we are sure model loaded successfully even line 188 autoReLoadModelByNodeAndModelId(localNodeId, mlTask.getModelId());
doesn't throw exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to update the retry times in the ML_MODEL_RELOAD_INDEX to 0 after loading successfully? For instance, the reload succeeded at the 3rd time so you need to reset the retry value in the index from 2 to 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes,the code have had “if success,reset to zero” function
mlLoadModelRequest, | ||
ActionListener | ||
.wrap(response -> log.info("the model {} is auto reloading under the node {} ", modelId, localNodeId), exception -> { | ||
log.error("fail to reload model " + modelId + " under the node " + localNodeId + "\nthe reason is: " + exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see retryTimes + 1
here, is that correct? I think we should count this failure in retry times
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because the method autoReLoadModelByNodeAndModelId
will throw exception to upper method,and the line188 mentioned in the previous comment will be caught, so retyrTimes+1
is written in the catch statement(the line 195)
} | ||
|
||
int reTryTimes = 0; | ||
try (XContentParser parser = createXContentParserFromRegistry(xContentRegistry, result.getHits()[0].getSourceRef())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From result.getHits()[0]
, seems it will only reload the first model?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This retryTimes
is defined based on the dimension of node
, not node+model
.
In other words, there is only one value of the retryTimes
for each ml node
QueryBuilder queryBuilder = QueryBuilders | ||
.boolQuery() | ||
.must(QueryBuilders.matchPhraseQuery("task_type", "LOAD_MODEL")) | ||
.must(QueryBuilders.matchPhraseQuery("state", "COMPLETED")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also consider COMPLETED_WITH_ERROR
which means the model isn't loaded to all workers nodes, but loaded on some worker nodes successfully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I found that there may be a time difference, when I wrote the code,the enumeration class MLTaskState
does not exist COMPLETED_WITH_ERROR
, I add this to my code
public static final String ML_MODEL_RELOAD_INDEX = ".plugins-ml-model-reload"; | ||
public static final String NODE_ID_FIELD = "node_id"; | ||
public static final String MODEL_LOAD_RETRY_TIMES_FIELD = "retry_times"; | ||
public static final Integer ML_MODEL_RELOAD_MAX_RETRY_TIMES = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this constant as we have setting MLCommonsSettings.ML_MODEL_RELOAD_MAX_RETRY_TIMES;
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, u r right, We don't need it in class CommonValue
. But we need it in opensearch.yml
,so we can let user define the value by himself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to test this auto reload in a local cluster? like creating a 2 nodes cluster, and kill one of the OS process in 1 node, etc?
} | ||
|
||
@Data | ||
static class Result { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class name "Result" looks too general and confusing. Let's rename it to something more meaningful? e.g. ModelsToRestore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found after referring to PR #717, We will not need this Result
any more.so I refactored it~
private final NamedXContentRegistry xContentRegistry; | ||
private final DiscoveryNodeHelper nodeHelper; | ||
private final ThreadPool threadPool; | ||
private volatile Boolean enableAutoReLoadModel; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reload
is a full word. Maybe we don't need to do camel casing for reload. enableAutoReloadModel
?
Same comment for retry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I will search reload
and retry
and will refactor these 2 names in anywhere
// According to the node id to get retry times, if more than the max retry times, don't need to retry | ||
// that the number of unsuccessful reload has reached the maximum number of times, do not need to reload | ||
if (result.getReTryTimes() > autoReLoadMaxReTryTimes) { | ||
log.info("have exceeded max retry times, always failure"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about: log.info("Node: {} has reached to the max retry limit, failed to load models", localNodeId)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's cool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have committed the code
@@ -32,6 +32,11 @@ public class CommonValue { | |||
public static final String ML_TASK_INDEX = ".plugins-ml-task"; | |||
public static final Integer ML_MODEL_INDEX_SCHEMA_VERSION = 3; | |||
public static final Integer ML_TASK_INDEX_SCHEMA_VERSION = 1; | |||
|
|||
public static final String ML_MODEL_RELOAD_INDEX = ".plugins-ml-model-reload"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A general question: Is it possible to avoid using a new index to achieve auto reload? Can we just query the Task index and find out all the loaded models in the current node and reload them all after OS started? I may missed some discussion earlier, but it looks like the retry number and search results can be stored locally in the memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this ml node has been restarted for some unknown reason, I can still use the persistent retryTimes
value to know how many times the models on this node have been auto-reloaded before, then decide whether to do auto-reload this time. but if it is placed in cache, I can't get this info and have to auto-reload again. both are compared. The former may have some performance improvement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but this is a trade off for performance improvement by using a lot more resources. Is it possible to define this auto_reload as a ml_task and reuse the ml_task index to store the retry_times? Adding 2 new fields in ml_task may be much cheaper than using a new index. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After the communication with you and charlie, we will elaborate
Signed-off-by: wujunshen <[email protected]>
Signed-off-by: wujunshen <[email protected]>
Signed-off-by: wujunshen <[email protected]>
Signed-off-by: wujunshen <[email protected]>
Signed-off-by: wujunshen <[email protected]>
Signed-off-by: opensearch-ci-bot <[email protected]> Signed-off-by: opensearch-ci-bot <[email protected]> Co-authored-by: opensearch-ci-bot <[email protected]> Signed-off-by: wujunshen <[email protected]>
Signed-off-by: Yaliang Wu <[email protected]> Signed-off-by: Yaliang Wu <[email protected]> Signed-off-by: wujunshen <[email protected]>
…713) Signed-off-by: Yaliang Wu <[email protected]> Signed-off-by: Yaliang Wu <[email protected]> Signed-off-by: wujunshen <[email protected]>
…rameter (#714) * Enhance profile API to add model centric result controled by view paramter Signed-off-by: Zan Niu <[email protected]> * Enhance profile API to add model centric result controled by view parameter Signed-off-by: Zan Niu <[email protected]> * Enhance profile API to add model centric result controled by view parameter Signed-off-by: Zan Niu <[email protected]> --------- Signed-off-by: Zan Niu <[email protected]> Signed-off-by: wujunshen <[email protected]>
* add planning work nodes to model Signed-off-by: Yaliang Wu <[email protected]> * add test Signed-off-by: Yaliang Wu <[email protected]> --------- Signed-off-by: Yaliang Wu <[email protected]> Signed-off-by: wujunshen <[email protected]>
Signed-off-by: Yaliang Wu <[email protected]> Signed-off-by: wujunshen <[email protected]>
* refactor: add DL model class Signed-off-by: Yaliang Wu <[email protected]> * fix model url in example doc Signed-off-by: Yaliang Wu <[email protected]> * address comments Signed-off-by: Yaliang Wu <[email protected]> * fix failed ut Signed-off-by: Yaliang Wu <[email protected]> --------- Signed-off-by: Yaliang Wu <[email protected]> Signed-off-by: wujunshen <[email protected]>
Signed-off-by: Yaliang Wu <[email protected]> Signed-off-by: wujunshen <[email protected]>
Signed-off-by: wujunshen <[email protected]>
Signed-off-by: wujunshen <[email protected]>
…in cluster metadata Signed-off-by: wujunshen <[email protected]>
Signed-off-by: wujunshen <[email protected]>
Signed-off-by: wujunshen <[email protected]>
…nValue.java Signed-off-by: wujunshen <[email protected]>
Signed-off-by: wujunshen <[email protected]>
Signed-off-by: wujunshen <[email protected]>
Signed-off-by: wujunshen <[email protected]>
Signed-off-by: wujunshen <[email protected]>
log | ||
.error( | ||
"the model auto-reloading has exception,and the root cause message is: {}", | ||
ExceptionUtils.getRootCauseMessage(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about print the full exception stack trace here? Just print out the root cause seems not easy to debug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can I use ExceptionUtils.getMessage(e)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for a while,and changed it to ExceptionUtils.getStackTrace(e)
at last.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Print the entire exception stack is useful and convenient to locate and debug issues, you can change to log .error("the model auto-reloading has exception,and the root cause message is: {}", e
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool~I will modify it according to what you said
*/ | ||
@VisibleForTesting | ||
void autoReloadModelByNodeAndModelId(String localNodeId, String modelId) throws MLException { | ||
String[] allNodeIds = nodeHelper.getAllNodeIds(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nodeHelper.getAllNodeIds()
will return all nodes , not just ML nodes. Should we reload model on all nodes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I will modify it. Let the collection just have all ids of ml node.
1. Let the collection just have all ids of ml node 2. print out full exception stack trace Signed-off-by: wujunshen <[email protected]>
@@ -32,6 +32,11 @@ public class CommonValue { | |||
public static final String ML_TASK_INDEX = ".plugins-ml-task"; | |||
public static final Integer ML_MODEL_INDEX_SCHEMA_VERSION = 3; | |||
public static final Integer ML_TASK_INDEX_SCHEMA_VERSION = 1; | |||
|
|||
public static final String ML_MODEL_RELOAD_INDEX = ".plugins-ml-model-reload"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but this is a trade off for performance improvement by using a lot more resources. Is it possible to define this auto_reload as a ml_task and reuse the ml_task index to store the retry_times? Adding 2 new fields in ml_task may be much cheaper than using a new index. Thoughts?
// that the number of unsuccessful reload has reached the maximum number of times, do not need to reload | ||
if (retryTimes > autoReloadMaxRetryTimes) { | ||
log.info("Node: {} has reached to the max retry limit, failed to load models", localNodeId); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before return, should we check how long the node has been in the max retry status and reset to 0 after a substantial time? It looks to me the node will never reload forever once reached maximum retry times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the first comment of yours, I found ml_task index and ml_model index are both definition in ml_task index, so if I add 2 new fields in ml_task,the ml_model index will have these 2 fields,too. It sounds that give ml_model index redundant attributes.
in the second comment of yours, when we discussed the design earlier, if the maximum retry times is reached, instead of automatically reloading, the model need to be loaded manually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool. Let's keep this logic then. But we should try to define a new type of ML Task for auto reload, and reuse MLTask to store the max_retry field, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks~
|
||
searchRequestBuilder.execute(ActionListener.wrap(searchResponseActionListener::onResponse, exception -> { | ||
log.error("index {} not found, the reason is {}", ML_TASK_INDEX, exception); | ||
throw new IndexNotFoundException("index " + ML_TASK_INDEX + " not found"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't confirm this is IndexNotFoundException
, please throw a MLException instead, and please wrap the original exception into the MLException like this:throw new MLException(exception)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I have changed it~
return; | ||
} | ||
indexResponseActionListener.onFailure(new MLException("node id:" + localNodeId + " insert retry times unsuccessfully")); | ||
}, indexResponseActionListener::onFailure)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add logs here when receiving indexRequestBuilder.execute
exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done~
|
||
String localNodeId = clusterService.localNode().getId(); | ||
// auto reload all models of this local ml node | ||
threadPool.generic().submit(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change this threadpool to UPLOAD_THREAD_POOL
in MachineLearningPlugin since this is dedicated for uploading models.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
en ,yaliang have said it~ I have committed the latest code
use LOAD_THREAD_POOL to replace generic Signed-off-by: wujunshen <[email protected]>
print the whole exception stack Signed-off-by: wujunshen <[email protected]>
change the IndexNotFoundException to MLException Signed-off-by: wujunshen <[email protected]>
add logs when receiving indexRequestBuilder.execute exception Signed-off-by: wujunshen <[email protected]>
change the test code after code review Signed-off-by: wujunshen <[email protected]>
*/ | ||
@VisibleForTesting | ||
void queryTask(String localNodeId, ActionListener<SearchResponse> searchResponseActionListener) { | ||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().from(0).size(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This query only return the latest load model task. If user have 3 models, this query only return 1 latest load model task for 1 model, the other 2 models' tasks won't be returned. So we can't reload all 3 models, just reload 1 model. Is that correct?
private volatile Integer autoReloadMaxRetryTimes; | ||
|
||
/** | ||
* constructor method, init all the params necessary for model auto reloading |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ,
after constructor method
is not US-ASCII. That will cause infra team's CI workflow failure. #736
…ode rejoin (opensearch-project#711)" This reverts commit 7a51dcc.
…ode rejoin (opensearch-project#711)" This reverts commit 7a51dcc. Signed-off-by: Yaliang Wu <[email protected]>
…ode rejoin (opensearch-project#711)" This reverts commit 7a51dcc. Signed-off-by: Yaliang Wu <[email protected]>
* fix unmappable character for encoding US-ASCII Signed-off-by: Yaliang Wu <[email protected]> * Revert "the dev of [FEATURE]Auto reload model when cluster rebooted/node rejoin (#711)" This reverts commit 7a51dcc. Signed-off-by: Yaliang Wu <[email protected]> --------- Signed-off-by: Yaliang Wu <[email protected]>
Signed-off-by: JunShen Wu [email protected]
Description
the new feature:
Auto reload model when cluster rebooted/node rejoin
When a ml node under the opensearch cluster halt down with some unknown reasons. The models under this node will be broken and impact the process of the inference or reduced performance. So we add a new feature: When a ml node halt down, we reboot this ml node, the opensearch on this node will auto reload all the models under this node,and user will not reload the model manually. Even in extreme cases, if the reload operation is still unsuccessful, opensearch will also tell the user via logs that the reload was unsuccessful.
Issues Resolved
please see: #577
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.