Skip to content

Commit

Permalink
Reject multiple data paths on frozen capable nodes (#71896)
Browse files Browse the repository at this point in the history
Nodes with a frozen cache no longer supports multiple data paths. This
simplifies cache sizing and avoids the need to support multiple cache
files.

Relates #71844
  • Loading branch information
henningandersen authored Apr 20, 2021
1 parent 59c364c commit 67c748e
Show file tree
Hide file tree
Showing 17 changed files with 672 additions and 530 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public abstract class AbstractFrozenAutoscalingIntegTestCase extends AbstractSna
protected final String snapshotName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
protected final String policyName = "frozen";

@Override
protected boolean forceSingleDataPath() {
return true;
}

@Override
protected boolean addMockInternalEngine() {
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.searchablesnapshots;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.xpack.searchablesnapshots.cache.shared.FrozenCacheService;

public class BaseFrozenSearchableSnapshotsIntegTestCase extends BaseSearchableSnapshotsIntegTestCase {
@Override
protected boolean forceSingleDataPath() {
return true;
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
if (DiscoveryNode.canContainData(otherSettings)) {
builder.put(
FrozenCacheService.SNAPSHOT_CACHE_SIZE_SETTING.getKey(),
rarely()
? randomBoolean()
? new ByteSizeValue(randomIntBetween(1, 10), ByteSizeUnit.KB)
: new ByteSizeValue(randomIntBetween(1, 1000), ByteSizeUnit.BYTES)
: new ByteSizeValue(randomIntBetween(1, 10), ByteSizeUnit.MB)
);
}

return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,31 @@
*/
package org.elasticsearch.xpack.searchablesnapshots;

import org.apache.lucene.search.TotalHits;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.ReadOnlyEngine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase;
Expand All @@ -34,18 +50,26 @@
import org.elasticsearch.xpack.searchablesnapshots.cache.shared.FrozenCacheService;
import org.junit.After;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.license.LicenseService.SELF_GENERATED_LICENSE_TYPE;
import static org.elasticsearch.test.NodeRoles.addRoles;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.searchablesnapshots.cache.shared.SharedBytes.pageAligned;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;

@ESIntegTestCase.ClusterScope(supportsDedicatedMasters = false, numClientNodes = 0)
public abstract class BaseSearchableSnapshotsIntegTestCase extends AbstractSnapshotIntegTestCase {
Expand Down Expand Up @@ -79,15 +103,8 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
: new ByteSizeValue(randomIntBetween(1, 10), ByteSizeUnit.MB)
);
}
if (DiscoveryNode.canContainData(otherSettings)) {
builder.put(
FrozenCacheService.SNAPSHOT_CACHE_SIZE_SETTING.getKey(),
rarely()
? randomBoolean()
? new ByteSizeValue(randomIntBetween(1, 10), ByteSizeUnit.KB)
: new ByteSizeValue(randomIntBetween(1, 1000), ByteSizeUnit.BYTES)
: new ByteSizeValue(randomIntBetween(1, 10), ByteSizeUnit.MB)
);
if (DiscoveryNode.canContainData(otherSettings) && randomBoolean()) {
builder.put(FrozenCacheService.SNAPSHOT_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ZERO.getStringRep());
}
builder.put(
FrozenCacheService.SNAPSHOT_CACHE_REGION_SIZE_SETTING.getKey(),
Expand Down Expand Up @@ -185,4 +202,111 @@ protected void populateIndex(String indexName, int maxIndexRequests) throws Inte
);
}
}

protected void checkSoftDeletesNotEagerlyLoaded(String restoredIndexName) {
for (IndicesService indicesService : internalCluster().getDataNodeInstances(IndicesService.class)) {
for (IndexService indexService : indicesService) {
if (indexService.index().getName().equals(restoredIndexName)) {
for (IndexShard indexShard : indexService) {
try {
Engine engine = IndexShardTestCase.getEngine(indexShard);
assertThat(engine, instanceOf(ReadOnlyEngine.class));
EngineTestCase.checkNoSoftDeletesLoaded((ReadOnlyEngine) engine);
} catch (AlreadyClosedException ace) {
// ok to ignore these
}
}
}
}
}
}

protected void assertShardFolders(String indexName, boolean snapshotDirectory) throws IOException {
final Index restoredIndex = resolveIndex(indexName);
final String customDataPath = resolveCustomDataPath(indexName);
final ShardId shardId = new ShardId(restoredIndex, 0);
boolean shardFolderFound = false;
for (String node : internalCluster().getNodeNames()) {
final NodeEnvironment service = internalCluster().getInstance(NodeEnvironment.class, node);
final ShardPath shardPath = ShardPath.loadShardPath(logger, service, shardId, customDataPath);
if (shardPath != null && Files.exists(shardPath.getDataPath())) {
shardFolderFound = true;
assertEquals(snapshotDirectory, Files.notExists(shardPath.resolveIndex()));

assertTrue(Files.exists(shardPath.resolveTranslog()));
try (Stream<Path> dir = Files.list(shardPath.resolveTranslog())) {
final long translogFiles = dir.filter(path -> path.getFileName().toString().contains("translog")).count();
if (snapshotDirectory) {
assertEquals(2L, translogFiles);
} else {
assertThat(translogFiles, greaterThanOrEqualTo(2L));
}
}
}
}
assertTrue("no shard folder found for index " + indexName, shardFolderFound);
}

protected void assertTotalHits(String indexName, TotalHits originalAllHits, TotalHits originalBarHits) throws Exception {
final Thread[] threads = new Thread[between(1, 5)];
final AtomicArray<TotalHits> allHits = new AtomicArray<>(threads.length);
final AtomicArray<TotalHits> barHits = new AtomicArray<>(threads.length);

final CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < threads.length; i++) {
int t = i;
threads[i] = new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
allHits.set(t, client().prepareSearch(indexName).setTrackTotalHits(true).get().getHits().getTotalHits());
barHits.set(
t,
client().prepareSearch(indexName)
.setTrackTotalHits(true)
.setQuery(matchQuery("foo", "bar"))
.get()
.getHits()
.getTotalHits()
);
});
threads[i].start();
}

ensureGreen(indexName);
latch.countDown();

for (int i = 0; i < threads.length; i++) {
threads[i].join();

final TotalHits allTotalHits = allHits.get(i);
final TotalHits barTotalHits = barHits.get(i);

logger.info("--> thread #{} has [{}] hits in total, of which [{}] match the query", i, allTotalHits, barTotalHits);
assertThat(allTotalHits, equalTo(originalAllHits));
assertThat(barTotalHits, equalTo(originalBarHits));
}
}

protected void assertRecoveryStats(String indexName, boolean preWarmEnabled) throws Exception {
int shardCount = getNumShards(indexName).totalNumShards;
assertBusy(() -> {
final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(indexName).get();
assertThat(recoveryResponse.toString(), recoveryResponse.shardRecoveryStates().get(indexName).size(), equalTo(shardCount));

for (List<RecoveryState> recoveryStates : recoveryResponse.shardRecoveryStates().values()) {
for (RecoveryState recoveryState : recoveryStates) {
RecoveryState.Index index = recoveryState.getIndex();
assertThat(
Strings.toString(recoveryState, true, true),
index.recoveredFileCount(),
preWarmEnabled ? equalTo(index.totalRecoverFiles()) : greaterThanOrEqualTo(0)
);
assertThat(recoveryState.getStage(), equalTo(RecoveryState.Stage.DONE));
}
}
}, 30L, TimeUnit.SECONDS);
}
}
Loading

0 comments on commit 67c748e

Please sign in to comment.