Skip to content

Commit

Permalink
improve tests based on feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jakelandis committed May 10, 2019
1 parent a9e0f3b commit a1e456c
Showing 1 changed file with 17 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,11 @@ public void testConcurrentExecutions() throws Exception {
int maxBatchSize = 0;
int maxDocuments = 0;
int iterations = 0;
boolean runTest = true;
//find some randoms that allow this test to take under ~ 10 seconds
while (estimatedTimeForTest > 10_000) {
if (iterations++ > 1_000) {
fail("failed to find random values that allows test to run quickly"); //extremely unlikely
if (iterations++ > 1_000) { //extremely unlikely
runTest = false;
}
maxBatchSize = randomIntBetween(1, 100);
maxDocuments = randomIntBetween(maxBatchSize, 1_000_000);
Expand All @@ -128,6 +129,7 @@ public void testConcurrentExecutions() throws Exception {
estimatedTimeForTest = (expectedExecutions * simulateWorkTimeInMillis) /
Math.min(concurrentBulkRequests + 1, concurrentClients);
}
assumeTrue("failed to find random values that allows test to run quickly", runTest);
BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[]{new BulkItemResponse()}, 0);
AtomicInteger failureCount = new AtomicInteger(0);
AtomicInteger successCount = new AtomicInteger(0);
Expand All @@ -149,30 +151,34 @@ concurrentBulkRequests, maxBatchSize, new ByteSizeValue(Integer.MAX_VALUE), null
(command, delay, executor) -> null, () -> called.set(true), BulkRequest::new);

ExecutorService executorService = Executors.newFixedThreadPool(concurrentClients);
CountDownLatch startGate = new CountDownLatch(1);

IndexRequest indexRequest = new IndexRequest();
String bulkRequest = "{ \"index\" : { \"_index\" : \"test\", \"_id\" : \"1\" } }\n" + "{ \"field1\" : \"value1\" }\n";
BytesReference bytesReference =
BytesReference.fromByteBuffers(new ByteBuffer[]{ByteBuffer.wrap(bulkRequest.getBytes(StandardCharsets.UTF_8))});
List<Future> futures = new ArrayList<>();
for (final AtomicInteger i = new AtomicInteger(0); i.getAndIncrement() < maxDocuments;) {
//alternate between ways to add to the bulk processor
for (final AtomicInteger i = new AtomicInteger(0); i.getAndIncrement() < maxDocuments; ) {
futures.add(executorService.submit(() -> {
if(i.get() % 2 == 0) {
bulkProcessor.add(indexRequest);
}else{
try {
try {
//don't start any work until all tasks are submitted
startGate.await();
//alternate between ways to add to the bulk processor
if (i.get() % 2 == 0) {
bulkProcessor.add(indexRequest);
} else {
bulkProcessor.add(bytesReference, null, null, XContentType.JSON);
} catch (Exception e) {
throw new RuntimeException(e);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}));
}
startGate.countDown();

for (Future f : futures) {
try {
f.get(10, TimeUnit.SECONDS);
f.get();
} catch (Exception e){
failureCount.incrementAndGet();
logger.error("failure while getting future", e);
Expand Down

0 comments on commit a1e456c

Please sign in to comment.