Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
fix checkstyle
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoran10 committed Jan 19, 2021
1 parent e93c6a4 commit a2ffa5a
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
ch.pipeline().addLast("handler",
new KafkaRequestHandler(pulsarService, kafkaConfig, groupCoordinator, transactionCoordinator, enableTls, advertisedEndPoint));
new KafkaRequestHandler(pulsarService, kafkaConfig,
groupCoordinator, transactionCoordinator, enableTls, advertisedEndPoint));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ protected KafkaHeaderAndRequest byteBufToRequest(ByteBuf msg) {

protected KafkaHeaderAndRequest byteBufToRequest(ByteBuf msg,
SocketAddress remoteAddress) {
log.info("byteBufToRequest msg {}", msg);
checkArgument(msg.readableBytes() > 0);
ByteBuffer nio = msg.nioBuffer();
RequestHeader header = RequestHeader.parse(nio);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ public void start(BrokerService service) {
try {
initGroupCoordinator(brokerService);
startGroupCoordinator();
initTransactionCoordinator();
// and listener for Offset topics load/unload
brokerService.pulsar()
.getNamespaceService()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,9 +591,7 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(produceHar.getRequest() instanceof ProduceRequest);
ProduceRequest produceRequest = (ProduceRequest) produceHar.getRequest();
log.info("Command [handleProduceRequest] request: transactionalId {}", produceRequest.transactionalId());
if (produceRequest.transactionalId() != null) {
log.warn("[{}] Transactions not supported", ctx.channel());
// TODO auth check
}

Expand Down Expand Up @@ -1314,7 +1312,6 @@ protected void handleDescribeConfigs(KafkaHeaderAndRequest describeConfigs,
protected void handleInitProducerId(KafkaHeaderAndRequest kafkaHeaderAndRequest,
CompletableFuture<AbstractResponse> response) {
InitProducerIdRequest request = (InitProducerIdRequest) kafkaHeaderAndRequest.getRequest();
log.info("Command [handleInitProducerId] request: {}", request);
transactionCoordinator.handleInitProducerId(
request.transactionalId(), request.transactionTimeoutMs(), response);
}
Expand All @@ -1323,7 +1320,6 @@ protected void handleInitProducerId(KafkaHeaderAndRequest kafkaHeaderAndRequest,
protected void handleAddPartitionsToTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest,
CompletableFuture<AbstractResponse> response) {
AddPartitionsToTxnRequest request = (AddPartitionsToTxnRequest) kafkaHeaderAndRequest.getRequest();
log.info("Command [handleAddPartitionsToTxn] request: {}", request);
transactionCoordinator.handleAddPartitionsToTransaction(request.transactionalId(),
request.producerId(), request.producerEpoch(), request.partitions(), response);
}
Expand All @@ -1332,7 +1328,6 @@ protected void handleAddPartitionsToTxn(KafkaHeaderAndRequest kafkaHeaderAndRequ
protected void handleAddOffsetsToTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest,
CompletableFuture<AbstractResponse> response) {
AddOffsetsToTxnRequest request = (AddOffsetsToTxnRequest) kafkaHeaderAndRequest.getRequest();
log.info("Command [handleAddOffsetsToTxn] request: {}", request);
int partition = groupCoordinator.partitionFor(request.consumerGroupId());
String offsetTopicName = groupCoordinator.getGroupManager().getOffsetConfig().offsetsTopicName();
transactionCoordinator.handleAddPartitionsToTransaction(
Expand All @@ -1346,15 +1341,14 @@ protected void handleAddOffsetsToTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest
protected void handleTxnOffsetCommit(KafkaHeaderAndRequest kafkaHeaderAndRequest,
CompletableFuture<AbstractResponse> response) {
TxnOffsetCommitRequest request = (TxnOffsetCommitRequest) kafkaHeaderAndRequest.getRequest();
log.info("Command [handleTxnOffsetCommit] request: {}", request);
log.info("handleTxnOffsetCommit transactionalId: {}", request.transactionalId());
response.complete(new TxnOffsetCommitResponse(0, Collections.EMPTY_MAP));
}

@Override
protected void handleEndTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest,
CompletableFuture<AbstractResponse> response) {
EndTxnRequest request = (EndTxnRequest) kafkaHeaderAndRequest.getRequest();
log.info("Command [handleEndTxn] request: {}", request);
transactionCoordinator.handleEndTransaction(
request.transactionalId(),
request.producerId(),
Expand All @@ -1368,7 +1362,6 @@ protected void handleEndTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest,
protected void handleWriteTxnMarkers(KafkaHeaderAndRequest kafkaHeaderAndRequest,
CompletableFuture<AbstractResponse> response) {
WriteTxnMarkersRequest request = (WriteTxnMarkersRequest) kafkaHeaderAndRequest.getRequest();
log.info("Command [handleWriteTxnMarkers] request: {}", request);
Map<Long, Map<TopicPartition, Errors>> resultMap = new HashMap<>();
List<CompletableFuture<Long>> offsetFutureList = new ArrayList<>();
for (WriteTxnMarkersRequest.TxnMarkerEntry txnMarkerEntry : request.markers()) {
Expand Down Expand Up @@ -1434,9 +1427,6 @@ private CompletableFuture<Long> writeTxnMarker(TopicPartition topicPartition,
1,
SystemTime.SYSTEM.milliseconds()));
});
offsetFuture.thenAccept(offset -> {
log.info("writeTxnMarker offset: {}", offset);
});
return offsetFuture;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ private void consumeTxnMessage(String topicName,

boolean readFinish = false;
for (ConsumerRecord<Integer, String> record : consumerRecords) {
log.info("Fetch for receive record offset: {}, key: {}, value: {}", record.offset(), record.key(), record.value());
log.info("Fetch for receive record offset: {}, key: {}, value: {}",
record.offset(), record.key(), record.value());
receiveCount.incrementAndGet();
if (lastMessage.equalsIgnoreCase(record.value())) {
log.info("receive the last message");
Expand Down

0 comments on commit a2ffa5a

Please sign in to comment.