Skip to content

Commit

Permalink
ReadOnlyEngine should update translog recovery state information (#39238
Browse files Browse the repository at this point in the history
) (#39254)

* ReadOnlyEngine should update translog recovery state information (#39238)

`ReadOnlyEngine` never recovers operations from translog and never
updates translog information in the index shard's recovery state, even
though the recovery state goes through the `TRANSLOG` stage during
the recovery. It means that recovery information for frozen shards indicates
an unkown number of recovered translog ops in the Recovery APIs
(translog_ops: `-1` and translog_ops_percent: `-1.0%`) and this is confusing.

This commit changes the `recoverFromTranslog()` method in `ReadOnlyEngine`
so that it always recover from an empty translog snapshot, allowing the recovery
state translog information to be correctly updated.

Related to #33888
  • Loading branch information
tlrx authored Feb 22, 2019
1 parent 1fe8b74 commit c899f63
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SeqNoStats;
Expand Down Expand Up @@ -287,18 +288,7 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS

@Override
public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
return new Translog.Snapshot() {
@Override
public void close() { }
@Override
public int totalOperations() {
return 0;
}
@Override
public Translog.Operation next() {
return null;
}
};
return newEmptySnapshot();
}

@Override
Expand Down Expand Up @@ -429,7 +419,15 @@ public int fillSeqNoGaps(long primaryTerm) {
}

@Override
public Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) {
public Engine recoverFromTranslog(final TranslogRecoveryRunner translogRecoveryRunner, final long recoverUpToSeqNo) {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
try (Translog.Snapshot snapshot = newEmptySnapshot()) {
translogRecoveryRunner.run(this, snapshot);
} catch (final Exception e) {
throw new EngineException(shardId, "failed to recover from empty translog snapshot", e);
}
}
return this;
}

Expand Down Expand Up @@ -468,4 +466,22 @@ protected void processReaders(IndexReader reader, IndexReader previousReader) {
public boolean refreshNeeded() {
return false;
}

private Translog.Snapshot newEmptySnapshot() {
return new Translog.Snapshot() {
@Override
public void close() {
}

@Override
public int totalOperations() {
return 0;
}

@Override
public Translog.Operation next() {
return null;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,36 @@ public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException {
}
}
}

public void testRecoverFromTranslogAppliesNoOperations() throws IOException {
IOUtils.close(engine, store);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
int numDocs = scaledRandomIntBetween(10, 1000);
try (InternalEngine engine = createEngine(config)) {
for (int i = 0; i < numDocs; i++) {
if (rarely()) {
continue; // gap in sequence number
}
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, VersionType.EXTERNAL,
Engine.Operation.Origin.REPLICA, System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
if (rarely()) {
engine.flush();
}
globalCheckpoint.set(i);
}
engine.syncTranslog();
engine.flushAndClose();
}
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings());
readOnlyEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
readOnlyEngine.recoverFromTranslog(translogHandler, randomNonNegativeLong());

assertThat(translogHandler.appliedOperations(), equalTo(0L));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -26,6 +28,7 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchService;
Expand All @@ -48,8 +51,10 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

public class FrozenIndexTests extends ESSingleNodeTestCase {

Expand Down Expand Up @@ -370,4 +375,36 @@ public void testFreezeEmptyIndexWithTranslogOps() throws Exception {
assertAcked(new XPackClient(client()).freeze(new TransportFreezeIndexAction.FreezeRequest(indexName)));
assertIndexFrozen(indexName);
}

public void testRecoveryState() throws ExecutionException, InterruptedException {
final String indexName = "index_recovery_state";
createIndex(indexName, Settings.builder()
.put("index.number_of_replicas", 0)
.build());

final long nbDocs = randomIntBetween(0, 50);
for (long i = 0; i < nbDocs; i++) {
final IndexResponse indexResponse = client().prepareIndex(indexName, "_doc", Long.toString(i)).setSource("field", i).get();
assertThat(indexResponse.status(), is(RestStatus.CREATED));
}

assertAcked(new XPackClient(client()).freeze(new TransportFreezeIndexAction.FreezeRequest(indexName)));
assertIndexFrozen(indexName);

final IndexMetaData indexMetaData = client().admin().cluster().prepareState().get().getState().metaData().index(indexName);
final IndexService indexService = getInstanceFromNode(IndicesService.class).indexService(indexMetaData.getIndex());
for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
final IndexShard indexShard = indexService.getShardOrNull(i);
assertThat("Shard [" + i + "] is missing for index " + indexMetaData.getIndex(), indexShard, notNullValue());
final RecoveryState recoveryState = indexShard.recoveryState();
assertThat(recoveryState.getRecoverySource(), is(RecoverySource.ExistingStoreRecoverySource.INSTANCE));
assertThat(recoveryState.getStage(), is(RecoveryState.Stage.DONE));
assertThat(recoveryState.getTargetNode(), notNullValue());
assertThat(recoveryState.getIndex().totalFileCount(), greaterThan(0));
assertThat(recoveryState.getIndex().reusedFileCount(), greaterThan(0));
assertThat(recoveryState.getTranslog().recoveredOperations(), equalTo(0));
assertThat(recoveryState.getTranslog().totalOperations(), equalTo(0));
assertThat(recoveryState.getTranslog().recoveredPercent(), equalTo(100.0f));
}
}
}

0 comments on commit c899f63

Please sign in to comment.