From a2ffa5a458a326049c2f3ddb0bdf206115c56e97 Mon Sep 17 00:00:00 2001 From: gaoran10 Date: Tue, 19 Jan 2021 02:46:42 +0800 Subject: [PATCH] fix checkstyle --- .../pulsar/handlers/kop/KafkaChannelInitializer.java | 3 ++- .../pulsar/handlers/kop/KafkaCommandDecoder.java | 1 - .../pulsar/handlers/kop/KafkaProtocolHandler.java | 1 - .../pulsar/handlers/kop/KafkaRequestHandler.java | 12 +----------- .../pulsar/handlers/kop/TransactionTest.java | 3 ++- 5 files changed, 5 insertions(+), 15 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java index 8180430428..b4f6a49797 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java @@ -79,7 +79,8 @@ protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4)); ch.pipeline().addLast("handler", - new KafkaRequestHandler(pulsarService, kafkaConfig, groupCoordinator, transactionCoordinator, enableTls, advertisedEndPoint)); + new KafkaRequestHandler(pulsarService, kafkaConfig, + groupCoordinator, transactionCoordinator, enableTls, advertisedEndPoint)); } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index 20e9773512..1bdcda495d 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -102,7 +102,6 @@ protected KafkaHeaderAndRequest byteBufToRequest(ByteBuf msg) { protected KafkaHeaderAndRequest byteBufToRequest(ByteBuf msg, SocketAddress remoteAddress) { - log.info("byteBufToRequest msg {}", msg); checkArgument(msg.readableBytes() > 0); ByteBuffer nio = msg.nioBuffer(); RequestHeader header = RequestHeader.parse(nio); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java index 3df28a8a57..500805f41a 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java @@ -247,7 +247,6 @@ public void start(BrokerService service) { try { initGroupCoordinator(brokerService); startGroupCoordinator(); - initTransactionCoordinator(); // and listener for Offset topics load/unload brokerService.pulsar() .getNamespaceService() diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 448fc18b0e..1a05f32f85 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -591,9 +591,7 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, CompletableFuture resultFuture) { checkArgument(produceHar.getRequest() instanceof ProduceRequest); ProduceRequest produceRequest = (ProduceRequest) produceHar.getRequest(); - log.info("Command [handleProduceRequest] request: transactionalId {}", produceRequest.transactionalId()); if (produceRequest.transactionalId() != null) { - log.warn("[{}] Transactions not supported", ctx.channel()); // TODO auth check } @@ -1314,7 +1312,6 @@ protected void handleDescribeConfigs(KafkaHeaderAndRequest describeConfigs, protected void handleInitProducerId(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response) { InitProducerIdRequest request = (InitProducerIdRequest) kafkaHeaderAndRequest.getRequest(); - log.info("Command [handleInitProducerId] request: {}", request); transactionCoordinator.handleInitProducerId( request.transactionalId(), request.transactionTimeoutMs(), response); } @@ -1323,7 +1320,6 @@ protected void handleInitProducerId(KafkaHeaderAndRequest kafkaHeaderAndRequest, protected void handleAddPartitionsToTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response) { AddPartitionsToTxnRequest request = (AddPartitionsToTxnRequest) kafkaHeaderAndRequest.getRequest(); - log.info("Command [handleAddPartitionsToTxn] request: {}", request); transactionCoordinator.handleAddPartitionsToTransaction(request.transactionalId(), request.producerId(), request.producerEpoch(), request.partitions(), response); } @@ -1332,7 +1328,6 @@ protected void handleAddPartitionsToTxn(KafkaHeaderAndRequest kafkaHeaderAndRequ protected void handleAddOffsetsToTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response) { AddOffsetsToTxnRequest request = (AddOffsetsToTxnRequest) kafkaHeaderAndRequest.getRequest(); - log.info("Command [handleAddOffsetsToTxn] request: {}", request); int partition = groupCoordinator.partitionFor(request.consumerGroupId()); String offsetTopicName = groupCoordinator.getGroupManager().getOffsetConfig().offsetsTopicName(); transactionCoordinator.handleAddPartitionsToTransaction( @@ -1346,7 +1341,7 @@ protected void handleAddOffsetsToTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest protected void handleTxnOffsetCommit(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response) { TxnOffsetCommitRequest request = (TxnOffsetCommitRequest) kafkaHeaderAndRequest.getRequest(); - log.info("Command [handleTxnOffsetCommit] request: {}", request); + log.info("handleTxnOffsetCommit transactionalId: {}", request.transactionalId()); response.complete(new TxnOffsetCommitResponse(0, Collections.EMPTY_MAP)); } @@ -1354,7 +1349,6 @@ protected void handleTxnOffsetCommit(KafkaHeaderAndRequest kafkaHeaderAndRequest protected void handleEndTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response) { EndTxnRequest request = (EndTxnRequest) kafkaHeaderAndRequest.getRequest(); - log.info("Command [handleEndTxn] request: {}", request); transactionCoordinator.handleEndTransaction( request.transactionalId(), request.producerId(), @@ -1368,7 +1362,6 @@ protected void handleEndTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest, protected void handleWriteTxnMarkers(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response) { WriteTxnMarkersRequest request = (WriteTxnMarkersRequest) kafkaHeaderAndRequest.getRequest(); - log.info("Command [handleWriteTxnMarkers] request: {}", request); Map> resultMap = new HashMap<>(); List> offsetFutureList = new ArrayList<>(); for (WriteTxnMarkersRequest.TxnMarkerEntry txnMarkerEntry : request.markers()) { @@ -1434,9 +1427,6 @@ private CompletableFuture writeTxnMarker(TopicPartition topicPartition, 1, SystemTime.SYSTEM.milliseconds())); }); - offsetFuture.thenAccept(offset -> { - log.info("writeTxnMarker offset: {}", offset); - }); return offsetFuture; } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java index a9a68c3cef..6989e707e5 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java @@ -177,7 +177,8 @@ private void consumeTxnMessage(String topicName, boolean readFinish = false; for (ConsumerRecord record : consumerRecords) { - log.info("Fetch for receive record offset: {}, key: {}, value: {}", record.offset(), record.key(), record.value()); + log.info("Fetch for receive record offset: {}, key: {}, value: {}", + record.offset(), record.key(), record.value()); receiveCount.incrementAndGet(); if (lastMessage.equalsIgnoreCase(record.value())) { log.info("receive the last message");