Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use exact numDocs in synced-flush and metadata snapshot #30228

Merged
merged 20 commits into from
May 15, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions server/src/main/java/org/elasticsearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.FieldDoc;
Expand Down Expand Up @@ -139,14 +141,12 @@ public static Iterable<String> files(SegmentInfos infos) throws IOException {
}

/**
* Returns the number of documents in the index referenced by this {@link SegmentInfos}
* Returns the number of live documents in the index referenced by this {@link SegmentInfos}
*/
public static int getNumDocs(SegmentInfos info) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep this like it is. We should rather, in the engine do this conditionally if we have soft-deletes enabled. Also please make sure that we only load this once and cache it. We don't commit very often and commit stats are fetched rarely we can do this when it's needed and cache it per commit.

int numDocs = 0;
for (SegmentCommitInfo si : info) {
numDocs += si.info.maxDoc() - si.getDelCount();
public static int getNumDocs(Directory directory, SegmentInfos sis) throws IOException {
try (DirectoryReader reader = StandardDirectoryReader.open(directory, sis, Collections.emptyList())) {
return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETE_FIELD).numDocs();
}
return numDocs;
}

/**
Expand Down
10 changes: 8 additions & 2 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.Base64;
Expand Down Expand Up @@ -577,8 +578,13 @@ protected final void ensureOpen() {

/** get commits stats for the last commit */
public CommitStats commitStats() {
try (Engine.Searcher searcher = acquireSearcher("commit_stats", Engine.SearcherScope.INTERNAL)) {
return new CommitStats(getLastCommittedSegmentInfos(), searcher.reader().numDocs());
// Need to retain the commit as we will open it.
try (IndexCommitRef commitRef = acquireLastIndexCommit(false)) {
final SegmentInfos sis = Lucene.readSegmentInfos(commitRef.getIndexCommit());
return new CommitStats(sis, Lucene.getNumDocs(store.directory(), sis));
} catch (IOException ex) {
maybeFailEngine("commit_stats", ex);
throw new UncheckedIOException(ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
Map<String, String> commitUserDataBuilder = new HashMap<>();
try {
final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory);
numDocs = Lucene.getNumDocs(segmentCommitInfos);
numDocs = Lucene.getNumDocs(directory, segmentCommitInfos);
commitUserDataBuilder.putAll(segmentCommitInfos.getUserData());
Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion(); // we don't know which version was used to write so we take the max version.
for (SegmentCommitInfo info : segmentCommitInfos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public void testNumDocs() throws IOException {
writer.addDocument(doc);
writer.commit();
SegmentInfos segmentCommitInfos = Lucene.readSegmentInfos(dir);
assertEquals(1, Lucene.getNumDocs(segmentCommitInfos));
assertEquals(1, Lucene.getNumDocs(dir, segmentCommitInfos));

doc = new Document();
doc.add(new TextField("id", "2", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
Expand All @@ -293,14 +293,14 @@ public void testNumDocs() throws IOException {
doc.add(new TextField("id", "3", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
writer.addDocument(doc);
segmentCommitInfos = Lucene.readSegmentInfos(dir);
assertEquals(1, Lucene.getNumDocs(segmentCommitInfos));
assertEquals(1, Lucene.getNumDocs(dir, segmentCommitInfos));
writer.commit();
segmentCommitInfos = Lucene.readSegmentInfos(dir);
assertEquals(3, Lucene.getNumDocs(segmentCommitInfos));
assertEquals(3, Lucene.getNumDocs(dir, segmentCommitInfos));
writer.deleteDocuments(new Term("id", "2"));
writer.commit();
segmentCommitInfos = Lucene.readSegmentInfos(dir);
assertEquals(2, Lucene.getNumDocs(segmentCommitInfos));
assertEquals(2, Lucene.getNumDocs(dir, segmentCommitInfos));

int numDocsToIndex = randomIntBetween(10, 50);
List<Term> deleteTerms = new ArrayList<>();
Expand All @@ -318,7 +318,7 @@ public void testNumDocs() throws IOException {
}
writer.commit();
segmentCommitInfos = Lucene.readSegmentInfos(dir);
assertEquals(2 + deleteTerms.size(), Lucene.getNumDocs(segmentCommitInfos));
assertEquals(2 + deleteTerms.size(), Lucene.getNumDocs(dir, segmentCommitInfos));
writer.close();
dir.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,38 @@ public long getCheckpoint() {
}
}

public void testCommitStatsNumDocs() throws Exception {
final MergePolicy keepSoftDeleteDocsMP = new SoftDeletesRetentionMergePolicy(
Lucene.SOFT_DELETE_FIELD, () -> new MatchAllDocsQuery(), engine.config().getMergePolicy());
try (Store store = createStore();
Engine engine = createEngine(config(defaultSettings, store, createTempDir(), keepSoftDeleteDocsMP, null))) {
final Set<String> pendingDocs = new HashSet<>();
int flushedDocs = 0;
final int iters = scaledRandomIntBetween(10, 100);
for (int i = 0; i < iters; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc));
pendingDocs.add(doc.id());
if (randomBoolean()) {
engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get()));
pendingDocs.remove(doc.id());
}
if (randomBoolean()) {
engine.index(indexForDoc(doc));
pendingDocs.add(doc.id());
}
if (randomBoolean()) {
engine.flush();
flushedDocs = pendingDocs.size();
}
if (randomBoolean()) {
engine.refresh("test");
}
assertThat(engine.commitStats().getNumDocs(), equalTo(flushedDocs));
}
}
}

public void testIndexSearcherWrapper() throws Exception {
final AtomicInteger counter = new AtomicInteger();
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,8 @@ private void pruneOldDeleteGenerations(Set<Path> files) {
}

public List<Path> listShardFiles(ShardRouting routing) throws IOException {
NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(routing.currentNodeId()).setFs(true).get();
NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(routing.currentNodeId())
.clear().setIndices(false).setFs(true).get();
ClusterState state = client().admin().cluster().prepareState().get().getState();
final Index test = state.metaData().index("test").getIndex();
assertThat(routing.toString(), nodeStatses.getNodes().size(), equalTo(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,12 @@ private void assertSameSyncIdSameDocs() {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
CommitStats commitStats = indexShard.commitStats();
CommitStats commitStats = null;
try {
commitStats = indexShard.commitStats();
} catch (Exception ex) {
logger.warn("Failed to read commit stats", ex);
}
if (commitStats != null) { // null if the engine is closed or if the shard is recovering
String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID);
if (syncId != null) {
Expand Down