Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Bulk Fetch SnapshotInfo API to Repository #73570

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4fb132b
worksish
original-brownbear May 28, 2021
0b2b29f
fix things + docs
original-brownbear May 31, 2021
4d15949
fix concurrency
original-brownbear May 31, 2021
d6f3e3c
adjust
original-brownbear Jun 1, 2021
0ef6f1b
snapshot status API fetch logic
original-brownbear Jun 1, 2021
c02244a
some additonal docs
original-brownbear Jun 1, 2021
9b27b0b
Merge remote-tracking branch 'elastic/master' into more-efficient-sna…
original-brownbear Jun 2, 2021
24de437
fix todo
original-brownbear Jun 2, 2021
97a7082
Merge remote-tracking branch 'elastic/master' into more-efficient-sna…
original-brownbear Jun 2, 2021
749f882
Merge remote-tracking branch 'elastic/master' into more-efficient-sna…
original-brownbear Jun 2, 2021
bc4515e
cleanup
original-brownbear Jun 2, 2021
2b082fd
Merge remote-tracking branch 'elastic/master' into more-efficient-sna…
original-brownbear Jun 2, 2021
f4355fa
Merge remote-tracking branch 'elastic/master' into more-efficient-sna…
original-brownbear Jun 2, 2021
854f443
tests fixed
original-brownbear Jun 2, 2021
7c3c422
Merge remote-tracking branch 'elastic/master' into more-efficient-sna…
original-brownbear Jun 2, 2021
a09ab69
Merge remote-tracking branch 'elastic/master' into more-efficient-sna…
original-brownbear Jun 7, 2021
df02ed9
spotless
original-brownbear Jun 10, 2021
2e7b50c
Merge remote-tracking branch 'elastic/master' into more-efficient-sna…
original-brownbear Jun 10, 2021
666d5f2
Merge remote-tracking branch 'elastic/master' into more-efficient-sna…
original-brownbear Jun 10, 2021
b38e4dc
Merge remote-tracking branch 'elastic/master' into more-efficient-sna…
original-brownbear Jun 14, 2021
fe4e0ce
CR: comments
original-brownbear Jun 14, 2021
b874ef5
CR: comments
original-brownbear Jun 14, 2021
54e9dea
Merge remote-tracking branch 'elastic/master' into more-efficient-sna…
original-brownbear Jun 14, 2021
d875e8c
docs and assertions
original-brownbear Jun 14, 2021
94f44ed
fix ccr
original-brownbear Jun 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
Expand All @@ -29,12 +28,12 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.repositories.GetSnapshotInfoContext;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.snapshots.SnapshotException;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotMissingException;
Expand All @@ -53,8 +52,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;

