Skip to content

Commit

Permalink
Revert "Avoid unneeded refresh with concurrent realtime gets (#47895)"
Browse files Browse the repository at this point in the history
This reverts commit 75f6eb7.
  • Loading branch information
dnhatn committed Oct 16, 2019
1 parent 74fe2f4 commit 97e64fe
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -675,8 +675,7 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> search
trackTranslogLocation.set(true);
}
}
assert versionValue.seqNo >= 0 : versionValue;
refreshIfNeeded("realtime_get", versionValue.seqNo);
refresh("realtime_get", SearcherScope.INTERNAL);
}
scope = SearcherScope.INTERNAL;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Phaser;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -5897,58 +5896,4 @@ private void runTestDeleteFailure(
}
}

public void testRealtimeGetOnlyRefreshIfNeeded() throws Exception {
final AtomicInteger refreshCount = new AtomicInteger();
final ReferenceManager.RefreshListener refreshListener = new ReferenceManager.RefreshListener() {
@Override
public void beforeRefresh() {

}

@Override
public void afterRefresh(boolean didRefresh) {
if (didRefresh) {
refreshCount.incrementAndGet();
}
}
};
try (Store store = createStore()) {
final EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null,
refreshListener, null, null, engine.config().getCircuitBreakerService());
try (InternalEngine engine = createEngine(config)) {
int numDocs = randomIntBetween(10, 100);
Set<String> ids = new HashSet<>();
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(i);
engine.index(indexForDoc(createParsedDoc(id, null)));
ids.add(id);
}
final int refreshCountBeforeGet = refreshCount.get();
Thread[] getters = new Thread[randomIntBetween(1, 4)];
Phaser phaser = new Phaser(getters.length + 1);
for (int t = 0; t < getters.length; t++) {
getters[t] = new Thread(() -> {
phaser.arriveAndAwaitAdvance();
int iters = randomIntBetween(1, 10);
for (int i = 0; i < iters; i++) {
ParsedDocument doc = createParsedDoc(randomFrom(ids), null);
try (Engine.GetResult getResult = engine.get(newGet(true, doc), engine::acquireSearcher)) {
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
}
}
});
getters[t].start();
}
phaser.arriveAndAwaitAdvance();
for (int i = 0; i < numDocs; i++) {
engine.index(indexForDoc(createParsedDoc("more-" + i, null)));
}
for (Thread getter : getters) {
getter.join();
}
assertThat(refreshCount.get(), lessThanOrEqualTo(refreshCountBeforeGet + 1));
}
}
}
}

0 comments on commit 97e64fe

Please sign in to comment.