Skip to content

Commit

Permalink
changed usages of "master" to "clusterManager" in variable names (#504)
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Galitzky <[email protected]>
  • Loading branch information
amitgalitz authored Apr 18, 2022
1 parent b5bebb2 commit fc1a6e2
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 47 deletions.
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import org.opensearch.ad.caching.PriorityCache;
import org.opensearch.ad.cluster.ADClusterEventListener;
import org.opensearch.ad.cluster.ADDataMigrator;
import org.opensearch.ad.cluster.ClusterManagerEventListener;
import org.opensearch.ad.cluster.HashRing;
import org.opensearch.ad.cluster.MasterEventListener;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.dataprocessor.IntegerSensitiveSingleFeatureLinearUniformInterpolator;
import org.opensearch.ad.dataprocessor.Interpolator;
Expand Down Expand Up @@ -731,7 +731,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
new ADClusterEventListener(clusterService, hashRing),
adCircuitBreakerService,
adStats,
new MasterEventListener(clusterService, threadPool, client, getClock(), clientUtil, nodeFilter),
new ClusterManagerEventListener(clusterService, threadPool, client, getClock(), clientUtil, nodeFilter),
nodeFilter,
multiEntityResultHandler,
checkpoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

import com.google.common.annotations.VisibleForTesting;

public class MasterEventListener implements LocalNodeMasterListener {
public class ClusterManagerEventListener implements LocalNodeMasterListener {

private Cancellable checkpointIndexRetentionCron;
private Cancellable hourlyCron;
Expand All @@ -39,7 +39,7 @@ public class MasterEventListener implements LocalNodeMasterListener {
private ClientUtil clientUtil;
private DiscoveryNodeFilterer nodeFilter;

public MasterEventListener(
public ClusterManagerEventListener(
ClusterService clusterService,
ThreadPool threadPool,
Client client,
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/ad/cluster/HashRing.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class HashRing {
// This field records AD version hash ring in realtime way. Historical detection will use this hash ring.
// Key: AD version; Value: hash ring which only contains eligible data nodes
private TreeMap<Version, TreeMap<Integer, DiscoveryNode>> circles;
// Track if hash ring inited or not. If not inited, the first master event will try to init it.
// Track if hash ring inited or not. If not inited, the first clusterManager event will try to init it.
private AtomicBoolean hashRingInited;

// the UTC epoch milliseconds of the most recent successful update of AD circles for realtime AD.
Expand Down Expand Up @@ -130,7 +130,7 @@ public boolean isHashRingInited() {
}

/**
* Build AD version based circles with discovery node delta change. Listen to master event in
* Build AD version based circles with discovery node delta change. Listen to clusterManager event in
* {@link ADClusterEventListener#clusterChanged(ClusterChangedEvent)}.
* Will remove the removed nodes from cache and send request to newly added nodes to get their
* plugin information; then add new nodes to AD version hash ring.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ public void onMaster() {
.scheduleWithFixedDelay(() -> rolloverAndDeleteHistoryIndex(), historyRolloverPeriod, executorName());
} catch (Exception e) {
// This should be run on cluster startup
logger.error("Error rollover AD result indices. " + "Can't rollover AD result until master node is restarted.", e);
logger.error("Error rollover AD result indices. " + "Can't rollover AD result until clusterManager node is restarted.", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ private AnomalyDetectorSettings() {}
);

// Opensearch-only setting. Doesn't plan to use the value of the legacy setting
// AD_RESULT_HISTORY_MAX_DOCS as that's too low. If the master node uses opendistro code,
// it uses the legacy setting. If the master node uses opensearch code, it uses the new setting.
// AD_RESULT_HISTORY_MAX_DOCS as that's too low. If the clusterManager node uses opendistro code,
// it uses the legacy setting. If the clusterManager node uses opensearch code, it uses the new setting.
public static final Setting<Long> AD_RESULT_HISTORY_MAX_DOCS_PER_SHARD = Setting
.longSetting(
"plugins.anomaly_detection.ad_result_history_max_docs_per_shard",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.opensearch.gateway.GatewayService;

public class ADClusterEventListenerTests extends AbstractADTest {
private final String masterNodeId = "masterNode";
private final String clusterManagerNodeId = "clusterManagerNode";
private final String dataNode1Id = "dataNode1";
private final String clusterName = "multi-node-cluster";

Expand All @@ -49,7 +49,7 @@ public class ADClusterEventListenerTests extends AbstractADTest {
private HashRing hashRing;
private ClusterState oldClusterState;
private ClusterState newClusterState;
private DiscoveryNode masterNode;
private DiscoveryNode clusterManagerNode;
private DiscoveryNode dataNode1;

@BeforeClass
Expand All @@ -70,15 +70,29 @@ public void setUp() throws Exception {
clusterService = createClusterService(threadPool);
hashRing = mock(HashRing.class);

masterNode = new DiscoveryNode(masterNodeId, buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
clusterManagerNode = new DiscoveryNode(
clusterManagerNodeId,
buildNewFakeTransportAddress(),
emptyMap(),
emptySet(),
Version.CURRENT
);
dataNode1 = new DiscoveryNode(dataNode1Id, buildNewFakeTransportAddress(), emptyMap(), BUILT_IN_ROLES, Version.CURRENT);
oldClusterState = ClusterState
.builder(new ClusterName(clusterName))
.nodes(new DiscoveryNodes.Builder().masterNodeId(masterNodeId).localNodeId(masterNodeId).add(masterNode))
.nodes(
new DiscoveryNodes.Builder().masterNodeId(clusterManagerNodeId).localNodeId(clusterManagerNodeId).add(clusterManagerNode)
)
.build();
newClusterState = ClusterState
.builder(new ClusterName(clusterName))
.nodes(new DiscoveryNodes.Builder().masterNodeId(masterNodeId).localNodeId(dataNode1Id).add(masterNode).add(dataNode1))
.nodes(
new DiscoveryNodes.Builder()
.masterNodeId(clusterManagerNodeId)
.localNodeId(dataNode1Id)
.add(clusterManagerNode)
.add(dataNode1)
)
.build();

listener = new ADClusterEventListener(clusterService, hashRing);
Expand Down Expand Up @@ -107,7 +121,13 @@ public void testIsWarmNode() {

ClusterState warmNodeClusterState = ClusterState
.builder(new ClusterName(clusterName))
.nodes(new DiscoveryNodes.Builder().masterNodeId(masterNodeId).localNodeId(dataNode1Id).add(masterNode).add(dataNode1))
.nodes(
new DiscoveryNodes.Builder()
.masterNodeId(clusterManagerNodeId)
.localNodeId(dataNode1Id)
.add(clusterManagerNode)
.add(dataNode1)
)
.blocks(ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK))
.build();
listener.clusterChanged(new ClusterChangedEvent("foo", warmNodeClusterState, oldClusterState));
Expand All @@ -117,7 +137,13 @@ public void testIsWarmNode() {
public void testNotRecovered() {
ClusterState blockedClusterState = ClusterState
.builder(new ClusterName(clusterName))
.nodes(new DiscoveryNodes.Builder().masterNodeId(masterNodeId).localNodeId(dataNode1Id).add(masterNode).add(dataNode1))
.nodes(
new DiscoveryNodes.Builder()
.masterNodeId(clusterManagerNodeId)
.localNodeId(dataNode1Id)
.add(clusterManagerNode)
.add(dataNode1)
)
.blocks(ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK))
.build();
listener.clusterChanged(new ClusterChangedEvent("foo", blockedClusterState, oldClusterState));
Expand Down Expand Up @@ -151,7 +177,9 @@ public void testNodeAdded() {
return null;
}).when(hashRing).buildCircles(any(), any());

doAnswer(invocation -> Optional.of(masterNode)).when(hashRing).getOwningNodeWithSameLocalAdVersionForRealtimeAD(any(String.class));
doAnswer(invocation -> Optional.of(clusterManagerNode))
.when(hashRing)
.getOwningNodeWithSameLocalAdVersionForRealtimeAD(any(String.class));

listener.clusterChanged(new ClusterChangedEvent("foo", newClusterState, oldClusterState));
assertTrue(testAppender.containsMessage(ADClusterEventListener.NODE_CHANGED_MSG));
Expand All @@ -163,9 +191,9 @@ public void testNodeRemoved() {
.builder(new ClusterName(clusterName))
.nodes(
new DiscoveryNodes.Builder()
.masterNodeId(masterNodeId)
.masterNodeId(clusterManagerNodeId)
.localNodeId(dataNode1Id)
.add(new DiscoveryNode(masterNodeId, buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT))
.add(new DiscoveryNode(clusterManagerNodeId, buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT))
.add(dataNode1)
.add(new DiscoveryNode("dataNode2", buildNewFakeTransportAddress(), emptyMap(), BUILT_IN_ROLES, Version.CURRENT))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@
import org.opensearch.threadpool.Scheduler.Cancellable;
import org.opensearch.threadpool.ThreadPool;

public class MasterEventListenerTests extends AbstractADTest {
public class ClusterManagerEventListenerTests extends AbstractADTest {
private ClusterService clusterService;
private ThreadPool threadPool;
private Client client;
private Clock clock;
private Cancellable hourlyCancellable;
private Cancellable checkpointIndexRetentionCancellable;
private MasterEventListener masterService;
private ClusterManagerEventListener clusterManagerService;
private ClientUtil clientUtil;
private DiscoveryNodeFilterer nodeFilter;

Expand All @@ -67,18 +67,18 @@ public void setUp() throws Exception {
ignoredAttributes.put(CommonName.BOX_TYPE_KEY, CommonName.WARM_BOX_TYPE);
nodeFilter = new DiscoveryNodeFilterer(clusterService);

masterService = new MasterEventListener(clusterService, threadPool, client, clock, clientUtil, nodeFilter);
clusterManagerService = new ClusterManagerEventListener(clusterService, threadPool, client, clock, clientUtil, nodeFilter);
}

public void testOnOffMaster() {
masterService.onMaster();
clusterManagerService.onMaster();
assertThat(hourlyCancellable, is(notNullValue()));
assertThat(checkpointIndexRetentionCancellable, is(notNullValue()));
assertTrue(!masterService.getHourlyCron().isCancelled());
assertTrue(!masterService.getCheckpointIndexRetentionCron().isCancelled());
masterService.offMaster();
assertThat(masterService.getCheckpointIndexRetentionCron(), is(nullValue()));
assertThat(masterService.getHourlyCron(), is(nullValue()));
assertTrue(!clusterManagerService.getHourlyCron().isCancelled());
assertTrue(!clusterManagerService.getCheckpointIndexRetentionCron().isCancelled());
clusterManagerService.offMaster();
assertThat(clusterManagerService.getCheckpointIndexRetentionCron(), is(nullValue()));
assertThat(clusterManagerService.getHourlyCron(), is(nullValue()));
}

public void testBeforeStop() {
Expand All @@ -100,11 +100,11 @@ public void testBeforeStop() {
return null;
}).when(clusterService).addLifecycleListener(any());

masterService.onMaster();
assertThat(masterService.getCheckpointIndexRetentionCron(), is(nullValue()));
assertThat(masterService.getHourlyCron(), is(nullValue()));
masterService.offMaster();
assertThat(masterService.getCheckpointIndexRetentionCron(), is(nullValue()));
assertThat(masterService.getHourlyCron(), is(nullValue()));
clusterManagerService.onMaster();
assertThat(clusterManagerService.getCheckpointIndexRetentionCron(), is(nullValue()));
assertThat(clusterManagerService.getHourlyCron(), is(nullValue()));
clusterManagerService.offMaster();
assertThat(clusterManagerService.getCheckpointIndexRetentionCron(), is(nullValue()));
assertThat(clusterManagerService.getHourlyCron(), is(nullValue()));
}
}
27 changes: 16 additions & 11 deletions src/test/java/test/org/opensearch/ad/util/ClusterCreation.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
package test.org.opensearch.ad.util;

import static org.mockito.Mockito.mock;
import static org.opensearch.cluster.node.DiscoveryNodeRole.CLUSTER_MANAGER_ROLE;
import static org.opensearch.cluster.node.DiscoveryNodeRole.DATA_ROLE;
import static org.opensearch.cluster.node.DiscoveryNodeRole.MASTER_ROLE;

import java.net.InetAddress;
import java.util.ArrayList;
Expand All @@ -31,20 +31,25 @@

public class ClusterCreation {
/**
* Creates a cluster state where local node and master node can be specified
* Creates a cluster state where local node and clusterManager node can be specified
*
* @param localNode node in allNodes that is the local node
* @param masterNode node in allNodes that is the master node. Can be null if no master exists
* @param clusterManagerNode node in allNodes that is the clusterManager node. Can be null if no clusterManager exists
* @param allNodes all nodes in the cluster
* @return cluster state
*/
public static ClusterState state(ClusterName name, DiscoveryNode localNode, DiscoveryNode masterNode, List<DiscoveryNode> allNodes) {
public static ClusterState state(
ClusterName name,
DiscoveryNode localNode,
DiscoveryNode clusterManagerNode,
List<DiscoveryNode> allNodes
) {
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
for (DiscoveryNode node : allNodes) {
discoBuilder.add(node);
}
if (masterNode != null) {
discoBuilder.masterNodeId(masterNode.getId());
if (clusterManagerNode != null) {
discoBuilder.masterNodeId(clusterManagerNode.getId());
}
discoBuilder.localNodeId(localNode.getId());

Expand All @@ -70,21 +75,21 @@ public static ImmutableOpenMap<String, DiscoveryNode> createDataNodes(int numDat
}

/**
* Create a cluster state with 1 master node and a few data nodes
* Create a cluster state with 1 clusterManager node and a few data nodes
* @param numDataNodes the number of data nodes
* @return the cluster state
*/
public static ClusterState state(int numDataNodes) {
DiscoveryNode masterNode = new DiscoveryNode(
DiscoveryNode clusterManagerNode = new DiscoveryNode(
"foo0",
"foo0",
new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
Collections.emptyMap(),
Collections.singleton(MASTER_ROLE),
Collections.singleton(CLUSTER_MANAGER_ROLE),
Version.CURRENT
);
List<DiscoveryNode> allNodes = new ArrayList<>();
allNodes.add(masterNode);
allNodes.add(clusterManagerNode);
for (int i = 1; i <= numDataNodes - 1; i++) {
allNodes
.add(
Expand All @@ -98,6 +103,6 @@ public static ClusterState state(int numDataNodes) {
)
);
}
return state(new ClusterName("test"), masterNode, masterNode, allNodes);
return state(new ClusterName("test"), clusterManagerNode, clusterManagerNode, allNodes);
}
}
7 changes: 5 additions & 2 deletions src/test/java/test/org/opensearch/ad/util/FakeNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,16 @@ public DiscoveryNode discoveryNode() {

public static void connectNodes(FakeNode... nodes) {
List<DiscoveryNode> discoveryNodes = new ArrayList<DiscoveryNode>(nodes.length);
DiscoveryNode master = nodes[0].discoveryNode();
DiscoveryNode clusterManager = nodes[0].discoveryNode();
for (int i = 0; i < nodes.length; i++) {
discoveryNodes.add(nodes[i].discoveryNode());
}

for (FakeNode node : nodes) {
setState(node.clusterService, ClusterCreation.state(new ClusterName("test"), node.discoveryNode(), master, discoveryNodes));
setState(
node.clusterService,
ClusterCreation.state(new ClusterName("test"), node.discoveryNode(), clusterManager, discoveryNodes)
);
}
for (FakeNode nodeA : nodes) {
for (FakeNode nodeB : nodes) {
Expand Down

0 comments on commit fc1a6e2

Please sign in to comment.