Skip to content

Commit

Permalink
Add Bulk Fetch SnapshotInfo API to Repository (#73570)
Browse files Browse the repository at this point in the history
This PR refactors the `Repository` API for fetching `SnapshotInfo` to enabled implementations to optimize for bulk fetching multiple `SnapshotInfo` at once. This is a requirement for making use of a more efficient repository format that does not require loading individual blobs per snapshot to fetch a snapshot listing. Also, by enabling consuming `SnapshotInfo` as they are fetched on the snapshot meta thread this allows for some more memory efficient usage of snapshot listing.
Also, this commit makes use of the new API to make the snapshot status API run a little more parallel if fetching multiple snapshots (though there's additional improvements possible+useful here as far as fetching shard level metadata in parallel).
  • Loading branch information
original-brownbear authored Jun 14, 2021
1 parent 80593b4 commit dbb626a
Show file tree
Hide file tree
Showing 13 changed files with 478 additions and 213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
Expand All @@ -29,12 +28,12 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.repositories.GetSnapshotInfoContext;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.snapshots.SnapshotException;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotMissingException;
Expand All @@ -53,8 +52,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;

import static java.util.Collections.unmodifiableList;
Expand Down Expand Up @@ -327,76 +324,43 @@ private void snapshots(
} else {
snapshotInfos = Collections.synchronizedList(new ArrayList<>());
}
final ActionListener<Collection<Void>> allDoneListener = listener.delegateFailure((l, v) -> {
final ActionListener<Void> allDoneListener = listener.delegateFailure((l, v) -> {
final ArrayList<SnapshotInfo> snapshotList = new ArrayList<>(snapshotInfos);
snapshotList.addAll(snapshotSet);
CollectionUtil.timSort(snapshotList);
listener.onResponse(unmodifiableList(snapshotList));
});
if (snapshotIdsToIterate.isEmpty()) {
allDoneListener.onResponse(Collections.emptyList());
allDoneListener.onResponse(null);
return;
}
// put snapshot info downloads into a task queue instead of pushing them all into the queue to not completely monopolize the
// snapshot meta pool for a single request
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_META).getMax(), snapshotIdsToIterate.size());
final BlockingQueue<SnapshotId> queue = new LinkedBlockingQueue<>(snapshotIdsToIterate);
final ActionListener<Void> workerDoneListener = new GroupedActionListener<>(allDoneListener, workers).delegateResponse((l, e) -> {
queue.clear(); // Stop fetching the remaining snapshots once we've failed fetching one since the response is an error response
// anyway in this case
l.onFailure(e);
});
final Repository repository;
try {
repository = repositoriesService.repository(repositoryName);
} catch (RepositoryMissingException e) {
listener.onFailure(e);
return;
}
for (int i = 0; i < workers; i++) {
getOneSnapshotInfo(ignoreUnavailable, repository, queue, snapshotInfos, task, workerDoneListener);
}
}
repository.getSnapshotInfo(
new GetSnapshotInfoContext(
snapshotIdsToIterate,
ignoreUnavailable == false,
task::isCancelled,
(context, snapshotInfo) -> snapshotInfos.add(snapshotInfo),
ignoreUnavailable ? ActionListener.runAfter(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.trace("done fetching snapshot infos [{}]", snapshotIdsToIterate);
}

/**
* Tries to poll a {@link SnapshotId} to load {@link SnapshotInfo} for from the given {@code queue}. If it finds one in the queue,
* loads the snapshot info from the repository and adds it to the given {@code snapshotInfos} collection, then invokes itself again to
* try and poll another task from the queue.
* If the queue is empty resolves {@code} listener.
*/
private void getOneSnapshotInfo(
boolean ignoreUnavailable,
Repository repository,
BlockingQueue<SnapshotId> queue,
Collection<SnapshotInfo> snapshotInfos,
CancellableTask task,
ActionListener<Void> listener
) {
final SnapshotId snapshotId = queue.poll();
if (snapshotId == null) {
listener.onResponse(null);
return;
}
threadPool.executor(ThreadPool.Names.SNAPSHOT_META).execute(() -> {
if (task.isCancelled()) {
listener.onFailure(new TaskCancelledException("task cancelled"));
return;
}
try {
snapshotInfos.add(repository.getSnapshotInfo(snapshotId));
} catch (Exception ex) {
if (ignoreUnavailable) {
logger.warn(() -> new ParameterizedMessage("failed to get snapshot [{}]", snapshotId), ex);
} else {
listener.onFailure(
ex instanceof SnapshotException
? ex
: new SnapshotException(repository.getMetadata().name(), snapshotId, "Snapshot could not be read", ex)
);
}
}
getOneSnapshotInfo(ignoreUnavailable, repository, queue, snapshotInfos, task, listener);
});
@Override
public void onFailure(Exception e) {
assert false : new AssertionError("listener should always complete successfully for ignoreUnavailable=true", e);
logger.warn("failed to fetch snapshot info for some snapshots", e);
}
}, () -> allDoneListener.onResponse(null)) : allDoneListener
)
);
}

