Skip to content

Commit

Permalink
Avoid unneeded refresh with concurrent realtime gets (elastic#47895)
Browse files Browse the repository at this point in the history
This change should reduce refreshes for a use-case where we perform 
multiple realtime gets at the same time on an active index. Currently,
we only call refresh if the index operation is still on the versionMap.
However, at the time we call refresh, that operation might be already or
will be included in the latest reader. Hence, we do not need to refresh.
Adding another lock here is not an issue as the refresh is already
sequential.
  • Loading branch information
dnhatn authored and howardhuanghua committed Oct 14, 2019
1 parent 38ce11c commit 36dd4c1
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,8 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Engine.Searcher>
trackTranslogLocation.set(true);
}
}
refresh("realtime_get", SearcherScope.INTERNAL, true);
assert versionValue.seqNo >= 0 : versionValue;
refreshIfNeeded("realtime_get", versionValue.seqNo);
}
scope = SearcherScope.INTERNAL;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Phaser;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -6102,4 +6103,58 @@ private void runTestDeleteFailure(
}
}

public void testRealtimeGetOnlyRefreshIfNeeded() throws Exception {
final AtomicInteger refreshCount = new AtomicInteger();
final ReferenceManager.RefreshListener refreshListener = new ReferenceManager.RefreshListener() {
@Override
public void beforeRefresh() {

}

@Override
public void afterRefresh(boolean didRefresh) {
if (didRefresh) {
refreshCount.incrementAndGet();
}
}
};
try (Store store = createStore()) {
final EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null,
refreshListener, null, null, engine.config().getCircuitBreakerService());
try (InternalEngine engine = createEngine(config)) {
int numDocs = randomIntBetween(10, 100);
Set<String> ids = new HashSet<>();
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(i);
engine.index(indexForDoc(createParsedDoc(id, null)));
ids.add(id);
}
final int refreshCountBeforeGet = refreshCount.get();
Thread[] getters = new Thread[randomIntBetween(1, 4)];
Phaser phaser = new Phaser(getters.length + 1);
for (int t = 0; t < getters.length; t++) {
getters[t] = new Thread(() -> {
phaser.arriveAndAwaitAdvance();
int iters = randomIntBetween(1, 10);
for (int i = 0; i < iters; i++) {
ParsedDocument doc = createParsedDoc(randomFrom(ids), null);
try (Engine.GetResult getResult = engine.get(newGet(true, doc), engine::acquireSearcher)) {
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
}
}
});
getters[t].start();
}
phaser.arriveAndAwaitAdvance();
for (int i = 0; i < numDocs; i++) {
engine.index(indexForDoc(createParsedDoc("more-" + i, null)));
}
for (Thread getter : getters) {
getter.join();
}
assertThat(refreshCount.get(), lessThanOrEqualTo(refreshCountBeforeGet + 1));
}
}
}
}

0 comments on commit 36dd4c1

Please sign in to comment.