diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java index b3a4c762..aedbd389 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java @@ -235,8 +235,9 @@ public void run() { metrics.commitOffsetTime.record(timer.duration()); } updateState(SubscriptionStateListener.State.SHUTTING_DOWN); - waitForRemainingTasksCompletion( - scope.props().get(ProcessorProperties.CONFIG_SHUTDOWN_TIMEOUT_MS).value()); + final long timeoutMillis = + scope.props().get(ProcessorProperties.CONFIG_SHUTDOWN_TIMEOUT_MS).value(); + if (timeoutMillis > 0) { waitForRemainingTasksCompletion(timeoutMillis); } } catch (RuntimeException e) { log.error("Unknown exception thrown at subscription loop, thread will be terminated: {}", scope, e); updateState(SubscriptionStateListener.State.SHUTTING_DOWN); diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessorUnit.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessorUnit.java index 9987babf..0f16ac06 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessorUnit.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessorUnit.java @@ -52,10 +52,7 @@ public ProcessorUnit(ThreadScope scope, ProcessPipeline pipeline) { "partition", String.valueOf(tp.partition()), "subpartition", String.valueOf(scope.threadId())) .new ResourceUtilizationMetrics(); - shutdownFuture = executorShutdownFuture.thenAccept(v -> { - pipeline.close(); - metrics.close(); - }); + shutdownFuture = executorShutdownFuture.thenAccept(v -> metrics.close()); } public void putTask(TaskRequest request) { @@ -95,6 +92,7 @@ public void initiateShutdown() { // finish executor.submit(() -> executorShutdownFuture.complete(null)); executor.shutdown(); + pipeline.close(); } @Override