Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
fcofdez committed Jun 28, 2021
1 parent 74c289b commit b80e0e9
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin, RepositoryPlugin, ClusterPlugin {

public static final String CCR_THREAD_POOL_NAME = "ccr";
// Constants have been moved into CcrConstants

public static final String REQUESTED_OPS_MISSING_METADATA_KEY = "es.requested_operations_missing";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2156,7 +2156,6 @@ void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List<String> pa
}

public void testDeprecationWarningsAreEmittedWhenASystemIndexIsAutoFollowed() throws Exception {
// Set up a mock log appender to watch for the log message we expect
final Client client = mock(Client.class);
when(client.getRemoteClusterClient(anyString())).thenReturn(client);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.index.Index;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.xpack.core.ccr.action.ShardFollowTask;

Expand Down Expand Up @@ -56,12 +57,12 @@ public static void getAutoFollowedSystemIndices(Client client, ClusterState stat
final String leaderIndexName = ccrMetadata.get(CcrConstants.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY);
final String remoteCluster = ccrMetadata.get(CcrConstants.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY);

final String followerIndexName = indexMetadata.getIndex().getName();
if (followedLeaderIndexUUIDs.contains(leaderIndexUUID) && isCurrentlyFollowed(persistentTasks, followerIndexName)) {
final Index followerIndex = indexMetadata.getIndex();
if (followedLeaderIndexUUIDs.contains(leaderIndexUUID) && isCurrentlyFollowed(persistentTasks, followerIndex)) {
List<AutoFollowedIndex> autoFollowedIndices =
remoteClusterAutoFollowedIndices.computeIfAbsent(remoteCluster, unused -> new ArrayList<>());

autoFollowedIndices.add(new AutoFollowedIndex(leaderIndexName, followerIndexName));
autoFollowedIndices.add(new AutoFollowedIndex(leaderIndexName, followerIndex.getName()));
}
}

Expand Down Expand Up @@ -118,10 +119,10 @@ public void onFailure(Exception e) {
}
}

private static boolean isCurrentlyFollowed(PersistentTasksCustomMetadata persistentTasks, String indexName) {
private static boolean isCurrentlyFollowed(PersistentTasksCustomMetadata persistentTasks, Index index) {
return persistentTasks != null && persistentTasks.findTasks(ShardFollowTask.NAME, task -> true).stream()
.map(task -> (ShardFollowTask) task.getParams())
.anyMatch(shardFollowTask -> indexName.equals(shardFollowTask.getFollowShardId().getIndexName()));
.anyMatch(shardFollowTask -> index.equals(shardFollowTask.getFollowShardId().getIndex()));
}

private static boolean areShardFollowTasksRunning(PersistentTasksCustomMetadata persistentTasks) {
Expand Down

0 comments on commit b80e0e9

Please sign in to comment.