Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReadOnlyEngine should update translog recovery state information #39238

Merged
merged 1 commit into from
Feb 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SeqNoStats;
Expand Down Expand Up @@ -287,18 +288,7 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS

@Override
public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
return new Translog.Snapshot() {
@Override
public void close() { }
@Override
public int totalOperations() {
return 0;
}
@Override
public Translog.Operation next() {
return null;
}
};
return newEmptySnapshot();
}

@Override
Expand Down Expand Up @@ -429,7 +419,15 @@ public int fillSeqNoGaps(long primaryTerm) {
}

@Override
public Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) {
public Engine recoverFromTranslog(final TranslogRecoveryRunner translogRecoveryRunner, final long recoverUpToSeqNo) {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
try (Translog.Snapshot snapshot = newEmptySnapshot()) {
translogRecoveryRunner.run(this, snapshot);
} catch (final Exception e) {
throw new EngineException(shardId, "failed to recover from empty translog snapshot", e);
}
}
return this;
}

Expand Down Expand Up @@ -468,4 +466,22 @@ protected void processReaders(IndexReader reader, IndexReader previousReader) {
public boolean refreshNeeded() {
return false;
}

private Translog.Snapshot newEmptySnapshot() {
return new Translog.Snapshot() {
@Override
public void close() {
}

@Override
public int totalOperations() {
return 0;
}

@Override
public Translog.Operation next() {
return null;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,36 @@ public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException {
}
}
}

public void testRecoverFromTranslogAppliesNoOperations() throws IOException {
IOUtils.close(engine, store);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
int numDocs = scaledRandomIntBetween(10, 1000);
try (InternalEngine engine = createEngine(config)) {
for (int i = 0; i < numDocs; i++) {
if (rarely()) {
continue; // gap in sequence number
}
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
if (rarely()) {
engine.flush();
}
globalCheckpoint.set(i);
}
engine.syncTranslog();
engine.flushAndClose();
}
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings());
readOnlyEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
readOnlyEngine.recoverFromTranslog(translogHandler, randomNonNegativeLong());

assertThat(translogHandler.appliedOperations(), equalTo(0L));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -27,6 +29,7 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchService;
Expand All @@ -49,8 +52,10 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

public class FrozenIndexTests extends ESSingleNodeTestCase {

Expand Down Expand Up @@ -372,4 +377,36 @@ public void testFreezeEmptyIndexWithTranslogOps() throws Exception {
assertAcked(new XPackClient(client()).freeze(new TransportFreezeIndexAction.FreezeRequest(indexName)));
assertIndexFrozen(indexName);
}

public void testRecoveryState() throws ExecutionException, InterruptedException {
final String indexName = "index_recovery_state";
createIndex(indexName, Settings.builder()
.put("index.number_of_replicas", 0)
.build());

final long nbDocs = randomIntBetween(0, 50);
for (long i = 0; i < nbDocs; i++) {
final IndexResponse indexResponse = client().prepareIndex(indexName, "_doc", Long.toString(i)).setSource("field", i).get();
assertThat(indexResponse.status(), is(RestStatus.CREATED));
}

assertAcked(new XPackClient(client()).freeze(new TransportFreezeIndexAction.FreezeRequest(indexName)));
assertIndexFrozen(indexName);

final IndexMetaData indexMetaData = client().admin().cluster().prepareState().get().getState().metaData().index(indexName);
final IndexService indexService = getInstanceFromNode(IndicesService.class).indexService(indexMetaData.getIndex());
for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
final IndexShard indexShard = indexService.getShardOrNull(i);
assertThat("Shard [" + i + "] is missing for index " + indexMetaData.getIndex(), indexShard, notNullValue());
final RecoveryState recoveryState = indexShard.recoveryState();
assertThat(recoveryState.getRecoverySource(), is(RecoverySource.ExistingStoreRecoverySource.INSTANCE));
assertThat(recoveryState.getStage(), is(RecoveryState.Stage.DONE));
assertThat(recoveryState.getTargetNode(), notNullValue());
assertThat(recoveryState.getIndex().totalFileCount(), greaterThan(0));
assertThat(recoveryState.getIndex().reusedFileCount(), greaterThan(0));
assertThat(recoveryState.getTranslog().recoveredOperations(), equalTo(0));
assertThat(recoveryState.getTranslog().totalOperations(), equalTo(0));
assertThat(recoveryState.getTranslog().recoveredPercent(), equalTo(100.0f));
}
}
}