Skip to content

Commit

Permalink
Some Cleanup in BlobStoreRepository (#43323)
Browse files Browse the repository at this point in the history
* Some Cleanup in BlobStoreRepository

* Extracted from #42833:
  * Dry up index and shard path handling
  * Shorten XContent handling
  • Loading branch information
original-brownbear authored Jun 18, 2019
1 parent 631c63c commit a3a4bd7
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,7 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser,
/**
* Writes the incompatible snapshot ids to x-content.
*/
public void incompatibleSnapshotsToXContent(final XContentBuilder builder) throws IOException {

public XContentBuilder incompatibleSnapshotsToXContent(XContentBuilder builder) throws IOException {
builder.startObject();
// write the incompatible snapshots list
builder.startArray(INCOMPATIBLE_SNAPSHOTS);
Expand All @@ -452,6 +451,7 @@ public void incompatibleSnapshotsToXContent(final XContentBuilder builder) throw
}
builder.endArray();
builder.endObject();
return builder;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.NotXContentException;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.metrics.CounterMetric;
Expand All @@ -63,7 +61,6 @@
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
Expand Down Expand Up @@ -374,10 +371,7 @@ public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, Met

// write the index metadata for each index in the snapshot
for (IndexId index : indices) {
final IndexMetaData indexMetaData = clusterMetaData.index(index.getName());
final BlobPath indexPath = basePath().add("indices").add(index.getId());
final BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
indexMetaDataFormat.write(indexMetaData, indexMetaDataBlobContainer, snapshotId.getUUID());
indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID());
}
} catch (IOException ex) {
throw new SnapshotCreationException(metadata.name(), snapshotId, ex);
Expand Down Expand Up @@ -425,7 +419,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action
snapshotId,
ActionListener.map(listener, v -> {
try {
blobStore().blobContainer(basePath().add("indices")).deleteBlobsIgnoringIfNotExists(
blobStore().blobContainer(indicesPath()).deleteBlobsIgnoringIfNotExists(
unreferencedIndices.stream().map(IndexId::getId).collect(Collectors.toList()));
} catch (IOException e) {
logger.warn(() ->
Expand Down Expand Up @@ -477,9 +471,8 @@ protected void doRun() {
}

private void deleteIndexMetaDataBlobIgnoringErrors(SnapshotId snapshotId, IndexId indexId) {
BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(basePath().add("indices").add(indexId.getId()));
try {
indexMetaDataFormat.delete(indexMetaDataBlobContainer, snapshotId.getUUID());
indexMetaDataFormat.delete(indexContainer(indexId), snapshotId.getUUID());
} catch (IOException ex) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to delete metadata for index [{}]",
snapshotId, indexId.getName()), ex);
Expand Down Expand Up @@ -540,8 +533,19 @@ public MetaData getSnapshotGlobalMetaData(final SnapshotId snapshotId) {

@Override
public IndexMetaData getSnapshotIndexMetaData(final SnapshotId snapshotId, final IndexId index) throws IOException {
final BlobPath indexPath = basePath().add("indices").add(index.getId());
return indexMetaDataFormat.read(blobStore().blobContainer(indexPath), snapshotId.getUUID());
return indexMetaDataFormat.read(indexContainer(index), snapshotId.getUUID());
}

private BlobPath indicesPath() {
return basePath().add("indices");
}

private BlobContainer indexContainer(IndexId indexId) {
return blobStore().blobContainer(indicesPath().add(indexId.getId()));
}

private BlobContainer shardContainer(IndexId indexId, ShardId shardId) {
return blobStore().blobContainer(indicesPath().add(indexId.getId()).add(Integer.toString(shardId.getId())));
}

/**
Expand Down Expand Up @@ -589,10 +593,9 @@ public String startVerification() {
String seed = UUIDs.randomBase64UUID();
byte[] testBytes = Strings.toUTF8Bytes(seed);
BlobContainer testContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed)));
String blobName = "master.dat";
BytesArray bytes = new BytesArray(testBytes);
try (InputStream stream = bytes.streamInput()) {
testContainer.writeBlobAtomic(blobName, stream, bytes.length(), true);
testContainer.writeBlobAtomic("master.dat", stream, bytes.length(), true);
}
return seed;
}
Expand Down Expand Up @@ -665,7 +668,7 @@ public RepositoryData getRepositoryData() {
}
}

public static String testBlobPrefix(String seed) {
private static String testBlobPrefix(String seed) {
return TESTS_FILE + seed;
}

Expand All @@ -685,19 +688,10 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long rep
"] - possibly due to simultaneous snapshot deletion requests");
}
final long newGen = currentGen + 1;
final BytesReference snapshotsBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
try (StreamOutput stream = new OutputStreamStreamOutput(bStream)) {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream);
repositoryData.snapshotsToXContent(builder);
builder.close();
}
snapshotsBytes = bStream.bytes();
}
// write the index file
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
writeAtomic(indexBlob, snapshotsBytes, true);
writeAtomic(indexBlob, BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder())), true);
// write the current generation to the index-latest file
final BytesReference genBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
Expand All @@ -724,16 +718,9 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long rep
*/
void writeIncompatibleSnapshots(RepositoryData repositoryData) throws IOException {
assert isReadOnly() == false; // can not write to a read only repository
final BytesReference bytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
try (StreamOutput stream = new OutputStreamStreamOutput(bStream);
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream)) {
repositoryData.incompatibleSnapshotsToXContent(builder);
}
bytes = bStream.bytes();
}
// write the incompatible snapshots blob
writeAtomic(INCOMPATIBLE_SNAPSHOTS_BLOB, bytes, false);
writeAtomic(INCOMPATIBLE_SNAPSHOTS_BLOB,
BytesReference.bytes(repositoryData.incompatibleSnapshotsToXContent(XContentFactory.jsonBuilder())), false);
}

/**
Expand Down Expand Up @@ -826,9 +813,8 @@ public void restoreShard(Store store, SnapshotId snapshotId,
Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
ShardId shardId = store.shardId();
final Context context = new Context(snapshotId, indexId, shardId, snapshotShardId);
BlobPath path = basePath().add("indices").add(indexId.getId()).add(Integer.toString(snapshotShardId.getId()));
BlobContainer blobContainer = blobStore().blobContainer(path);
final RestoreContext snapshotContext = new RestoreContext(shardId, snapshotId, recoveryState, blobContainer);
final RestoreContext snapshotContext =
new RestoreContext(shardId, snapshotId, recoveryState, shardContainer(indexId, snapshotShardId));
try {
BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot();
SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
Expand Down Expand Up @@ -904,8 +890,7 @@ private class Context {
Context(SnapshotId snapshotId, IndexId indexId, ShardId shardId, ShardId snapshotShardId) {
this.snapshotId = snapshotId;
this.shardId = shardId;
blobContainer = blobStore().blobContainer(basePath().add("indices").add(indexId.getId())
.add(Integer.toString(snapshotShardId.getId())));
blobContainer = shardContainer(indexId, snapshotShardId);
}

/**
Expand Down

0 comments on commit a3a4bd7

Please sign in to comment.