diff --git a/zulia-server/src/main/java/io/zulia/server/index/ShardReader.java b/zulia-server/src/main/java/io/zulia/server/index/ShardReader.java index de7316c5..9484ef21 100644 --- a/zulia-server/src/main/java/io/zulia/server/index/ShardReader.java +++ b/zulia-server/src/main/java/io/zulia/server/index/ShardReader.java @@ -61,12 +61,14 @@ public class ShardReader implements AutoCloseable { private final ServerIndexConfig indexConfig; private final String indexName; private final int shardNumber; + private final long creationTime; private final ZuliaPerFieldAnalyzer zuliaPerFieldAnalyzer; private final Cache queryResultCache; private final Cache pinnedQueryResultCache; public ShardReader(int shardNumber, DirectoryReader indexReader, DirectoryTaxonomyReader taxoReader, ServerIndexConfig indexConfig, ZuliaPerFieldAnalyzer zuliaPerFieldAnalyzer) { + this.creationTime = System.currentTimeMillis(); this.shardNumber = shardNumber; this.indexReader = indexReader; this.taxoReader = taxoReader; @@ -83,6 +85,10 @@ public void close() throws Exception { taxoReader.close(); } + public long getCreationTime() { + return creationTime; + } + public int getTotalFacets() { return taxoReader.getSize(); } diff --git a/zulia-server/src/main/java/io/zulia/server/index/ShardReaderManager.java b/zulia-server/src/main/java/io/zulia/server/index/ShardReaderManager.java index 077ccff4..12265530 100644 --- a/zulia-server/src/main/java/io/zulia/server/index/ShardReaderManager.java +++ b/zulia-server/src/main/java/io/zulia/server/index/ShardReaderManager.java @@ -6,8 +6,11 @@ public class ShardReaderManager extends ReferenceManager { + private long latestShardTime; + public ShardReaderManager(ShardReader initial) { this.current = initial; + this.latestShardTime = initial.getCreationTime(); } @Override @@ -17,7 +20,12 @@ protected void decRef(ShardReader reference) throws IOException { @Override protected ShardReader refreshIfNeeded(ShardReader referenceToRefresh) throws IOException { - return referenceToRefresh.refreshIfNeeded(); + // Evaluate last build time for outside decision making + ShardReader next = referenceToRefresh.refreshIfNeeded(); + if (next != null) { + latestShardTime = next.getCreationTime(); + } + return next; } @Override @@ -30,4 +38,7 @@ protected int getRefCount(ShardReader reference) { return reference.getRefCount(); } + public long getLatestShardTime() { + return latestShardTime; + } } diff --git a/zulia-server/src/main/java/io/zulia/server/index/ShardWriteManager.java b/zulia-server/src/main/java/io/zulia/server/index/ShardWriteManager.java index 0796e015..8cad52ac 100644 --- a/zulia-server/src/main/java/io/zulia/server/index/ShardWriteManager.java +++ b/zulia-server/src/main/java/io/zulia/server/index/ShardWriteManager.java @@ -148,28 +148,24 @@ public boolean needsIdleCommit() { return false; } - public WarmInfo needsSearchWarming() { + public WarmInfo needsSearchWarming(long lastestShardTime) { long currentTime = System.currentTimeMillis(); - long msAfterCommitToWarm = indexConfig.getIndexSettings().getIdleTimeWithoutCommit() * 1000L; - //reassign so it can't change in the middle of the logic Long lastChange = this.lastChange; Long lastCommit = this.lastCommit; Long lastWarm = this.lastWarm; if (lastWarm == null) { - // never warmed so needs warmed - return new WarmInfo(true, lastChange, lastCommit); + // never warmed so needs warmed and the index is idle + boolean idle = lastChange == null || (Math.abs(currentTime - lastChange) > 1000L && (lastCommit == null || (lastCommit > lastChange))); + return new WarmInfo(idle, lastChange, lastCommit); } if (lastCommit != null && lastChange != null) { // if there has been a change to the index and a commit if (lastChange < lastCommit) { // no changes since last commit - long timeSinceLastCommit = currentTime - lastCommit; - if (timeSinceLastCommit > msAfterCommitToWarm) { - //if the last commit is after the last warming - boolean needsWarm = lastCommit > lastWarm; - return new WarmInfo(needsWarm, lastChange, lastCommit); + if (lastWarm < lastestShardTime) { // Change is committed AND shard reader has been reopened + return new WarmInfo(true, lastChange, lastCommit); } } } @@ -185,8 +181,12 @@ public Long getLastCommit() { return lastCommit; } - public void searchesWarmed() { - lastWarm = System.currentTimeMillis(); + public Long getLastWarm() { + return lastWarm; + } + + public void searchesWarmed(long time) { + lastWarm = time; } public boolean markedChangedCheckIfCommitNeeded() { diff --git a/zulia-server/src/main/java/io/zulia/server/index/ZuliaIndexManager.java b/zulia-server/src/main/java/io/zulia/server/index/ZuliaIndexManager.java index 0413df55..695d8fb4 100644 --- a/zulia-server/src/main/java/io/zulia/server/index/ZuliaIndexManager.java +++ b/zulia-server/src/main/java/io/zulia/server/index/ZuliaIndexManager.java @@ -519,6 +519,7 @@ public UpdateIndexResponse updateIndex(UpdateIndexRequest request) throws Except } } + //TODO(Ian): Check if there was a change before resetting the warming (not compatible w/ List warmingSearches = updateWithAction(updateIndexSettings.getWarmingSearchesOperation(), existingWarmingSearch, warmingSearch, QueryRequest::getSearchLabel); existingSettings.clearWarmingSearches(); diff --git a/zulia-server/src/main/java/io/zulia/server/index/ZuliaShard.java b/zulia-server/src/main/java/io/zulia/server/index/ZuliaShard.java index 18bb5d66..61651a99 100644 --- a/zulia-server/src/main/java/io/zulia/server/index/ZuliaShard.java +++ b/zulia-server/src/main/java/io/zulia/server/index/ZuliaShard.java @@ -82,8 +82,7 @@ public void forceCommit() throws IOException { } shardWriteManager.commit(); - shardReaderManager.maybeRefresh(); - + shardReaderManager.maybeRefreshBlocking(); } public void tryIdleCommit() throws IOException { @@ -98,8 +97,18 @@ public void tryWarmSearches(ZuliaIndex zuliaIndex, boolean primary) { EnumSet usesPrimary = EnumSet.of(MasterSlaveSettings.MASTER_ONLY, MasterSlaveSettings.MASTER_IF_AVAILABLE); EnumSet usesReplica = EnumSet.of(MasterSlaveSettings.SLAVE_ONLY, MasterSlaveSettings.MASTER_IF_AVAILABLE); - WarmInfo warmInfo = shardWriteManager.needsSearchWarming(); + long lastestShardTime = shardReaderManager.getLatestShardTime(); + WarmInfo warmInfo = shardWriteManager.needsSearchWarming(lastestShardTime); if (warmInfo.needsWarming()) { + + try { + shardReaderManager.maybeRefreshBlocking(); + } + catch (Exception e) { + LOG.error("Failed to refresh shard reader: ", e); + throw new RuntimeException(e); + } + LOG.info("Started warming searching for index {}", indexName); List warmingSearches = shardWriteManager.getIndexConfig().getWarmingSearches(); @@ -120,6 +129,10 @@ public void tryWarmSearches(ZuliaIndex zuliaIndex, boolean primary) { LOG.info("Index {} commited: canceling warming", indexName); return; } + if (lastestShardTime != shardReaderManager.getLatestShardTime()) { + LOG.info("Index {} reloaded: canceling warming", indexName); + return; + } if (shardNeedsWarmForSearch) { try { @@ -133,10 +146,25 @@ public void tryWarmSearches(ZuliaIndex zuliaIndex, boolean primary) { } } + } + + // has the index changed since we made the decision to start warming ? + if (!Objects.equals(warmInfo.lastChanged(), shardWriteManager.getLastChanged())) { + LOG.info("Index {} changed: canceling warming", indexName); + return; + } + if (!Objects.equals(warmInfo.lastCommit(), shardWriteManager.getLastCommit())) { + LOG.info("Index {} commited: canceling warming", indexName); + return; + } + long warmTime = System.currentTimeMillis(); + if (lastestShardTime != shardReaderManager.getLatestShardTime()) { + LOG.info("Index {} reloaded: canceling warming", indexName); + return; } - shardWriteManager.searchesWarmed(); + shardWriteManager.searchesWarmed(warmTime); LOG.info("Finished warming searching for index {}", indexName); } } diff --git a/zulia-server/src/test/java/io/zulia/server/test/node/CacheTest.java b/zulia-server/src/test/java/io/zulia/server/test/node/CacheTest.java index 34dbbbaf..11cf03e3 100644 --- a/zulia-server/src/test/java/io/zulia/server/test/node/CacheTest.java +++ b/zulia-server/src/test/java/io/zulia/server/test/node/CacheTest.java @@ -112,7 +112,8 @@ private void indexRecord(String index, int id, String title, String description, @Order(3) public void searchTest() throws Exception { ZuliaWorkPool zuliaWorkPool = nodeExtension.getClient(); - Thread.sleep(2000); + // Sleep needed because post-update warming takes a long time now since it can (and does) reset after writes based on file io + Thread.sleep(10000); Search search; SearchResult searchResult; @@ -272,7 +273,8 @@ public void searchTest() throws Exception { .addCountFacet(new CountFacet("rating").setTopN(3))); zuliaWorkPool.updateIndex(updateIndex); - Thread.sleep(2000); + // Sleep needed because post-update warming takes a long time now since it can (and does) reset after writes based on file io + Thread.sleep(10000); search = new Search(CACHE_TEST); search.addQuery(new FilterQuery("rating:[2.0 TO 3.5]")); @@ -294,6 +296,8 @@ public void restart() throws Exception { @Test @Order(6) public void confirm() throws Exception { + // Sleep needed because post-update warming takes a long time now since it can (and does) reset after writes based on file io + Thread.sleep(10000); ZuliaWorkPool zuliaWorkPool = nodeExtension.getClient(); Search search; SearchResult searchResult;