Skip to content

Commit

Permalink
Move CompletionStats into the Engine
Browse files Browse the repository at this point in the history
By moving CompletionStats into the engine we can easily cache the stats for
read-only engines if necessary. It also moves the responsibiltiy out of IndexShard
which has quiet some complexity already.

Relates to elastic#33835
  • Loading branch information
s1monw committed Sep 19, 2018
1 parent a3e8b83 commit e72514c
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 75 deletions.
35 changes: 35 additions & 0 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

package org.elasticsearch.index.engine;

import com.carrotsearch.hppc.ObjectLongHashMap;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexReader;
Expand All @@ -32,8 +34,10 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.suggest.document.CompletionTerms;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
Expand All @@ -42,6 +46,7 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.FieldMemoryStats;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand All @@ -56,6 +61,7 @@
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.VersionType;
Expand All @@ -70,6 +76,7 @@
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.search.suggest.completion.CompletionStats;

import java.io.Closeable;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -175,6 +182,34 @@ public MergeStats getMergeStats() {
/** Returns how many bytes we are currently moving from heap to disk */
public abstract long getWritingBytes();

/**
* Returns the {@link CompletionStats} for this engine
*/
public CompletionStats completionStats(String... fieldNamePatterns) throws IOException {
try (Engine.Searcher currentSearcher = acquireSearcher("completion_stats", SearcherScope.INTERNAL)) {
long sizeInBytes = 0;
ObjectLongHashMap<String> completionFields = null;
if (fieldNamePatterns != null && fieldNamePatterns.length > 0) {
completionFields = new ObjectLongHashMap<>(fieldNamePatterns.length);
}
for (LeafReaderContext atomicReaderContext : currentSearcher.reader().leaves()) {
LeafReader atomicReader = atomicReaderContext.reader();
for (FieldInfo info : atomicReader.getFieldInfos()) {
Terms terms = atomicReader.terms(info.name);
if (terms instanceof CompletionTerms) {
// TODO: currently we load up the suggester for reporting its size
long fstSize = ((CompletionTerms) terms).suggester().ramBytesUsed();
if (fieldNamePatterns != null && fieldNamePatterns.length > 0 && Regex.simpleMatch(fieldNamePatterns, info.name)) {
completionFields.addTo(info.name, fstSize);
}
sizeInBytes += fstSize;
}
}
}
return new CompletionStats(sizeInBytes, completionFields == null ? null : new FieldMemoryStats(completionFields));
}
}

/**
* A throttling class that can be activated, causing the
* {@code acquireThrottle} method to block on a lock when throttling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,13 @@
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.suggest.completion.CompletionFieldStats;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.IOException;
import java.io.PrintStream;
import java.io.UncheckedIOException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand Down Expand Up @@ -981,14 +981,15 @@ public TranslogStats translogStats() {
}

public CompletionStats completionStats(String... fields) {
CompletionStats completionStats = new CompletionStats();
try (Engine.Searcher currentSearcher = acquireSearcher("completion_stats")) {
try {
CompletionStats stats = getEngine().completionStats(fields);
// we don't wait for a pending refreshes here since it's a stats call instead we mark it as accessed only which will cause
// the next scheduled refresh to go through and refresh the stats as well
markSearcherAccessed();
completionStats.add(CompletionFieldStats.completionStats(currentSearcher.reader(), fields));
return stats;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return completionStats;
}

public Engine.SyncedFlushResult syncFlush(String syncId, Engine.CommitId expectedCommitId) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2438,6 +2438,23 @@ public void testRecoverFromLocalShard() throws IOException {
closeShards(sourceShard, targetShard);
}

public void testCompletionStatsMarksSearcherAccessed() throws Exception {
IndexShard indexShard = null;
try {
indexShard = newStartedShard();
IndexShard shard = indexShard;
assertBusy(() -> {
ThreadPool threadPool = shard.getThreadPool();
assertThat(threadPool.relativeTimeInMillis(), greaterThan(shard.getLastSearcherAccess()));
});
long prevAccessTime = shard.getLastSearcherAccess();
indexShard.completionStats();
assertThat("searcher was not marked as accessed", shard.getLastSearcherAccess(), greaterThan(prevAccessTime));
} finally {
closeShards(indexShard);
}
}

public void testDocStats() throws IOException, InterruptedException {
IndexShard indexShard = null;
try {
Expand Down

0 comments on commit e72514c

Please sign in to comment.