Skip to content

Commit

Permalink
Limit number of retaining translog files for peer recovery (#47414)
Browse files Browse the repository at this point in the history
Today we control the extra translog (when soft-deletes is disabled) for
peer recoveries by size and age. If users manually (force) flush many
times within a short period, we can keep many small (or empty) translog
files as neither the size or age condition is reached. We can protect
the cluster from running out of the file descriptors in such a situation
by limiting the number of retaining translog files.
  • Loading branch information
dnhatn committed Oct 4, 2019
1 parent 08915b6 commit c5b39fd
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 25 deletions.
16 changes: 16 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,14 @@ public final class IndexSettings {
Setting.byteSizeSetting("index.translog.retention.size", settings -> INDEX_SOFT_DELETES_SETTING.get(settings) ? "-1" : "512MB",
Property.Dynamic, Property.IndexScope);

/**
* Controls the number of translog files that are no longer needed for persistence reasons will be kept around before being deleted.
* This is a safeguard making sure that the translog deletion policy won't keep too many translog files especially when they're small.
* This setting is intentionally not registered, it is only used in tests
**/
public static final Setting<Integer> INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING =
Setting.intSetting("index.translog.retention.total_files", 100, 0, Setting.Property.IndexScope);

/**
* Controls the maximum length of time since a retention lease is created or renewed before it is considered expired.
*/
Expand Down Expand Up @@ -781,6 +789,14 @@ public TimeValue getTranslogRetentionAge() {
return translogRetentionAge;
}

/**
* Returns the maximum number of translog files that that no longer required for persistence should be kept for peer recovery
* when soft-deletes is disabled.
*/
public int getTranslogRetentionTotalFiles() {
return INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING.get(getSettings());
}

/**
* Returns the generation threshold size. As sequence numbers can cause multiple generations to
* be preserved for rollback purposes, we want to keep the size of individual generations from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ public InternalEngine(EngineConfig engineConfig) {
}
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
);
store.incRef();
IndexWriter writer = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void trimUnreferencedTranslogFiles() {

if (minTranslogGeneration < lastCommitGeneration) {
// a translog deletion policy that retains nothing but the last translog generation from safe commit
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1);
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1, 0);
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastCommitGeneration);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastCommitGeneration);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
final TranslogConfig translogConfig = config.getTranslogConfig();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
config.getIndexSettings().getTranslogRetentionSize().getBytes(),
config.getIndexSettings().getTranslogRetentionAge().getMillis()
config.getIndexSettings().getTranslogRetentionAge().getMillis(),
config.getIndexSettings().getTranslogRetentionTotalFiles()
);
translogDeletionPolicy.setTranslogGenerationOfLastCommit(translogGenOfLastCommit);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ public void assertNoOpenTranslogRefs() {

private long retentionAgeInMillis;

public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis) {
private int retentionTotalFiles;

public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles) {
this.retentionSizeInBytes = retentionSizeInBytes;
this.retentionAgeInMillis = retentionAgeInMillis;
this.retentionTotalFiles = retentionTotalFiles;
if (Assertions.ENABLED) {
openTranslogRef = new ConcurrentHashMap<>();
} else {
Expand Down Expand Up @@ -100,6 +103,10 @@ public synchronized void setRetentionAgeInMillis(long ageInMillis) {
retentionAgeInMillis = ageInMillis;
}

synchronized void setRetentionTotalFiles(int retentionTotalFiles) {
this.retentionTotalFiles = retentionTotalFiles;
}

/**
* acquires the basis generation for a new snapshot. Any translog generation above, and including, the returned generation
* will not be deleted until the returned {@link Releasable} is closed.
Expand Down Expand Up @@ -164,7 +171,8 @@ synchronized long minTranslogGenRequired(List<TranslogReader> readers, TranslogW
} else {
minByAgeAndSize = Math.max(minByAge, minBySize);
}
return Math.min(minByAgeAndSize, Math.min(minByLocks, minTranslogGenerationForRecovery));
long minByNumFiles = getMinTranslogGenByTotalFiles(readers, writer, retentionTotalFiles);
return Math.min(Math.max(minByAgeAndSize, minByNumFiles), Math.min(minByLocks, minTranslogGenerationForRecovery));
}

static long getMinTranslogGenBySize(List<TranslogReader> readers, TranslogWriter writer, long retentionSizeInBytes) {
Expand Down Expand Up @@ -196,6 +204,16 @@ static long getMinTranslogGenByAge(List<TranslogReader> readers, TranslogWriter
}
}

static long getMinTranslogGenByTotalFiles(List<TranslogReader> readers, TranslogWriter writer, final int maxTotalFiles) {
long minGen = writer.generation;
int totalFiles = 1; // for the current writer
for (int i = readers.size() - 1; i >= 0 && totalFiles < maxTotalFiles; i--) {
totalFiles++;
minGen = readers.get(i).generation;
}
return minGen;
}

protected long currentTime() {
return System.currentTimeMillis();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ private boolean isTranslogClean(ShardPath shardPath, String translogUUID) throws
indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
long primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardPath.getShardId().id());
// We open translog to check for corruption, do not clean anything.
final TranslogDeletionPolicy retainAllTranslogPolicy = new TranslogDeletionPolicy(Long.MAX_VALUE, Long.MAX_VALUE) {
final TranslogDeletionPolicy retainAllTranslogPolicy = new TranslogDeletionPolicy(
Long.MAX_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE) {
@Override
long minTranslogGenRequired(List<TranslogReader> readers, TranslogWriter writer) {
long minGen = writer.generation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.BrokenBarrierException;
Expand Down Expand Up @@ -136,6 +137,7 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;

public class IndexShardIT extends ESSingleNodeTestCase {
Expand Down Expand Up @@ -991,6 +993,43 @@ public void testNoOpEngineFactoryTakesPrecedence() {
}
}

public void testLimitNumberOfRetainingTranslogFiles() throws Exception {
String indexName = "test";
int translogRetentionTotalFiles = randomIntBetween(0, 50);
Settings.Builder settings = Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false)
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING.getKey(), translogRetentionTotalFiles);
if (randomBoolean()) {
settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), new ByteSizeValue(between(1, 1024 * 1024)));
}
if (randomBoolean()) {
settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), TimeValue.timeValueMillis(between(1, 10_000)));
}
IndexService indexService = createIndex(indexName, settings.build());
IndexShard shard = indexService.getShard(0);
shard.rollTranslogGeneration();
CheckedRunnable<IOException> checkTranslog = () -> {
try (Stream<Path> files = Files.list(getTranslog(shard).location()).sorted(Comparator.reverseOrder())) {
long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count();
assertThat(totalFiles, either(lessThanOrEqualTo((long) translogRetentionTotalFiles)).or(equalTo(1L)));
}
};
for (int i = 0; i < 100; i++) {
client().prepareIndex(indexName, "_doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get();
if (randomInt(100) < 10) {
client().admin().indices().prepareFlush(indexName).setWaitIfOngoing(true).get();
checkTranslog.run();
}
if (randomInt(100) < 10) {
shard.rollTranslogGeneration();
}
}
client().admin().indices().prepareFlush(indexName).get();
checkTranslog.run();
}

/**
* Asserts that there are no files in the specified path
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void testNoRetention() throws IOException {
List<BaseTranslogReader> allGens = new ArrayList<>(readersAndWriter.v1());
allGens.add(readersAndWriter.v2());
try {
TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, 0, 0);
TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, 0, 0, 0);
assertMinGenRequired(deletionPolicy, readersAndWriter, 1L);
final int committedReader = randomIntBetween(0, allGens.size() - 1);
final long committedGen = allGens.get(committedReader).generation;
Expand Down Expand Up @@ -98,6 +98,25 @@ public void testAgeRetention() throws IOException {
}
}

public void testTotalFilesRetention() throws Exception {
Tuple<List<TranslogReader>, TranslogWriter> readersAndWriter = createReadersAndWriter(System.currentTimeMillis());
List<BaseTranslogReader> allGens = new ArrayList<>(readersAndWriter.v1());
allGens.add(readersAndWriter.v2());
try {
assertThat(TranslogDeletionPolicy.getMinTranslogGenByTotalFiles(readersAndWriter.v1(), readersAndWriter.v2(),
randomIntBetween(Integer.MIN_VALUE, 1)), equalTo(readersAndWriter.v2().generation));
assertThat(TranslogDeletionPolicy.getMinTranslogGenByTotalFiles(readersAndWriter.v1(), readersAndWriter.v2(),
randomIntBetween(allGens.size(), Integer.MAX_VALUE)), equalTo(allGens.get(0).generation));
int numFiles = randomIntBetween(1, allGens.size());
long selectedGeneration = allGens.get(allGens.size() - numFiles).generation;
assertThat(TranslogDeletionPolicy.getMinTranslogGenByTotalFiles(readersAndWriter.v1(), readersAndWriter.v2(), numFiles),
equalTo(selectedGeneration));
} finally {
IOUtils.close(readersAndWriter.v1());
IOUtils.close(readersAndWriter.v2());
}
}

/**
* Tests that age trumps size but recovery trumps both.
*/
Expand All @@ -107,7 +126,7 @@ public void testRetentionHierarchy() throws IOException {
List<BaseTranslogReader> allGens = new ArrayList<>(readersAndWriter.v1());
allGens.add(readersAndWriter.v2());
try {
TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE);
TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE);
deletionPolicy.setTranslogGenerationOfLastCommit(Long.MAX_VALUE);
deletionPolicy.setMinTranslogGenerationForRecovery(Long.MAX_VALUE);
int selectedReader = randomIntBetween(0, allGens.size() - 1);
Expand All @@ -116,33 +135,41 @@ public void testRetentionHierarchy() throws IOException {
selectedReader = randomIntBetween(0, allGens.size() - 1);
final long selectedGenerationBySize = allGens.get(selectedReader).generation;
long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get();
selectedReader = randomIntBetween(0, allGens.size() - 1);
final long selectedGenerationByTotalFiles = allGens.get(selectedReader).generation;
deletionPolicy.setRetentionAgeInMillis(maxAge);
deletionPolicy.setRetentionSizeInBytes(size);
assertMinGenRequired(deletionPolicy, readersAndWriter, Math.max(selectedGenerationByAge, selectedGenerationBySize));
final int totalFiles = allGens.size() - selectedReader;
deletionPolicy.setRetentionTotalFiles(totalFiles);
assertMinGenRequired(deletionPolicy, readersAndWriter,
max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles));
// make a new policy as committed gen can't go backwards (for now)
deletionPolicy = new MockDeletionPolicy(now, size, maxAge);
deletionPolicy = new MockDeletionPolicy(now, size, maxAge, totalFiles);
long committedGen = randomFrom(allGens).generation;
deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(committedGen, Long.MAX_VALUE));
deletionPolicy.setMinTranslogGenerationForRecovery(committedGen);
assertMinGenRequired(deletionPolicy, readersAndWriter,
Math.min(committedGen, Math.max(selectedGenerationByAge, selectedGenerationBySize)));
assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen,
max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles)));
long viewGen = randomFrom(allGens).generation;
try (Releasable ignored = deletionPolicy.acquireTranslogGen(viewGen)) {
assertMinGenRequired(deletionPolicy, readersAndWriter,
Math.min(
Math.min(committedGen, viewGen),
Math.max(selectedGenerationByAge, selectedGenerationBySize)));
min3(committedGen, viewGen, max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles)));
// disable age
deletionPolicy.setRetentionAgeInMillis(-1);
assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(Math.min(committedGen, viewGen), selectedGenerationBySize));
assertMinGenRequired(deletionPolicy, readersAndWriter,
min3(committedGen, viewGen, Math.max(selectedGenerationBySize, selectedGenerationByTotalFiles)));
// disable size
deletionPolicy.setRetentionAgeInMillis(maxAge);
deletionPolicy.setRetentionSizeInBytes(-1);
assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(Math.min(committedGen, viewGen), selectedGenerationByAge));
// disable both
assertMinGenRequired(deletionPolicy, readersAndWriter,
min3(committedGen, viewGen, Math.max(selectedGenerationByAge, selectedGenerationByTotalFiles)));
// disable age and zie
deletionPolicy.setRetentionAgeInMillis(-1);
deletionPolicy.setRetentionSizeInBytes(-1);
assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, viewGen));
// disable total files
deletionPolicy.setRetentionTotalFiles(0);
assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, viewGen));
}
} finally {
IOUtils.close(readersAndWriter.v1());
Expand Down Expand Up @@ -191,8 +218,8 @@ private static class MockDeletionPolicy extends TranslogDeletionPolicy {

long now;

MockDeletionPolicy(long now, long retentionSizeInBytes, long maxRetentionAgeInMillis) {
super(retentionSizeInBytes, maxRetentionAgeInMillis);
MockDeletionPolicy(long now, long retentionSizeInBytes, long maxRetentionAgeInMillis, int maxRetentionTotalFiles) {
super(retentionSizeInBytes, maxRetentionAgeInMillis, maxRetentionTotalFiles);
this.now = now;
}

Expand All @@ -201,4 +228,12 @@ protected long currentTime() {
return now;
}
}

private static long max3(long x1, long x2, long x3) {
return Math.max(Math.max(x1, x2), x3);
}

private static long min3(long x1, long x2, long x3) {
return Math.min(Math.min(x1, x2), x3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2265,7 +2265,7 @@ public void testRecoveryFromAFutureGenerationCleansUp() throws IOException {
// engine blows up, after committing the above generation
translog.close();
TranslogConfig config = translog.getConfig();
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1);
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1, 0);
deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE));
deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration);
translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy,
Expand Down Expand Up @@ -2324,7 +2324,7 @@ public void testRecoveryFromFailureOnTrimming() throws IOException {
// expected...
}
}
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1);
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1, 0);
deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE));
deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration);
try (Translog translog = new Translog(config, translogUUID, deletionPolicy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ public class TranslogDeletionPolicies {
public static TranslogDeletionPolicy createTranslogDeletionPolicy() {
return new TranslogDeletionPolicy(
IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(),
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getDefault(Settings.EMPTY).getMillis()
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getDefault(Settings.EMPTY).getMillis(),
IndexSettings.INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING.getDefault(Settings.EMPTY)
);
}

public static TranslogDeletionPolicy createTranslogDeletionPolicy(IndexSettings indexSettings) {
return new TranslogDeletionPolicy(indexSettings.getTranslogRetentionSize().getBytes(),
indexSettings.getTranslogRetentionAge().getMillis());
indexSettings.getTranslogRetentionAge().getMillis(), indexSettings.getTranslogRetentionTotalFiles());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public List<Setting<?>> getSettings() {
IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING,
IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING,
IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING,
IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING
IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING
);
}
}

0 comments on commit c5b39fd

Please sign in to comment.