diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java index 2f1d9e8cfa..0f78eac573 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java @@ -150,7 +150,7 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean { IllegalStateException isx = new IllegalStateException("Failed to determine TestUtils.boundPort() method; client version: " + AppInfoParser.getVersion(), e); isx.addSuppressed(e1); - throw isx; + throw isx; // NOSONAR } } BOUND_PORT_METHOD = method; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index a814393b41..eb71c2f6be 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -144,6 +144,8 @@ public class KafkaListenerAnnotationBeanPostProcessor implements BeanPostProcessor, Ordered, ApplicationContextAware, InitializingBean, SmartInitializingSingleton { + private static final String UNCHECKED = "unchecked"; + private static final String THE_LEFT = "The ["; private static final String RESOLVED_TO_LEFT = "Resolved to ["; @@ -643,7 +645,7 @@ private void resolveContentTypeConverter(MethodKafkaListenerEndpoint endpo } } - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings({ "rawtypes", UNCHECKED }) private void resolveFilter(MethodKafkaListenerEndpoint endpoint, KafkaListener kafkaListener) { Object filter = resolveExpression(kafkaListener.filter()); if (filter instanceof RecordFilterStrategy) { @@ -701,7 +703,7 @@ protected String noBeanFoundMessage(Object target, String listenerBeanName, Stri + requestedBeanName + "' was found in the application context"; } - @SuppressWarnings("unchecked") + @SuppressWarnings(UNCHECKED) private void resolveKafkaProperties(MethodKafkaListenerEndpoint endpoint, String[] propertyStrings) { if (propertyStrings.length > 0) { Properties properties = new Properties(); @@ -870,7 +872,7 @@ else if (relativeToCurrentValue instanceof Boolean) { return relativeToCurrent; } - @SuppressWarnings("unchecked") + @SuppressWarnings(UNCHECKED) private void resolveAsString(Object resolvedValue, List result) { if (resolvedValue instanceof String[]) { for (Object object : (String[]) resolvedValue) { @@ -891,7 +893,7 @@ else if (resolvedValue instanceof Iterable) { } } - @SuppressWarnings("unchecked") + @SuppressWarnings(UNCHECKED) private void resolvePartitionAsInteger(String topic, Object resolvedValue, List result, @Nullable Long offset, boolean isRelative, boolean checkDups) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java index f80cda4251..dae1f61870 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java @@ -66,6 +66,8 @@ */ public class DeadLetterPublishingRecoverer extends ExceptionClassifier implements ConsumerAwareRecordRecoverer { + private static final String DEPRECATION = "deprecation"; + protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR private static final BiFunction, Exception, TopicPartition> @@ -339,7 +341,7 @@ public void setSkipSameTopicFatalExceptions(boolean skipSameTopicFatalExceptions this.skipSameTopicFatalExceptions = skipSameTopicFatalExceptions; } - @SuppressWarnings({ "unchecked", "deprecation" }) + @SuppressWarnings({ "unchecked", DEPRECATION }) @Override public void accept(ConsumerRecord record, @Nullable Consumer consumer, Exception exception) { TopicPartition tp = this.destinationResolver.apply(record, exception); @@ -406,7 +408,7 @@ private void sendOrThrow(ProducerRecord outRecord, } private void maybeThrow(ConsumerRecord record, Exception exception) { - @SuppressWarnings("deprecation") + @SuppressWarnings(DEPRECATION) String message = String.format("No destination returned for record %s and exception %s. " + "failIfNoDestinationReturned: %s", ListenerUtils.recordToString(record), exception, this.throwIfNoDestinationReturned); @@ -519,7 +521,7 @@ protected ProducerRecord createProducerRecord(ConsumerRecord outRecord, KafkaOperations kafkaTemplate, ConsumerRecord inRecord) { @@ -561,7 +563,7 @@ private void verifySendResult(KafkaOperations kafkaTemplate, } } - @SuppressWarnings("deprecation") + @SuppressWarnings(DEPRECATION) private String pubFailMessage(ProducerRecord outRecord, ConsumerRecord inRecord) { return "Dead-letter publication to " + outRecord.topic() + "failed for: " + ListenerUtils.recordToString(inRecord, true); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index aac07916d6..7ba487437e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -152,6 +152,12 @@ public class KafkaMessageListenerContainer // NOSONAR line count private static final String UNUSED = "unused"; + private static final String DEPRECATION = "deprecation"; + + private static final String UNCHECKED = "unchecked"; + + private static final String RAWTYPES = "rawtypes"; + private static final int DEFAULT_ACK_TIME = 5000; private static final Map CONSUMER_CONFIG_DEFAULTS = ConsumerConfig.configDef().defaultValues(); @@ -547,12 +553,6 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private static final String ERROR_HANDLER_THREW_AN_EXCEPTION = "Error handler threw an exception"; - private static final String UNCHECKED = "unchecked"; - - private static final String RAWTYPES = "rawtypes"; - - private static final String RAW_TYPES = RAWTYPES; - private final LogAccessor logger = KafkaMessageListenerContainer.this.logger; // NOSONAR hide private final ContainerProperties containerProperties = getContainerProperties(); @@ -611,7 +611,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final PlatformTransactionManager transactionManager = this.containerProperties.getTransactionManager(); - @SuppressWarnings(RAW_TYPES) + @SuppressWarnings(RAWTYPES) private final KafkaAwareTransactionManager kafkaTxManager = this.transactionManager instanceof KafkaAwareTransactionManager ? ((KafkaAwareTransactionManager) this.transactionManager) : null; @@ -858,7 +858,7 @@ else if (this.commonRecordInterceptor != null) { @Nullable private CommonErrorHandler determineCommonErrorHandler() { CommonErrorHandler common = getCommonErrorHandler(); - @SuppressWarnings("deprecation") + @SuppressWarnings(DEPRECATION) GenericErrorHandler errHandler = getGenericErrorHandler(); if (common != null) { if (errHandler != null) { @@ -935,7 +935,7 @@ private void checkGroupInstance(Properties properties, ConsumerFactory con } } - @SuppressWarnings("deprecation") + @SuppressWarnings(DEPRECATION) private boolean setupSubBatchPerPartition() { Boolean subBatching = this.containerProperties.getSubBatchPerPartition(); if (subBatching != null) { @@ -1229,7 +1229,7 @@ public boolean isLongLived() { return true; } - @SuppressWarnings("deprecation") + @SuppressWarnings(DEPRECATION) @Override // NOSONAR complexity public void run() { ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata()); @@ -1671,16 +1671,20 @@ private void pausePartitionsIfNecessary() { } private void resumePartitionsIfNecessary() { - List partitionsToResume = getAssignedPartitions() - .stream() - .filter(tp -> !isPartitionPauseRequested(tp) - && this.pausedPartitions.contains(tp)) - .collect(Collectors.toList()); - if (partitionsToResume.size() > 0) { - this.consumer.resume(partitionsToResume); - this.pausedPartitions.removeAll(partitionsToResume); - this.logger.debug(() -> "Resumed consumption from " + partitionsToResume); - partitionsToResume.forEach(KafkaMessageListenerContainer.this::publishConsumerPartitionResumedEvent); + Collection assigned = getAssignedPartitions(); + if (assigned != null) { + List partitionsToResume = assigned + .stream() + .filter(tp -> !isPartitionPauseRequested(tp) + && this.pausedPartitions.contains(tp)) + .collect(Collectors.toList()); + if (partitionsToResume.size() > 0) { + this.consumer.resume(partitionsToResume); + this.pausedPartitions.removeAll(partitionsToResume); + this.logger.debug(() -> "Resumed consumption from " + partitionsToResume); + partitionsToResume + .forEach(KafkaMessageListenerContainer.this::publishConsumerPartitionResumedEvent); + } } } @@ -1812,7 +1816,7 @@ record = this.acks.poll(); } } - @SuppressWarnings("deprecation") + @SuppressWarnings(DEPRECATION) private void traceAck(ConsumerRecord record) { this.logger.trace(() -> "Ack: " + ListenerUtils.recordToString(record, true)); } @@ -1887,7 +1891,7 @@ private void processAcks(ConsumerRecords records) { } } - @SuppressWarnings("deprecation") + @SuppressWarnings(DEPRECATION) private synchronized void ackInOrder(ConsumerRecord record) { TopicPartition part = new TopicPartition(record.topic(), record.partition()); List offs = this.offsetsInThisBatch.get(part); @@ -1995,7 +1999,7 @@ private void invokeBatchListener(final ConsumerRecords recordsArg) { } } - @SuppressWarnings(RAW_TYPES) + @SuppressWarnings(RAWTYPES) private void invokeBatchListenerInTx(final ConsumerRecords records, @Nullable final List> recordList) { @@ -2306,7 +2310,7 @@ private void invokeRecordListener(final ConsumerRecords records) { * Invoke the listener with each record in a separate transaction. * @param records the records. */ - @SuppressWarnings("deprecation") // NOSONAR complexity + @SuppressWarnings(DEPRECATION) // NOSONAR complexity private void invokeRecordListenerInTx(final ConsumerRecords records) { Iterator> iterator = records.iterator(); while (iterator.hasNext()) { @@ -2408,7 +2412,7 @@ protected void doInTransactionWithoutResult(TransactionStatus status) { } } - @SuppressWarnings("deprecation") + @SuppressWarnings(DEPRECATION) private void doInvokeWithRecords(final ConsumerRecords records) { Iterator> iterator = records.iterator(); while (iterator.hasNext()) { @@ -2444,7 +2448,7 @@ private ConsumerRecords checkEarlyIntercept(ConsumerRecords nextArg) return next; } - @SuppressWarnings("deprecation") + @SuppressWarnings(DEPRECATION) @Nullable private ConsumerRecord checkEarlyIntercept(ConsumerRecord recordArg) { internalHeaders(recordArg); @@ -2494,7 +2498,10 @@ private void pauseForNackSleep() { this.nackWake = System.currentTimeMillis() + this.nackSleep; this.nackSleep = -1; Set alreadyPaused = this.consumer.paused(); - this.pausedForNack.addAll(getAssignedPartitions()); + Collection assigned = getAssignedPartitions(); + if (assigned != null) { + this.pausedForNack.addAll(assigned); + } this.pausedForNack.removeAll(alreadyPaused); this.logger.debug(() -> "Pausing for nack sleep: " + ListenerConsumer.this.pausedForNack); try { @@ -2605,7 +2612,7 @@ private void invokeOnMessage(final ConsumerRecord record) { } } - @SuppressWarnings("deprecation") + @SuppressWarnings(DEPRECATION) private void doInvokeOnMessage(final ConsumerRecord recordArg) { ConsumerRecord record = recordArg; if (this.recordInterceptor != null) { @@ -2759,7 +2766,7 @@ private void sendOffsetsToTransaction() { doSendOffsets(this.producer, commits); } - @SuppressWarnings("deprecation") + @SuppressWarnings(DEPRECATION) private void doSendOffsets(Producer prod, Map commits) { if (this.eosMode.getMode().equals(EOSMode.V1)) { prod.sendOffsetsToTransaction(commits, this.consumerGroupId); @@ -3171,7 +3178,7 @@ public void nack(long sleep) { } @Override - @SuppressWarnings("deprecation") + @SuppressWarnings(DEPRECATION) public String toString() { return "Acknowledgment for " + ListenerUtils.recordToString(this.record, true); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java index e00243e68a..2abf171470 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java @@ -358,6 +358,7 @@ void shouldInstantiateIfNotInContainer() { } @Test + @SuppressWarnings("deprecation") void shouldLogConsumerRecordMessage() { ListenerUtils.setLogOnlyMetadata(false); RetryTopicConfigurer.LoggingDltListenerHandlerMethod method = diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/MicrometerHolderTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/MicrometerHolderTests.java index d0b9a5d236..b3b3e5bed4 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/MicrometerHolderTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/MicrometerHolderTests.java @@ -90,6 +90,7 @@ void primary() { AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(Config3.class); MicrometerHolder micrometerHolder = new MicrometerHolder(ctx, "holderName", "timerName", "timerDesc", Collections.emptyMap()); + @SuppressWarnings("unchecked") Map meters = (Map) ReflectionTestUtils.getField(micrometerHolder, "meters"); assertThat(meters).hasSize(1); }