Skip to content

Commit

Permalink
Reset engine for hollow shards (elastic#120649)
Browse files Browse the repository at this point in the history
Introduces a function in the IndexShard to reset the engine.
By default, all Engine implementation will throw in core ES.

In stateless, we will extend the prepareForEngineReset() function
in order to make a hollow commit or an unhollow commit.
Based on the commit, the subsequent new engine will be a hollow
or unhollow indexing engine.

The reset function takes care to close the engine, which
waits for all operations to drain. This, along with the fact we
will have blocked ingestion in stateless, and there should be no
searches in the indexing tier, should ensure there
are no unexpected asynchronous side-effects.

Relates ES-10600
  • Loading branch information
kingherc authored Jan 24, 2025
1 parent 5e662c5 commit 39603ec
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 1 deletion.
12 changes: 12 additions & 0 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.DenseVectorStats;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardLongFieldRange;
import org.elasticsearch.index.shard.SparseVectorStats;
Expand Down Expand Up @@ -2334,4 +2335,15 @@ public record FlushResult(boolean flushPerformed, long generation) {
public static final long UNKNOWN_GENERATION = -1L;
public static final FlushResult NO_FLUSH = new FlushResult(false, UNKNOWN_GENERATION);
}

/**
* Ensures the engine is in a state that it can be closed by a call to {@link IndexShard#resetEngine()}.
*
* In general, resetting the engine should be done with care, to consider any
* in-progress operations and listeners (e.g., primary term and generation listeners).
* At the moment, this is implemented in serverless for a special case that ensures the engine is prepared for reset.
*/
public void prepareForEngineReset() throws IOException {
throw new UnsupportedOperationException("does not support engine reset");
}
}
29 changes: 29 additions & 0 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4307,6 +4307,35 @@ public void afterRefresh(boolean didRefresh) {
}
}

/**
* Reset the current engine to a new one.
*
* Calls {@link Engine#prepareForEngineReset()} on the current engine, then closes it, and loads a new engine without
* doing any translog recovery.
*
* In general, resetting the engine should be done with care, to consider any in-progress operations and listeners.
* At the moment, this is implemented in serverless for a special case that ensures the engine is prepared for reset.
*/
public void resetEngine() {
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
assert waitForEngineOrClosedShardListeners.isDone();
try {
synchronized (engineMutex) {
final var currentEngine = getEngine();
currentEngine.prepareForEngineReset();
var engineConfig = newEngineConfig(replicationTracker);
verifyNotClosed();
IOUtils.close(currentEngine);
var newEngine = createEngine(engineConfig);
currentEngineReference.set(newEngine);
onNewEngine(newEngine);
}
onSettingsChanged();
} catch (Exception e) {
failShard("unable to reset engine", e);
}
}

/**
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4497,7 +4497,7 @@ public void testSupplyTombstoneDoc() throws Exception {
closeShards(shard);
}

public void testResetEngine() throws Exception {
public void testResetEngineToGlobalCheckpoint() throws Exception {
IndexShard shard = newStartedShard(false);
indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint()));
long maxSeqNoBeforeRollback = shard.seqNoStats().getMaxSeqNo();
Expand Down Expand Up @@ -4559,6 +4559,33 @@ public void testResetEngine() throws Exception {
closeShard(shard, false);
}

public void testResetEngine() throws Exception {
var newEngineCreated = new CountDownLatch(2);
var indexShard = newStartedShard(true, Settings.EMPTY, config -> {
try {
return new ReadOnlyEngine(config, null, null, true, Function.identity(), true, true) {
@Override
public void prepareForEngineReset() throws IOException {
;
}
};
} finally {
newEngineCreated.countDown();
}
});
var newEngineNotification = new CountDownLatch(1);
indexShard.waitForEngineOrClosedShard(ActionListener.running(newEngineNotification::countDown));

var onAcquired = new PlainActionFuture<Releasable>();
indexShard.acquireAllPrimaryOperationsPermits(onAcquired, TimeValue.timeValueMinutes(1L));
try (var permits = safeGet(onAcquired)) {
indexShard.resetEngine();
}
safeAwait(newEngineCreated);
safeAwait(newEngineNotification);
closeShard(indexShard, false);
}

/**
* This test simulates a scenario seen rarely in ConcurrentSeqNoVersioningIT. Closing a shard while engine is inside
* resetEngineToGlobalCheckpoint can lead to check index failure in integration tests.
Expand Down

0 comments on commit 39603ec

Please sign in to comment.