Skip to content

Commit

Permalink
Cache completion stats between refreshes (#51991)
Browse files Browse the repository at this point in the history
Computing the stats for completion fields may involve a significant amount of
work since it walks every field of every segment looking for completion fields.
Innocuous-looking APIs like `GET _stats` or `GET _cluster/stats` do this for
every shard in the cluster. This repeated work is unnecessary since these stats
do not change between refreshes; in many indices they remain constant for a
long time.

This commit introduces a cache for these stats which is invalidated on a
refresh, allowing most stats calls to bypass the work needed to compute them on
most shards.

Closes #51915
  • Loading branch information
DaveCTurner authored Feb 27, 2020
1 parent 416dc46 commit a3a98c7
Show file tree
Hide file tree
Showing 7 changed files with 458 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
---
setup:

- do:
indices.create:
index: test1
wait_for_active_shards: all
body:
settings:
# Limit the number of shards so that shards are unlikely
# to be relocated or being initialized between the test
# set up and the test execution
index.number_of_shards: 3
index.number_of_replicas: 0
mappings:
properties:
bar:
type: text
fielddata: true
fields:
completion:
type: completion

- do:
cluster.health:
wait_for_no_relocating_shards: true
wait_for_events: languid

- do:
index:
index: test1
id: 1
body: { "bar": "bar" }

- do:
index:
index: test1
id: 2
body: { "bar": "foo" }

- do:
indices.refresh: {}

---
"Completion stats":
- do:
indices.stats: { completion_fields: "*" }

- match: { _shards.failed: 0}
- gt: { _all.total.completion.fields.bar\.completion.size_in_bytes: 0 }
- gt: { _all.total.completion.size_in_bytes: 0 }
- set: { _all.total.completion.size_in_bytes: original_size }

- do:
index:
index: test1
id: 3
body: { "bar": "foo", "baz": "foo" }

- do:
indices.refresh: {}

- do:
indices.stats: { completion_fields: "*" }

- match: { _shards.failed: 0}
- gt: { _all.total.completion.size_in_bytes: $original_size }
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;

import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.Terms;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.suggest.document.CompletionTerms;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.FieldMemoryStats;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.search.suggest.completion.CompletionStats;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

class CompletionStatsCache implements ReferenceManager.RefreshListener {

private final Supplier<Engine.Searcher> searcherSupplier;

/**
* Contains a future (i.e. non-null) if another thread is already computing stats, in which case wait for this computation to
* complete. Contains null otherwise, in which case compute the stats ourselves and save them here for other threads to use.
* Futures are eventually completed with stats that include all fields, requiring further filtering (see
* {@link CompletionStatsCache#filterCompletionStatsByFieldName}).
*/
private final AtomicReference<PlainActionFuture<CompletionStats>> completionStatsFutureRef = new AtomicReference<>();

CompletionStatsCache(Supplier<Engine.Searcher> searcherSupplier) {
this.searcherSupplier = searcherSupplier;
}

CompletionStats get(String... fieldNamePatterns) {
final PlainActionFuture<CompletionStats> newFuture = new PlainActionFuture<>();
final PlainActionFuture<CompletionStats> oldFuture = completionStatsFutureRef.compareAndExchange(null, newFuture);

if (oldFuture != null) {
// we lost the race, someone else is already computing stats, so we wait for that to finish
return filterCompletionStatsByFieldName(fieldNamePatterns, oldFuture.actionGet());
}

// we won the race, nobody else is already computing stats, so it's up to us
ActionListener.completeWith(newFuture, () -> {
long sizeInBytes = 0;
final ObjectLongHashMap<String> completionFields = new ObjectLongHashMap<>();

try (Engine.Searcher currentSearcher = searcherSupplier.get()) {
for (LeafReaderContext atomicReaderContext : currentSearcher.getIndexReader().leaves()) {
LeafReader atomicReader = atomicReaderContext.reader();
for (FieldInfo info : atomicReader.getFieldInfos()) {
Terms terms = atomicReader.terms(info.name);
if (terms instanceof CompletionTerms) {
// TODO: currently we load up the suggester for reporting its size
final long fstSize = ((CompletionTerms) terms).suggester().ramBytesUsed();
completionFields.addTo(info.name, fstSize);
sizeInBytes += fstSize;
}
}
}
}

return new CompletionStats(sizeInBytes, new FieldMemoryStats(completionFields));
});

boolean success = false;
final CompletionStats completionStats;
try {
completionStats = newFuture.actionGet();
success = true;
} finally {
if (success == false) {
// invalidate the cache (if not already invalidated) so that future calls will retry
completionStatsFutureRef.compareAndSet(newFuture, null);
}
}

return filterCompletionStatsByFieldName(fieldNamePatterns, completionStats);
}

private static CompletionStats filterCompletionStatsByFieldName(String[] fieldNamePatterns, CompletionStats fullCompletionStats) {
final FieldMemoryStats fieldMemoryStats;
if (fieldNamePatterns != null && fieldNamePatterns.length > 0) {
final ObjectLongHashMap<String> completionFields = new ObjectLongHashMap<>(fieldNamePatterns.length);
for (ObjectLongCursor<String> fieldCursor : fullCompletionStats.getFields()) {
if (Regex.simpleMatch(fieldNamePatterns, fieldCursor.key)) {
completionFields.addTo(fieldCursor.key, fieldCursor.value);
}
}
fieldMemoryStats = new FieldMemoryStats(completionFields);
} else {
fieldMemoryStats = null;
}
return new CompletionStats(fullCompletionStats.getSizeInBytes(), fieldMemoryStats);
}

@Override
public void beforeRefresh() {
}

@Override
public void afterRefresh(boolean didRefresh) {
if (didRefresh) {
completionStatsFutureRef.set(null);
}
}
}
32 changes: 1 addition & 31 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,22 @@

package org.elasticsearch.index.engine;

import com.carrotsearch.hppc.ObjectLongHashMap;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.search.suggest.document.CompletionTerms;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
Expand All @@ -49,7 +44,6 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.FieldMemoryStats;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand All @@ -62,7 +56,6 @@
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.VersionType;
Expand Down Expand Up @@ -180,30 +173,7 @@ public MergeStats getMergeStats() {
/**
* Returns the {@link CompletionStats} for this engine
*/
public CompletionStats completionStats(String... fieldNamePatterns) throws IOException {
try (Searcher currentSearcher = acquireSearcher("completion_stats", SearcherScope.INTERNAL)) {
long sizeInBytes = 0;
ObjectLongHashMap<String> completionFields = null;
if (fieldNamePatterns != null && fieldNamePatterns.length > 0) {
completionFields = new ObjectLongHashMap<>(fieldNamePatterns.length);
}
for (LeafReaderContext atomicReaderContext : currentSearcher.getIndexReader().leaves()) {
LeafReader atomicReader = atomicReaderContext.reader();
for (FieldInfo info : atomicReader.getFieldInfos()) {
Terms terms = atomicReader.terms(info.name);
if (terms instanceof CompletionTerms) {
// TODO: currently we load up the suggester for reporting its size
long fstSize = ((CompletionTerms) terms).suggester().ramBytesUsed();
if (Regex.simpleMatch(fieldNamePatterns, info.name)) {
completionFields.addTo(info.name, fstSize);
}
sizeInBytes += fstSize;
}
}
}
return new CompletionStats(sizeInBytes, completionFields == null ? null : new FieldMemoryStats(completionFields));
}
}
public abstract CompletionStats completionStats(String... fieldNamePatterns);

/**
* Returns the {@link DocsStats} for this engine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
Expand Down Expand Up @@ -174,6 +175,8 @@ public class InternalEngine extends Engine {
private final SoftDeletesPolicy softDeletesPolicy;
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;

private final CompletionStatsCache completionStatsCache;

private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false);
private final KeyedLock<Long> noOpKeyedLock = new KeyedLock<>();
private final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false);
Expand Down Expand Up @@ -258,6 +261,8 @@ public InternalEngine(EngineConfig engineConfig) {
"failed to restore version map and local checkpoint tracker", e);
}
}
completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats"));
this.externalReaderManager.addListener(completionStatsCache);
success = true;
} finally {
if (success == false) {
Expand Down Expand Up @@ -298,6 +303,11 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
engineConfig.retentionLeasesSupplier());
}

@Override
public CompletionStats completionStats(String... fieldNamePatterns) {
return completionStatsCache.get(fieldNamePatterns);
}

/**
* This reference manager delegates all it's refresh calls to another (internal) ReaderManager
* The main purpose for this is that if we have external refreshes happening we don't issue extra
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.search.suggest.completion.CompletionStats;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -78,6 +79,7 @@ public class ReadOnlyEngine extends Engine {
private final DocsStats docsStats;
private final RamAccountingRefreshListener refreshListener;
private final SafeCommitInfo safeCommitInfo;
private final CompletionStatsCache completionStatsCache;

protected volatile TranslogStats translogStats;

Expand Down Expand Up @@ -122,6 +124,10 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
this.translogStats = translogStats != null ? translogStats : translogStats(config, lastCommittedSegmentInfos);
this.indexWriterLock = indexWriterLock;
this.safeCommitInfo = new SafeCommitInfo(seqNoStats.getLocalCheckpoint(), lastCommittedSegmentInfos.totalMaxDoc());

completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats"));
// no need to register a refresh listener to invalidate completionStatsCache since this engine is readonly

success = true;
} finally {
if (success == false) {
Expand Down Expand Up @@ -513,4 +519,9 @@ protected static DirectoryReader openDirectory(Directory directory, boolean wrap
return reader;
}
}

@Override
public CompletionStats completionStats(String... fieldNamePatterns) {
return completionStatsCache.get(fieldNamePatterns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.PrintStream;
import java.io.UncheckedIOException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand Down Expand Up @@ -1026,11 +1025,7 @@ public TranslogStats translogStats() {

public CompletionStats completionStats(String... fields) {
readAllowed();
try {
return getEngine().completionStats(fields);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return getEngine().completionStats(fields);
}

/**
Expand Down
Loading

0 comments on commit a3a98c7

Please sign in to comment.