diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageRecordUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageRecordUtils.java index ebf9ea38c..4404253d3 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageRecordUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageRecordUtils.java @@ -54,6 +54,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata; import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; +import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Commands.ChecksumType; @@ -352,15 +353,11 @@ public static MemoryRecords entriesToRecords(List { // each entry is a batched message ByteBuf metadataAndPayload = entry.getDataBuffer(); - totalBytes.getAndAdd(metadataAndPayload.readableBytes()); + consumerStats.bytesOutCounter = metadataAndPayload.readableBytes(); // Uncompress the payload if necessary MessageMetadata msgMetadata = Commands.parseMessageMetadata(metadataAndPayload); @@ -374,7 +371,7 @@ public static MemoryRecords entriesToRecords(List