Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed shard should never open new engine #47186

Merged
merged 7 commits into from
Nov 7, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 23 additions & 49 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1157,11 +1157,9 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
synchronized (engineMutex) {
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
synchronized (mutex) {
final Engine engine = getEngineOrNull();
if (engine != null) {
indexCommit = engine.acquireLastIndexCommit(false);
}
final Engine engine = getEngineOrNull();
if (engine != null) {
indexCommit = engine.acquireLastIndexCommit(false);
}
if (indexCommit == null) {
return store.getMetadata(null, true);
Expand Down Expand Up @@ -1285,9 +1283,11 @@ public CacheHelper getReaderCacheHelper() {
}

public void close(String reason, boolean flushEngine) throws IOException {
synchronized (mutex) {
synchronized (engineMutex) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
try {
changeState(IndexShardState.CLOSED, reason);
synchronized (mutex) {
changeState(IndexShardState.CLOSED, reason);
}
} finally {
final Engine engine = this.currentEngineReference.getAndSet(null);
try {
Expand Down Expand Up @@ -1393,7 +1393,7 @@ public long recoverLocallyUpToGlobalCheckpoint() {
getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint);
logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint));
} finally {
synchronized (mutex) {
synchronized (engineMutex) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
IOUtils.close(currentEngineReference.getAndSet(null));
}
}
Expand Down Expand Up @@ -1568,23 +1568,15 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
: "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()
+ "] but got " + getRetentionLeases();
synchronized (engineMutex) {
assert currentEngineReference.get() == null : "engine is running";
verifyNotClosed();
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
final Engine newEngine = engineFactory.newReadWriteEngine(config);
synchronized (mutex) {
try {
verifyNotClosed();
assert currentEngineReference.get() == null : "engine is running";
onNewEngine(newEngine);
currentEngineReference.set(newEngine);
// We set active because we are now writing operations to the engine; this way,
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
active.set(true);
} finally {
if (currentEngineReference.get() != newEngine) {
newEngine.close();
}
}
}
onNewEngine(newEngine);
currentEngineReference.set(newEngine);
// We set active because we are now writing operations to the engine; this way,
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
active.set(true);
}
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
Expand Down Expand Up @@ -1615,7 +1607,7 @@ private void onNewEngine(Engine newEngine) {
* called if recovery has to be restarted after network error / delay **
*/
public void performRecoveryRestart() throws IOException {
synchronized (mutex) {
synchronized (engineMutex) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners";
IOUtils.close(currentEngineReference.getAndSet(null));
resetRecoveryStage();
Expand Down Expand Up @@ -3299,7 +3291,7 @@ public ParsedDocument newNoopTombstoneDoc(String reason) {
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
*/
void resetEngineToGlobalCheckpoint() throws IOException {
assert Thread.holdsLock(engineMutex) == false : "resetting engine under mutex";
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
assert getActiveOperationsCount() == OPERATIONS_BLOCKED
: "resetting engine without blocking operations; active operations are [" + getActiveOperations() + ']';
sync(); // persist the global checkpoint to disk
Expand All @@ -3312,6 +3304,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
final long globalCheckpoint = getLastKnownGlobalCheckpoint();
assert globalCheckpoint == getLastSyncedGlobalCheckpoint();
synchronized (engineMutex) {
verifyNotClosed();
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
// acquireXXXCommit and close works.
final Engine readOnlyEngine =
Expand Down Expand Up @@ -3339,7 +3332,7 @@ public IndexCommitRef acquireSafeIndexCommit() {

@Override
public void close() throws IOException {
assert Thread.holdsLock(mutex);
assert Thread.holdsLock(engineMutex);

Engine newEngine = newEngineReference.get();
if (newEngine == currentEngineReference.get()) {
Expand All @@ -3349,36 +3342,17 @@ public void close() throws IOException {
IOUtils.close(super::close, newEngine);
}
};
synchronized (mutex) {
try {
verifyNotClosed();
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
} finally {
if (currentEngineReference.get() != readOnlyEngine) {
readOnlyEngine.close();
}
}
}
final Engine newReadWriteEngine = engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker));
synchronized (mutex) {
try {
verifyNotClosed();
newEngineReference.set(newReadWriteEngine);
onNewEngine(newReadWriteEngine);
} finally {
if (newEngineReference.get() != newReadWriteEngine) {
newReadWriteEngine.close(); // shard was closed
}
}
}
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
onNewEngine(newEngineReference.get());
}
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
// TODO: add a dedicate recovery stats for the reset translog
});
newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint);
newEngineReference.get().refresh("reset_engine");
synchronized (mutex) {
synchronized (engineMutex) {
verifyNotClosed();
IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get()));
// We set active because we are now writing operations to the engine; this way,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import java.util.Random;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -511,15 +512,21 @@ public final void ensureAllSearchContextsReleased() throws Exception {
// TODO: can we do this cleaner???

/** MockFSDirectoryService sets this: */
public static boolean checkIndexFailed;
public static final List<Exception> checkIndexFailures = new CopyOnWriteArrayList<>();

@Before
public final void resetCheckIndexStatus() throws Exception {
checkIndexFailed = false;
checkIndexFailures.clear();
}

public final void ensureCheckIndexPassed() {
assertFalse("at least one shard failed CheckIndex", checkIndexFailed);
if (checkIndexFailures.isEmpty() == false) {
final AssertionError e = new AssertionError("at least one shard failed CheckIndex");
for (Exception failure : checkIndexFailures) {
e.addSuppressed(failure);
}
throw e;
}
}

// -----------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,19 @@ public static void checkIndex(Logger logger, Store store, ShardId shardId) {
CheckIndex.Status status = store.checkIndex(out);
out.flush();
if (!status.clean) {
ESTestCase.checkIndexFailed = true;
logger.warn("check index [failure] index files={}\n{}", Arrays.toString(dir.listAll()), os.bytes().utf8ToString());
throw new IOException("index check failure");
IOException failure = new IOException("failed to check index for shard " + shardId +
";index files [" + Arrays.toString(dir.listAll()) + "] os [" + os.bytes().utf8ToString() + "]");
ESTestCase.checkIndexFailures.add(failure);
throw failure;
} else {
if (logger.isDebugEnabled()) {
logger.debug("check index [success]\n{}", os.bytes().utf8ToString());
}
}
} catch (LockObtainFailedException e) {
ESTestCase.checkIndexFailed = true;
throw new IllegalStateException("IndexWriter is still open on shard " + shardId, e);
IllegalStateException failure = new IllegalStateException("IndexWriter is still open on shard " + shardId, e);
ESTestCase.checkIndexFailures.add(failure);
throw failure;
}
} catch (Exception e) {
logger.warn("failed to check index", e);
Expand Down