private boolean isAllSnapshots(String[] snapshots) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.node.NodeClient;
Expand All @@ -27,10 +27,10 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.repositories.GetSnapshotInfoContext;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
Expand All @@ -52,6 +52,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -62,6 +63,7 @@
import java.util.stream.Collectors;

import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.cluster.SnapshotsInProgress.ShardState.SUCCESS;

public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<SnapshotsStatusRequest, SnapshotsStatusResponse> {

Expand Down Expand Up @@ -90,7 +92,7 @@ public TransportSnapshotsStatusAction(
SnapshotsStatusRequest::new,
indexNameExpressionResolver,
SnapshotsStatusResponse::new,
ThreadPool.Names.GENERIC
ThreadPool.Names.SAME
);
this.repositoriesService = repositoriesService;
this.client = client;
Expand Down Expand Up @@ -142,13 +144,14 @@ protected void masterOperation(
new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(Strings.EMPTY_ARRAY)).snapshots(snapshots)
.timeout(request.masterNodeTimeout()),
ActionListener.wrap(
nodeSnapshotStatuses -> threadPool.generic()
.execute(
ActionRunnable.wrap(
listener,
l -> buildResponse(snapshotsInProgress, request, currentSnapshots, nodeSnapshotStatuses, cancellableTask, l)
)
),
nodeSnapshotStatuses -> buildResponse(
snapshotsInProgress,
request,
currentSnapshots,
nodeSnapshotStatuses,
cancellableTask,
listener
),
listener::onFailure
)
);
Expand Down Expand Up @@ -192,8 +195,7 @@ private void buildResponse(
SnapshotIndexShardStatus shardStatus = shardStatues.get(shardEntry.key);
if (shardStatus != null) {
// We have full information about this shard
if (shardStatus.getStage() == SnapshotIndexShardStage.DONE
&& shardEntry.value.state() != SnapshotsInProgress.ShardState.SUCCESS) {
if (shardStatus.getStage() == SnapshotIndexShardStage.DONE && shardEntry.value.state() != SUCCESS) {
// Unlikely edge case:
// Data node has finished snapshotting the shard but the cluster state has not yet been updated
// to reflect this. We adjust the status to show up as snapshot metadata being written because
Expand Down Expand Up @@ -286,9 +288,10 @@ private void loadRepositoryData(
ActionListener<SnapshotsStatusResponse> listener
) {
final Set<String> requestedSnapshotNames = Sets.newHashSet(request.snapshots());
final ListenableFuture<RepositoryData> repositoryDataListener = new ListenableFuture<>();
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
repositoriesService.getRepositoryData(repositoryName, repositoryDataListener);
repositoryDataListener.addListener(ActionListener.wrap(repositoryData -> {
final Collection<SnapshotId> snapshotIdsToLoad = new ArrayList<>();
repositoryDataListener.whenComplete(repositoryData -> {
ensureNotCancelled(task);
final Map<String, SnapshotId> matchedSnapshotIds = repositoryData.getSnapshotIds()
.stream()
Expand All @@ -314,73 +317,62 @@ private void loadRepositoryData(
throw new SnapshotMissingException(repositoryName, snapshotName);
}
}
SnapshotInfo snapshotInfo = snapshot(snapshotsInProgress, repositoryName, snapshotId);
List<SnapshotIndexShardStatus> shardStatusBuilder = new ArrayList<>();
if (snapshotInfo.state().completed()) {
Map<ShardId, IndexShardSnapshotStatus> shardStatuses = snapshotShards(
repositoryName,
repositoryData,
task,
snapshotInfo
);
for (Map.Entry<ShardId, IndexShardSnapshotStatus> shardStatus : shardStatuses.entrySet()) {
IndexShardSnapshotStatus.Copy lastSnapshotStatus = shardStatus.getValue().asCopy();
shardStatusBuilder.add(new SnapshotIndexShardStatus(shardStatus.getKey(), lastSnapshotStatus));
}
final SnapshotsInProgress.State state;
switch (snapshotInfo.state()) {
case FAILED:
state = SnapshotsInProgress.State.FAILED;
break;
case SUCCESS:
case PARTIAL:
// Translating both PARTIAL and SUCCESS to SUCCESS for now
// TODO: add the differentiation on the metadata level in the next major release
state = SnapshotsInProgress.State.SUCCESS;
break;
default:
throw new IllegalArgumentException("Unknown snapshot state " + snapshotInfo.state());
}
final long startTime = snapshotInfo.startTime();
final long endTime = snapshotInfo.endTime();
assert endTime >= startTime || (endTime == 0L && snapshotInfo.state().completed() == false)
: "Inconsistent timestamps found in SnapshotInfo [" + snapshotInfo + "]";
builder.add(
new SnapshotStatus(
new Snapshot(repositoryName, snapshotId),
state,
Collections.unmodifiableList(shardStatusBuilder),
snapshotInfo.includeGlobalState(),
startTime,
// Use current time to calculate overall runtime for in-progress snapshots that have endTime == 0
(endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime
)
);
if (snapshotsInProgress.snapshot(new Snapshot(repositoryName, snapshotId)) == null) {
snapshotIdsToLoad.add(snapshotId);
}
}
listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder)));
}, listener::onFailure), threadPool.generic(), null);
}

/**
* Retrieves snapshot from repository
*
* @param snapshotsInProgress snapshots in progress in the cluster state
* @param repositoryName repository name
* @param snapshotId snapshot id
* @return snapshot
* @throws SnapshotMissingException if snapshot is not found
*/
private SnapshotInfo snapshot(SnapshotsInProgress snapshotsInProgress, String repositoryName, SnapshotId snapshotId) {
List<SnapshotsInProgress.Entry> entries = SnapshotsService.currentSnapshots(
snapshotsInProgress,
repositoryName,
Collections.singletonList(snapshotId.getName())
);
if (entries.isEmpty() == false) {
return new SnapshotInfo(entries.iterator().next());
}
return repositoriesService.repository(repositoryName).getSnapshotInfo(snapshotId);
if (snapshotIdsToLoad.isEmpty()) {
listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder)));
} else {
final List<SnapshotStatus> threadSafeBuilder = Collections.synchronizedList(builder);
repositoriesService.repository(repositoryName)
.getSnapshotInfo(new GetSnapshotInfoContext(snapshotIdsToLoad, true, task::isCancelled, (context, snapshotInfo) -> {
List<SnapshotIndexShardStatus> shardStatusBuilder = new ArrayList<>();
final Map<ShardId, IndexShardSnapshotStatus> shardStatuses;
try {
shardStatuses = snapshotShards(repositoryName, repositoryData, task, snapshotInfo);
} catch (Exception e) {
// stops all further fetches of snapshotInfo since context is fail-fast
context.onFailure(e);
return;
}
for (Map.Entry<ShardId, IndexShardSnapshotStatus> shardStatus : shardStatuses.entrySet()) {
IndexShardSnapshotStatus.Copy lastSnapshotStatus = shardStatus.getValue().asCopy();
shardStatusBuilder.add(new SnapshotIndexShardStatus(shardStatus.getKey(), lastSnapshotStatus));
}
final SnapshotsInProgress.State state;
switch (snapshotInfo.state()) {
case FAILED:
state = SnapshotsInProgress.State.FAILED;
break;
case SUCCESS:
case PARTIAL:
// Translating both PARTIAL and SUCCESS to SUCCESS for now
// TODO: add the differentiation on the metadata level in the next major release
state = SnapshotsInProgress.State.SUCCESS;
break;
default:
throw new IllegalArgumentException("Unknown snapshot state " + snapshotInfo.state());
}
final long startTime = snapshotInfo.startTime();
final long endTime = snapshotInfo.endTime();
assert endTime >= startTime || (endTime == 0L && snapshotInfo.state().completed() == false)
: "Inconsistent timestamps found in SnapshotInfo [" + snapshotInfo + "]";
threadSafeBuilder.add(
new SnapshotStatus(
new Snapshot(repositoryName, snapshotInfo.snapshotId()),
state,
Collections.unmodifiableList(shardStatusBuilder),
snapshotInfo.includeGlobalState(),
startTime,
// Use current time to calculate overall runtime for in-progress snapshots that have endTime == 0
(endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime
)
);
}, listener.map(v -> new SnapshotsStatusResponse(List.copyOf(threadSafeBuilder)))));
}
}, listener::onFailure);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public RepositoryMetadata getMetadata() {
}

@Override
public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
return in.getSnapshotInfo(snapshotId);
public void getSnapshotInfo(GetSnapshotInfoContext context) {
in.getSnapshotInfo(context);
}

@Override
Expand Down
Loading

0 comments on commit dbb626a

Please sign in to comment.