Skip to content

Commit

Permalink
Fix Sonar Issues
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell committed Mar 9, 2022
1 parent f96edc0 commit a1ca194
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {
IllegalStateException isx = new IllegalStateException("Failed to determine TestUtils.boundPort() method; client version: "
+ AppInfoParser.getVersion(), e);
isx.addSuppressed(e1);
throw isx;
throw isx; // NOSONAR
}
}
BOUND_PORT_METHOD = method;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@
public class KafkaListenerAnnotationBeanPostProcessor<K, V>
implements BeanPostProcessor, Ordered, ApplicationContextAware, InitializingBean, SmartInitializingSingleton {

private static final String UNCHECKED = "unchecked";

private static final String THE_LEFT = "The [";

private static final String RESOLVED_TO_LEFT = "Resolved to [";
Expand Down Expand Up @@ -643,7 +645,7 @@ private void resolveContentTypeConverter(MethodKafkaListenerEndpoint<?, ?> endpo
}
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@SuppressWarnings({ "rawtypes", UNCHECKED })
private void resolveFilter(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener) {
Object filter = resolveExpression(kafkaListener.filter());
if (filter instanceof RecordFilterStrategy) {
Expand Down Expand Up @@ -701,7 +703,7 @@ protected String noBeanFoundMessage(Object target, String listenerBeanName, Stri
+ requestedBeanName + "' was found in the application context";
}

@SuppressWarnings("unchecked")
@SuppressWarnings(UNCHECKED)
private void resolveKafkaProperties(MethodKafkaListenerEndpoint<?, ?> endpoint, String[] propertyStrings) {
if (propertyStrings.length > 0) {
Properties properties = new Properties();
Expand Down Expand Up @@ -870,7 +872,7 @@ else if (relativeToCurrentValue instanceof Boolean) {
return relativeToCurrent;
}

@SuppressWarnings("unchecked")
@SuppressWarnings(UNCHECKED)
private void resolveAsString(Object resolvedValue, List<String> result) {
if (resolvedValue instanceof String[]) {
for (Object object : (String[]) resolvedValue) {
Expand All @@ -891,7 +893,7 @@ else if (resolvedValue instanceof Iterable) {
}
}

@SuppressWarnings("unchecked")
@SuppressWarnings(UNCHECKED)
private void resolvePartitionAsInteger(String topic, Object resolvedValue,
List<TopicPartitionOffset> result, @Nullable Long offset, boolean isRelative, boolean checkDups) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
*/
public class DeadLetterPublishingRecoverer extends ExceptionClassifier implements ConsumerAwareRecordRecoverer {

private static final String DEPRECATION = "deprecation";

protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR

private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
Expand Down Expand Up @@ -339,7 +341,7 @@ public void setSkipSameTopicFatalExceptions(boolean skipSameTopicFatalExceptions
this.skipSameTopicFatalExceptions = skipSameTopicFatalExceptions;
}

@SuppressWarnings({ "unchecked", "deprecation" })
@SuppressWarnings({ "unchecked", DEPRECATION })
@Override
public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consumer, Exception exception) {
TopicPartition tp = this.destinationResolver.apply(record, exception);
Expand Down Expand Up @@ -406,7 +408,7 @@ private void sendOrThrow(ProducerRecord<Object, Object> outRecord,
}

private void maybeThrow(ConsumerRecord<?, ?> record, Exception exception) {
@SuppressWarnings("deprecation")
@SuppressWarnings(DEPRECATION)
String message = String.format("No destination returned for record %s and exception %s. " +
"failIfNoDestinationReturned: %s", ListenerUtils.recordToString(record), exception,
this.throwIfNoDestinationReturned);
Expand Down Expand Up @@ -519,7 +521,7 @@ protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?,
* @param inRecord the consumer record.
* @since 2.2.5
*/
@SuppressWarnings("deprecation")
@SuppressWarnings(DEPRECATION)
protected void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations<Object, Object> kafkaTemplate,
ConsumerRecord<?, ?> inRecord) {

Expand Down Expand Up @@ -561,7 +563,7 @@ private void verifySendResult(KafkaOperations<Object, Object> kafkaTemplate,
}
}

@SuppressWarnings("deprecation")
@SuppressWarnings(DEPRECATION)
private String pubFailMessage(ProducerRecord<Object, Object> outRecord, ConsumerRecord<?, ?> inRecord) {
return "Dead-letter publication to "
+ outRecord.topic() + "failed for: " + ListenerUtils.recordToString(inRecord, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ public class KafkaMessageListenerContainer<K, V> // NOSONAR line count

private static final String UNUSED = "unused";

private static final String DEPRECATION = "deprecation";

private static final String UNCHECKED = "unchecked";

private static final String RAWTYPES = "rawtypes";

private static final int DEFAULT_ACK_TIME = 5000;

private static final Map<String, Object> CONSUMER_CONFIG_DEFAULTS = ConsumerConfig.configDef().defaultValues();
Expand Down Expand Up @@ -547,12 +553,6 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private static final String ERROR_HANDLER_THREW_AN_EXCEPTION = "Error handler threw an exception";

private static final String UNCHECKED = "unchecked";

private static final String RAWTYPES = "rawtypes";

private static final String RAW_TYPES = RAWTYPES;

private final LogAccessor logger = KafkaMessageListenerContainer.this.logger; // NOSONAR hide

private final ContainerProperties containerProperties = getContainerProperties();
Expand Down Expand Up @@ -611,7 +611,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final PlatformTransactionManager transactionManager = this.containerProperties.getTransactionManager();

@SuppressWarnings(RAW_TYPES)
@SuppressWarnings(RAWTYPES)
private final KafkaAwareTransactionManager kafkaTxManager =
this.transactionManager instanceof KafkaAwareTransactionManager
? ((KafkaAwareTransactionManager) this.transactionManager) : null;
Expand Down Expand Up @@ -858,7 +858,7 @@ else if (this.commonRecordInterceptor != null) {
@Nullable
private CommonErrorHandler determineCommonErrorHandler() {
CommonErrorHandler common = getCommonErrorHandler();
@SuppressWarnings("deprecation")
@SuppressWarnings(DEPRECATION)
GenericErrorHandler<?> errHandler = getGenericErrorHandler();
if (common != null) {
if (errHandler != null) {
Expand Down Expand Up @@ -935,7 +935,7 @@ private void checkGroupInstance(Properties properties, ConsumerFactory<K, V> con
}
}

@SuppressWarnings("deprecation")
@SuppressWarnings(DEPRECATION)
private boolean setupSubBatchPerPartition() {
Boolean subBatching = this.containerProperties.getSubBatchPerPartition();
if (subBatching != null) {
Expand Down Expand Up @@ -1229,7 +1229,7 @@ public boolean isLongLived() {
return true;
}

@SuppressWarnings("deprecation")
@SuppressWarnings(DEPRECATION)
@Override // NOSONAR complexity
public void run() {
ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());
Expand Down Expand Up @@ -1671,16 +1671,20 @@ private void pausePartitionsIfNecessary() {
}

private void resumePartitionsIfNecessary() {
List<TopicPartition> partitionsToResume = getAssignedPartitions()
.stream()
.filter(tp -> !isPartitionPauseRequested(tp)
&& this.pausedPartitions.contains(tp))
.collect(Collectors.toList());
if (partitionsToResume.size() > 0) {
this.consumer.resume(partitionsToResume);
this.pausedPartitions.removeAll(partitionsToResume);
this.logger.debug(() -> "Resumed consumption from " + partitionsToResume);
partitionsToResume.forEach(KafkaMessageListenerContainer.this::publishConsumerPartitionResumedEvent);
Collection<TopicPartition> assigned = getAssignedPartitions();
if (assigned != null) {
List<TopicPartition> partitionsToResume = assigned
.stream()
.filter(tp -> !isPartitionPauseRequested(tp)
&& this.pausedPartitions.contains(tp))
.collect(Collectors.toList());
if (partitionsToResume.size() > 0) {
this.consumer.resume(partitionsToResume);
this.pausedPartitions.removeAll(partitionsToResume);
this.logger.debug(() -> "Resumed consumption from " + partitionsToResume);
partitionsToResume
.forEach(KafkaMessageListenerContainer.this::publishConsumerPartitionResumedEvent);
}
}
}

Expand Down Expand Up @@ -1812,7 +1816,7 @@ record = this.acks.poll();
}
}

@SuppressWarnings("deprecation")
@SuppressWarnings(DEPRECATION)
private void traceAck(ConsumerRecord<K, V> record) {
this.logger.trace(() -> "Ack: " + ListenerUtils.recordToString(record, true));
}
Expand Down Expand Up @@ -1887,7 +1891,7 @@ private void processAcks(ConsumerRecords<K, V> records) {
}
}

@SuppressWarnings("deprecation")
@SuppressWarnings(DEPRECATION)
private synchronized void ackInOrder(ConsumerRecord<K, V> record) {
TopicPartition part = new TopicPartition(record.topic(), record.partition());
List<Long> offs = this.offsetsInThisBatch.get(part);
Expand Down Expand Up @@ -1995,7 +1999,7 @@ private void invokeBatchListener(final ConsumerRecords<K, V> recordsArg) {
}
}

@SuppressWarnings(RAW_TYPES)
@SuppressWarnings(RAWTYPES)
private void invokeBatchListenerInTx(final ConsumerRecords<K, V> records,
@Nullable final List<ConsumerRecord<K, V>> recordList) {

Expand Down Expand Up @@ -2306,7 +2310,7 @@ private void invokeRecordListener(final ConsumerRecords<K, V> records) {
* Invoke the listener with each record in a separate transaction.
* @param records the records.
*/
@SuppressWarnings("deprecation") // NOSONAR complexity
@SuppressWarnings(DEPRECATION) // NOSONAR complexity
private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
while (iterator.hasNext()) {
Expand Down Expand Up @@ -2408,7 +2412,7 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
}
}

@SuppressWarnings("deprecation")
@SuppressWarnings(DEPRECATION)
private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
while (iterator.hasNext()) {
Expand Down Expand Up @@ -2444,7 +2448,7 @@ private ConsumerRecords<K, V> checkEarlyIntercept(ConsumerRecords<K, V> nextArg)
return next;
}

@SuppressWarnings("deprecation")
@SuppressWarnings(DEPRECATION)
@Nullable
private ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> recordArg) {
internalHeaders(recordArg);
Expand Down Expand Up @@ -2494,7 +2498,10 @@ private void pauseForNackSleep() {
this.nackWake = System.currentTimeMillis() + this.nackSleep;
this.nackSleep = -1;
Set<TopicPartition> alreadyPaused = this.consumer.paused();
this.pausedForNack.addAll(getAssignedPartitions());
Collection<TopicPartition> assigned = getAssignedPartitions();
if (assigned != null) {
this.pausedForNack.addAll(assigned);
}
this.pausedForNack.removeAll(alreadyPaused);
this.logger.debug(() -> "Pausing for nack sleep: " + ListenerConsumer.this.pausedForNack);
try {
Expand Down Expand Up @@ -2605,7 +2612,7 @@ private void invokeOnMessage(final ConsumerRecord<K, V> record) {
}
}

@SuppressWarnings("deprecation")
@SuppressWarnings(DEPRECATION)
private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg) {
ConsumerRecord<K, V> record = recordArg;
if (this.recordInterceptor != null) {
Expand Down Expand Up @@ -2759,7 +2766,7 @@ private void sendOffsetsToTransaction() {
doSendOffsets(this.producer, commits);
}

@SuppressWarnings("deprecation")
@SuppressWarnings(DEPRECATION)
private void doSendOffsets(Producer<?, ?> prod, Map<TopicPartition, OffsetAndMetadata> commits) {
if (this.eosMode.getMode().equals(EOSMode.V1)) {
prod.sendOffsetsToTransaction(commits, this.consumerGroupId);
Expand Down Expand Up @@ -3171,7 +3178,7 @@ public void nack(long sleep) {
}

@Override
@SuppressWarnings("deprecation")
@SuppressWarnings(DEPRECATION)
public String toString() {
return "Acknowledgment for " + ListenerUtils.recordToString(this.record, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ void shouldInstantiateIfNotInContainer() {
}

@Test
@SuppressWarnings("deprecation")
void shouldLogConsumerRecordMessage() {
ListenerUtils.setLogOnlyMetadata(false);
RetryTopicConfigurer.LoggingDltListenerHandlerMethod method =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ void primary() {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(Config3.class);
MicrometerHolder micrometerHolder = new MicrometerHolder(ctx, "holderName",
"timerName", "timerDesc", Collections.emptyMap());
@SuppressWarnings("unchecked")
Map<String, Timer> meters = (Map<String, Timer>) ReflectionTestUtils.getField(micrometerHolder, "meters");
assertThat(meters).hasSize(1);
}
Expand Down

0 comments on commit a1ca194

Please sign in to comment.