From c36610fa68951370ab6b8aec8d2baf82bccb670c Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Wed, 4 May 2022 13:02:03 +0200 Subject: [PATCH] fix(kafka-runner): wrong logger for KafkaStreamService --- .../io/kestra/runner/kafka/services/KafkaStreamService.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamService.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamService.java index 0332b746a60..62c688027a2 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamService.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamService.java @@ -154,7 +154,7 @@ private Stream(Topology topology, Properties props, MetricRegistry meterRegistry public synchronized void start(final KafkaStreams.StateListener listener) throws IllegalStateException, StreamsException { this.setUncaughtExceptionHandler(e -> { - log.error("Uncaught exception in Kafka Stream, closing !", e); + this.logger.error("Uncaught exception in Kafka Stream, closing !", e); return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION; }); @@ -182,12 +182,12 @@ public synchronized void start(final KafkaStreams.StateListener listener) throws newState == State.NOT_RUNNING || newState == State.PENDING_SHUTDOWN ) { - log.warn("Switching stream state from {} to {}", oldState, newState); + this.logger.warn("Switching stream state from {} to {}", oldState, newState); } else if ( newState == State.PENDING_ERROR || newState == State.ERROR ) { - log.error("Switching stream state from {} to {}", oldState, newState); + this.logger.error("Switching stream state from {} to {}", oldState, newState); } else { logger.info("Switching stream state from {} to {}", oldState, newState); }