Skip to content

Commit

Permalink
_flush should block by default (#20597)
Browse files Browse the repository at this point in the history
This commit changes the default behavior of `_flush` to block if other flushes are ongoing.
This also removes the use of `FlushNotAllowedException` and instead simply return immediately
by skipping the flush. Users should be aware if they set this option that the flush might or might
not flush everything to disk ie. no transactional behavior of some sort.

Closes #20569
  • Loading branch information
s1monw authored Sep 21, 2016
1 parent 6dc03ec commit 0151974
Show file tree
Hide file tree
Showing 24 changed files with 75 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -633,8 +633,7 @@ enum ElasticsearchExceptionHandle {
org.elasticsearch.repositories.RepositoryMissingException::new, 107),
DOCUMENT_SOURCE_MISSING_EXCEPTION(org.elasticsearch.index.engine.DocumentSourceMissingException.class,
org.elasticsearch.index.engine.DocumentSourceMissingException::new, 109),
FLUSH_NOT_ALLOWED_ENGINE_EXCEPTION(org.elasticsearch.index.engine.FlushNotAllowedEngineException.class,
org.elasticsearch.index.engine.FlushNotAllowedEngineException::new, 110),
// 110 used to be FlushNotAllowedEngineException
NO_CLASS_SETTINGS_EXCEPTION(org.elasticsearch.common.settings.NoClassSettingsException.class,
org.elasticsearch.common.settings.NoClassSettingsException::new, 111),
BIND_TRANSPORT_EXCEPTION(org.elasticsearch.transport.BindTransportException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
public class FlushRequest extends BroadcastRequest<FlushRequest> {

private boolean force = false;
private boolean waitIfOngoing = false;
private boolean waitIfOngoing = true;

/**
* Constructs a new flush request against one or more indices. If nothing is provided, all indices will
Expand All @@ -61,6 +61,7 @@ public boolean waitIfOngoing() {
/**
* if set to <tt>true</tt> the flush will block
* if a another flush operation is already running until the flush can be performed.
* The default is <code>true</code>
*/
public FlushRequest waitIfOngoing(boolean waitIfOngoing) {
this.waitIfOngoing = waitIfOngoing;
Expand Down
9 changes: 7 additions & 2 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1105,8 +1105,6 @@ public void flushAndClose() throws IOException {
logger.debug("flushing shard on close - this might take some time to sync files to disk");
try {
flush(); // TODO we might force a flush in the future since we have the write lock already even though recoveries are running.
} catch (FlushNotAllowedEngineException ex) {
logger.debug("flush not allowed during flushAndClose - skipping");
} catch (EngineClosedException ex) {
logger.debug("engine already closed - skipping flushAndClose");
}
Expand Down Expand Up @@ -1233,4 +1231,11 @@ public interface Warmer {
* This operation will close the engine if the recovery fails.
*/
public abstract Engine recoverFromTranslog() throws IOException;

/**
* Returns <code>true</code> iff this engine is currently recovering from translog.
*/
public boolean isRecovering() {
return false;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public class InternalEngine extends Engine {
// incoming indexing ops to a single thread:
private final AtomicInteger throttleRequestCount = new AtomicInteger();
private final EngineConfig.OpenMode openMode;
private final AtomicBoolean allowCommits = new AtomicBoolean(true);
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
Expand Down Expand Up @@ -163,8 +163,9 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
manager = createSearcherManager();
this.searcherManager = manager;
this.versionMap.setManager(searcherManager);
assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
// don't allow commits until we are done with recovering
allowCommits.compareAndSet(true, openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
pendingTranslogRecovery.set(openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
if (engineConfig.getRefreshListeners() != null) {
searcherManager.addListener(engineConfig.getRefreshListeners());
}
Expand All @@ -190,14 +191,14 @@ public InternalEngine recoverFromTranslog() throws IOException {
if (openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
throw new IllegalStateException("Can't recover from translog with open mode: " + openMode);
}
if (allowCommits.get()) {
if (pendingTranslogRecovery.get() == false) {
throw new IllegalStateException("Engine has already been recovered");
}
try {
recoverFromTranslog(engineConfig.getTranslogRecoveryPerformer());
} catch (Exception e) {
try {
allowCommits.set(false); // just play safe and never allow commits on this
pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
failEngine("failed to recover from translog", e);
} catch (Exception inner) {
e.addSuppressed(inner);
Expand All @@ -221,8 +222,8 @@ private void recoverFromTranslog(TranslogRecoveryPerformer handler) throws IOExc
}
// flush if we recovered something or if we have references to older translogs
// note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
assert allowCommits.get() == false : "commits are allowed but shouldn't";
allowCommits.set(true); // we are good - now we can commit
assert pendingTranslogRecovery.get(): "translogRecovery is not pending but should be";
pendingTranslogRecovery.set(false); // we are good - now we can commit
if (opsRecovered > 0) {
logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
opsRecovered, translogGeneration == null ? null : translogGeneration.translogFileGeneration, translog.currentFileGeneration());
Expand Down Expand Up @@ -765,7 +766,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
flushLock.lock();
logger.trace("acquired flush lock after blocking");
} else {
throw new FlushNotAllowedEngineException(shardId, "already flushing...");
return new CommitId(lastCommittedSegmentInfos.getId());
}
} else {
logger.trace("acquired flush lock immediately");
Expand Down Expand Up @@ -1287,8 +1288,8 @@ private void ensureCanFlush() {
// if we are in this stage we have to prevent flushes from this
// engine otherwise we might loose documents if the flush succeeds
// and the translog recover fails we we "commit" the translog on flush.
if (allowCommits.get() == false) {
throw new FlushNotAllowedEngineException(shardId, "flushes are disabled - pending translog recovery");
if (pendingTranslogRecovery.get()) {
throw new IllegalStateException(shardId.toString() + " flushes are disabled - pending translog recovery");
}
}

Expand Down Expand Up @@ -1349,4 +1350,9 @@ private boolean incrementIndexVersionLookup() {
boolean indexWriterHasDeletions() {
return indexWriter.hasDeletions();
}

@Override
public boolean isRecovering() {
return pendingTranslogRecovery.get();
}
}
24 changes: 19 additions & 5 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,12 @@ public CompletionStats completionStats(String... fields) {
public Engine.SyncedFlushResult syncFlush(String syncId, Engine.CommitId expectedCommitId) {
verifyStartedOrRecovering();
logger.trace("trying to sync flush. sync id [{}]. expected commit id [{}]]", syncId, expectedCommitId);
return getEngine().syncFlush(syncId, expectedCommitId);
Engine engine = getEngine();
if (engine.isRecovering()) {
throw new IllegalIndexShardStateException(shardId(), state, "syncFlush is only allowed if the engine is not recovery" +
" from translog");
}
return engine.syncFlush(syncId, expectedCommitId);
}

public Engine.CommitId flush(FlushRequest request) throws ElasticsearchException {
Expand All @@ -741,11 +746,16 @@ public Engine.CommitId flush(FlushRequest request) throws ElasticsearchException
}
// we allows flush while recovering, since we allow for operations to happen
// while recovering, and we want to keep the translog at bay (up to deletes, which
// we don't gc).
// we don't gc). Yet, we don't use flush internally to clear deletes and flush the indexwriter since
// we use #writeIndexingBuffer for this now.
verifyStartedOrRecovering();

Engine engine = getEngine();
if (engine.isRecovering()) {
throw new IllegalIndexShardStateException(shardId(), state, "flush is only allowed if the engine is not recovery" +
" from translog");
}
long time = System.nanoTime();
Engine.CommitId commitId = getEngine().flush(force, waitIfOngoing);
Engine.CommitId commitId = engine.flush(force, waitIfOngoing);
flushMetric.inc(System.nanoTime() - time);
return commitId;

Expand Down Expand Up @@ -1165,7 +1175,11 @@ public void checkIdle(long inactiveTimeNS) {
boolean wasActive = active.getAndSet(false);
if (wasActive) {
logger.debug("shard is now inactive");
indexEventListener.onShardInactive(this);
try {
indexEventListener.onShardInactive(this);
} catch (Exception e) {
logger.warn("failed to notify index event listener", e);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexingOperationListener;
Expand All @@ -52,7 +51,7 @@
public class IndexingMemoryController extends AbstractComponent implements IndexingOperationListener, Closeable {

/** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */
public static final Setting<ByteSizeValue> INDEX_BUFFER_SIZE_SETTING =
public static final Setting<ByteSizeValue> INDEX_BUFFER_SIZE_SETTING =
Setting.memorySizeSetting("indices.memory.index_buffer_size", "10%", Property.NodeScope);

/** Only applies when <code>indices.memory.index_buffer_size</code> is a %, to set a floor on the actual size in bytes (default: 48 MB). */
Expand Down Expand Up @@ -386,7 +385,7 @@ private void runUnlocked() {
protected void checkIdle(IndexShard shard, long inactiveTimeNS) {
try {
shard.checkIdle(inactiveTimeNS);
} catch (EngineClosedException | FlushNotAllowedEngineException e) {
} catch (EngineClosedException e) {
logger.trace("ignore exception while checking if shard {} is inactive", e, shard.shardId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ public void testIds() {
ids.put(107, org.elasticsearch.repositories.RepositoryMissingException.class);
ids.put(108, null);
ids.put(109, org.elasticsearch.index.engine.DocumentSourceMissingException.class);
ids.put(110, org.elasticsearch.index.engine.FlushNotAllowedEngineException.class);
ids.put(110, null); // FlushNotAllowedEngineException was removed in 5.0
ids.put(111, org.elasticsearch.common.settings.NoClassSettingsException.class);
ids.put(112, org.elasticsearch.transport.BindTransportException.class);
ids.put(113, org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void testFlushWithBlocks() {
for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE)) {
try {
enableIndexBlock("test", blockSetting);
FlushResponse response = client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).execute().actionGet();
FlushResponse response = client().admin().indices().prepareFlush("test").execute().actionGet();
assertNoFailures(response);
assertThat(response.getSuccessfulShards(), equalTo(numShards.totalNumShards));
} finally {
Expand Down Expand Up @@ -80,4 +80,4 @@ public void testFlushWithBlocks() {
setClusterReadOnly(false);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void setupIndex() {
String id = Integer.toString(j);
client().prepareIndex("test", "type1", id).setSource("text", "sometext").get();
}
client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).get();
client().admin().indices().prepareFlush("test").get();
}

public void testBasic() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private void indexRandomData(String index) throws ExecutionException, Interrupte
builders[i] = client().prepareIndex(index, "type").setSource("field", "value");
}
indexRandom(true, builders);
client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet();
client().admin().indices().prepareFlush().setForce(true).execute().actionGet();
}

private static final class IndexNodePredicate implements Predicate<Settings> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ public void testReusePeerRecovery() throws Exception {
logger.info("Running Cluster Health");
ensureGreen();
client().admin().indices().prepareForceMerge("test").setMaxNumSegments(100).get(); // just wait for merges
client().admin().indices().prepareFlush().setWaitIfOngoing(true).setForce(true).get();
client().admin().indices().prepareFlush().setForce(true).get();

boolean useSyncIds = randomBoolean();
if (useSyncIds == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public static void testCase(Settings indexSettings, Runnable restartCluster, Log
client().admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("30s").get();
// just wait for merges
client().admin().indices().prepareForceMerge("test").setMaxNumSegments(100).get();
client().admin().indices().prepareFlush().setWaitIfOngoing(true).setForce(true).get();
client().admin().indices().prepareFlush().setForce(true).get();

if (useSyncIds == false) {
logger.info("--> disabling allocation while the cluster is shut down");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void testRestoreToShadow() throws ExecutionException, InterruptedExceptio
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("foo", "doc", ""+i).setSource("foo", "bar").get();
}
assertNoFailures(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
assertNoFailures(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());

assertAcked(client().admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
engine.close();

engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
assertTrue(engine.isRecovering());
engine.recoverFromTranslog();
Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test"));
assertThat(counter.get(), equalTo(2));
Expand All @@ -594,13 +595,16 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
}

public void testFlushIsDisabledDuringTranslogRecovery() throws IOException {
assertFalse(engine.isRecovering());
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.index(new Engine.Index(newUid("1"), doc));
engine.close();

engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
expectThrows(FlushNotAllowedEngineException.class, () -> engine.flush(true, true));
expectThrows(IllegalStateException.class, () -> engine.flush(true, true));
assertTrue(engine.isRecovering());
engine.recoverFromTranslog();
assertFalse(engine.isRecovering());
doc = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.index(new Engine.Index(newUid("2"), doc));
engine.flush();
Expand Down Expand Up @@ -2114,6 +2118,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException {
Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(0)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);

try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG))){
assertFalse(engine.isRecovering());
engine.index(firstIndexRequest);

expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
Expand All @@ -2126,6 +2131,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException {
{
for (int i = 0; i < 2; i++) {
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
assertTrue(engine.isRecovering());
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
if (i == 0) {
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
Expand Down
Loading

0 comments on commit 0151974

Please sign in to comment.