Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache completion stats between refreshes #51991

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
---
setup:

- do:
indices.create:
index: test1
wait_for_active_shards: all
body:
settings:
# Limit the number of shards so that shards are unlikely
# to be relocated or being initialized between the test
# set up and the test execution
index.number_of_shards: 3
index.number_of_replicas: 0
mappings:
properties:
bar:
type: text
fielddata: true
fields:
completion:
type: completion

- do:
cluster.health:
wait_for_no_relocating_shards: true
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved

- do:
index:
index: test1
id: 1
body: { "bar": "bar" }

- do:
index:
index: test1
id: 2
body: { "bar": "foo" }

- do:
indices.refresh: {}

---
"Completion stats":
- do:
indices.stats: { completion_fields: "*" }

- match: { _shards.failed: 0}
- gt: { _all.total.completion.fields.bar\.completion.size_in_bytes: 0 }
- gt: { _all.total.completion.size_in_bytes: 0 }
- set: { _all.total.completion.size_in_bytes: original_size }

- do:
index:
index: test1
id: 3
body: { "bar": "foo", "baz": "foo" }

- do:
indices.refresh: {}

- do:
indices.stats: { completion_fields: "*" }

- match: { _shards.failed: 0}
- gt: { _all.total.completion.size_in_bytes: $original_size }
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;

import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.Terms;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.suggest.document.CompletionTerms;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.FieldMemoryStats;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.search.suggest.completion.CompletionStats;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

class CompletionStatsCache implements ReferenceManager.RefreshListener {

private final Supplier<Engine.Searcher> searcherSupplier;

/**
* Contains a future (i.e. non-null) if another thread is already computing stats, in which case wait for this computation to
* complete. Contains null otherwise, in which case compute the stats ourselves and save them here for other threads to use.
* Futures are eventually completed with stats that include all fields, requiring further filtering (see
* {@link CompletionStatsCache#filterCompletionStatsByFieldName}).
*/
private final AtomicReference<PlainActionFuture<CompletionStats>> completionStatsFutureRef = new AtomicReference<>();

CompletionStatsCache(Supplier<Engine.Searcher> searcherSupplier) {
this.searcherSupplier = searcherSupplier;
}

CompletionStats get(String... fieldNamePatterns) {
final PlainActionFuture<CompletionStats> newFuture = new PlainActionFuture<>();
final PlainActionFuture<CompletionStats> oldFuture = completionStatsFutureRef.compareAndExchange(null, newFuture);

if (oldFuture != null) {
// we lost the race, someone else is already computing stats, so we wait for that to finish
return filterCompletionStatsByFieldName(fieldNamePatterns, oldFuture.actionGet());
}

// we won the race, nobody else is already computing stats, so it's up to us
ActionListener.completeWith(newFuture, () -> {
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
long sizeInBytes = 0;
final ObjectLongHashMap<String> completionFields = new ObjectLongHashMap<>();

try (Engine.Searcher currentSearcher = searcherSupplier.get()) {
for (LeafReaderContext atomicReaderContext : currentSearcher.getIndexReader().leaves()) {
LeafReader atomicReader = atomicReaderContext.reader();
for (FieldInfo info : atomicReader.getFieldInfos()) {
Terms terms = atomicReader.terms(info.name);
if (terms instanceof CompletionTerms) {
// TODO: currently we load up the suggester for reporting its size
final long fstSize = ((CompletionTerms) terms).suggester().ramBytesUsed();
completionFields.addTo(info.name, fstSize);
sizeInBytes += fstSize;
}
}
}
}

return new CompletionStats(sizeInBytes, new FieldMemoryStats(completionFields));
});

return filterCompletionStatsByFieldName(fieldNamePatterns, newFuture.actionGet());
}

private static CompletionStats filterCompletionStatsByFieldName(String[] fieldNamePatterns, CompletionStats fullCompletionStats) {
final FieldMemoryStats fieldMemoryStats;
if (fieldNamePatterns != null && fieldNamePatterns.length > 0) {
final ObjectLongHashMap<String> completionFields = new ObjectLongHashMap<>(fieldNamePatterns.length);
for (ObjectLongCursor<String> fieldCursor : fullCompletionStats.getFields()) {
if (Regex.simpleMatch(fieldNamePatterns, fieldCursor.key)) {
completionFields.addTo(fieldCursor.key, fieldCursor.value);
}
}
fieldMemoryStats = new FieldMemoryStats(completionFields);
} else {
fieldMemoryStats = null;
}
return new CompletionStats(fullCompletionStats.getSizeInBytes(), fieldMemoryStats);
}

@Override
public void beforeRefresh() {
}

@Override
public void afterRefresh(boolean didRefresh) {
if (didRefresh) {
completionStatsFutureRef.set(null);
Copy link
Member

@dnhatn dnhatn Feb 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of invalidating the entire current cache, we can mark the current cache as outdated (i.e., need to refresh), then we can reuse the stats of some LeafReader that haven't changed between refreshes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we re-use a LeafReader across a refresh, does it keep its suggester loaded? If so, do we not already avoid most of the work of recomputing stats?

Note that we need to break the stats down by field, because the user can select the fields in the API. If I understand correctly I think re-using stats on a per-segment basis too would require tracking everything on a per-segment-per-field basis which seems unnecessary.

}
}
}
35 changes: 5 additions & 30 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,22 @@

