Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move DocsStats into Engine #33835

Merged
merged 4 commits into from
Sep 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -175,6 +176,41 @@ public MergeStats getMergeStats() {
/** Returns how many bytes we are currently moving from heap to disk */
public abstract long getWritingBytes();

/**
* Returns the {@link DocsStats} for this engine
*/
public DocsStats docStats() {
// we calculate the doc stats based on the internal reader that is more up-to-date and not subject
// to external refreshes. For instance we don't refresh an external reader if we flush and indices with
// index.refresh_interval=-1 won't see any doc stats updates at all. This change will give more accurate statistics
// when indexing but not refreshing in general. Yet, if a refresh happens the internal reader is refresh as well so we are
// safe here.
try (Engine.Searcher searcher = acquireSearcher("docStats", Engine.SearcherScope.INTERNAL)) {
return docsStats(searcher.reader());
}
}

protected final DocsStats docsStats(IndexReader indexReader) {
long numDocs = 0;
long numDeletedDocs = 0;
long sizeInBytes = 0;
// we don't wait for a pending refreshes here since it's a stats call instead we mark it as accessed only which will cause
// the next scheduled refresh to go through and refresh the stats as well
for (LeafReaderContext readerContext : indexReader.leaves()) {
// we go on the segment level here to get accurate numbers
final SegmentReader segmentReader = Lucene.segmentReader(readerContext.reader());
SegmentCommitInfo info = segmentReader.getSegmentInfo();
numDocs += readerContext.reader().numDocs();
numDeletedDocs += readerContext.reader().numDeletedDocs();
try {
sizeInBytes += info.sizeInBytes();
} catch (IOException e) {
logger.trace(() -> new ParameterizedMessage("failed to get size for [{}]", info.info.name), e);
}
}
return new DocsStats(numDocs, numDeletedDocs, sizeInBytes);
}

/**
* A throttling class that can be activated, causing the
* {@code acquireThrottle} method to block on a lock when throttling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStats;
Expand Down Expand Up @@ -63,6 +64,7 @@ public final class ReadOnlyEngine extends Engine {
private final SearcherManager searcherManager;
private final IndexCommit indexCommit;
private final Lock indexWriterLock;
private final DocsStats docsStats;

/**
* Creates a new ReadOnlyEngine. This ctor can also be used to open a read-only engine on top of an already opened
Expand Down Expand Up @@ -101,6 +103,7 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
this.indexCommit = reader.getIndexCommit();
this.searcherManager = new SearcherManager(reader,
new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService()));
this.docsStats = docsStats(reader);
this.indexWriterLock = indexWriterLock;
success = true;
} finally {
Expand Down Expand Up @@ -365,4 +368,9 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) {
@Override
public void maybePruneDeletes() {
}

@Override
public DocsStats docStats() {
return docsStats;
}
}
33 changes: 3 additions & 30 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,9 @@

import com.carrotsearch.hppc.ObjectLongMap;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryCachingPolicy;
Expand Down Expand Up @@ -879,32 +875,9 @@ public FlushStats flushStats() {
}

public DocsStats docStats() {
// we calculate the doc stats based on the internal reader that is more up-to-date and not subject
// to external refreshes. For instance we don't refresh an external reader if we flush and indices with
// index.refresh_interval=-1 won't see any doc stats updates at all. This change will give more accurate statistics
// when indexing but not refreshing in general. Yet, if a refresh happens the internal reader is refresh as well so we are
// safe here.
long numDocs = 0;
long numDeletedDocs = 0;
long sizeInBytes = 0;
try (Engine.Searcher searcher = acquireSearcher("docStats", Engine.SearcherScope.INTERNAL)) {
// we don't wait for a pending refreshes here since it's a stats call instead we mark it as accessed only which will cause
// the next scheduled refresh to go through and refresh the stats as well
markSearcherAccessed();
for (LeafReaderContext reader : searcher.reader().leaves()) {
// we go on the segment level here to get accurate numbers
final SegmentReader segmentReader = Lucene.segmentReader(reader.reader());
SegmentCommitInfo info = segmentReader.getSegmentInfo();
numDocs += reader.reader().numDocs();
numDeletedDocs += reader.reader().numDeletedDocs();
try {
sizeInBytes += info.sizeInBytes();
} catch (IOException e) {
logger.trace(() -> new ParameterizedMessage("failed to get size for [{}]", info.info.name), e);
}
}
}
return new DocsStats(numDocs, numDeletedDocs, sizeInBytes);
DocsStats docsStats = getEngine().docStats();
markSearcherAccessed();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a test for this? It could have gotten lost here. I looked (IndexShardTests#testDocStats) but did not see one. Would you add one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return docsStats;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2439,7 +2439,7 @@ public void testRecoverFromLocalShard() throws IOException {
closeShards(sourceShard, targetShard);
}

public void testDocStats() throws IOException, InterruptedException {
public void testDocStats() throws Exception {
IndexShard indexShard = null;
try {
indexShard = newStartedShard(
Expand All @@ -2456,7 +2456,14 @@ public void testDocStats() throws IOException, InterruptedException {
indexShard.flush(new FlushRequest());
}
{
IndexShard shard = indexShard;
assertBusy(() -> {
ThreadPool threadPool = shard.getThreadPool();
assertThat(threadPool.relativeTimeInMillis(), greaterThan(shard.getLastSearcherAccess()));
});
long prevAccessTime = shard.getLastSearcherAccess();
final DocsStats docsStats = indexShard.docStats();
assertThat("searcher was not marked as accessed", shard.getLastSearcherAccess(), greaterThan(prevAccessTime));
assertThat(docsStats.getCount(), equalTo(numDocs));
try (Engine.Searcher searcher = indexShard.acquireSearcher("test")) {
assertTrue(searcher.reader().numDocs() <= docsStats.getCount());
Expand Down Expand Up @@ -3428,4 +3435,9 @@ public void testResetEngine() throws Exception {
assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(translogStats.estimatedNumberOfOperations()));
closeShard(shard, false);
}

@Override
public Settings threadPoolSettings() {
return Settings.builder().put(super.threadPoolSettings()).put("thread_pool.estimated_time_interval", "5ms").build();
}
}