Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle hook errors #772

Merged
merged 2 commits into from
Apr 28, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 73 additions & 25 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,11 @@ public Observable<R> toObservable() {
/* mark that we received this response from cache */
metrics.markResponseFromCache();
isExecutionComplete.set(true);
executionHook.onCacheHit(this);
try {
executionHook.onCacheHit(this);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onCacheHit", hookEx);
}
return new CachedObservableResponse<R>((CachedObservableOriginal<R>) fromCache, this);
}
}
Expand All @@ -379,7 +383,11 @@ public void call(Subscriber<? super R> observer) {
metrics.incrementConcurrentExecutionCount();

// mark that we're starting execution on the ExecutionHook
executionHook.onStart(_this);
try {
executionHook.onStart(_this);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onStart", hookEx);
}

/* determine if we're allowed to execute */
if (circuitBreaker.allowRequest()) {
Expand Down Expand Up @@ -511,9 +519,21 @@ public void call(Subscriber<? super R> s) {
s.onError(new RuntimeException("timed out before executing run()"));
} else {
// not timed out so execute
executionHook.onThreadStart(_self);
executionHook.onRunStart(_self);
executionHook.onExecutionStart(_self);
try {
executionHook.onThreadStart(_self);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onThreadStart", hookEx);
}
try {
executionHook.onRunStart(_self);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onRunStart", hookEx);
}
try {
executionHook.onExecutionStart(_self);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onExecutionStart", hookEx);
}
threadPool.markThreadExecution();
// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));
Expand All @@ -530,8 +550,16 @@ public Boolean call() {
}));
} else {
// semaphore isolated
executionHook.onRunStart(_self);
executionHook.onExecutionStart(_self);
try {
executionHook.onRunStart(_self);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onRunStart", hookEx);
}
try {
executionHook.onExecutionStart(_self);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onExecutionStart", hookEx);
}
// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));
run = getExecutionObservableWithLifecycle(); //the getExecutionObservableWithLifecycle method already wraps sync exceptions, so no need to catch here
Expand Down Expand Up @@ -603,8 +631,8 @@ public Observable<R> call(Throwable t) {
} else {
logger.warn("ExecutionHook.onError returned an exception that was not an instance of HystrixBadRequestException so will be ignored.", decorated);
}
} catch (Exception hookException) {
logger.warn("Error calling ExecutionHook.onError", hookException);
} catch (Exception hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onError", hookEx);
}
/*
* HystrixBadRequestException is treated differently and allowed to propagate without any stats tracking or fallback logic
Expand Down Expand Up @@ -665,7 +693,7 @@ private Observable<R> getExecutionObservableWithLifecycle() {
// so we catch it here and turn it into Observable.error
userObservable = Observable.error(ex);
}
return userObservable .lift(new ExecutionHookApplication(_self))
return userObservable.lift(new ExecutionHookApplication(_self))
.lift(new DeprecatedOnRunHookApplication(_self))
.doOnTerminate(new Action0() {
@Override
Expand Down Expand Up @@ -725,7 +753,11 @@ private Observable<R> getFallbackOrThrowException(final HystrixEventType eventTy
// acquire a permit
if (fallbackSemaphore.tryAcquire()) {
if (isFallbackUserSupplied(this)) {
executionHook.onFallbackStart(this);
try {
executionHook.onFallbackStart(this);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onFallbackStart", hookEx);
}
}

try {
Expand Down Expand Up @@ -883,7 +915,11 @@ protected void handleThreadEnd() {
}
if (isExecutedInThread.get()) {
threadPool.markThreadCompletion();
executionHook.onThreadComplete(this);
try {
executionHook.onThreadComplete(this);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onThreadComplete", hookEx);
}
}
}

Expand Down Expand Up @@ -1297,7 +1333,11 @@ public Subscriber<? super R> call(final Subscriber<? super R> subscriber) {
return new Subscriber<R>(subscriber) {
@Override
public void onCompleted() {
executionHook.onSuccess(cmd);
try {
executionHook.onSuccess(cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx);
}
subscriber.onCompleted();
}

Expand Down Expand Up @@ -1328,7 +1368,11 @@ public Subscriber<? super R> call(final Subscriber<? super R> subscriber) {
return new Subscriber<R>(subscriber) {
@Override
public void onCompleted() {
executionHook.onExecutionSuccess(cmd);
try {
executionHook.onExecutionSuccess(cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onExecutionSuccess", hookEx);
}
subscriber.onCompleted();
}

Expand Down Expand Up @@ -1359,7 +1403,11 @@ public Subscriber<? super R> call(final Subscriber<? super R> subscriber) {
return new Subscriber<R>(subscriber) {
@Override
public void onCompleted() {
executionHook.onFallbackSuccess(cmd);
try {
executionHook.onFallbackSuccess(cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onFallbackSuccess", hookEx);
}
subscriber.onCompleted();
}

Expand Down Expand Up @@ -1405,7 +1453,7 @@ public void onNext(R r) {
R wrappedValue = executionHook.onComplete(cmd, r);
subscriber.onNext(wrappedValue);
} catch (Throwable hookEx) {
logger.warn("Error calling ExecutionHook.onComplete", hookEx);
logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx);
subscriber.onNext(r);
}
}
Expand Down Expand Up @@ -1437,7 +1485,7 @@ public void onError(Throwable t) {
Exception wrappedEx = executionHook.onRunError(cmd, e);
subscriber.onError(wrappedEx);
} catch (Throwable hookEx) {
logger.warn("Error calling ExecutionHook.onRunError", hookEx);
logger.warn("Error calling HystrixCommandExecutionHook.onRunError", hookEx);
subscriber.onError(e);
}
}
Expand All @@ -1448,7 +1496,7 @@ public void onNext(R r) {
R wrappedValue = executionHook.onRunSuccess(cmd, r);
subscriber.onNext(wrappedValue);
} catch (Throwable hookEx) {
logger.warn("Error calling ExecutionHook.onRunSuccess", hookEx);
logger.warn("Error calling HystrixCommandExecutionHook.onRunSuccess", hookEx);
subscriber.onNext(r);
}
}
Expand Down Expand Up @@ -1485,7 +1533,7 @@ public void onNext(R r) {
R wrappedValue = executionHook.onFallbackSuccess(cmd, r);
subscriber.onNext(wrappedValue);
} catch (Throwable hookEx) {
logger.warn("Error calling ExecutionHook.onFallbackSuccess", hookEx);
logger.warn("Error calling HystrixCommandExecutionHook.onFallbackSuccess", hookEx);
subscriber.onNext(r);
}
}
Expand All @@ -1498,7 +1546,7 @@ private Exception wrapWithOnExecutionErrorHook(Throwable t) {
try {
return executionHook.onExecutionError(this, e);
} catch (Throwable hookEx) {
logger.warn("Error calling ExecutionHook.onExecutionError", hookEx);
logger.warn("Error calling HystrixCommandExecutionHook.onExecutionError", hookEx);
return e;
}
}
Expand All @@ -1512,7 +1560,7 @@ private Exception wrapWithOnFallbackErrorHook(Throwable t) {
return e;
}
} catch (Throwable hookEx) {
logger.warn("Error calling ExecutionHook.onFallbackError", hookEx);
logger.warn("Error calling HystrixCommandExecutionHook.onFallbackError", hookEx);
return e;
}
}
Expand All @@ -1522,7 +1570,7 @@ private Exception wrapWithOnErrorHook(FailureType failureType, Throwable t) {
try {
return executionHook.onError(this, failureType, e);
} catch (Throwable hookEx) {
logger.warn("Error calling ExecutionHook.onError", hookEx);
logger.warn("Error calling HystrixCommandExecutionHook.onError", hookEx);
return e;
}
}
Expand All @@ -1531,7 +1579,7 @@ private R wrapWithOnExecutionEmitHook(R r) {
try {
return executionHook.onExecutionEmit(this, r);
} catch (Throwable hookEx) {
logger.warn("Error calling ExecutionHook.onExecutionEmit", hookEx);
logger.warn("Error calling HystrixCommandExecutionHook.onExecutionEmit", hookEx);
return r;
}
}
Expand All @@ -1540,7 +1588,7 @@ private R wrapWithOnFallbackEmitHook(R r) {
try {
return executionHook.onFallbackEmit(this, r);
} catch (Throwable hookEx) {
logger.warn("Error calling ExecutionHook.onFallbackEmit", hookEx);
logger.warn("Error calling HystrixCommandExecutionHook.onFallbackEmit", hookEx);
return r;
}
}
Expand All @@ -1549,7 +1597,7 @@ private R wrapWithOnEmitHook(R r) {
try {
return executionHook.onEmit(this, r);
} catch (Throwable hookEx) {
logger.warn("Error calling ExecutionHook.onEmit", hookEx);
logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx);
return r;
}
}
Expand Down