From 6dd716b0c40032537c8bc2910f6e35c5ae8ece04 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 3 Oct 2018 21:03:24 -0400 Subject: [PATCH] Replace version with reader cache key in IndicesRequestCache (#34189) Today we use the version of a DirectoryReader as a component of the key of IndicesRequestCache. This usage is perfectly fine since the version is advanced every time a new change is made into IndexWriter. In other words, two DirectoryReaders with the same version should have the same content. However, this invariant is only guaranteed in the context of a single IndexWriter because the version is reset to the committed version value when IndexWriter is re-opened. Since #33473, each IndexShard may have more than one IndexWriter, and using the version of a DirectoryReader as a part of the cache key can cause IndicesRequestCache to return stale cached values. For example, in #27650, we rollback the engine (i.e., re-open IndexWriter), index new documents, refresh, then make a count request, but the search layer mistakenly returns the count of the DirectoryReader of the previous IndexWriter because the current DirectoryReader has the same version of the old DirectoryReader even their documents are different. This is possible because these two readers come from different IndexWriters. This commit replaces the the version with the reader cache key of IndexReader as a component of the cache key of IndicesRequestCache. Closes #27650 Relates #33473 --- .../indices/IndicesRequestCache.java | 37 ++++++++++--------- .../elasticsearch/indices/IndicesService.java | 7 ++-- .../index/shard/IndexShardIT.java | 35 ++++++++++++++++++ .../indices/IndicesRequestCacheTests.java | 29 +++++++++++---- 4 files changed, 80 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesRequestCache.java b/server/src/main/java/org/elasticsearch/indices/IndicesRequestCache.java index da80a07abc1c8..626d6e8df17f5 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesRequestCache.java @@ -47,6 +47,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.function.Supplier; @@ -54,7 +55,7 @@ /** * The indices request cache allows to cache a shard level request stage responses, helping with improving * similar requests that are potentially expensive (because of aggs for example). The cache is fully coherent - * with the semantics of NRT (the index reader version is part of the cache key), and relies on size based + * with the semantics of NRT (the index reader cache key is part of the cache key), and relies on size based * eviction to evict old reader associated cache entries as well as scheduler reaper to clean readers that * are no longer used or closed shards. *

@@ -105,7 +106,7 @@ public void close() { } void clear(CacheEntity entity) { - keysToClean.add(new CleanupKey(entity, -1)); + keysToClean.add(new CleanupKey(entity, null)); cleanCache(); } @@ -119,7 +120,8 @@ public void onRemoval(RemovalNotification notification) { // removed when this issue is solved BytesReference getOrCompute(CacheEntity cacheEntity, Supplier loader, DirectoryReader reader, BytesReference cacheKey, Supplier cacheKeyRenderer) throws Exception { - final Key key = new Key(cacheEntity, reader.getVersion(), cacheKey); + assert reader.getReaderCacheHelper() != null; + final Key key = new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey); Loader cacheLoader = new Loader(cacheEntity, loader); BytesReference value = cache.computeIfAbsent(key, cacheLoader); if (cacheLoader.isLoaded()) { @@ -128,7 +130,7 @@ BytesReference getOrCompute(CacheEntity cacheEntity, Supplier lo logger.trace("Cache miss for reader version [{}] and request:\n {}", reader.getVersion(), cacheKeyRenderer.get()); } // see if its the first time we see this reader, and make sure to register a cleanup key - CleanupKey cleanupKey = new CleanupKey(cacheEntity, reader.getVersion()); + CleanupKey cleanupKey = new CleanupKey(cacheEntity, reader.getReaderCacheHelper().getKey()); if (!registeredClosedListeners.containsKey(cleanupKey)) { Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE); if (previous == null) { @@ -151,7 +153,8 @@ BytesReference getOrCompute(CacheEntity cacheEntity, Supplier lo * @param cacheKey the cache key to invalidate */ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { - cache.invalidate(new Key(cacheEntity, reader.getVersion(), cacheKey)); + assert reader.getReaderCacheHelper() != null; + cache.invalidate(new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey)); } private static class Loader implements CacheLoader { @@ -220,12 +223,12 @@ static class Key implements Accountable { private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); public final CacheEntity entity; // use as identity equality - public final long readerVersion; // use the reader version to now keep a reference to a "short" lived reader until its reaped + public final IndexReader.CacheKey readerCacheKey; public final BytesReference value; - Key(CacheEntity entity, long readerVersion, BytesReference value) { + Key(CacheEntity entity, IndexReader.CacheKey readerCacheKey, BytesReference value) { this.entity = entity; - this.readerVersion = readerVersion; + this.readerCacheKey = Objects.requireNonNull(readerCacheKey); this.value = value; } @@ -245,7 +248,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Key key = (Key) o; - if (readerVersion != key.readerVersion) return false; + if (Objects.equals(readerCacheKey, key.readerCacheKey) == false) return false; if (!entity.getCacheIdentity().equals(key.entity.getCacheIdentity())) return false; if (!value.equals(key.value)) return false; return true; @@ -254,7 +257,7 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = entity.getCacheIdentity().hashCode(); - result = 31 * result + Long.hashCode(readerVersion); + result = 31 * result + readerCacheKey.hashCode(); result = 31 * result + value.hashCode(); return result; } @@ -262,11 +265,11 @@ public int hashCode() { private class CleanupKey implements IndexReader.ClosedListener { final CacheEntity entity; - final long readerVersion; // use the reader version to now keep a reference to a "short" lived reader until its reaped + final IndexReader.CacheKey readerCacheKey; - private CleanupKey(CacheEntity entity, long readerVersion) { + private CleanupKey(CacheEntity entity, IndexReader.CacheKey readerCacheKey) { this.entity = entity; - this.readerVersion = readerVersion; + this.readerCacheKey = readerCacheKey; } @Override @@ -284,7 +287,7 @@ public boolean equals(Object o) { return false; } CleanupKey that = (CleanupKey) o; - if (readerVersion != that.readerVersion) return false; + if (Objects.equals(readerCacheKey, that.readerCacheKey) == false) return false; if (!entity.getCacheIdentity().equals(that.entity.getCacheIdentity())) return false; return true; } @@ -292,7 +295,7 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = entity.getCacheIdentity().hashCode(); - result = 31 * result + Long.hashCode(readerVersion); + result = 31 * result + Objects.hashCode(readerCacheKey); return result; } } @@ -307,7 +310,7 @@ synchronized void cleanCache() { for (Iterator iterator = keysToClean.iterator(); iterator.hasNext(); ) { CleanupKey cleanupKey = iterator.next(); iterator.remove(); - if (cleanupKey.readerVersion == -1 || cleanupKey.entity.isOpen() == false) { + if (cleanupKey.readerCacheKey == null || cleanupKey.entity.isOpen() == false) { // -1 indicates full cleanup, as does a closed shard currentFullClean.add(cleanupKey.entity.getCacheIdentity()); } else { @@ -320,7 +323,7 @@ synchronized void cleanCache() { if (currentFullClean.contains(key.entity.getCacheIdentity())) { iterator.remove(); } else { - if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerVersion))) { + if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKey))) { iterator.remove(); } } diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 05dcf0b2d010c..83d358424de32 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -1166,10 +1166,9 @@ public boolean canCache(ShardSearchRequest request, SearchContext context) { } else if (request.requestCache() == false) { return false; } - // if the reader is not a directory reader, we can't get the version from it - if ((context.searcher().getIndexReader() instanceof DirectoryReader) == false) { - return false; - } + // We use the cacheKey of the index reader as a part of a key of the IndicesRequestCache. + assert context.searcher().getIndexReader().getReaderCacheHelper() != null; + // if now in millis is used (or in the future, a more generic "isDeterministic" flag // then we can't cache based on "now" key within the search request, as it is not deterministic if (context.getQueryShardContext().isCachable() == false) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 5c610feba1922..579764d671cba 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterInfoService; @@ -69,6 +70,7 @@ import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.IndexSettingsModule; @@ -809,4 +811,37 @@ public void testGlobalCheckpointListenerTimeout() throws InterruptedException { assertTrue(notified.get()); } + public void testInvalidateIndicesRequestCacheWhenRollbackEngine() throws Exception { + createIndex("test", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0) + .put("index.refresh_interval", -1).build()); + ensureGreen(); + final IndicesService indicesService = getInstanceFromNode(IndicesService.class); + final IndexShard shard = indicesService.getShardOrNull(new ShardId(resolveIndex("test"), 0)); + final SearchRequest countRequest = new SearchRequest("test").source(new SearchSourceBuilder().size(0)); + final long numDocs = between(10, 20); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("test", "_doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get(); + if (randomBoolean()) { + shard.refresh("test"); + } + } + shard.refresh("test"); + assertThat(client().search(countRequest).actionGet().getHits().totalHits, equalTo(numDocs)); + assertThat(shard.getLocalCheckpoint(), equalTo(shard.seqNoStats().getMaxSeqNo())); + shard.resetEngineToGlobalCheckpoint(); + final long moreDocs = between(10, 20); + for (int i = 0; i < moreDocs; i++) { + client().prepareIndex("test", "_doc", Long.toString(i + numDocs)).setSource("{}", XContentType.JSON).get(); + if (randomBoolean()) { + shard.refresh("test"); + } + } + shard.refresh("test"); + try (Engine.Searcher searcher = shard.acquireSearcher("test")) { + assertThat("numDocs=" + numDocs + " moreDocs=" + moreDocs, (long) searcher.reader().numDocs(), equalTo(numDocs + moreDocs)); + } + assertThat("numDocs=" + numDocs + " moreDocs=" + moreDocs, + client().search(countRequest).actionGet().getHits().totalHits, equalTo(numDocs + moreDocs)); + } + } diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheTests.java index 08bf43b91bbc6..4411d3f3e934a 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheTests.java @@ -23,7 +23,9 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.StringField; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TermQuery; @@ -119,7 +121,11 @@ public void testCacheDifferentReaders() throws Exception { DirectoryReader reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, XContentType.JSON, false); - + if (randomBoolean()) { + writer.flush(); + IOUtils.close(writer); + writer = new IndexWriter(dir, newIndexWriterConfig()); + } writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); DirectoryReader secondReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); @@ -424,14 +430,23 @@ public void testInvalidate() throws Exception { assertEquals(0, cache.numRegisteredCloseListeners()); } - public void testEqualsKey() { + public void testEqualsKey() throws IOException { AtomicBoolean trueBoolean = new AtomicBoolean(true); AtomicBoolean falseBoolean = new AtomicBoolean(false); - IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), 1L, new TestBytesReference(1)); - IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), 1L, new TestBytesReference(1)); - IndicesRequestCache.Key key3 = new IndicesRequestCache.Key(new TestEntity(null, falseBoolean), 1L, new TestBytesReference(1)); - IndicesRequestCache.Key key4 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), 2L, new TestBytesReference(1)); - IndicesRequestCache.Key key5 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), 1L, new TestBytesReference(2)); + Directory dir = newDirectory(); + IndexWriterConfig config = newIndexWriterConfig(); + IndexWriter writer = new IndexWriter(dir, config); + IndexReader reader1 = DirectoryReader.open(writer); + IndexReader.CacheKey rKey1 = reader1.getReaderCacheHelper().getKey(); + writer.addDocument(new Document()); + IndexReader reader2 = DirectoryReader.open(writer); + IndexReader.CacheKey rKey2 = reader2.getReaderCacheHelper().getKey(); + IOUtils.close(reader1, reader2, writer, dir); + IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1)); + IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1)); + IndicesRequestCache.Key key3 = new IndicesRequestCache.Key(new TestEntity(null, falseBoolean), rKey1, new TestBytesReference(1)); + IndicesRequestCache.Key key4 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey2, new TestBytesReference(1)); + IndicesRequestCache.Key key5 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(2)); String s = "Some other random object"; assertEquals(key1, key1); assertEquals(key1, key2);