Skip to content

Commit

Permalink
Remove changes for legacy mixed-cluster support (#5228)
Browse files Browse the repository at this point in the history
This change removes code that was written to support rolling upgrade from Elasticsearch 7.10.2 to OpenSearch 1.x, as part of PR #865

Signed-off-by: Rabi Panda <[email protected]>

Signed-off-by: Rabi Panda <[email protected]>
  • Loading branch information
adnapibar authored Nov 14, 2022
1 parent 671820d commit b43e985
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
public class RefreshVersionInClusterStateIT extends AbstractRollingTestCase {

/*
This test ensures that after the upgrade from ElasticSearch/ OpenSearch all nodes report the version on and after 1.0.0
* This test ensures that after the upgrade, all nodes report the current version
*/
public void testRefresh() throws IOException {
switch (CLUSTER_TYPE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public class JoinHelper {
this.nodeHealthService = nodeHealthService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
this.nodeCommissioned = nodeCommissioned;
this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService, transportService) {
this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService) {

private final long term = currentTermSupplier.getAsLong();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.persistent.PersistentTasksCustomMetadata;
import org.opensearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -74,7 +73,6 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut

private final Logger logger;
private final RerouteService rerouteService;
private final TransportService transportService;

/**
* Task for the join task executor.
Expand Down Expand Up @@ -127,17 +125,10 @@ public boolean isFinishElectionTask() {
private static final String FINISH_ELECTION_TASK_REASON = "_FINISH_ELECTION_";
}

public JoinTaskExecutor(
Settings settings,
AllocationService allocationService,
Logger logger,
RerouteService rerouteService,
TransportService transportService
) {
public JoinTaskExecutor(Settings settings, AllocationService allocationService, Logger logger, RerouteService rerouteService) {
this.allocationService = allocationService;
this.logger = logger;
this.rerouteService = rerouteService;
this.transportService = transportService;
}

@Override
Expand Down Expand Up @@ -261,9 +252,7 @@ protected ClusterState.Builder becomeClusterManagerAndTrimConflictingNodes(Clust
nodesBuilder.clusterManagerNodeId(currentState.nodes().getLocalNodeId());

for (final Task joinTask : joiningNodes) {
if (joinTask.isBecomeClusterManagerTask()) {
refreshDiscoveryNodeVersionAfterUpgrade(currentNodes, nodesBuilder);
} else if (joinTask.isFinishElectionTask()) {
if (joinTask.isBecomeClusterManagerTask() || joinTask.isFinishElectionTask()) {
// no-op
} else {
final DiscoveryNode joiningNode = joinTask.node();
Expand Down Expand Up @@ -300,20 +289,6 @@ protected ClusterState.Builder becomeClusterManagerAndTrimConflictingNodes(Clust
return ClusterState.builder(allocationService.disassociateDeadNodes(tmpState, false, "removed dead nodes on election"));
}

private void refreshDiscoveryNodeVersionAfterUpgrade(DiscoveryNodes currentNodes, DiscoveryNodes.Builder nodesBuilder) {
// During the upgrade from Elasticsearch, OpenSearch node send their version as 7.10.2 to Elasticsearch master
// in order to successfully join the cluster. But as soon as OpenSearch node becomes the master, cluster state
// should show the OpenSearch nodes version as 1.x. As the cluster state was carry forwarded from ES master,
// version in DiscoveryNode is stale 7.10.2. As soon as OpenSearch node becomes master, it can refresh the
// DiscoveryNodes version and publish the updated state while finishing the election. This helps in atomically
// updating the version of those node which have connection with the new master.
// Note: This should get deprecated with BWC mode logic
if (null == transportService) {
// this logic is only applicable when OpenSearch node is cluster-manager and is noop for zen discovery node
return;
}
}

@Override
public boolean runOnlyOnClusterManager() {
// we validate that we are allowed to change the cluster state during cluster state processing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.Nullable;
import org.opensearch.common.Strings;
import org.opensearch.common.component.AbstractLifecycleComponent;
Expand Down Expand Up @@ -72,7 +71,6 @@
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -793,18 +791,6 @@ public Transport.Connection getConnection(DiscoveryNode node) {
}
}

public Map<String, Version> getChannelVersion(DiscoveryNodes nodes) {
Map<String, Version> nodeChannelVersions = new HashMap<>(nodes.getSize());
for (DiscoveryNode node : nodes) {
try {
nodeChannelVersions.putIfAbsent(node.getId(), connectionManager.getConnection(node).getVersion());
} catch (Exception e) {
// ignore in case node is not connected
}
}
return nodeChannelVersions;
}

public final <T extends TransportResponse> void sendChildRequest(
final DiscoveryNode node,
final String action,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void testUpdatesNodeWithNewRoles() throws Exception {
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);

final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, rerouteService, null);
final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, rerouteService);

final DiscoveryNode clusterManagerNode = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT);

Expand Down Expand Up @@ -270,7 +270,7 @@ public void testJoinFailedForDecommissionedNode() throws Exception {
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);

final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, rerouteService, null);
final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, rerouteService);

final DiscoveryNode clusterManagerNode = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ public IndexMetadata upgradeIndexMetadata(IndexMetadata indexMetadata, Version m
);

nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, (s, p, r) -> {}, transportService);
joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, (s, p, r) -> {});
}

public ClusterState createIndex(ClusterState state, CreateIndexRequest request) {
Expand Down

0 comments on commit b43e985

Please sign in to comment.