package org.elasticsearch.index.engine;

import com.carrotsearch.hppc.ObjectLongHashMap;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.search.suggest.document.CompletionTerms;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
Expand All @@ -49,7 +44,6 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.FieldMemoryStats;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand All @@ -62,7 +56,6 @@
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.VersionType;
Expand Down Expand Up @@ -177,32 +170,14 @@ public MergeStats getMergeStats() {
/** Returns how many bytes we are currently moving from heap to disk */
public abstract long getWritingBytes();


final CompletionStatsCache completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats"));
henningandersen marked this conversation as resolved.
Show resolved Hide resolved

/**
* Returns the {@link CompletionStats} for this engine
*/
public CompletionStats completionStats(String... fieldNamePatterns) throws IOException {
try (Searcher currentSearcher = acquireSearcher("completion_stats", SearcherScope.INTERNAL)) {
long sizeInBytes = 0;
ObjectLongHashMap<String> completionFields = null;
if (fieldNamePatterns != null && fieldNamePatterns.length > 0) {
completionFields = new ObjectLongHashMap<>(fieldNamePatterns.length);
}
for (LeafReaderContext atomicReaderContext : currentSearcher.getIndexReader().leaves()) {
LeafReader atomicReader = atomicReaderContext.reader();
for (FieldInfo info : atomicReader.getFieldInfos()) {
Terms terms = atomicReader.terms(info.name);
if (terms instanceof CompletionTerms) {
// TODO: currently we load up the suggester for reporting its size
long fstSize = ((CompletionTerms) terms).suggester().ramBytesUsed();
if (Regex.simpleMatch(fieldNamePatterns, info.name)) {
completionFields.addTo(info.name, fstSize);
}
sizeInBytes += fstSize;
}
}
}
return new CompletionStats(sizeInBytes, completionFields == null ? null : new FieldMemoryStats(completionFields));
}
public CompletionStats completionStats(String... fieldNamePatterns) {
return completionStatsCache.get(fieldNamePatterns);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ public InternalEngine(EngineConfig engineConfig) {
}
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getProcessedCheckpoint());
this.internalReaderManager.addListener(lastRefreshedCheckpointListener);
this.externalReaderManager.addListener(completionStatsCache);
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo()));
if (localCheckpointTracker.getPersistedCheckpoint() < localCheckpointTracker.getMaxSeqNo()) {
try (Searcher searcher =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1026,11 +1026,7 @@ public TranslogStats translogStats() {

public CompletionStats completionStats(String... fields) {
readAllowed();
try {
return getEngine().completionStats(fields);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return getEngine().completionStats(fields);
}

/**
Expand Down
Loading