Skip to content

Commit

Permalink
Merge pull request #128 from iSwepston/fix_pinning_breaks
Browse files Browse the repository at this point in the history
Added better handling of commits / file changes so that warming is pe…
  • Loading branch information
payammeyer authored Oct 10, 2024
2 parents dac5e78 + 140e4d7 commit f24774e
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ public class ShardReader implements AutoCloseable {
private final ServerIndexConfig indexConfig;
private final String indexName;
private final int shardNumber;
private final long creationTime;
private final ZuliaPerFieldAnalyzer zuliaPerFieldAnalyzer;
private final Cache<QueryCacheKey, ZuliaQuery.ShardQueryResponse.Builder> queryResultCache;
private final Cache<QueryCacheKey, ZuliaQuery.ShardQueryResponse.Builder> pinnedQueryResultCache;

public ShardReader(int shardNumber, DirectoryReader indexReader, DirectoryTaxonomyReader taxoReader, ServerIndexConfig indexConfig,
ZuliaPerFieldAnalyzer zuliaPerFieldAnalyzer) {
this.creationTime = System.currentTimeMillis();
this.shardNumber = shardNumber;
this.indexReader = indexReader;
this.taxoReader = taxoReader;
Expand All @@ -83,6 +85,10 @@ public void close() throws Exception {
taxoReader.close();
}

public long getCreationTime() {
return creationTime;
}

public int getTotalFacets() {
return taxoReader.getSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@

public class ShardReaderManager extends ReferenceManager<ShardReader> {

private long latestShardTime;

public ShardReaderManager(ShardReader initial) {
this.current = initial;
this.latestShardTime = initial.getCreationTime();
}

@Override
Expand All @@ -17,7 +20,12 @@ protected void decRef(ShardReader reference) throws IOException {

@Override
protected ShardReader refreshIfNeeded(ShardReader referenceToRefresh) throws IOException {
return referenceToRefresh.refreshIfNeeded();
// Evaluate last build time for outside decision making
ShardReader next = referenceToRefresh.refreshIfNeeded();
if (next != null) {
latestShardTime = next.getCreationTime();
}
return next;
}

@Override
Expand All @@ -30,4 +38,7 @@ protected int getRefCount(ShardReader reference) {
return reference.getRefCount();
}

public long getLatestShardTime() {
return latestShardTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,28 +148,24 @@ public boolean needsIdleCommit() {
return false;
}

public WarmInfo needsSearchWarming() {
public WarmInfo needsSearchWarming(long lastestShardTime) {
long currentTime = System.currentTimeMillis();

long msAfterCommitToWarm = indexConfig.getIndexSettings().getIdleTimeWithoutCommit() * 1000L;

//reassign so it can't change in the middle of the logic
Long lastChange = this.lastChange;
Long lastCommit = this.lastCommit;
Long lastWarm = this.lastWarm;

if (lastWarm == null) {
// never warmed so needs warmed
return new WarmInfo(true, lastChange, lastCommit);
// never warmed so needs warmed and the index is idle
boolean idle = lastChange == null || (Math.abs(currentTime - lastChange) > 1000L && (lastCommit == null || (lastCommit > lastChange)));
return new WarmInfo(idle, lastChange, lastCommit);
}

if (lastCommit != null && lastChange != null) { // if there has been a change to the index and a commit
if (lastChange < lastCommit) { // no changes since last commit
long timeSinceLastCommit = currentTime - lastCommit;
if (timeSinceLastCommit > msAfterCommitToWarm) {
//if the last commit is after the last warming
boolean needsWarm = lastCommit > lastWarm;
return new WarmInfo(needsWarm, lastChange, lastCommit);
if (lastWarm < lastestShardTime) { // Change is committed AND shard reader has been reopened
return new WarmInfo(true, lastChange, lastCommit);
}
}
}
Expand All @@ -185,8 +181,12 @@ public Long getLastCommit() {
return lastCommit;
}

public void searchesWarmed() {
lastWarm = System.currentTimeMillis();
public Long getLastWarm() {
return lastWarm;
}

public void searchesWarmed(long time) {
lastWarm = time;
}

public boolean markedChangedCheckIfCommitNeeded() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ public UpdateIndexResponse updateIndex(UpdateIndexRequest request) throws Except
}
}

//TODO(Ian): Check if there was a change before resetting the warming (not compatible w/
List<QueryRequest> warmingSearches = updateWithAction(updateIndexSettings.getWarmingSearchesOperation(), existingWarmingSearch, warmingSearch,
QueryRequest::getSearchLabel);
existingSettings.clearWarmingSearches();
Expand Down
36 changes: 32 additions & 4 deletions zulia-server/src/main/java/io/zulia/server/index/ZuliaShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ public void forceCommit() throws IOException {
}

shardWriteManager.commit();
shardReaderManager.maybeRefresh();

shardReaderManager.maybeRefreshBlocking();
}

public void tryIdleCommit() throws IOException {
Expand All @@ -98,8 +97,18 @@ public void tryWarmSearches(ZuliaIndex zuliaIndex, boolean primary) {
EnumSet<MasterSlaveSettings> usesPrimary = EnumSet.of(MasterSlaveSettings.MASTER_ONLY, MasterSlaveSettings.MASTER_IF_AVAILABLE);
EnumSet<MasterSlaveSettings> usesReplica = EnumSet.of(MasterSlaveSettings.SLAVE_ONLY, MasterSlaveSettings.MASTER_IF_AVAILABLE);

WarmInfo warmInfo = shardWriteManager.needsSearchWarming();
long lastestShardTime = shardReaderManager.getLatestShardTime();
WarmInfo warmInfo = shardWriteManager.needsSearchWarming(lastestShardTime);
if (warmInfo.needsWarming()) {

try {
shardReaderManager.maybeRefreshBlocking();
}
catch (Exception e) {
LOG.error("Failed to refresh shard reader: ", e);
throw new RuntimeException(e);
}

LOG.info("Started warming searching for index {}", indexName);
List<ZuliaServiceOuterClass.QueryRequest> warmingSearches = shardWriteManager.getIndexConfig().getWarmingSearches();

Expand All @@ -120,6 +129,10 @@ public void tryWarmSearches(ZuliaIndex zuliaIndex, boolean primary) {
LOG.info("Index {} commited: canceling warming", indexName);
return;
}
if (lastestShardTime != shardReaderManager.getLatestShardTime()) {
LOG.info("Index {} reloaded: canceling warming", indexName);
return;
}

if (shardNeedsWarmForSearch) {
try {
Expand All @@ -133,10 +146,25 @@ public void tryWarmSearches(ZuliaIndex zuliaIndex, boolean primary) {
}
}

}

// has the index changed since we made the decision to start warming ?
if (!Objects.equals(warmInfo.lastChanged(), shardWriteManager.getLastChanged())) {
LOG.info("Index {} changed: canceling warming", indexName);
return;
}
if (!Objects.equals(warmInfo.lastCommit(), shardWriteManager.getLastCommit())) {
LOG.info("Index {} commited: canceling warming", indexName);
return;
}

long warmTime = System.currentTimeMillis();
if (lastestShardTime != shardReaderManager.getLatestShardTime()) {
LOG.info("Index {} reloaded: canceling warming", indexName);
return;
}

shardWriteManager.searchesWarmed();
shardWriteManager.searchesWarmed(warmTime);
LOG.info("Finished warming searching for index {}", indexName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ private void indexRecord(String index, int id, String title, String description,
@Order(3)
public void searchTest() throws Exception {
ZuliaWorkPool zuliaWorkPool = nodeExtension.getClient();
Thread.sleep(2000);
// Sleep needed because post-update warming takes a long time now since it can (and does) reset after writes based on file io
Thread.sleep(10000);

Search search;
SearchResult searchResult;
Expand Down Expand Up @@ -272,7 +273,8 @@ public void searchTest() throws Exception {
.addCountFacet(new CountFacet("rating").setTopN(3)));
zuliaWorkPool.updateIndex(updateIndex);

Thread.sleep(2000);
// Sleep needed because post-update warming takes a long time now since it can (and does) reset after writes based on file io
Thread.sleep(10000);

search = new Search(CACHE_TEST);
search.addQuery(new FilterQuery("rating:[2.0 TO 3.5]"));
Expand All @@ -294,6 +296,8 @@ public void restart() throws Exception {
@Test
@Order(6)
public void confirm() throws Exception {
// Sleep needed because post-update warming takes a long time now since it can (and does) reset after writes based on file io
Thread.sleep(10000);
ZuliaWorkPool zuliaWorkPool = nodeExtension.getClient();
Search search;
SearchResult searchResult;
Expand Down

0 comments on commit f24774e

Please sign in to comment.