Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform] prevent assignment to nodes older than 7.4 #48044

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.support.IndicesOptions;
Expand All @@ -33,18 +34,18 @@
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.TransformMessages;
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.elasticsearch.xpack.transform.Transform;
import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.TransformInternalIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.persistence.TransformInternalIndex;
import org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil;

import java.util.ArrayList;
Expand Down Expand Up @@ -100,7 +101,10 @@ public PersistentTasksCustomMetaData.Assignment getAssignment(TransformTaskParam
return new PersistentTasksCustomMetaData.Assignment(null, reason);
}
DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, (node) ->
node.isDataNode() && node.getVersion().onOrAfter(params.getVersion())
node.isDataNode() &&
// see gh#48019 older nodes might not be able to read documents
node.getVersion().onOrAfter(Version.V_7_4_0) &&
node.getVersion().onOrAfter(params.getVersion())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not allow assignment at all in a mixed cluster where a <= 7.3 node is present. If the task is running and somebody makes a stats call, that stats call could be made against a 7.3 node which queries the wrong index and incorrectly says that the transform does not exist.

);
return discoveryNode == null ? NO_NODE_FOUND : new PersistentTasksCustomMetaData.Assignment(discoveryNode.getId(), "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.TransformInternalIndexTests;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.persistence.TransformInternalIndexTests;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -75,7 +75,7 @@ public void testNodeVersionAssignment() {
buildNewFakeTransportAddress(),
Collections.emptyMap(),
Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE),
Version.V_7_2_0))
Version.V_7_4_0))
.add(new DiscoveryNode("current-data-node-with-2-tasks",
buildNewFakeTransportAddress(),
Collections.emptyMap(),
Expand Down Expand Up @@ -122,6 +122,72 @@ transformCheckpointService, mock(SchedulerEngine.class),
equalTo("past-data-node-1"));
}

public void testDoNotSelectOldNodes() {
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addIndices(metaData, routingTable);
PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder()
.addTask("transform-task-1",
TransformTaskParams.NAME,
new TransformTaskParams("transform-task-1", Version.CURRENT, null),
new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-task", ""));

PersistentTasksCustomMetaData pTasks = pTasksBuilder.build();

metaData.putCustom(PersistentTasksCustomMetaData.TYPE, pTasks);

DiscoveryNodes.Builder nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("old-data-node-1",
buildNewFakeTransportAddress(),
Collections.emptyMap(),
Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE),
Version.V_7_2_0))
.add(new DiscoveryNode("current-data-node-with-1-task",
buildNewFakeTransportAddress(),
Collections.emptyMap(),
Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE),
Version.CURRENT))
.add(new DiscoveryNode("non-data-node-1",
buildNewFakeTransportAddress(),
Collections.emptyMap(),
Set.of(DiscoveryNodeRole.MASTER_ROLE),
Version.CURRENT));

ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"))
.nodes(nodes);
csBuilder.routingTable(routingTable.build());
csBuilder.metaData(metaData);

ClusterState cs = csBuilder.build();
Client client = mock(Client.class);
TransformAuditor mockAuditor = mock(TransformAuditor.class);
TransformConfigManager transformsConfigManager = new TransformConfigManager(client, xContentRegistry());
TransformCheckpointService transformCheckpointService = new TransformCheckpointService(client,
transformsConfigManager, mockAuditor);
ClusterSettings cSettings = new ClusterSettings(Settings.EMPTY,
Collections.singleton(TransformTask.NUM_FAILURE_RETRIES_SETTING));
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(cSettings);
when(clusterService.state()).thenReturn(TransformInternalIndexTests.STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE);
TransformPersistentTasksExecutor executor = new TransformPersistentTasksExecutor(client,
transformsConfigManager,
transformCheckpointService, mock(SchedulerEngine.class),
new TransformAuditor(client, ""),
mock(ThreadPool.class),
clusterService,
Settings.EMPTY);

// old-data-node-1 should never be selected
assertThat(executor.getAssignment(new TransformTaskParams("new-task-id", Version.CURRENT, null), cs).getExecutorNode(),
equalTo("current-data-node-with-1-task"));
assertThat(executor.getAssignment(new TransformTaskParams("new-old-task-id", Version.V_7_2_0, null), cs).getExecutorNode(),
equalTo("current-data-node-with-1-task"));
assertThat(executor.getAssignment(new TransformTaskParams("new-old-task-id-2", Version.V_7_2_0, null), cs).getExecutorNode(),
equalTo("current-data-node-with-1-task"));
assertThat(executor.getAssignment(new TransformTaskParams("new-old-task-id-3", Version.V_7_2_0, null), cs).getExecutorNode(),
equalTo("current-data-node-with-1-task"));
}

public void testVerifyIndicesPrimaryShardsAreActive() {
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
Expand Down