Skip to content

Commit

Permalink
skip running syncup job if no model index (#717)
Browse files Browse the repository at this point in the history
Signed-off-by: Yaliang Wu <[email protected]>
(cherry picked from commit 50116bb)
  • Loading branch information
ylwu-amzn authored and github-actions[bot] committed Feb 1, 2023
1 parent faa935a commit 5d83cca
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 10 deletions.
22 changes: 12 additions & 10 deletions plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ public MLSyncUpCron(Client client, ClusterService clusterService, DiscoveryNodeH

@Override
public void run() {
if (!clusterService.state().metadata().indices().containsKey(ML_MODEL_INDEX)) {
// no need to run sync up job if no model index
return;
}
log.debug("ML sync job starts");
DiscoveryNode[] allNodes = nodeHelper.getAllNodes();
MLSyncUpInput gatherInfoInput = MLSyncUpInput.builder().getLoadedModels(true).build();
Expand Down Expand Up @@ -131,16 +135,14 @@ public void run() {
);

// refresh model status
if (clusterService.state().getRoutingTable().hasIndex(ML_MODEL_INDEX)) {
mlIndicesHandler
.initModelIndexIfAbsent(
ActionListener
.wrap(
res -> { refreshModelState(modelWorkerNodes, loadingModels); },
e -> { log.error("Failed to init model index", e); }
)
);
}
mlIndicesHandler
.initModelIndexIfAbsent(
ActionListener
.wrap(
res -> { refreshModelState(modelWorkerNodes, loadingModels); },
e -> { log.error("Failed to init model index", e); }
)
);
}, e -> { log.error("Failed to sync model routing", e); }));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static org.mockito.Mockito.when;
import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX;
import static org.opensearch.ml.utils.TestHelper.ML_ROLE;
import static org.opensearch.ml.utils.TestHelper.setupTestClusterState;

import java.io.IOException;
import java.time.Instant;
Expand All @@ -26,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.lucene.search.TotalHits;
import org.junit.Before;
Expand All @@ -41,9 +43,16 @@
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.ml.common.MLModel;
import org.opensearch.ml.common.model.MLModelState;
Expand Down Expand Up @@ -77,12 +86,45 @@ public class MLSyncUpCronTests extends OpenSearchTestCase {
private final String mlNode1Id = "mlNode1";
private final String mlNode2Id = "mlNode2";

private ClusterState testState;

@Before
public void setup() throws IOException {
MockitoAnnotations.openMocks(this);
mlNode1 = new DiscoveryNode(mlNode1Id, buildNewFakeTransportAddress(), emptyMap(), ImmutableSet.of(ML_ROLE), Version.CURRENT);
mlNode2 = new DiscoveryNode(mlNode2Id, buildNewFakeTransportAddress(), emptyMap(), ImmutableSet.of(ML_ROLE), Version.CURRENT);
syncUpCron = new MLSyncUpCron(client, clusterService, nodeHelper, null);

testState = setupTestClusterState();
when(clusterService.state()).thenReturn(testState);
}

public void testRun_NoMLModelIndex() {
Metadata metadata = new Metadata.Builder().indices(ImmutableOpenMap.<String, IndexMetadata>builder().build()).build();
DiscoveryNode node = new DiscoveryNode(
"node",
new TransportAddress(TransportAddress.META_ADDRESS, new AtomicInteger().incrementAndGet()),
new HashMap<>(),
ImmutableSet.of(DiscoveryNodeRole.DATA_ROLE),
Version.CURRENT
);
ClusterState state = new ClusterState(
new ClusterName("test cluster"),
123l,
"111111",
metadata,
null,
DiscoveryNodes.builder().add(node).build(),
null,
null,
0,
false
);
;
when(clusterService.state()).thenReturn(state);

syncUpCron.run();
verify(client, never()).execute(eq(MLSyncUpAction.INSTANCE), any(), any());
}

public void testRun() {
Expand Down

0 comments on commit 5d83cca

Please sign in to comment.