Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replica recovery could go into an endless flushing loop #28350

Merged
merged 13 commits into from
Jan 25, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,12 @@ public final boolean refreshNeeded() {
// NOTE: do NOT rename this to something containing flush or refresh!
public abstract void writeIndexingBuffer() throws EngineException;

/**
* Checks if this engine should be flushed periodically.
* This check is mainly based on the uncommitted translog size and the translog flush threshold setting.
*/
public abstract boolean shouldPeriodicallyFlush();

/**
* Flushes the state of the engine including the transaction log, clearing memory.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1462,6 +1462,31 @@ final boolean tryRenewSyncCommit() {
return renewed;
}

@Override
public boolean shouldPeriodicallyFlush() {
ensureOpen();
final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes();
final long uncommittedSizeOfCurrentCommit = translog.uncommittedSizeInBytes();
if (uncommittedSizeOfCurrentCommit < flushThreshold) {
return false;
}
/*
* We should only flush ony if the shouldFlush condition can become false after flushing.
* This condition will change if the `uncommittedSize` of the new commit is smaller than
* the `uncommittedSize` of the current commit. This method is to maintain translog only,
* thus the IndexWriter#hasUncommittedChanges condition is not considered.
*/
final long uncommittedSizeOfNewCommit = translog.sizeOfGensAboveSeqNoInBytes(localCheckpointTracker.getCheckpoint() + 1);
/*
* If flushThreshold is too small, we may repeatedly flush even there is no uncommitted operation
* as #sizeOfGensAboveSeqNoInByte and #uncommittedSizeInBytes can return different values.
* An empty translog file has non-zero `uncommittedSize` (the translog header), and method #sizeOfGensAboveSeqNoInBytes can
* return 0 now(no translog gen contains ops above local checkpoint) but method #uncommittedSizeInBytes will return an actual
* non-zero value after rolling a new translog generation. This can be avoided by checking the actual uncommitted operations.
*/
return uncommittedSizeOfNewCommit < uncommittedSizeOfCurrentCommit && translog.uncommittedOperations() > 0;
}