import static java.util.Collections.unmodifiableList;
Expand Down Expand Up @@ -327,76 +324,43 @@ private void snapshots(
} else {
snapshotInfos = Collections.synchronizedList(new ArrayList<>());
}
final ActionListener<Collection<Void>> allDoneListener = listener.delegateFailure((l, v) -> {
final ActionListener<Void> allDoneListener = listener.delegateFailure((l, v) -> {
final ArrayList<SnapshotInfo> snapshotList = new ArrayList<>(snapshotInfos);
snapshotList.addAll(snapshotSet);
CollectionUtil.timSort(snapshotList);
listener.onResponse(unmodifiableList(snapshotList));
});
if (snapshotIdsToIterate.isEmpty()) {
allDoneListener.onResponse(Collections.emptyList());
allDoneListener.onResponse(null);
return;
}
// put snapshot info downloads into a task queue instead of pushing them all into the queue to not completely monopolize the
// snapshot meta pool for a single request
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_META).getMax(), snapshotIdsToIterate.size());
final BlockingQueue<SnapshotId> queue = new LinkedBlockingQueue<>(snapshotIdsToIterate);
final ActionListener<Void> workerDoneListener = new GroupedActionListener<>(allDoneListener, workers).delegateResponse((l, e) -> {
queue.clear(); // Stop fetching the remaining snapshots once we've failed fetching one since the response is an error response
// anyway in this case
l.onFailure(e);
});
final Repository repository;
try {
repository = repositoriesService.repository(repositoryName);
} catch (RepositoryMissingException e) {
listener.onFailure(e);
return;
}
for (int i = 0; i < workers; i++) {
getOneSnapshotInfo(ignoreUnavailable, repository, queue, snapshotInfos, task, workerDoneListener);
}
}
repository.getSnapshotInfo(
new GetSnapshotInfoContext(
snapshotIdsToIterate,
ignoreUnavailable == false,
task::isCancelled,
(context, snapshotInfo) -> snapshotInfos.add(snapshotInfo),
ignoreUnavailable ? ActionListener.runAfter(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.trace("done fetching snapshot infos [{}]", snapshotIdsToIterate);
}

/**
* Tries to poll a {@link SnapshotId} to load {@link SnapshotInfo} for from the given {@code queue}. If it finds one in the queue,
* loads the snapshot info from the repository and adds it to the given {@code snapshotInfos} collection, then invokes itself again to
* try and poll another task from the queue.
* If the queue is empty resolves {@code} listener.
*/
private void getOneSnapshotInfo(
boolean ignoreUnavailable,
Repository repository,
BlockingQueue<SnapshotId> queue,
Collection<SnapshotInfo> snapshotInfos,
CancellableTask task,
ActionListener<Void> listener
) {
final SnapshotId snapshotId = queue.poll();
if (snapshotId == null) {
listener.onResponse(null);
return;
}
threadPool.executor(ThreadPool.Names.SNAPSHOT_META).execute(() -> {
if (task.isCancelled()) {
listener.onFailure(new TaskCancelledException("task cancelled"));
return;
}
try {
snapshotInfos.add(repository.getSnapshotInfo(snapshotId));
} catch (Exception ex) {
if (ignoreUnavailable) {
logger.warn(() -> new ParameterizedMessage("failed to get snapshot [{}]", snapshotId), ex);
} else {
listener.onFailure(
ex instanceof SnapshotException
? ex
: new SnapshotException(repository.getMetadata().name(), snapshotId, "Snapshot could not be read", ex)
);
}
}
getOneSnapshotInfo(ignoreUnavailable, repository, queue, snapshotInfos, task, listener);
});
@Override
public void onFailure(Exception e) {
assert false : new AssertionError("listener should always complete successfully for ignoreUnavailable=true", e);
logger.warn("failed to fetch snapshot info for some snapshots", e);
}
}, () -> allDoneListener.onResponse(null)) : allDoneListener
)
);
}

private boolean isAllSnapshots(String[] snapshots) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.node.NodeClient;
Expand All @@ -27,10 +27,10 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.repositories.GetSnapshotInfoContext;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
Expand All @@ -52,6 +52,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -62,6 +63,7 @@
import java.util.stream.Collectors;

import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.cluster.SnapshotsInProgress.ShardState.SUCCESS;

