Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate class 'MasterService' and create alternative class 'ClusterManagerService' #4022

Merged
merged 8 commits into from
Jul 30, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ public void testDiscoveryStats() throws Exception {
ensureGreen(); // ensures that all events are processed (in particular state recovery fully completed)
assertBusy(
() -> assertThat(
internalCluster().clusterService(internalCluster().getClusterManagerName()).getMasterService().numberOfPendingTasks(),
internalCluster().clusterService(internalCluster().getClusterManagerName())
.getClusterManagerService()
.numberOfPendingTasks(),
equalTo(0)
)
); // see https://github.com/elastic/elasticsearch/issues/24388
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS

// The tasks can be re-ordered, so we need to check out-of-order
Set<String> controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"));
List<PendingClusterTask> pendingClusterTasks = clusterService.getMasterService().pendingTasks();
List<PendingClusterTask> pendingClusterTasks = clusterService.getClusterManagerService().pendingTasks();
assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(10));
assertThat(pendingClusterTasks.get(0).getSource().string(), equalTo("1"));
assertThat(pendingClusterTasks.get(0).isExecuting(), equalTo(true));
Expand All @@ -413,7 +413,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
invoked2.await();

// whenever we test for no tasks, we need to wait since this is a live node
assertBusy(() -> assertTrue("Pending tasks not empty", clusterService.getMasterService().pendingTasks().isEmpty()));
assertBusy(() -> assertTrue("Pending tasks not empty", clusterService.getClusterManagerService().pendingTasks().isEmpty()));
waitNoPendingTasksOnAll();

final CountDownLatch block2 = new CountDownLatch(1);
Expand Down Expand Up @@ -453,7 +453,7 @@ public void onFailure(String source, Exception e) {
}
Thread.sleep(100);