@Override
public CommitId flush() throws EngineException {
return flush(false, false);
Expand Down Expand Up @@ -1492,7 +1517,9 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
logger.trace("acquired flush lock immediately");
}
try {
if (indexWriter.hasUncommittedChanges() || force) {
// Only flush if (1) Lucene has uncommitted docs, or (2) forced by caller, or (3) the
// newly created commit points to a different translog generation (can free translog)
if (indexWriter.hasUncommittedChanges() || force || shouldPeriodicallyFlush()) {
ensureCanFlush();
try {
translog.rollGeneration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1600,17 +1600,16 @@ public boolean restoreFromRepository(Repository repository) {
}

/**
* Tests whether or not the translog should be flushed. This test is based on the current size of the translog comparted to the
* configured flush threshold size.
* Tests whether or not the engine should be flushed periodically.
* This test is based on the current size of the translog compared to the configured flush threshold size.
*
* @return {@code true} if the translog should be flushed
* @return {@code true} if the engine should be flushed
*/
boolean shouldFlush() {
boolean shouldPeriodicallyFlush() {
final Engine engine = getEngineOrNull();
if (engine != null) {
try {
final Translog translog = engine.getTranslog();
return translog.shouldFlush();
return engine.shouldPeriodicallyFlush();
} catch (final AlreadyClosedException e) {
// we are already closed, no need to flush or roll
}
Expand Down Expand Up @@ -2364,7 +2363,7 @@ public Translog.Durability getTranslogDurability() {
* executed asynchronously on the flush thread pool.
*/
public void afterWriteOperation() {
if (shouldFlush() || shouldRollTranslogGeneration()) {
if (shouldPeriodicallyFlush() || shouldRollTranslogGeneration()) {
if (flushOrRollRunning.compareAndSet(false, true)) {
/*
* We have to check again since otherwise there is a race when a thread passes the first check next to another thread which
Expand All @@ -2374,7 +2373,7 @@ public void afterWriteOperation() {
* Additionally, a flush implicitly executes a translog generation roll so if we execute a flush then we do not need to
* check if we should roll the translog generation.
*/
if (shouldFlush()) {
if (shouldPeriodicallyFlush()) {
logger.debug("submitting async flush request");
final AbstractRunnable flush = new AbstractRunnable() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ private long sizeInBytesByMinGen(long minGeneration) {
/**
* Returns the size in bytes of the translog files with ops above the given seqNo
*/
private long sizeOfGensAboveSeqNoInBytes(long minSeqNo) {
public long sizeOfGensAboveSeqNoInBytes(long minSeqNo) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
return readersAboveMinSeqNo(minSeqNo).mapToLong(BaseTranslogReader::sizeInBytes).sum();
Expand Down Expand Up @@ -523,17 +523,6 @@ public Location add(final Operation operation) throws IOException {
}
}

/**
* Tests whether or not the translog should be flushed. This test is based on the current size
* of the translog comparted to the configured flush threshold size.
*
* @return {@code true} if the translog should be flushed
*/
public boolean shouldFlush() {
final long size = this.uncommittedSizeInBytes();
return size > this.indexSettings.getFlushThresholdSize().getBytes();
}

/**
* Tests whether or not the translog generation should be rolled to a new generation. This test
* is based on the size of the current generation compared to the configured generation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.index.engine;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -46,6 +47,7 @@
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.IndexSearcher;
Expand Down Expand Up @@ -163,6 +165,7 @@
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
Expand Down Expand Up @@ -4439,4 +4442,37 @@ public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception {
assertThat(DirectoryReader.listCommits(store.directory()), contains(commits.get(commits.size() - 1)));
}
}

public void testShouldPeriodicallyFlush() throws Exception {
assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false));
int numDocs = between(10, 100);
for (int id = 0; id < numDocs; id++) {
final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc));
}
assertThat("Not exceeded translog flush threshold yet", engine.shouldPeriodicallyFlush(), equalTo(false));
long flushThreshold = RandomNumbers.randomLongBetween(random(), 100, engine.getTranslog().uncommittedSizeInBytes());
final IndexSettings indexSettings = engine.config().getIndexSettings();
final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData())
.settings(Settings.builder().put(indexSettings.getSettings())
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build();
indexSettings.updateIndexMetaData(indexMetaData);
engine.onSettingsChanged();
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs));
assertThat(engine.shouldPeriodicallyFlush(), equalTo(true));
engine.flush();
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0));
// Stale operations skipped by Lucene but added to translog - still able to flush
for (int id = 0; id < numDocs; id++) {
final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null);
final Engine.IndexResult result = engine.index(replicaIndexForDoc(doc, 1L, id, false));
assertThat(result.isCreated(), equalTo(false));
}
SegmentInfos lastCommitInfo = engine.getLastCommittedSegmentInfos();
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs));
assertThat(engine.shouldPeriodicallyFlush(), equalTo(true));
engine.flush(false, false);
assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo)));
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
Expand Down Expand Up @@ -73,7 +72,6 @@
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.junit.annotations.TestLogging;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -332,23 +330,23 @@ public void testMaybeFlush() throws Exception {
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService(resolveIndex("test"));
IndexShard shard = test.getShardOrNull(0);
assertFalse(shard.shouldFlush());
assertFalse(shard.shouldPeriodicallyFlush());
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(),
new ByteSizeValue(117 /* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get();
client().prepareIndex("test", "test", "0")
.setSource("{}", XContentType.JSON).setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
assertFalse(shard.shouldFlush());
assertFalse(shard.shouldPeriodicallyFlush());
shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
SourceToParse.source("test", "test", "1", new BytesArray("{}"), XContentType.JSON),
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, update -> {});
assertTrue(shard.shouldFlush());
assertTrue(shard.shouldPeriodicallyFlush());
final Translog translog = shard.getEngine().getTranslog();
assertEquals(2, translog.uncommittedOperations());
client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON)
.setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
assertBusy(() -> { // this is async
assertFalse(shard.shouldFlush());
assertFalse(shard.shouldPeriodicallyFlush());
});
assertEquals(0, translog.uncommittedOperations());
translog.sync();
Expand All @@ -364,7 +362,7 @@ public void testMaybeFlush() throws Exception {
assertBusy(() -> { // this is async
logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(),
translog.uncommittedOperations(), translog.getGeneration());
assertFalse(shard.shouldFlush());
assertFalse(shard.shouldPeriodicallyFlush());
});
assertEquals(0, translog.uncommittedOperations());
}
Expand Down Expand Up @@ -408,7 +406,7 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception {
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService(resolveIndex("test"));
final IndexShard shard = test.getShardOrNull(0);
assertFalse(shard.shouldFlush());
assertFalse(shard.shouldPeriodicallyFlush());
final String key;
final boolean flush = randomBoolean();
if (flush) {
Expand All @@ -423,7 +421,7 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception {
.setSource("{}", XContentType.JSON)
.setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE)
.get();
assertFalse(shard.shouldFlush());
assertFalse(shard.shouldPeriodicallyFlush());
final AtomicBoolean running = new AtomicBoolean(true);
final int numThreads = randomIntBetween(2, 4);
final Thread[] threads = new Thread[numThreads];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.indices.recovery;

import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
Expand Down Expand Up @@ -306,4 +307,30 @@ public void testSequenceBasedRecoveryKeepsTranslog() throws Exception {
}
}

/**
* This test makes sure that there is no infinite loop of flushing (the condition `shouldPeriodicallyFlush` eventually is false)
* in peer-recovery if a primary sends a fully-baked index commit.
*/
public void testShouldFlushAfterPeerRecovery() throws Exception {
try (ReplicationGroup shards = createGroup(0)) {
shards.startAll();
int numDocs = shards.indexDocs(between(10, 100));
final long translogSizeOnPrimary = shards.getPrimary().getTranslog().uncommittedSizeInBytes();
shards.flush();

final IndexShard replica = shards.addReplica();
IndexMetaData.Builder builder = IndexMetaData.builder(replica.indexSettings().getIndexMetaData());
long flushThreshold = RandomNumbers.randomLongBetween(random(), 100, translogSizeOnPrimary);
builder.settings(Settings.builder().put(replica.indexSettings().getSettings())
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")
);
replica.indexSettings().updateIndexMetaData(builder.build());
replica.onSettingsChanged();
shards.recoverReplica(replica);
// Make sure the flushing will eventually be completed (eg. `shouldPeriodicallyFlush` is false)
assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false)));
assertThat(replica.getTranslog().totalOperations(), equalTo(numDocs));
shards.assertAllEqual(numDocs);
}
}
}