-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Bulk Fetch SnapshotInfo API to Repository #73570
Add Bulk Fetch SnapshotInfo API to Repository #73570
Conversation
Pinging @elastic/es-distributed (Team:Distributed) |
With work to make repo APIs more async incoming in elastic#73570 we need a non-blocking way to run this check. This adds that async check and removes the need to manually pass executors around as well.
server/src/main/java/org/elasticsearch/repositories/GetSnapshotInfoContext.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/repositories/GetSnapshotInfoContext.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/repositories/GetSnapshotInfoContext.java
Outdated
Show resolved
Hide resolved
(endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime | ||
) | ||
); | ||
if (snapshotsInProgress.snapshot(new Snapshot(repositoryName, snapshotId)) == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure to understand this: we found a snapshot with a matching name in the repository data but we did not found it before in the in progress snapshots, how could it be in snapshotsInProgress
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well spotted :) I think this is an impossible race to run into these days. I'll leave it as is here for now and will open a PR to clean this up from master
if possible separately today :)
public void getSnapshotInfo(GetSnapshotInfoContext context) { | ||
// put snapshot info downloads into a task queue instead of pushing them all into the queue to not completely monopolize the | ||
// snapshot meta pool for a single request | ||
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_META).getMax(), context.snapshotIds().size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe check context.done() and context.isCancelled() first here too and return directly? This would avoid to queue runnables in the snapshot meta thread pool for nothing (yet it is unlikely to happen)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I figured that wasn't really worth it since we haven't forked yet. The chance of a concurrent cancellation is almost 0 and the cannot have been failed yet concurrently ever I think.
} else { | ||
final List<SnapshotStatus> threadSafeBuilder = Collections.synchronizedList(builder); | ||
repositoriesService.repository(repositoryName) | ||
.getSnapshotInfo(new GetSnapshotInfoContext(snapshotIdsToLoad, true, () -> false, (context, snapshotInfo) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe pass task::isCancelled
instead of () -> false
? The task cancellation is verified also in snapshotShards()
few lines after but maybe we could be more explicit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++ right, this PR was opened before the status action became cancellable, will fix :)
shardStatuses = snapshotShards(repositoryName, repositoryData, task, snapshotInfo); | ||
} catch (Exception e) { | ||
// stops all further fetches of snapshotInfo since context is fail-fast | ||
context.onFailure(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could turn the GetSnapshotInfoContext
's biconsumer into a checked consumer and let `GetSnapshotInfoContext catch exceptions and fail the context? (see other comment below)
* {@link BiConsumer} invoked for each {@link SnapshotInfo} that is fetched with this instance and the {@code SnapshotInfo} as | ||
* arguments. | ||
*/ | ||
private final BiConsumer<GetSnapshotInfoContext, SnapshotInfo> onSnapshotInfo; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this could be a CheckedConsumer<SnapshotInfo, IOException> onSnapshotInfo
and handle failure in case of exception directly in this class rather than the consumer holds a reference to the context in order to be able to fail it.
(Unless this is required by an upcoming change?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Unless this is required by an upcoming change?)
Yes I think I'd like the flexibility for upcoming changes. The fact that we currently fetch the shard level metadata on the thread that resolves the snapshot info listener is not supposed to stick around as a solution so we need this mechanism to fail a forking callback here.
} catch (NoSuchFileException ex) { | ||
failure = new SnapshotMissingException(metadata.name(), snapshotId, ex); | ||
} catch (IOException | NotXContentException ex) { | ||
failure = new SnapshotException(metadata.name(), snapshotId, "failed to get snapshots", ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
failure = new SnapshotException(metadata.name(), snapshotId, "failed to get snapshots", ex); | |
failure = new SnapshotException(metadata.name(), snapshotId, "failed to get snapshot info for " + snapshotId, ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to add the snapshot id to the message IMO, we already have it as a field on the snapshot exception and that will be serialized. But it should be singular here in the message I think :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not much to add over Tanguy's thorough review, just a few comments on habitability.
BiConsumer<GetSnapshotInfoContext, SnapshotInfo> onSnapshotInfo, | ||
ActionListener<Void> listener | ||
) { | ||
assert snapshotIds.isEmpty() == false : "no snapshot ids to fetch given"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we should either forbid this in production too (e.g. throw an IllegalArgumentException
) or else just allow it and deal with the n=0
case ourselves. I'd rather the latter I think but either way is ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went with throwing for now. I think we shouldn't create the case where we pass an empty list to this API, just like we don't allow a 0 sized GroupedActionListener
, otherwise we'd have to resolve the listener in the constructor right away I guess, which I find confusing.
server/src/main/java/org/elasticsearch/repositories/GetSnapshotInfoContext.java
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/repositories/Repository.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java
Outdated
Show resolved
Hide resolved
test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java
Show resolved
Hide resolved
Thanks Tanguy + David! I think I addressed all comments now and this should be good for another round :) |
@@ -182,8 +185,15 @@ public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { | |||
ArrayList<String> indices = new ArrayList<>(indicesMap.size()); | |||
indicesMap.keysIt().forEachRemaining(indices::add); | |||
|
|||
return new SnapshotInfo(snapshotId, indices, new ArrayList<>(metadata.dataStreams().keySet()), Collections.emptyList(), | |||
response.getState().getNodes().getMaxNodeVersion(), SnapshotState.SUCCESS | |||
context.onResponse( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh oops I just saw this. Think we need to take steps to put this on the SNAPSHOT_META
thread now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timing #73570 (comment) :D on it shortly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 94f44ed now
Ah damn the new assertion breaks the CCR repo which resolves the listener straight away, will fix in ~30 minutes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM; I haven't reviewed the changes in TransportSnapshotsStatusAction
terribly deeply, they look roughly ok but I'd have to spend more time on them to be sure and it looks like Tanguy has already done that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the extra iterations
Thanks Tanguy + David! |
Pagination and snapshots for get snapshots API, build on top of the current implementation to enable work that needs this API for testing. A follow-up will leverage the changes to make things more efficient via pagination. Relates #73570 which does part of the under-the-hood changes required to efficiently implement this API on the repository layer.
) Backport of the recently introduced snapshot pagination and scalability improvements listed below. Merged as a single backport because the `7.x` and master snapshot status API logic had massively diverged between master and 7.x. With the work in the below PRs, the logic in master and 7.x once again has been aligned very closely again. #72842 #73172 #73199 #73570 #73952 #74236 #74451 (this one is only partly applicable as it was mainly a change to master to align `master` and `7.x` branches)
This PR refactors the
Repository
API for fetchingSnapshotInfo
to enabled implementations to optimize for bulk fetching multipleSnapshotInfo
at once. This is a requirement for making use of a more efficient repository format that does not require loading individual blobs per snapshot to fetch a snapshot listing. Also, by enabling consumingSnapshotInfo
as they are fetched on the snapshot meta thread this allows for some more memory efficient usage of snapshot listing.Also, this commit makes use of the new API to make the snapshot status API run a little more parallel if fetching multiple snapshots (though there's additional improvements possible+useful here as far as fetching shard level metadata in parallel).