Skip to content

Commit

Permalink
Use exact numDocs in synced-flush and metadata snapshot (#30228)
Browse files Browse the repository at this point in the history
Since #29458, we use a searcher to calculate the number of documents for
a commit stats. Sadly, that approach is flawed. The searcher might no
longer point to the last commit if it's refreshed. As synced-flush
requires an exact numDocs to work correctly, we have to exclude all
soft-deleted docs.

This commit makes synced-flush stop using CommitStats but read an exact
numDocs directly from an index commit.

Relates #29458
Relates #29530
  • Loading branch information
dnhatn authored May 15, 2018
1 parent 2a2c23b commit b12c2f6
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 21 deletions.
11 changes: 11 additions & 0 deletions server/src/main/java/org/elasticsearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.FieldDoc;
Expand Down Expand Up @@ -149,6 +150,16 @@ public static int getNumDocs(SegmentInfos info) {
return numDocs;
}

/**
* Unlike {@link #getNumDocs(SegmentInfos)} this method returns a numDocs that always excludes soft-deleted docs.
* This method is expensive thus prefer using {@link #getNumDocs(SegmentInfos)} unless an exact numDocs is required.
*/
public static int getExactNumDocs(IndexCommit commit) throws IOException {
try (DirectoryReader reader = DirectoryReader.open(commit)) {
return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETE_FIELD).numDocs();
}
}

/**
* Reads the segments infos from the given commit, failing if it fails to load
*/
Expand Down
12 changes: 11 additions & 1 deletion server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
Map<String, String> commitUserDataBuilder = new HashMap<>();
try {
final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory);
numDocs = Lucene.getNumDocs(segmentCommitInfos);
numDocs = Lucene.getExactNumDocs(commit != null ? commit : findIndexCommit(directory, segmentCommitInfos));
commitUserDataBuilder.putAll(segmentCommitInfos.getUserData());
Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion(); // we don't know which version was used to write so we take the max version.
for (SegmentCommitInfo info : segmentCommitInfos) {
Expand Down Expand Up @@ -945,6 +945,16 @@ public static void hashFile(BytesRefBuilder fileHash, InputStream in, long size)
assert fileHash.length() == len : Integer.toString(fileHash.length()) + " != " + Integer.toString(len);
}

private static IndexCommit findIndexCommit(Directory directory, SegmentInfos sis) throws IOException {
List<IndexCommit> commits = DirectoryReader.listCommits(directory);
for (IndexCommit commit : commits) {
if (commit.getSegmentsFileName().equals(sis.getSegmentsFileName())) {
return commit;
}
}
throw new IOException("Index commit [" + sis.getSegmentsFileName() + "] is not found");
}

@Override
public Iterator<StoreFileMetaData> iterator() {
return metadata.values().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.indices.flush;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.Assertions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
Expand All @@ -41,13 +42,13 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -467,15 +468,19 @@ public String executor() {
}
}

private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest request) {
private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest request) throws IOException {
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true);
logger.trace("{} performing pre sync flush", request.shardId());
indexShard.flush(flushRequest);
final CommitStats commitStats = indexShard.commitStats();
final Engine.CommitId commitId = commitStats.getRawCommitId();
logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, commitStats.getNumDocs());
return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs(), commitStats.syncId());
try (Engine.IndexCommitRef commitRef = indexShard.acquireLastIndexCommit(false)) {
final SegmentInfos segmentInfos = Lucene.readSegmentInfos(commitRef.getIndexCommit());
final int numDocs = Lucene.getExactNumDocs(commitRef.getIndexCommit());
final Engine.CommitId commitId = new Engine.CommitId(segmentInfos.getId());
final String syncId = segmentInfos.userData.get(Engine.SYNC_COMMIT_ID);
logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, numDocs);
return new PreSyncedFlushResponse(commitId, numDocs, syncId);
}
}