pendingClusterTasks = clusterService.getMasterService().pendingTasks();
pendingClusterTasks = clusterService.getClusterManagerService().pendingTasks();
assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(5));
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
for (PendingClusterTask task : pendingClusterTasks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,9 @@ private boolean validateRequest(final ClusterHealthRequest request, ClusterState
ClusterHealthResponse response = clusterHealth(
request,
clusterState,
clusterService.getMasterService().numberOfPendingTasks(),
clusterService.getClusterManagerService().numberOfPendingTasks(),
allocationService.getNumberOfInFlightFetches(),
clusterService.getMasterService().getMaxTaskWaitTime()
clusterService.getClusterManagerService().getMaxTaskWaitTime()
);
return prepareResponse(request, response, clusterState, indexNameExpressionResolver) == waitCount;
}
Expand All @@ -338,9 +338,9 @@ private ClusterHealthResponse getResponse(
ClusterHealthResponse response = clusterHealth(
request,
clusterState,
clusterService.getMasterService().numberOfPendingTasks(),
clusterService.getClusterManagerService().numberOfPendingTasks(),
allocationService.getNumberOfInFlightFetches(),
clusterService.getMasterService().getMaxTaskWaitTime()
clusterService.getClusterManagerService().getMaxTaskWaitTime()
);
int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver);
boolean valid = (readyCounter == waitFor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ protected void masterOperation(
ActionListener<PendingClusterTasksResponse> listener
) {
logger.trace("fetching pending tasks from cluster service");
final List<PendingClusterTask> pendingTasks = clusterService.getMasterService().pendingTasks();
final List<PendingClusterTask> pendingTasks = clusterService.getClusterManagerService().pendingTasks();
logger.trace("done fetching pending tasks from cluster service");
listener.onResponse(new PendingClusterTasksResponse(pendingTasks));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

package org.opensearch.cluster;

import org.opensearch.cluster.service.MasterService;
import org.opensearch.cluster.service.ClusterManagerService;

import java.util.List;

Expand All @@ -49,15 +49,15 @@ public interface ClusterStateTaskListener {

/**
* called when the task was rejected because the local node is no longer cluster-manager.
* Used only for tasks submitted to {@link MasterService}.
* Used only for tasks submitted to {@link ClusterManagerService}.
*/
default void onNoLongerClusterManager(String source) {
onFailure(source, new NotClusterManagerException("no longer cluster-manager. source: [" + source + "]"));
}

/**
* called when the task was rejected because the local node is no longer cluster-manager.
* Used only for tasks submitted to {@link MasterService}.
* Used only for tasks submitted to {@link ClusterManagerService}.
*
* @deprecated As of 2.1, because supporting inclusive language, replaced by {@link #onNoLongerClusterManager(String)}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterApplier;
import org.opensearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.opensearch.cluster.service.MasterService;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.common.Booleans;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
Expand Down Expand Up @@ -141,7 +141,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final boolean singleNodeDiscovery;
private final ElectionStrategy electionStrategy;
private final TransportService transportService;
private final MasterService masterService;
private final ClusterManagerService clusterManagerService;
private final AllocationService allocationService;
private final JoinHelper joinHelper;
private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
Expand Down Expand Up @@ -191,7 +191,7 @@ public Coordinator(
TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry,
AllocationService allocationService,
MasterService masterService,
ClusterManagerService clusterManagerService,
Supplier<CoordinationState.PersistedState> persistedStateSupplier,
SeedHostsProvider seedHostsProvider,
ClusterApplier clusterApplier,
Expand All @@ -203,15 +203,15 @@ public Coordinator(
) {
this.settings = settings;
this.transportService = transportService;
this.masterService = masterService;
this.clusterManagerService = clusterManagerService;
this.allocationService = allocationService;
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
this.singleNodeDiscovery = DiscoveryModule.isSingleNodeDiscovery(settings);
this.electionStrategy = electionStrategy;
this.joinHelper = new JoinHelper(
settings,
allocationService,
masterService,
clusterManagerService,
transportService,
this::getCurrentTerm,
this::getStateForClusterManagerService,
Expand Down Expand Up @@ -260,7 +260,7 @@ public Coordinator(
);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
this.clusterApplier = clusterApplier;
masterService.setClusterStateSupplier(this::getStateForClusterManagerService);
clusterManagerService.setClusterStateSupplier(this::getStateForClusterManagerService);
this.reconfigurator = new Reconfigurator(settings, clusterSettings);
this.clusterBootstrapService = new ClusterBootstrapService(
settings,
Expand Down Expand Up @@ -310,7 +310,7 @@ private void onLeaderFailure(Exception e) {
private void removeNode(DiscoveryNode discoveryNode, String reason) {
synchronized (mutex) {
if (mode == Mode.LEADER) {
masterService.submitStateUpdateTask(
clusterManagerService.submitStateUpdateTask(
"node-left",
new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, reason),
ClusterStateTaskConfig.build(Priority.IMMEDIATE),
Expand Down Expand Up @@ -757,7 +757,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {
}

private void cleanClusterManagerService() {
masterService.submitStateUpdateTask("clean-up after stepping down as cluster-manager", new LocalClusterUpdateTask() {
clusterManagerService.submitStateUpdateTask("clean-up after stepping down as cluster-manager", new LocalClusterUpdateTask() {
@Override
public void onFailure(String source, Exception e) {
// ignore
Expand Down Expand Up @@ -1129,7 +1129,7 @@ private void scheduleReconfigurationIfNeeded() {
final ClusterState state = getLastAcceptedState();
if (improveConfiguration(state) != state && reconfigurationTaskScheduled.compareAndSet(false, true)) {
logger.trace("scheduling reconfiguration");
masterService.submitStateUpdateTask("reconfigure", new ClusterStateUpdateTask(Priority.URGENT) {
clusterManagerService.submitStateUpdateTask("reconfigure", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public ClusterState execute(ClusterState currentState) {
reconfigurationTaskScheduled.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.MasterService;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.common.Priority;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -106,7 +106,7 @@ public class JoinHelper {
Setting.Property.Deprecated
);

private final MasterService masterService;
private final ClusterManagerService clusterManagerService;
private final TransportService transportService;
private volatile JoinTaskExecutor joinTaskExecutor;

Expand All @@ -122,7 +122,7 @@ public class JoinHelper {
JoinHelper(
Settings settings,
AllocationService allocationService,
MasterService masterService,
ClusterManagerService clusterManagerService,
TransportService transportService,
LongSupplier currentTermSupplier,
Supplier<ClusterState> currentStateSupplier,
Expand All @@ -132,7 +132,7 @@ public class JoinHelper {
RerouteService rerouteService,
NodeHealthService nodeHealthService
) {
this.masterService = masterService;
this.clusterManagerService = clusterManagerService;
this.transportService = transportService;
this.nodeHealthService = nodeHealthService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
Expand Down Expand Up @@ -458,7 +458,7 @@ class LeaderJoinAccumulator implements JoinAccumulator {
public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) {
final JoinTaskExecutor.Task task = new JoinTaskExecutor.Task(sender, "join existing leader");
assert joinTaskExecutor != null;
masterService.submitStateUpdateTask(
clusterManagerService.submitStateUpdateTask(
"node-join",
task,
ClusterStateTaskConfig.build(Priority.URGENT),
Expand Down Expand Up @@ -543,7 +543,7 @@ public void close(Mode newMode) {
pendingAsTasks.put(JoinTaskExecutor.newBecomeClusterManagerTask(), (source, e) -> {});
pendingAsTasks.put(JoinTaskExecutor.newFinishElectionTask(), (source, e) -> {});
joinTaskExecutor = joinTaskExecutorGenerator.get();
masterService.submitStateUpdateTasks(
clusterManagerService.submitStateUpdateTasks(
stateUpdateSource,
pendingAsTasks,
ClusterStateTaskConfig.build(Priority.URGENT),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.threadpool.ThreadPool;

/**
* Main Cluster Manager Node Service
*
* @opensearch.internal
*/
public class ClusterManagerService extends MasterService {
public ClusterManagerService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings, clusterSettings, threadPool);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
* @opensearch.internal
*/
public class ClusterService extends AbstractLifecycleComponent {
private final MasterService masterService;
private final ClusterManagerService clusterManagerService;

private final ClusterApplierService clusterApplierService;

Expand Down Expand Up @@ -93,20 +93,20 @@ public ClusterService(Settings settings, ClusterSettings clusterSettings, Thread
this(
settings,
clusterSettings,
new MasterService(settings, clusterSettings, threadPool),
new ClusterManagerService(settings, clusterSettings, threadPool),
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool)
);
}

public ClusterService(
Settings settings,
ClusterSettings clusterSettings,
MasterService masterService,
ClusterManagerService clusterManagerService,
ClusterApplierService clusterApplierService
) {
this.settings = settings;
this.nodeName = Node.NODE_NAME_SETTING.get(settings);
this.masterService = masterService;
this.clusterManagerService = clusterManagerService;
this.operationRouting = new OperationRouting(settings, clusterSettings);
this.clusterSettings = clusterSettings;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
Expand All @@ -132,18 +132,18 @@ public RerouteService getRerouteService() {
@Override
protected synchronized void doStart() {
clusterApplierService.start();
masterService.start();
clusterManagerService.start();
}

@Override
protected synchronized void doStop() {
masterService.stop();
clusterManagerService.stop();
clusterApplierService.stop();
}

@Override
protected synchronized void doClose() {
masterService.close();
clusterManagerService.close();
clusterApplierService.close();
}

Expand Down Expand Up @@ -228,8 +228,14 @@ public void addLocalNodeMasterListener(LocalNodeMasterListener listener) {
addLocalNodeClusterManagerListener(listener);
}

public ClusterManagerService getClusterManagerService() {
return clusterManagerService;
}

/** @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #getClusterManagerService()} */
@Deprecated
public MasterService getMasterService() {
return masterService;
return clusterManagerService;
}

/**
Expand All @@ -252,7 +258,7 @@ public ClusterApplierService getClusterApplierService() {

public static boolean assertClusterOrClusterManagerStateThread() {
assert Thread.currentThread().getName().contains(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME)
|| Thread.currentThread().getName().contains(MasterService.MASTER_UPDATE_THREAD_NAME)
|| Thread.currentThread().getName().contains(ClusterManagerService.MASTER_UPDATE_THREAD_NAME)
: "not called from the master/cluster state update thread";
return true;
}
Expand Down Expand Up @@ -349,6 +355,6 @@ public <T> void submitStateUpdateTasks(
final ClusterStateTaskConfig config,
final ClusterStateTaskExecutor<T> executor
) {
masterService.submitStateUpdateTasks(source, tasks, config, executor);
clusterManagerService.submitStateUpdateTasks(source, tasks, config, executor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@
* Main Master Node Service
*
* @opensearch.internal
* @deprecated As of 2.2, because supporting inclusive language, replaced by {@link ClusterManagerService}.
*/
@Deprecated
public class MasterService extends AbstractLifecycleComponent {
private static final Logger logger = LogManager.getLogger(MasterService.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.cluster.service.MasterService;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.network.NetworkService;
Expand Down Expand Up @@ -335,8 +335,8 @@ public void apply(Settings value, Settings current, Settings previous) {
IndexModule.NODE_STORE_ALLOW_MMAP,
ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
ClusterService.USER_DEFINED_METADATA,
MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, // deprecated
MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
ClusterManagerService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, // deprecated
ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
package org.opensearch.common.util.concurrent;

import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.cluster.service.MasterService;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.common.Nullable;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Transports;
Expand Down Expand Up @@ -109,7 +109,7 @@ protected boolean blockingAllowed() {
return Transports.assertNotTransportThread(BLOCKING_OP_REASON)
&& ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON)
&& ClusterApplierService.assertNotClusterStateUpdateThread(BLOCKING_OP_REASON)
&& MasterService.assertNotMasterUpdateThread(BLOCKING_OP_REASON);
&& ClusterManagerService.assertNotMasterUpdateThread(BLOCKING_OP_REASON);
}

@Override
Expand Down
Loading