From c8bf1708a7bf8cafcfe26bb2f67307f2de73e260 Mon Sep 17 00:00:00 2001 From: Matt Jacobs Date: Thu, 19 Nov 2015 23:53:21 -0800 Subject: [PATCH] Adding a test of concurrent HystrixObservableCollapsers running --- .../HystrixObservableCollapserTest.java | 80 ++++++++++++++++++- 1 file changed, 77 insertions(+), 3 deletions(-) diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCollapserTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCollapserTest.java index 5a3dd9033..0eb9bb4d8 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCollapserTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCollapserTest.java @@ -24,14 +24,20 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; import com.netflix.hystrix.collapser.CollapserTimer; import com.netflix.hystrix.collapser.RealCollapserTimer; import com.netflix.hystrix.strategy.HystrixPlugins; +import com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable; import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler; import com.netflix.hystrix.strategy.properties.HystrixPropertiesCollapserDefault; import com.netflix.hystrix.util.HystrixRollingNumberEvent; @@ -563,6 +569,74 @@ public void testTwoRequestsWithValuesForWrongArgs() { testSubscriber2.assertValues("2:2", "2:4", "2:6"); } + @Test + public void testCollapserUnderConcurrency() throws InterruptedException { + final CollapserTimer timer = new RealCollapserTimer(); + final int NUM_THREADS_SUBMITTING_WORK = 4; + final int NUM_REQUESTS_PER_THREAD = 2; + + final CountDownLatch latch = new CountDownLatch(NUM_THREADS_SUBMITTING_WORK); + + List runnables = new ArrayList(); + final ConcurrentLinkedQueue> subscribers = new ConcurrentLinkedQueue>(); + + HystrixRequestContext context = HystrixRequestContext.initializeContext(); + + final AtomicInteger uniqueInt = new AtomicInteger(0); + + for (int i = 0; i < NUM_THREADS_SUBMITTING_WORK; i++) { + runnables.add(new Runnable() { + @Override + public void run() { + //System.out.println("Runnable starting on thread : " + Thread.currentThread().getName()); + + for (int j = 0; j < NUM_REQUESTS_PER_THREAD; j++) { + HystrixObservableCollapser collapser = + new TestCollapserWithMultipleResponses(timer, uniqueInt.getAndIncrement(), 3, false); + Observable o = collapser.observe(); + TestSubscriber subscriber = new TestSubscriber(); + o.subscribe(subscriber); + subscribers.offer(subscriber); + } + //System.out.println("Runnable done on thread : " + Thread.currentThread().getName()); + latch.countDown(); + } + }); + } + + ExecutorService threadPool = Executors.newFixedThreadPool(NUM_THREADS_SUBMITTING_WORK); + for (Runnable r: runnables) { + threadPool.submit(new HystrixContextRunnable(r)); + } + + latch.await(); + + for (TestSubscriber subscriber: subscribers) { + subscriber.awaitTerminalEvent(); + if (subscriber.getOnErrorEvents().size() > 0) { + System.out.println("ERROR : " + subscriber.getOnErrorEvents()); + for (Throwable ex: subscriber.getOnErrorEvents()) { + ex.printStackTrace(); + } + } + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + //System.out.println("Received : " + subscriber.getOnNextEvents()); + subscriber.assertValueCount(3); + } + + context.shutdown(); + threadPool.shutdown(); + } + + @Test + public void testConcurrencyInTightLoop() throws InterruptedException { + for (int i = 0; i < 1000; i++) { + System.out.println("TRIAL : " + i); + testCollapserUnderConcurrency(); + } + } + private static class TestRequestCollapser extends HystrixObservableCollapser { private final String value; @@ -719,13 +793,13 @@ public void call(Subscriber s) { private static class TestCollapserWithMultipleResponses extends HystrixObservableCollapser { private final String arg; - private final static Map emitsPerArg; + private final static ConcurrentMap emitsPerArg; private final boolean commandConstructionFails; private final Func1 keyMapper; private final Action1> onMissingResponseHandler; static { - emitsPerArg = new HashMap(); + emitsPerArg = new ConcurrentHashMap(); } public TestCollapserWithMultipleResponses(CollapserTimer timer, int arg, int numEmits, boolean commandConstructionFails) { @@ -835,7 +909,7 @@ protected Observable construct() { public void call(Subscriber subscriber) { try { assertNotNull("Executing the Batch command should have a HystrixRequestContext", HystrixRequestContext.getContextForCurrentThread()); - Thread.sleep(100); + Thread.sleep(30); for (Integer arg: args) { int numEmits = emitsPerArg.get(arg.toString()); for (int j = 1; j < numEmits + 1; j++) {