Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
using new pulsar update state interface
Browse files Browse the repository at this point in the history
  • Loading branch information
dockerzhang committed Dec 14, 2020
1 parent a8f9c5d commit 1e7fed4
Showing 1 changed file with 5 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Commands.ChecksumType;

Expand Down Expand Up @@ -352,15 +353,11 @@ public static MemoryRecords entriesToRecords(List<org.apache.bookkeeper.mledger.
false, false,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
MAX_RECORDS_BUFFER_SIZE);
int batchSizes = entries.size();
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entries.size());
AtomicLong totalBytes = new AtomicLong(0);
AtomicInteger totalMessages = new AtomicInteger(0);

ConsumerStats consumerStats = new ConsumerStats();
entries.parallelStream().forEachOrdered(entry -> {
// each entry is a batched message
ByteBuf metadataAndPayload = entry.getDataBuffer();
totalBytes.getAndAdd(metadataAndPayload.readableBytes());
consumerStats.bytesOutCounter = metadataAndPayload.readableBytes();

// Uncompress the payload if necessary
MessageMetadata msgMetadata = Commands.parseMessageMetadata(metadataAndPayload);
Expand All @@ -374,7 +371,7 @@ public static MemoryRecords entriesToRecords(List<org.apache.bookkeeper.mledger.
throw new UncheckedIOException(ioe);
}
int numMessages = msgMetadata.getNumMessagesInBatch();
totalMessages.getAndAdd(numMessages);
consumerStats.msgOutCounter = numMessages;

boolean notBatchMessage = (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch());

Expand Down Expand Up @@ -431,8 +428,7 @@ public static MemoryRecords entriesToRecords(List<org.apache.bookkeeper.mledger.
payload.release();
entry.release();
});
consumer.updateStats(batchSizes, batchIndexesAcks, totalMessages.get(),
totalBytes.get(), 0);
consumer.updateStats(consumerStats);
return builder.build();
} catch (IOException ioe){
log.error("Meet IOException: {}", ioe);
Expand Down

0 comments on commit 1e7fed4

Please sign in to comment.