Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix shard follow task cleaner under security #52347

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions x-pack/plugin/ccr/qa/security/follower-roles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,10 @@ ccruser:
- read
- write
- manage_follow_index
- names: [ 'clean-follower' ]
privileges:
- monitor
- read
- write
- delete_index
- manage_follow_index
2 changes: 1 addition & 1 deletion x-pack/plugin/ccr/qa/security/leader-roles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ ccruser:
cluster:
- read_ccr
indices:
- names: [ 'allowed-index', 'forget-leader', 'logs-eu-*' ]
- names: [ 'allowed-index', 'clean-leader', 'forget-leader', 'logs-eu-*' ]
privileges:
- monitor
- read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,32 @@ public void testForgetFollower() throws IOException {
}
}

public void testCleanShardFollowTaskAfterDeleteFollower() throws Exception {
final String cleanLeader = "clean-leader";
final String cleanFollower = "clean-follower";
if ("leader".equals(targetCluster)) {
logger.info("running against leader cluster");
final Settings indexSettings = Settings.builder()
.put("index.number_of_replicas", 0)
.put("index.number_of_shards", 1)
.put("index.soft_deletes.enabled", true)
.build();
createIndex(cleanLeader, indexSettings);
} else {
logger.info("running against follower cluster");
followIndex(client(), "leader_cluster", cleanLeader, cleanFollower);

final Request request = new Request("DELETE", "/" + cleanFollower);
final Response response = client().performRequest(request);
assertOK(response);
// the shard follow task should have been cleaned up on behalf of the user, see ShardFollowTaskCleaner
assertBusy(() -> {
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
List<?> tasks = (List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState);
assertThat(tasks.size(), equalTo(0));
assertThat(countCcrNodeTasks(), equalTo(0));
});
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public void clusterChanged(final ClusterChangedEvent event) {
CompletionPersistentTaskAction.Request request =
new CompletionPersistentTaskAction.Request(persistentTask.getId(), persistentTask.getAllocationId(), infe);
threadPool.generic().submit(() -> {
/*
* We are executing under the system context, on behalf of the user to clean up the shard follow task after the follower
* index was deleted. This is why the system role includes the privilege for persistent task completion.
*/
assert threadPool.getThreadContext().isSystemContext();
client.execute(CompletionPersistentTaskAction.INSTANCE, request, new ActionListener<>() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseSyncAction;
import org.elasticsearch.persistent.CompletionPersistentTaskAction;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.xpack.core.security.support.Automatons;

Expand All @@ -33,7 +34,8 @@ public final class SystemPrivilege extends Privilege {
RetentionLeaseActions.Add.ACTION_NAME + "*", // needed for CCR to add retention leases
RetentionLeaseActions.Remove.ACTION_NAME + "*", // needed for CCR to remove retention leases
RetentionLeaseActions.Renew.ACTION_NAME + "*", // needed for CCR to renew retention leases
"indices:admin/settings/update" // needed for DiskThresholdMonitor.markIndicesReadOnly
"indices:admin/settings/update", // needed for DiskThresholdMonitor.markIndicesReadOnly
CompletionPersistentTaskAction.NAME // needed for ShardFollowTaskCleaner
);

private static final Predicate<String> PREDICATE = (action) -> {
Expand Down