private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public RecoveryResponse newInstance() {
* @param recoveryTarget the target of the recovery
* @return a snapshot of the store metadata
*/
private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) {
static Store.MetadataSnapshot getStoreMetadataSnapshot(final Logger logger, final RecoveryTarget recoveryTarget) {
try {
return recoveryTarget.indexShard().snapshotStoreMetadata();
} catch (final org.apache.lucene.index.IndexNotFoundException e) {
Expand All @@ -312,7 +312,7 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
final StartRecoveryRequest request;
logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());

final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget);
final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(logger, recoveryTarget);
logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());

final long startingSeqNo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.translog.Translog;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -108,4 +111,33 @@ public void testGetStartingSeqNo() throws Exception {
closeShards(replica);
}
}

public void testExactNumDocsInStoreMetadataSnapshot() throws Exception {
final IndexShard replica = newShard(false);
recoveryEmptyReplica(replica);
long flushedDocs = 0;
final int numDocs = scaledRandomIntBetween(1, 20);
final Set<String> docIds = new HashSet<>();
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(i);
docIds.add(id);
indexDoc(replica, "_doc", id);
if (randomBoolean()) {
replica.flush(new FlushRequest());
flushedDocs = docIds.size();
}
}
for (String id : randomSubsetOf(docIds)) {
deleteDoc(replica, "_doc", id);
docIds.remove(id);
if (randomBoolean()) {
replica.flush(new FlushRequest());
flushedDocs = docIds.size();
}
}
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
assertThat(PeerRecoveryTargetService.getStoreMetadataSnapshot(logger, recoveryTarget).getNumDocs(), equalTo(flushedDocs));
recoveryTarget.decRef();
closeShards(replica);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
Expand Down Expand Up @@ -76,7 +78,9 @@
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -1104,8 +1108,7 @@ public void beforeIndexDeletion() throws Exception {
// ElasticsearchIntegrationTest must override beforeIndexDeletion() to avoid failures.
assertNoPendingIndexOperations();
//check that shards that have same sync id also contain same number of documents
// norelease - AwaitsFix: https://github.com/elastic/elasticsearch/pull/30228
// assertSameSyncIdSameDocs();
assertSameSyncIdSameDocs();
assertOpenTranslogReferences();
}

Expand All @@ -1116,23 +1119,39 @@ private void assertSameSyncIdSameDocs() {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
CommitStats commitStats = indexShard.commitStats();
if (commitStats != null) { // null if the engine is closed or if the shard is recovering
String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID);
if (syncId != null) {
long liveDocsOnShard = commitStats.getNumDocs();
if (docsOnShards.get(syncId) != null) {
assertThat("sync id is equal but number of docs does not match on node " + nodeAndClient.name + ". expected " + docsOnShards.get(syncId) + " but got " + liveDocsOnShard, docsOnShards.get(syncId), equalTo(liveDocsOnShard));
} else {
docsOnShards.put(syncId, liveDocsOnShard);
}
Tuple<String, Integer> commitStats = commitStats(indexShard);
if (commitStats != null) {
String syncId = commitStats.v1();
long liveDocsOnShard = commitStats.v2();
if (docsOnShards.get(syncId) != null) {
assertThat("sync id is equal but number of docs does not match on node " + nodeAndClient.name +
". expected " + docsOnShards.get(syncId) + " but got " + liveDocsOnShard, docsOnShards.get(syncId),
equalTo(liveDocsOnShard));
} else {
docsOnShards.put(syncId, liveDocsOnShard);
}
}
}
}
}
}

private Tuple<String, Integer> commitStats(IndexShard indexShard) {
try (Engine.IndexCommitRef commitRef = indexShard.acquireLastIndexCommit(false)) {
final String syncId = commitRef.getIndexCommit().getUserData().get(Engine.SYNC_COMMIT_ID);
// Only read if sync_id exists
if (Strings.hasText(syncId)) {
return Tuple.tuple(syncId, Lucene.getExactNumDocs(commitRef.getIndexCommit()));
} else {
return null;
}
} catch (IllegalIndexShardStateException ex) {
return null; // Shard is closed or not started yet.
} catch (IOException ex) {
throw new AssertionError(ex);
}
}

private void assertNoPendingIndexOperations() throws Exception {
assertBusy(() -> {
final Collection<NodeAndClient> nodesAndClients = nodes.values();
Expand Down

0 comments on commit b12c2f6

Please sign in to comment.