Skip to content

Commit

Permalink
Adapt Test
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed Dec 3, 2018
1 parent 1f875b0 commit 587e074
Showing 1 changed file with 48 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3600,59 +3600,63 @@ public void testConcurrentAcquireAllReplicaOperationsPermitsWithPrimaryTermUpdat
final IndexShard replica = newStartedShard(false);
indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint()));

final int operations = scaledRandomIntBetween(10, 64);
final Thread[] threads = new Thread[operations];
final CyclicBarrier startBarrier = new CyclicBarrier(1 + operations);
final CountDownLatch latch = new CountDownLatch(operations);
final int nbTermUpdates = randomIntBetween(1, 5);

final AtomicArray<Tuple<Long, Long>> opsTerms = new AtomicArray<>(operations);
final long futurePrimaryTerm = replica.getOperationPrimaryTerm() + 1;
for (int i = 0; i < nbTermUpdates; i++) {
long opPrimaryTerm = replica.getOperationPrimaryTerm() + 1;
final long globalCheckpoint = replica.getGlobalCheckpoint();
final long maxSeqNoOfUpdatesOrDeletes = replica.getMaxSeqNoOfUpdatesOrDeletes();

for (int i = 0; i < operations; i++) {
final int threadId = i;
threads[threadId] = new Thread(() -> {
try {
startBarrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
replica.acquireAllReplicaOperationsPermits(
futurePrimaryTerm,
replica.getGlobalCheckpoint(),
replica.getMaxSeqNoOfUpdatesOrDeletes(),
new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
try (Releasable ignored = releasable) {
opsTerms.set(threadId, Tuple.tuple(replica.getPendingPrimaryTerm(), replica.getOperationPrimaryTerm()));
latch.countDown();
}
}
final int operations = scaledRandomIntBetween(10, 64);
final CyclicBarrier barrier = new CyclicBarrier(1 + operations);
final CountDownLatch latch = new CountDownLatch(operations);

@Override
public void onFailure(final Exception e) {
latch.countDown();
throw new RuntimeException(e);
}
}, TimeValue.timeValueMinutes(30L));
});
threads[threadId].start();
}
final Thread[] threads = new Thread[operations];
for (int j = 0; j < operations; j++) {
threads[j] = new Thread(() -> {
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
replica.acquireAllReplicaOperationsPermits(
opPrimaryTerm,
globalCheckpoint,
maxSeqNoOfUpdatesOrDeletes,
new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
try (Releasable ignored = releasable) {
assertThat(replica.getPendingPrimaryTerm(), greaterThanOrEqualTo(opPrimaryTerm));
assertThat(replica.getOperationPrimaryTerm(), equalTo(opPrimaryTerm));
} finally {
latch.countDown();
}
}

startBarrier.await();
latch.await();
@Override
public void onFailure(final Exception e) {
try {
throw new RuntimeException(e);
} finally {
latch.countDown();
}
}
}, TimeValue.timeValueMinutes(30L));
});
threads[j].start();
}
barrier.await();
latch.await();

for (Tuple<Long, Long> opTerms : opsTerms.asList()) {
assertNotNull(opTerms);
assertEquals("Expected primary term and pending primary term captured during execution must match",
futurePrimaryTerm, opTerms.v1().longValue());
assertEquals("Expected primary term and operation primary term captured during execution must match",
futurePrimaryTerm, opTerms.v2().longValue());
for (Thread thread : threads) {
thread.join();
}
}

closeShard(replica, false);
}


@Override
public Settings threadPoolSettings() {
return Settings.builder().put(super.threadPoolSettings()).put("thread_pool.estimated_time_interval", "5ms").build();
Expand Down

0 comments on commit 587e074

Please sign in to comment.