-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CCR] Added history uuid validation #33546
[CCR] Added history uuid validation #33546
Conversation
For correctness we need to verify whether the history uuid of the leader index shards never changes while that index is being followed. * The history UUIDs are recorded as custom index metadata in the follow index. * The follow api validates whether the current history UUIDs of the leader index shards are the same as the recorded history UUIDs. If not the follow api fails. * While a follow index is following a leader index; shard follow tasks on each shard changes api call verify whether their current history uuid is the same as the recorded history uuid. Relates to elastic#30086
Pinging @elastic/es-distributed |
…hecker to avoid duplication of logic
@martijnvg It seems my latest push from #33496 introduced some merge conflicts here. Would you resolve them? |
for (Map.Entry<Integer, String> entry : leaderIndexHistoryUUID.entrySet()) { | ||
String recordedLeaderIndexHistoryUUID = recordedHistoryUUIDs.get(entry.getKey()); | ||
if (entry.getValue().equals(recordedLeaderIndexHistoryUUID) == false) { | ||
throw new IllegalArgumentException("follow index [" + request.followerIndex + "] should reference [" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add some info as to how this can happen? (restore from snapshot is a likely cause, I think)
|
||
CheckedConsumer<IndicesStatsResponse, Exception> indicesStatsHandler = indicesStatsResponse -> { | ||
IndexStats indexStats = indicesStatsResponse.getIndices().get(leaderIndex); | ||
Map<Integer, String> historyUUIDs = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
random drive by comment - we need to make sure we got history uuids for all the shards and a map data structure (compared to say, an array) hides it. Also - can we assert that the history uuids of all the shard copies that we got are identical?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this would be clearer with a String[]
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So then the index of the array indicates to which shard a historyUUID belongs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So then the index of the array indicates to which shard a historyUUID belongs?
Correct. +1.
@@ -82,6 +82,8 @@ | |||
public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin { | |||
|
|||
public static final String CCR_THREAD_POOL_NAME = "ccr"; | |||
public static final String CCR_CUSTOM_METADATA_KEY = "ccr"; | |||
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY = "leader_index_history_uuid"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The history UUID is per shard (as is clear you understand based on your implementation), so I think the name is misleading.
createFollowerIndex(leaderIndexMetadata, request, listener); | ||
final String leaderIndex = request.getFollowRequest().getLeaderIndex(); | ||
final IndexMetaData leaderIndexMetadata = state.getMetaData().index(leaderIndex); | ||
Consumer<Map<Integer, String>> handler = historyUUID -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think historyUUIDs
would be clearer to indicate there is one per shard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martijnvg This looks good. I left some comments.
@@ -278,6 +278,11 @@ protected void onOperationsFetched(Translog.Operation[] operations) { | |||
|
|||
synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) { | |||
onOperationsFetched(response.getOperations()); | |||
if (params.getRecordedLeaderIndexHistoryUUID().equals(response.getHistoryUUID()) == false) { | |||
markAsFailed(new IllegalStateException("unexpected history uuid, expected [" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about passing historyUUID to the Request then let IndexShard#newChangesSnapshot
validate the historyUUID before creating a changes snapshot? I think it's less error-prone than examining the Response.
|
||
CheckedConsumer<IndicesStatsResponse, Exception> indicesStatsHandler = indicesStatsResponse -> { | ||
IndexStats indexStats = indicesStatsResponse.getIndices().get(leaderIndex); | ||
Map<Integer, String> historyUUIDs = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So then the index of the array indicates to which shard a historyUUID belongs?
Correct. +1.
@@ -568,4 +598,16 @@ private static Settings filter(Settings originalSettings) { | |||
return settings.build(); | |||
} | |||
|
|||
private static Map<Integer, String> convert(Map<String, String> ccrMetadata) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we name this method a bit less abstract? extractIndexShardHistoryUUIDs
?
* @param leaderIndex the name of the leader index | ||
* @param onFailure the failure consumer | ||
* @param historyUUIDConsumer the leader index history uuid and consumer | ||
* @param <T> the type of response the listener is waiting for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if we need to make this a generic method?
@@ -82,6 +82,8 @@ | |||
public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin { | |||
|
|||
public static final String CCR_THREAD_POOL_NAME = "ccr"; | |||
public static final String CCR_CUSTOM_METADATA_KEY = "ccr"; | |||
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS = "leader_index_shard_history_uuids"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make SHARDS singular? CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS?
@dnhatn Thanks for reviewing. I've updated this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @martijnvg. I left some comments around how we handle the corner cases for commit stats.
String[] historyUUIDs = new String[leaderIndexMetaData.getNumberOfShards()]; | ||
for (IndexShardStats indexShardStats : indexStats) { | ||
for (ShardStats shardStats : indexShardStats) { | ||
CommitStats commitStats = shardStats.getCommitStats(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commitStats
might be null if a shard is (being) closed.
CommitStats commitStats = shardStats.getCommitStats(); | ||
String historyUUID = commitStats.getUserData().get(Engine.HISTORY_UUID_KEY); | ||
ShardId shardId = shardStats.getShardRouting().shardId(); | ||
historyUUIDs[shardId.id()] = historyUUID; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might have missed Boaz's comment:
Also - can we assert that the history uuids of all the shard copies that we got are identical
Moreover, not every shard is allocated or associated with a historyUUID. Should we fail if there is no historyUUID for a shardId?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops, I will add the checks.
Moreover, not every shard is allocated or associated with a historyUUID
In what cases does a shard does not have a historyUUID?
can we assert that the history uuids of all the shard copies that we got are identical
In what cases are history uuids not unique between shards?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In what cases does a shard does not have a historyUUID?
I see, not yet started shards have no history uuid, which is more likely for replica shards.
historyUUIDConsumer.accept(historyUUIDs); | ||
}; | ||
IndicesStatsRequest request = new IndicesStatsRequest(); | ||
request.indices(leaderIndex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can "clear" all flags to reduce this stat request.
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java
Outdated
Show resolved
Hide resolved
* | ||
* @param client the client | ||
* @param clusterAlias the remote cluster alias | ||
* @param leaderIndex the name of the leader index | ||
* @param onFailure the failure consumer | ||
* @param leaderIndexMetadataConsumer the leader index metadata consumer | ||
* @param consumer the consumer for supplying the leader index metadata and historyUUIDs of all leader shards |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can re-indent the javadocs.
ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0), | ||
new ShardId("leader_index", "", 0), between(1, 64), between(1, 8), Long.MAX_VALUE, between(1, 4), 10240, | ||
TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap()); | ||
ShardFollowTask params = new ShardFollowTask( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a simple unit test showing that if the request's historyUUID is mismatched, no operation is received and we abort the follow-task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a test here: 9620d7b
I added a method to ESIndexLevelReplicationTestCase
so that I can reinit the primary shard. Which then allowed me to change the history uuid and restart the shard in the test. I think this an okay test approach, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am ok with this approach. How about just changing a historyUUID in the requests of the follow-task without really bootstrapping new history on the leader?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about just changing a historyUUID in the requests of the follow-task without really bootstrapping new history on the leader?
That would change the recorded history uuid, which is immutable in non test scenarios. It would trigger the same error to be thrown, but it feels reverse to me. In this test now the current history uuid changes, which I think is more realistic.
in case of history uuid changed.
@dnhatn I've updated the PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martijnvg I left some more comments around the commit stats.
onFailure.accept(new IllegalArgumentException("leader index's commit stats are missing")); | ||
return; | ||
} | ||
String historyUUID = commitStats.getUserData().get(Engine.HISTORY_UUID_KEY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Martijn, I am sorry. I should have been clearer here:
-
If a commit stats is not null, it should have a valid history UUID. We can remove the null check
historyUUID == null
. -
If a primary is unassigned, the index_shard_stats of that primary is not returned in the response; thus we won't have a history UUID for that shardId in the array. I think we should check that every entry in the historyUUIDs array is not null; otherwise, we should fail the request. WDYT?
Please note the assertion assert new HashSet<>(Arrays.asList(historyUUIDs)).size() == leaderIndexMetaData.getNumberOfShards();
does not guarantee that every entry is non-null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should check that every entry in the historyUUIDs array is not null; otherwise, we should fail the request. WDYT?
Agreed
Please note the assertion assert new HashSet<>(Arrays.asList(historyUUIDs)).size() == leaderIndexMetaData.getNumberOfShards(); does not guarantee that every entry is non-null.
Good point. I will check each entry individually.
@dnhatn I think I resolved the comment around commit stats. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @martijnvg for extra iterations.
|
||
assertBusy(() -> { | ||
assertThat(shardFollowTask.isStopped(), is(true)); | ||
assertThat(shardFollowTask.getFailure().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why this test fails on my machine.
java.lang.AssertionError:
Expected: "unexpected history uuid, expected [_x3AmMhzQsa91ee-ah9Aug], actual [ZWUMypAWSEq9P3BEFIAGfA]"
but: was "retrying failed [11] times, aborting..."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that is because it took to long to re-initialise the leader primary shard. Currently we retry up to 10 times and then fail. This will change when I merge #33371, which I will do before merging this PR and then this failure will not occur.
@elasticmachine run gradle build tests |
For correctness we need to verify whether the history uuid of the leader index shards never changes while that index is being followed. * The history UUIDs are recorded as custom index metadata in the follow index. * The follow api validates whether the current history UUIDs of the leader index shards are the same as the recorded history UUIDs. If not the follow api fails. * While a follow index is following a leader index; shard follow tasks on each shard changes api call verify whether their current history uuid is the same as the recorded history uuid. Relates to #30086 Co-authored-by: Nhat Nguyen <[email protected]>
For correctness we need to verify whether the history uuid of the leader
index shards never changes while that index is being followed.
index shards are the same as the recorded history UUIDs.
If not the follow api fails.
on each shard changes api call verify whether their current history uuid
is the same as the recorded history uuid.
Relates to #30086