Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a test of concurrent HystrixObservableCollapsers running #989

Merged
merged 1 commit into from
Nov 20, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,20 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;

import com.netflix.hystrix.collapser.CollapserTimer;
import com.netflix.hystrix.collapser.RealCollapserTimer;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable;
import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesCollapserDefault;
import com.netflix.hystrix.util.HystrixRollingNumberEvent;
Expand Down Expand Up @@ -563,6 +569,74 @@ public void testTwoRequestsWithValuesForWrongArgs() {
testSubscriber2.assertValues("2:2", "2:4", "2:6");
}

@Test
public void testCollapserUnderConcurrency() throws InterruptedException {
final CollapserTimer timer = new RealCollapserTimer();
final int NUM_THREADS_SUBMITTING_WORK = 4;
final int NUM_REQUESTS_PER_THREAD = 2;

final CountDownLatch latch = new CountDownLatch(NUM_THREADS_SUBMITTING_WORK);

List<Runnable> runnables = new ArrayList<Runnable>();
final ConcurrentLinkedQueue<TestSubscriber<String>> subscribers = new ConcurrentLinkedQueue<TestSubscriber<String>>();

HystrixRequestContext context = HystrixRequestContext.initializeContext();

final AtomicInteger uniqueInt = new AtomicInteger(0);

for (int i = 0; i < NUM_THREADS_SUBMITTING_WORK; i++) {
runnables.add(new Runnable() {
@Override
public void run() {
//System.out.println("Runnable starting on thread : " + Thread.currentThread().getName());

for (int j = 0; j < NUM_REQUESTS_PER_THREAD; j++) {
HystrixObservableCollapser<String, String, String, String> collapser =
new TestCollapserWithMultipleResponses(timer, uniqueInt.getAndIncrement(), 3, false);
Observable<String> o = collapser.observe();
TestSubscriber<String> subscriber = new TestSubscriber<String>();
o.subscribe(subscriber);
subscribers.offer(subscriber);
}
//System.out.println("Runnable done on thread : " + Thread.currentThread().getName());
latch.countDown();
}
});
}

ExecutorService threadPool = Executors.newFixedThreadPool(NUM_THREADS_SUBMITTING_WORK);
for (Runnable r: runnables) {
threadPool.submit(new HystrixContextRunnable(r));
}

latch.await();

for (TestSubscriber<String> subscriber: subscribers) {
subscriber.awaitTerminalEvent();
if (subscriber.getOnErrorEvents().size() > 0) {
System.out.println("ERROR : " + subscriber.getOnErrorEvents());
for (Throwable ex: subscriber.getOnErrorEvents()) {
ex.printStackTrace();
}
}
subscriber.assertCompleted();
subscriber.assertNoErrors();
//System.out.println("Received : " + subscriber.getOnNextEvents());
subscriber.assertValueCount(3);
}

context.shutdown();
threadPool.shutdown();
}

@Test
public void testConcurrencyInTightLoop() throws InterruptedException {
for (int i = 0; i < 1000; i++) {
System.out.println("TRIAL : " + i);
testCollapserUnderConcurrency();
}
}

private static class TestRequestCollapser extends HystrixObservableCollapser<String, String, String, String> {

private final String value;
Expand Down Expand Up @@ -719,13 +793,13 @@ public void call(Subscriber<? super String> s) {
private static class TestCollapserWithMultipleResponses extends HystrixObservableCollapser<String, String, String, String> {

private final String arg;
private final static Map<String, Integer> emitsPerArg;
private final static ConcurrentMap<String, Integer> emitsPerArg;
private final boolean commandConstructionFails;
private final Func1<String, String> keyMapper;
private final Action1<CollapsedRequest<String, String>> onMissingResponseHandler;

static {
emitsPerArg = new HashMap<String, Integer>();
emitsPerArg = new ConcurrentHashMap<String, Integer>();
}

public TestCollapserWithMultipleResponses(CollapserTimer timer, int arg, int numEmits, boolean commandConstructionFails) {
Expand Down Expand Up @@ -835,7 +909,7 @@ protected Observable<String> construct() {
public void call(Subscriber<? super String> subscriber) {
try {
assertNotNull("Executing the Batch command should have a HystrixRequestContext", HystrixRequestContext.getContextForCurrentThread());
Thread.sleep(100);
Thread.sleep(30);
for (Integer arg: args) {
int numEmits = emitsPerArg.get(arg.toString());
for (int j = 1; j < numEmits + 1; j++) {
Expand Down