Skip to content

Commit

Permalink
Improve snapshot creation and deletion performance on repositories wi…
Browse files Browse the repository at this point in the history
…th large number of snapshots

 Fixes elastic#8958
  • Loading branch information
imotov committed Jan 23, 2015
1 parent c617af3 commit ee3c3d1
Show file tree
Hide file tree
Showing 4 changed files with 385 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.snapshots.blobstore;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
Expand All @@ -40,6 +41,7 @@
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.Lucene;
Expand Down Expand Up @@ -97,6 +99,10 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements

private static final String SNAPSHOT_PREFIX = "snapshot-";

private static final String SNAPSHOT_INDEX_PREFIX = "index-";

private static final String SNAPSHOT_TEMP_PREFIX = "pending-";

@Inject
public BlobStoreIndexShardRepository(Settings settings, RepositoryName repositoryName, IndicesService indicesService, ClusterService clusterService) {
super(settings);
Expand Down Expand Up @@ -288,7 +294,9 @@ public void delete() {
throw new IndexShardSnapshotException(shardId, "Failed to list content of gateway", e);
}

BlobStoreIndexShardSnapshots snapshots = buildBlobStoreIndexShardSnapshots(blobs);
Tuple<BlobStoreIndexShardSnapshots, Integer> tuple = buildBlobStoreIndexShardSnapshots(blobs);
BlobStoreIndexShardSnapshots snapshots = tuple.v1();
int fileListGeneration = tuple.v2();

String commitPointName = snapshotBlobName(snapshotId);

Expand All @@ -300,13 +308,13 @@ public void delete() {

// delete all files that are not referenced by any commit point
// build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
List<BlobStoreIndexShardSnapshot> newSnapshotsList = Lists.newArrayList();
for (BlobStoreIndexShardSnapshot point : snapshots) {
List<SnapshotFiles> newSnapshotsList = Lists.newArrayList();
for (SnapshotFiles point : snapshots) {
if (!point.snapshot().equals(snapshotId.getSnapshot())) {
newSnapshotsList.add(point);
}
}
cleanup(newSnapshotsList, blobs);
finalize(newSnapshotsList, fileListGeneration + 1, blobs);
}

/**
Expand All @@ -323,23 +331,48 @@ public BlobStoreIndexShardSnapshot loadSnapshot() {
}

/**
* Removes all unreferenced files from the repository
* Removes all unreferenced files from the repository and writes new index file
*
* @param snapshots list of active snapshots in the container
* @param blobs list of blobs in the container
*/
protected void cleanup(List<BlobStoreIndexShardSnapshot> snapshots, ImmutableMap<String, BlobMetaData> blobs) {
protected void finalize(List<SnapshotFiles> snapshots, int fileListGeneration, ImmutableMap<String, BlobMetaData> blobs) {
BlobStoreIndexShardSnapshots newSnapshots = new BlobStoreIndexShardSnapshots(snapshots);
String newSnapshotIndexName = SNAPSHOT_INDEX_PREFIX + fileListGeneration;
// If we deleted all snapshots - we don't need to create the index file
if (snapshots.size() > 0) {
try (OutputStream output = blobContainer.createOutput(SNAPSHOT_TEMP_PREFIX + fileListGeneration)) {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, output);
newSnapshots.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.flush();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write file list", e);
}
try {
blobContainer.move(SNAPSHOT_TEMP_PREFIX + fileListGeneration, newSnapshotIndexName);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to rename file list", e);
}
}

// now go over all the blobs, and if they don't exists in a snapshot, delete them
for (String blobName : blobs.keySet()) {
if (!blobName.startsWith("__")) {
continue;
}
if (newSnapshots.findNameFile(FileInfo.canonicalName(blobName)) == null) {
try {
blobContainer.deleteBlob(blobName);
} catch (IOException e) {
logger.debug("[{}] [{}] error deleting blob [{}] during cleanup", e, snapshotId, shardId, blobName);
// delete old file lists
if (blobName.startsWith(SNAPSHOT_TEMP_PREFIX) || blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) {
if (!newSnapshotIndexName.equals(blobName)) {
try {
blobContainer.deleteBlob(blobName);
} catch (IOException e) {
logger.debug("[{}] [{}] error deleting blob [{}] during cleanup", e, snapshotId, shardId, blobName);
}
}
} else if (blobName.startsWith("__")) {
if (newSnapshots.findNameFile(FileInfo.canonicalName(blobName)) == null) {
try {
blobContainer.deleteBlob(blobName);
} catch (IOException e) {
logger.debug("[{}] [{}] error deleting blob [{}] during cleanup", e, snapshotId, shardId, blobName);
}
}
}
}
Expand Down Expand Up @@ -386,20 +419,45 @@ protected long findLatestFileNameGeneration(ImmutableMap<String, BlobMetaData> b
* @param blobs list of blobs in repository
* @return BlobStoreIndexShardSnapshots
*/
protected BlobStoreIndexShardSnapshots buildBlobStoreIndexShardSnapshots(ImmutableMap<String, BlobMetaData> blobs) {
List<BlobStoreIndexShardSnapshot> snapshots = Lists.newArrayList();
protected Tuple<BlobStoreIndexShardSnapshots, Integer> buildBlobStoreIndexShardSnapshots(ImmutableMap<String, BlobMetaData> blobs) {
int latest = -1;
for (String name : blobs.keySet()) {
if (name.startsWith(SNAPSHOT_INDEX_PREFIX)) {
try {
int gen = Integer.parseInt(name.substring(SNAPSHOT_INDEX_PREFIX.length()));
if (gen > latest) {
latest = gen;
}
} catch (NumberFormatException ex) {
logger.warn("failed to parse index file name [{}]", name);
}
}
}
if (latest >= 0) {
try (InputStream stream = blobContainer.openInput(SNAPSHOT_INDEX_PREFIX + latest)) {
try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(stream)) {
parser.nextToken();
return new Tuple<>(BlobStoreIndexShardSnapshots.fromXContent(parser), latest);
}
} catch (IOException e) {
logger.warn("failed to read index file [{}]", e, SNAPSHOT_INDEX_PREFIX + latest);
}
}


List<SnapshotFiles> snapshots = Lists.newArrayList();
for (String name : blobs.keySet()) {
if (name.startsWith(SNAPSHOT_PREFIX)) {
try (InputStream stream = blobContainer.openInput(name)) {
snapshots.add(readSnapshot(stream));
BlobStoreIndexShardSnapshot snapshot = readSnapshot(stream);
snapshots.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()));
} catch (IOException e) {
logger.warn("failed to read commit point [{}]", e, name);
}
}
}
return new BlobStoreIndexShardSnapshots(snapshots);
return new Tuple<>(new BlobStoreIndexShardSnapshots(snapshots), -1);
}

}

/**
Expand Down Expand Up @@ -443,7 +501,9 @@ public void snapshot(SnapshotIndexCommit snapshotIndexCommit) {
}

long generation = findLatestFileNameGeneration(blobs);
BlobStoreIndexShardSnapshots snapshots = buildBlobStoreIndexShardSnapshots(blobs);
Tuple<BlobStoreIndexShardSnapshots, Integer> tuple = buildBlobStoreIndexShardSnapshots(blobs);
BlobStoreIndexShardSnapshots snapshots = tuple.v1();
int fileListGeneration = tuple.v2();

final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<>();
final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles = newArrayList();
Expand All @@ -465,31 +525,35 @@ public void snapshot(SnapshotIndexCommit snapshotIndexCommit) {
}
logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName);
final StoreFileMetaData md = metadata.get(fileName);
boolean snapshotRequired = false;
BlobStoreIndexShardSnapshot.FileInfo fileInfo = snapshots.findPhysicalIndexFile(fileName);
try {
// in 1.3.3 we added additional hashes for .si / segments_N files
// to ensure we don't double the space in the repo since old snapshots
// don't have this hash we try to read that hash from the blob store
// in a bwc compatible way.
maybeRecalculateMetadataHash(blobContainer, fileInfo, metadata);
} catch (Throwable e) {
logger.warn("{} Can't calculate hash from blob for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata());
}
if (fileInfo == null || !fileInfo.isSame(md) || !snapshotFileExistsInBlobs(fileInfo, blobs)) {
// commit point file does not exists in any commit point, or has different length, or does not fully exists in the listed blobs
snapshotRequired = true;
FileInfo existingFileInfo = null;
ImmutableList<FileInfo> filesInfo = snapshots.findPhysicalIndexFiles(fileName);
if (filesInfo != null) {
for (FileInfo fileInfo : filesInfo) {
try {
// in 1.3.3 we added additional hashes for .si / segments_N files
// to ensure we don't double the space in the repo since old snapshots
// don't have this hash we try to read that hash from the blob store
// in a bwc compatible way.
maybeRecalculateMetadataHash(blobContainer, fileInfo, metadata);
} catch (Throwable e) {
logger.warn("{} Can't calculate hash from blob for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata());
}
if (fileInfo.isSame(md) && snapshotFileExistsInBlobs(fileInfo, blobs)) {
// commit point file does not exists in any commit point, or has different length, or does not fully exists in the listed blobs
existingFileInfo = fileInfo;
break;
}
}
}

if (snapshotRequired) {
if (existingFileInfo == null) {
indexNumberOfFiles++;
indexTotalFilesSize += md.length();
// create a new FileInfo
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = new BlobStoreIndexShardSnapshot.FileInfo(fileNameFromGeneration(++generation), md, chunkSize);
indexCommitPointFiles.add(snapshotFileInfo);
filesToSnapshot.add(snapshotFileInfo);
} else {
indexCommitPointFiles.add(fileInfo);
indexCommitPointFiles.add(existingFileInfo);
}
}

Expand Down Expand Up @@ -524,12 +588,12 @@ public void snapshot(SnapshotIndexCommit snapshotIndexCommit) {

// delete all files that are not referenced by any commit point
// build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
List<BlobStoreIndexShardSnapshot> newSnapshotsList = Lists.newArrayList();
newSnapshotsList.add(snapshot);
for (BlobStoreIndexShardSnapshot point : snapshots) {
List<SnapshotFiles> newSnapshotsList = Lists.newArrayList();
newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()));
for (SnapshotFiles point : snapshots) {
newSnapshotsList.add(point);
}
cleanup(newSnapshotsList, blobs);
finalize(newSnapshotsList, fileListGeneration + 1, blobs);
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE);
} finally {
store.decRef();
Expand Down Expand Up @@ -710,6 +774,7 @@ public void restore() throws IOException {
try {
logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId);
BlobStoreIndexShardSnapshot snapshot = loadSnapshot();
SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());

recoveryState.setStage(RecoveryState.Stage.INDEX);
int numberOfFiles = 0;
Expand Down Expand Up @@ -803,7 +868,7 @@ public void restore() throws IOException {
/// now, go over and clean files that are in the store, but were not in the snapshot
try {
for (String storeFile : store.directory().listAll()) {
if (!Store.isChecksum(storeFile) && !snapshot.containPhysicalIndexFile(storeFile)) {
if (!Store.isChecksum(storeFile) && !snapshotFiles.containPhysicalIndexFile(storeFile)) {
try {
store.deleteFile("restore", storeFile);
store.directory().deleteFile(storeFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,20 @@ public boolean isSame(StoreFileMetaData md) {
return metadata.isSame(md);
}

/**
* Checks if a file in a store is the same file
*
* @param fileInfo file in a store
* @return true if file in a store this this file have the same checksum and length
*/
public boolean isSame(FileInfo fileInfo) {
if (numberOfParts != fileInfo.numberOfParts) return false;
if (partBytes != fileInfo.partBytes) return false;
if (!name.equals(fileInfo.name)) return false;
if (!partSize.equals(fileInfo.partSize)) return false;
return metadata.isSame(fileInfo.metadata);
}

static final class Fields {
static final XContentBuilderString NAME = new XContentBuilderString("name");
static final XContentBuilderString PHYSICAL_NAME = new XContentBuilderString("physical_name");
Expand Down Expand Up @@ -484,38 +498,4 @@ public static BlobStoreIndexShardSnapshot fromXContent(XContentParser parser) th
startTime, time, numberOfFiles, totalSize);
}

/**
* Returns true if this snapshot contains a file with a given original name
*
* @param physicalName original file name
* @return true if the file was found, false otherwise
*/
public boolean containPhysicalIndexFile(String physicalName) {
return findPhysicalIndexFile(physicalName) != null;
}

public FileInfo findPhysicalIndexFile(String physicalName) {
for (FileInfo file : indexFiles) {
if (file.physicalName().equals(physicalName)) {
return file;
}
}
return null;
}

/**
* Returns true if this snapshot contains a file with a given name
*
* @param name file name
* @return true if file was found, false otherwise
*/
public FileInfo findNameFile(String name) {
for (FileInfo file : indexFiles) {
if (file.name().equals(name)) {
return file;
}
}
return null;
}

}
Loading

0 comments on commit ee3c3d1

Please sign in to comment.