Skip to content

Commit

Permalink
Always use CacheService for caching metadata blobs (elastic#70668) (e…
Browse files Browse the repository at this point in the history
…lastic#70795)

This PR unifies CachedBlobContainerIndexInput and FrozenIndexInput so that they share the same infrastructure for
caching metadata blobs as well as header and footer ranges for data blobs. The idea is to always use CacheService for
this, which does not evict the metadata, and which efficiently stores the information on disk (using sparse file support).

This also allows us to align writes in FrozenCacheService to 4KB block sizes in this PR, which addresses an issue when
reusing regions from the shared cache, as writes that are not aligned on page cache boundaries causes the existing data
(which we don't care about) to be loaded from disk, which comes with a dramatic performance penalty.

Closes elastic#70728
Closes elastic#70763
  • Loading branch information
ywelsch committed Mar 26, 2021
1 parent 83f97a1 commit 822db8c
Show file tree
Hide file tree
Showing 12 changed files with 669 additions and 880 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,11 @@ public void testBlobStoreCache() throws Exception {
() -> systemClient().admin().indices().prepareGetIndex().addIndices(SNAPSHOT_BLOB_CACHE_INDEX).get()
);

final Storage storage = randomFrom(Storage.values());
final Storage storage1 = randomFrom(Storage.values());
logger.info(
"--> mount snapshot [{}] as an index for the first time [storage={}, max length={}]",
snapshot,
storage,
storage1,
blobCacheMaxLength.getStringRep()
);
final String restoredIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
Expand All @@ -175,7 +175,7 @@ public void testBlobStoreCache() throws Exception {
.put(SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), false)
.put(SearchableSnapshots.SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH, blobCacheMaxLength)
.build(),
storage
storage1
);
ensureGreen(restoredIndex);

Expand Down Expand Up @@ -244,7 +244,8 @@ public void testBlobStoreCache() throws Exception {
);
});

logger.info("--> mount snapshot [{}] as an index for the second time [storage={}]", snapshot, storage);
final Storage storage2 = randomFrom(Storage.values());
logger.info("--> mount snapshot [{}] as an index for the second time [storage={}]", snapshot, storage2);
final String restoredAgainIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
mountSnapshot(
repositoryName,
Expand All @@ -256,7 +257,7 @@ public void testBlobStoreCache() throws Exception {
.put(SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), false)
.put(SearchableSnapshots.SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH, blobCacheMaxLength)
.build(),
storage
storage2
);
ensureGreen(restoredAgainIndex);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;
import org.junit.After;

import java.io.FilterInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -180,14 +181,7 @@ public void testConcurrentPrewarming() throws Exception {
.setSettings(repositorySettings.build())
);

TrackingRepositoryPlugin tracker = null;
for (RepositoryPlugin plugin : getInstanceFromNode(PluginsService.class).filterPlugins(RepositoryPlugin.class)) {
if (plugin instanceof TrackingRepositoryPlugin) {
tracker = ((TrackingRepositoryPlugin) plugin);
}
}

assertThat(tracker, notNullValue());
TrackingRepositoryPlugin tracker = getTrackingRepositoryPlugin();
assertThat(tracker.totalFilesRead(), equalTo(0L));
assertThat(tracker.totalBytesRead(), equalTo(0L));

Expand Down Expand Up @@ -305,6 +299,20 @@ public void testConcurrentPrewarming() throws Exception {
}
}

@After
public void resetTracker() {
getTrackingRepositoryPlugin().clear();
}

private TrackingRepositoryPlugin getTrackingRepositoryPlugin() {
for (RepositoryPlugin plugin : getInstanceFromNode(PluginsService.class).filterPlugins(RepositoryPlugin.class)) {
if (plugin instanceof TrackingRepositoryPlugin) {
return ((TrackingRepositoryPlugin) plugin);
}
}
throw new IllegalStateException("tracking repository missing");
}

/**
* A plugin that allows to track the read operations on blobs
*/
Expand All @@ -313,6 +321,11 @@ public static class TrackingRepositoryPlugin extends Plugin implements Repositor
private final ConcurrentHashMap<String, Long> files = new ConcurrentHashMap<>();
private final AtomicBoolean enabled = new AtomicBoolean(true);

void clear() {
files.clear();
enabled.set(true);
}

long totalFilesRead() {
return files.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.SnapshotRestoreException;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;

Expand All @@ -33,6 +34,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
public class SearchableSnapshotsUuidValidationIntegTests extends BaseSearchableSnapshotsIntegTestCase {

public static class TestPlugin extends Plugin implements ActionPlugin {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private boolean maybeReadChecksumFromFileInfo(ByteBuffer b) throws IOException {
return success;
}

protected ByteRange maybeReadFromBlobCache(long position, int length) {
protected ByteRange rangeToReadFromBlobCache(long position, int length) {
final long end = position + length;
if (headerBlobCacheByteRange.contains(position, end)) {
return headerBlobCacheByteRange;
Expand Down
Loading

0 comments on commit 822db8c

Please sign in to comment.