Skip to content

Commit

Permalink
GH-3589: Refactor toMessage batch method and fix logging/header issues
Browse files Browse the repository at this point in the history
Remove unnecessary check

Fix checkstyle violation

Change log level from WARN to DEBUG

Refactor headers conversion method

Add natives condition for logging when no header mapper
  • Loading branch information
bky373 authored and sobychacko committed Oct 28, 2024
1 parent 4a45905 commit 5fb1562
Showing 1 changed file with 48 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
Expand Down Expand Up @@ -63,6 +62,7 @@
* @author Biju Kunjummen
* @author Sanghyeok An
* @author Hope Kim
* @author Borahm Lee
* @since 1.1
*/
public class BatchMessagingMessageConverter implements BatchMessageConverter {
Expand Down Expand Up @@ -93,7 +93,7 @@ public BatchMessagingMessageConverter() {
* @param recordConverter the converter.
* @since 1.3.2
*/
public BatchMessagingMessageConverter(RecordMessageConverter recordConverter) {
public BatchMessagingMessageConverter(@Nullable RecordMessageConverter recordConverter) {
this.recordConverter = recordConverter;
if (JacksonPresent.isJackson2Present()) {
this.headerMapper = new DefaultKafkaHeaderMapper();
Expand Down Expand Up @@ -144,7 +144,8 @@ public void setRawRecordHeader(boolean rawRecordHeader) {
}

@Override // NOSONAR
public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer, Type type) {
public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type type) {

KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId,
this.generateTimestamp);
Expand All @@ -165,65 +166,38 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
addToRawHeaders(rawHeaders, convertedHeaders, natives, raws, conversionFailures);
commonHeaders(acknowledgment, consumer, rawHeaders, keys, topics, partitions, offsets, timestampTypes,
timestamps);
records.forEach(record -> processRecord(record, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps,
convertedHeaders, natives, raws, conversionFailures, rawHeaders, type));
return MessageBuilder.createMessage(payloads, kafkaMessageHeaders);
}

private void processRecord(ConsumerRecord<?, ?> record, List<Object> payloads, List<Object> keys,
List<String> topics, List<Integer> partitions, List<Long> offsets,
List<String> timestampTypes, List<Long> timestamps, List<Map<String, Object>> convertedHeaders,
List<Headers> natives, List<ConsumerRecord<?, ?>> raws, List<ConversionException> conversionFailures,
Map<String, Object> rawHeaders, Type type) {
payloads.add(obtainPayload(type, record, conversionFailures));
keys.add(record.key());
topics.add(record.topic());
partitions.add(record.partition());
offsets.add(record.offset());

if (record.timestampType() != null) {
timestampTypes.add(record.timestampType().name());
}
timestamps.add(record.timestamp());

boolean logged = false;
String info = null;

if (this.headerMapper != null && record.headers() != null) {
Map<String, Object> converted = new HashMap<>();
this.headerMapper.toHeaders(record.headers(), converted);
convertedHeaders.add(converted);
Object object = converted.get(KafkaHeaders.LISTENER_INFO);
info = Optional.ofNullable(object)
.filter(String.class::isInstance)
.map(String.class::cast)
.orElse(null);
}
else {
if (!logged) {
logHeaderWarningOnce();
logged = true;
String listenerInfo = null;
for (ConsumerRecord<?, ?> record : records) {
addRecordInfo(record, type, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps, conversionFailures);
if (this.headerMapper != null && record.headers() != null) {
Map<String, Object> converted = convertHeaders(record.headers(), convertedHeaders);
Object obj = converted.get(KafkaHeaders.LISTENER_INFO);
if (obj instanceof String) {
listenerInfo = (String) obj;
}
}
else {
natives.add(record.headers());
}
if (this.rawRecordHeader) {
raws.add(record);
}
natives.add(record.headers());
}
if (this.rawRecordHeader) {
raws.add(record);
if (this.headerMapper == null && !natives.isEmpty()) {
this.logger.debug(() ->
"No header mapper is available; Jackson is required for the default mapper; "
+ "headers (if present) are not mapped but provided raw in "
+ KafkaHeaders.NATIVE_HEADERS);
}
if (info != null) {
rawHeaders.put(KafkaHeaders.LISTENER_INFO, info);
if (listenerInfo != null) {
rawHeaders.put(KafkaHeaders.LISTENER_INFO, listenerInfo);
}
}

private void logHeaderWarningOnce() {
this.logger.debug(() ->
"No header mapper is available; Jackson is required for the default mapper; "
+ "headers (if present) are not mapped but provided raw in "
+ KafkaHeaders.NATIVE_HEADERS);
return MessageBuilder.createMessage(payloads, kafkaMessageHeaders);
}

private void addToRawHeaders(Map<String, Object> rawHeaders, List<Map<String, Object>> convertedHeaders,
List<Headers> natives, List<ConsumerRecord<?, ?>> raws, List<ConversionException> conversionFailures) {

if (this.headerMapper != null) {
rawHeaders.put(KafkaHeaders.BATCH_CONVERTED_HEADERS, convertedHeaders);
}
Expand All @@ -236,12 +210,33 @@ private void addToRawHeaders(Map<String, Object> rawHeaders, List<Map<String, Ob
rawHeaders.put(KafkaHeaders.CONVERSION_FAILURES, conversionFailures);
}

private void addRecordInfo(ConsumerRecord<?, ?> record, Type type, List<Object> payloads, List<Object> keys,
List<String> topics, List<Integer> partitions, List<Long> offsets, List<String> timestampTypes,
List<Long> timestamps, List<ConversionException> conversionFailures) {
payloads.add(obtainPayload(type, record, conversionFailures));
keys.add(record.key());
topics.add(record.topic());
partitions.add(record.partition());
offsets.add(record.offset());
timestamps.add(record.timestamp());
if (record.timestampType() != null) {
timestampTypes.add(record.timestampType().name());
}
}

private Object obtainPayload(Type type, ConsumerRecord<?, ?> record, List<ConversionException> conversionFailures) {
return this.recordConverter == null || !containerType(type)
? extractAndConvertValue(record, type)
: convert(record, type, conversionFailures);
}

private Map<String, Object> convertHeaders(Headers headers, List<Map<String, Object>> convertedHeaders) {
Map<String, Object> converted = new HashMap<>();
this.headerMapper.toHeaders(headers, converted);
convertedHeaders.add(converted);
return converted;
}

@Override
public List<ProducerRecord<?, ?>> fromMessage(Message<?> message, String defaultTopic) {
throw new UnsupportedOperationException();
Expand Down

0 comments on commit 5fb1562

Please sign in to comment.