Skip to content

Commit

Permalink
Changes per PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
m50d committed Feb 22, 2021
1 parent f26b7ec commit cd84d5e
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,9 @@ public void run() {
metrics.commitOffsetTime.record(timer.duration());
}
updateState(SubscriptionStateListener.State.SHUTTING_DOWN);
waitForRemainingTasksCompletion(
scope.props().get(ProcessorProperties.CONFIG_SHUTDOWN_TIMEOUT_MS).value());
final long timeoutMillis =
scope.props().get(ProcessorProperties.CONFIG_SHUTDOWN_TIMEOUT_MS).value();
if (timeoutMillis > 0) { waitForRemainingTasksCompletion(timeoutMillis); }
} catch (RuntimeException e) {
log.error("Unknown exception thrown at subscription loop, thread will be terminated: {}", scope, e);
updateState(SubscriptionStateListener.State.SHUTTING_DOWN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@ public ProcessorUnit(ThreadScope scope, ProcessPipeline<?> pipeline) {
"partition", String.valueOf(tp.partition()),
"subpartition", String.valueOf(scope.threadId()))
.new ResourceUtilizationMetrics();
shutdownFuture = executorShutdownFuture.thenAccept(v -> {
pipeline.close();
metrics.close();
});
shutdownFuture = executorShutdownFuture.thenAccept(v -> metrics.close());
}

public void putTask(TaskRequest request) {
Expand Down Expand Up @@ -95,6 +92,7 @@ public void initiateShutdown() {
// finish
executor.submit(() -> executorShutdownFuture.complete(null));
executor.shutdown();
pipeline.close();
}

@Override
Expand Down

0 comments on commit cd84d5e

Please sign in to comment.