Skip to content

Commit

Permalink
Merge pull request #318 from benjchristensen/async-observable
Browse files Browse the repository at this point in the history
HystrixAsyncCommand and HystrixObservableCommand
  • Loading branch information
benjchristensen committed Sep 25, 2014
2 parents f793f44 + 93c8240 commit 62ac93f
Show file tree
Hide file tree
Showing 41 changed files with 9,385 additions and 1,627 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.*;
import rx.Observable.OnSubscribe;
import rx.Observable;
import rx.Scheduler;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;
Expand Down Expand Up @@ -59,7 +59,7 @@
* @param <RequestArgumentType>
* The type of the request argument. If multiple arguments are needed, wrap them in another object or a Tuple.
*/
public abstract class HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType> implements HystrixExecutable<ResponseType> {
public abstract class HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType> implements HystrixExecutable<ResponseType>, HystrixObservable<ResponseType> {

static final Logger logger = LoggerFactory.getLogger(HystrixCollapser.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.netflix.hystrix;

import static com.netflix.hystrix.strategy.properties.HystrixProperty.Factory.*;
import static com.netflix.hystrix.strategy.properties.HystrixProperty.Factory.asProperty;

import com.netflix.hystrix.strategy.properties.HystrixPropertiesChainedArchaiusProperty;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy;
Expand Down
491 changes: 250 additions & 241 deletions hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.netflix.hystrix;

import static com.netflix.hystrix.strategy.properties.HystrixProperty.Factory.*;
import static com.netflix.hystrix.strategy.properties.HystrixProperty.Factory.asProperty;

import java.util.concurrent.Future;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*
* @param <R>
*/
public interface HystrixExecutable<R> {
public interface HystrixExecutable<R> extends HystrixInvokable<R> {

/**
* Used for synchronous execution of command.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.hystrix;

import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.netflix.hystrix;

/**
* Marker interface for Hystrix commands that can be invoked.
*/
public interface HystrixInvokable<R> {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.hystrix;

import rx.Observable;
import rx.schedulers.Schedulers;

import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
import com.netflix.hystrix.exception.HystrixBadRequestException;
import com.netflix.hystrix.exception.HystrixRuntimeException;

/**
* Common interface for executables that implement the Observable methods {@link #observe()} and {@link #toObservable()} so client code can treat them the same and combine in typed collections if desired.
*
* @param <R>
*/
public interface HystrixObservable<R> extends HystrixInvokable<R> {

/**
* Used for asynchronous execution of command with a callback by subscribing to the {@link Observable}.
* <p>
* This eagerly starts execution of the command the same as {@link #queue()} and {@link #execute()}.
* <p>
* A lazy {@link Observable} can be obtained from {@link #toObservable()}.
* <p>
* <b>Callback Scheduling</b>
* <p>
* <ul>
* <li>When using {@link ExecutionIsolationStrategy#THREAD} this defaults to using {@link Schedulers#threadPoolForComputation()} for callbacks.</li>
* <li>When using {@link ExecutionIsolationStrategy#SEMAPHORE} this defaults to using {@link Schedulers#immediate()} for callbacks.</li>
* </ul>
* Use {@link HystrixCommand#toObservable(rx.Scheduler)} or {@link HystrixCollapser#toObservable(rx.Scheduler)} to schedule the callback differently.
* <p>
* See https://github.com/ReactiveX/RxJava/wiki for more information.
*
* @return {@code Observable<R>} that executes and calls back with the result of {@link #run()} execution or a fallback from {@link #getFallback()} if the command fails for any reason.
* @throws HystrixRuntimeException
* if a fallback does not exist
* <p>
* <ul>
* <li>via {@code Observer#onError} if a failure occurs</li>
* <li>or immediately if the command can not be queued (such as short-circuited, thread-pool/semaphore rejected)</li>
* </ul>
* @throws HystrixBadRequestException
* via {@code Observer#onError} if invalid arguments or state were used representing a user failure, not a system failure
* @throws IllegalStateException
* if invoked more than once
*/
public Observable<R> observe();

/**
* Used for asynchronous execution of command with a callback by subscribing to the {@link Observable}.
* <p>
* This lazily starts execution of the command only once the {@link Observable} is subscribed to.
* <p>
* An eager {@link Observable} can be obtained from {@link #observe()}
* <p>
* <b>Callback Scheduling</b>
* <p>
* <ul>
* <li>When using {@link ExecutionIsolationStrategy#THREAD} this defaults to using {@link Schedulers#threadPoolForComputation()} for callbacks.</li>
* <li>When using {@link ExecutionIsolationStrategy#SEMAPHORE} this defaults to using {@link Schedulers#immediate()} for callbacks.</li>
* </ul>
* Use {@link HystrixCommand#toObservable(rx.Scheduler)} or {@link HystrixCollapser#toObservable(rx.Scheduler)} to schedule the callback differently.
* <p>
* See https://github.com/ReactiveX/RxJava/wiki for more information.
*
* @return {@code Observable<R>} that executes and calls back with the result of {@link #run()} execution or a fallback from {@link #getFallback()} if the command fails for any reason.
* @throws HystrixRuntimeException
* if a fallback does not exist
* <p>
* <ul>
* <li>via {@code Observer#onError} if a failure occurs</li>
* <li>or immediately if the command can not be queued (such as short-circuited, thread-pool/semaphore rejected)</li>
* </ul>
* @throws HystrixBadRequestException
* via {@code Observer#onError} if invalid arguments or state were used representing a user failure, not a system failure
* @throws IllegalStateException
* if invoked more than once
*/
public Observable<R> toObservable();

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,29 @@
*/
package com.netflix.hystrix;

import java.util.*;
import java.util.concurrent.*;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.*;
import rx.Observable;
import rx.functions.*;
import rx.Scheduler;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;

import com.netflix.hystrix.HystrixCollapser.CollapsedRequest;
import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
import com.netflix.hystrix.collapser.*;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import com.netflix.hystrix.collapser.CollapserTimer;
import com.netflix.hystrix.collapser.HystrixCollapserBridge;
import com.netflix.hystrix.collapser.RealCollapserTimer;
import com.netflix.hystrix.collapser.RequestCollapser;
import com.netflix.hystrix.collapser.RequestCollapserFactory;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy;
Expand All @@ -55,7 +63,7 @@
* @param <RequestArgumentType>
* The type of the request argument. If multiple arguments are needed, wrap them in another object or a Tuple.
*/
public abstract class HystrixObservableCollapser<K, BatchReturnType, ResponseType, RequestArgumentType> implements HystrixExecutable<ResponseType> {
public abstract class HystrixObservableCollapser<K, BatchReturnType, ResponseType, RequestArgumentType> implements HystrixObservable<ResponseType> {

static final Logger logger = LoggerFactory.getLogger(HystrixObservableCollapser.class);

Expand Down Expand Up @@ -397,52 +405,6 @@ public Observable<ResponseType> toObservable(Scheduler observeOn) {
return response;
}

/**
* Used for synchronous execution.
* <p>
* If {@link Scope#REQUEST} is being used then synchronous execution will only result in collapsing if other threads are running within the same scope.
*
* @return ResponseType
* Result of {@link HystrixCommand}{@code <BatchReturnType>} execution after passing through {@link #mapResponseToRequests} to transform the {@code <BatchReturnType>} into
* {@code <ResponseType>}
* @throws HystrixRuntimeException
* if an error occurs and a fallback cannot be retrieved
*/
public ResponseType execute() {
try {
return queue().get();
} catch (Throwable e) {
if (e instanceof HystrixRuntimeException) {
throw (HystrixRuntimeException) e;
}
// if we have an exception we know about we'll throw it directly without the threading wrapper exception
if (e.getCause() instanceof HystrixRuntimeException) {
throw (HystrixRuntimeException) e.getCause();
}
// we don't know what kind of exception this is so create a generic message and throw a new HystrixRuntimeException
String message = getClass().getSimpleName() + " HystrixCollapser failed while executing.";
logger.debug(message, e); // debug only since we're throwing the exception and someone higher will do something with it
//TODO should this be made a HystrixRuntimeException?
throw new RuntimeException(message, e);
}
}

/**
* Used for asynchronous execution.
* <p>
* This will queue up the command and return a Future to get the result once it completes.
*
* @return ResponseType
* Result of {@link HystrixCommand}{@code <BatchReturnType>} execution after passing through {@link #mapResponseToRequests} to transform the {@code <BatchReturnType>} into
* {@code <ResponseType>}
* @throws HystrixRuntimeException
* within an <code>ExecutionException.getCause()</code> (thrown by {@link Future#get}) if an error occurs and a fallback cannot be retrieved
*/
public Future<ResponseType> queue() {
final Observable<ResponseType> o = toObservable();
return o.toBlocking().toFuture();
}

/**
* Key to be used for request caching.
* <p>
Expand Down Expand Up @@ -550,4 +512,4 @@ public Setter andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter p
@SuppressWarnings("rawtypes")
private static ConcurrentHashMap<Class<? extends HystrixObservableCollapser>, String> defaultNameCache = new ConcurrentHashMap<Class<? extends HystrixObservableCollapser>, String>();

}
}
Loading

0 comments on commit 62ac93f

Please sign in to comment.