public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<SnapshotsStatusRequest, SnapshotsStatusResponse> {

Expand Down Expand Up @@ -90,7 +92,7 @@ public TransportSnapshotsStatusAction(
SnapshotsStatusRequest::new,
indexNameExpressionResolver,
SnapshotsStatusResponse::new,
ThreadPool.Names.GENERIC
ThreadPool.Names.SAME
);
this.repositoriesService = repositoriesService;
this.client = client;
Expand Down Expand Up @@ -142,13 +144,14 @@ protected void masterOperation(
new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(Strings.EMPTY_ARRAY)).snapshots(snapshots)
.timeout(request.masterNodeTimeout()),
ActionListener.wrap(
nodeSnapshotStatuses -> threadPool.generic()
.execute(
ActionRunnable.wrap(
listener,
l -> buildResponse(snapshotsInProgress, request, currentSnapshots, nodeSnapshotStatuses, cancellableTask, l)
)
),
nodeSnapshotStatuses -> buildResponse(
snapshotsInProgress,
request,
currentSnapshots,
nodeSnapshotStatuses,
cancellableTask,
listener
),
listener::onFailure
)
);
Expand Down Expand Up @@ -192,8 +195,7 @@ private void buildResponse(
SnapshotIndexShardStatus shardStatus = shardStatues.get(shardEntry.key);
if (shardStatus != null) {
// We have full information about this shard
if (shardStatus.getStage() == SnapshotIndexShardStage.DONE
&& shardEntry.value.state() != SnapshotsInProgress.ShardState.SUCCESS) {
if (shardStatus.getStage() == SnapshotIndexShardStage.DONE && shardEntry.value.state() != SUCCESS) {
// Unlikely edge case:
// Data node has finished snapshotting the shard but the cluster state has not yet been updated
// to reflect this. We adjust the status to show up as snapshot metadata being written because
Expand Down Expand Up @@ -286,9 +288,10 @@ private void loadRepositoryData(
ActionListener<SnapshotsStatusResponse> listener
) {
final Set<String> requestedSnapshotNames = Sets.newHashSet(request.snapshots());
final ListenableFuture<RepositoryData> repositoryDataListener = new ListenableFuture<>();
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
repositoriesService.getRepositoryData(repositoryName, repositoryDataListener);
repositoryDataListener.addListener(ActionListener.wrap(repositoryData -> {
final Collection<SnapshotId> snapshotIdsToLoad = new ArrayList<>();
repositoryDataListener.whenComplete(repositoryData -> {
ensureNotCancelled(task);
final Map<String, SnapshotId> matchedSnapshotIds = repositoryData.getSnapshotIds()
.stream()
Expand All @@ -314,73 +317,62 @@ private void loadRepositoryData(
throw new SnapshotMissingException(repositoryName, snapshotName);
}
}
SnapshotInfo snapshotInfo = snapshot(snapshotsInProgress, repositoryName, snapshotId);
List<SnapshotIndexShardStatus> shardStatusBuilder = new ArrayList<>();
if (snapshotInfo.state().completed()) {
Map<ShardId, IndexShardSnapshotStatus> shardStatuses = snapshotShards(
repositoryName,
repositoryData,
task,
snapshotInfo
);
for (Map.Entry<ShardId, IndexShardSnapshotStatus> shardStatus : shardStatuses.entrySet()) {
IndexShardSnapshotStatus.Copy lastSnapshotStatus = shardStatus.getValue().asCopy();
shardStatusBuilder.add(new SnapshotIndexShardStatus(shardStatus.getKey(), lastSnapshotStatus));
}
final SnapshotsInProgress.State state;
switch (snapshotInfo.state()) {
case FAILED:
state = SnapshotsInProgress.State.FAILED;
break;
case SUCCESS:
case PARTIAL:
// Translating both PARTIAL and SUCCESS to SUCCESS for now
// TODO: add the differentiation on the metadata level in the next major release
state = SnapshotsInProgress.State.SUCCESS;
break;
default:
throw new IllegalArgumentException("Unknown snapshot state " + snapshotInfo.state());
}
final long startTime = snapshotInfo.startTime();
final long endTime = snapshotInfo.endTime();
assert endTime >= startTime || (endTime == 0L && snapshotInfo.state().completed() == false)
: "Inconsistent timestamps found in SnapshotInfo [" + snapshotInfo + "]";
builder.add(
new SnapshotStatus(
new Snapshot(repositoryName, snapshotId),
state,
Collections.unmodifiableList(shardStatusBuilder),
snapshotInfo.includeGlobalState(),
startTime,
// Use current time to calculate overall runtime for in-progress snapshots that have endTime == 0
(endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime
)
);
if (snapshotsInProgress.snapshot(new Snapshot(repositoryName, snapshotId)) == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure to understand this: we found a snapshot with a matching name in the repository data but we did not found it before in the in progress snapshots, how could it be in snapshotsInProgress?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well spotted :) I think this is an impossible race to run into these days. I'll leave it as is here for now and will open a PR to clean this up from master if possible separately today :)

snapshotIdsToLoad.add(snapshotId);
}
}
listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder)));
}, listener::onFailure), threadPool.generic(), null);
}

/**
* Retrieves snapshot from repository
*
* @param snapshotsInProgress snapshots in progress in the cluster state
* @param repositoryName repository name
* @param snapshotId snapshot id
* @return snapshot
* @throws SnapshotMissingException if snapshot is not found
*/
private SnapshotInfo snapshot(SnapshotsInProgress snapshotsInProgress, String repositoryName, SnapshotId snapshotId) {
List<SnapshotsInProgress.Entry> entries = SnapshotsService.currentSnapshots(
snapshotsInProgress,
repositoryName,
Collections.singletonList(snapshotId.getName())
);
if (entries.isEmpty() == false) {
return new SnapshotInfo(entries.iterator().next());
}
return repositoriesService.repository(repositoryName).getSnapshotInfo(snapshotId);
if (snapshotIdsToLoad.isEmpty()) {
listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder)));
} else {
final List<SnapshotStatus> threadSafeBuilder = Collections.synchronizedList(builder);
repositoriesService.repository(repositoryName)
.getSnapshotInfo(new GetSnapshotInfoContext(snapshotIdsToLoad, true, task::isCancelled, (context, snapshotInfo) -> {
List<SnapshotIndexShardStatus> shardStatusBuilder = new ArrayList<>();
final Map<ShardId, IndexShardSnapshotStatus> shardStatuses;
try {
shardStatuses = snapshotShards(repositoryName, repositoryData, task, snapshotInfo);
} catch (Exception e) {
// stops all further fetches of snapshotInfo since context is fail-fast
context.onFailure(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could turn the GetSnapshotInfoContext's biconsumer into a checked consumer and let `GetSnapshotInfoContext catch exceptions and fail the context? (see other comment below)

return;
}
for (Map.Entry<ShardId, IndexShardSnapshotStatus> shardStatus : shardStatuses.entrySet()) {
IndexShardSnapshotStatus.Copy lastSnapshotStatus = shardStatus.getValue().asCopy();
shardStatusBuilder.add(new SnapshotIndexShardStatus(shardStatus.getKey(), lastSnapshotStatus));
}
final SnapshotsInProgress.State state;
switch (snapshotInfo.state()) {
case FAILED:
state = SnapshotsInProgress.State.FAILED;
break;
case SUCCESS:
case PARTIAL:
// Translating both PARTIAL and SUCCESS to SUCCESS for now
// TODO: add the differentiation on the metadata level in the next major release
state = SnapshotsInProgress.State.SUCCESS;
break;
default:
throw new IllegalArgumentException("Unknown snapshot state " + snapshotInfo.state());
}
final long startTime = snapshotInfo.startTime();
final long endTime = snapshotInfo.endTime();
assert endTime >= startTime || (endTime == 0L && snapshotInfo.state().completed() == false)
: "Inconsistent timestamps found in SnapshotInfo [" + snapshotInfo + "]";
threadSafeBuilder.add(
new SnapshotStatus(
new Snapshot(repositoryName, snapshotInfo.snapshotId()),
state,
Collections.unmodifiableList(shardStatusBuilder),
snapshotInfo.includeGlobalState(),
startTime,
// Use current time to calculate overall runtime for in-progress snapshots that have endTime == 0
(endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime
)
);
}, listener.map(v -> new SnapshotsStatusResponse(List.copyOf(threadSafeBuilder)))));
}
}, listener::onFailure);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public RepositoryMetadata getMetadata() {
}

@Override
public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
return in.getSnapshotInfo(snapshotId);
public void getSnapshotInfo(GetSnapshotInfoContext context) {
in.getSnapshotInfo(context);
}

@Override
Expand Down
Loading