Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Avoid creating offset consumer before updating consumer metrics (#426)
Browse files Browse the repository at this point in the history
### Motivation
Sometimes multiple brokers could subscribe the same topic partition and fail with `ConsumerBusyException`:

```
WARN  io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetAcker - Error when get consumer for offset ack:
  java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$ConsumerBusyException: Exclusive consumer is already connected
```

This problem was mentioned in #263 before. However, switching the subscription type to shared or failover is not correct because a partition should have only one consumer at the same time. The problem is before updating consumer metrics, it uses `OffsetAcker#getConsumer` to check if the consumer has been created. This check is wrong.

Firstly, `OffsetAcker#getConsumer` may create an offset consumer, while the actual offset consumer could have been created by another broker.
Secondly, it only checks if the future of consumer is done and returns immediately if not. It could cause a situation that this metrics update was skipped.

### Modifications

- Modify the condition check in `KafkaTopicConsumer#getGroupConsumers`. Because the offset consumer could be created by `GroupCoordinator#handleSyncGroup` before, here we just check if the consumer existed. Then update the consumer metrics when the future is completed.
- Remove the failed future of offset consumers.
- Add consumer stats tests to guarantee the modification works.
  • Loading branch information
BewareMyPower authored Apr 8, 2021
1 parent 643f573 commit d0053b1
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

import static com.google.common.base.Preconditions.checkState;

import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetAcker;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -27,6 +29,7 @@

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -379,14 +382,25 @@ public void deReference(String topicName) {
}

public CompletableFuture<Consumer> getGroupConsumers(String groupId, TopicPartition kafkaPartition) {
// make sure internal consumer existed
CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
if (groupId == null || groupId.isEmpty() || !requestHandler.getGroupCoordinator()
.getOffsetAcker().getConsumer(groupId, kafkaPartition).isDone()) {
log.warn("not get consumer for group {} this time", groupId);
consumerFuture.complete(null);
return consumerFuture;
if (StringUtils.isEmpty(groupId)) {
if (log.isDebugEnabled()) {
log.debug("Try to get group consumers with an empty group id");
}
return CompletableFuture.completedFuture(null);
}

// The future of the offset consumer should be created before in `GroupCoordinator#handleSyncGroup`
final OffsetAcker offsetAcker = requestHandler.getGroupCoordinator().getOffsetAcker();
final CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> offsetConsumerFuture =
offsetAcker.getConsumer(groupId, kafkaPartition);
if (offsetConsumerFuture == null) {
if (log.isDebugEnabled()) {
log.debug("No offset consumer for [group={}] [topic={}]", groupId, kafkaPartition);
}
return CompletableFuture.completedFuture(null);
}

CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
return CONSUMERS_CACHE.computeIfAbsent(groupId, group -> {
try {
TopicName topicName = TopicName.get(KopTopic.toString(kafkaPartition));
Expand All @@ -396,10 +410,28 @@ public CompletableFuture<Consumer> getGroupConsumers(String groupId, TopicPartit
.getBrokerService().getMultiLayerTopicsMap()
.get(topicName.getNamespace()).get(namespaceBundle.toString())
.get(topicName.toString());
// only one consumer existed for internal subscription
Consumer consumer = persistentTopic.getSubscriptions()
.get(groupId).getDispatcher().getConsumers().get(0);
consumerFuture.complete(consumer);
// The `Consumer` in broker side won't be created until the `Consumer` in client side subscribes
// successfully, so we should wait until offset consumer's future is completed.
offsetConsumerFuture.whenComplete((ignored, e) -> {
if (e != null) {
log.warn("Failed to create offset consumer for [group={}] [topic={}]: {}",
groupId, kafkaPartition, e.getMessage());
offsetAcker.removeConsumer(groupId, kafkaPartition);
// Here we don't return because the `Consumer` in broker side may be created already
}
// Double check for if the `Consumer` in broker side has been created
final List<Consumer> consumers =
persistentTopic.getSubscriptions().get(groupId).getDispatcher().getConsumers();
if (consumers.isEmpty()) {
log.error("There's no internal consumer for [group={}]", groupId);
consumerFuture.complete(null);
return;
}
// only one consumer existed for internal subscription
final Consumer consumer = persistentTopic.getSubscriptions()
.get(groupId).getDispatcher().getConsumers().get(0);
consumerFuture.complete(consumer);
});
} catch (Exception e) {
log.error("get topic error", e);
consumerFuture.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import io.streamnative.pulsar.handlers.kop.utils.OffsetSearchPredicate;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
Expand All @@ -43,9 +45,21 @@
@Slf4j
public class OffsetAcker implements Closeable {

private static final Map<TopicPartition, CompletableFuture<Consumer<byte[]>>> EMPTY_CONSUMERS = new HashMap<>();

private final ConsumerBuilder<byte[]> consumerBuilder;
private final BrokerService brokerService;

// A map whose
// key is group id,
// value is a map whose
// key is the partition,
// value is the created future of consumer.
// The consumer, whose subscription is the group id, is used for acknowledging message id cumulatively.
// This behavior is equivalent to committing offsets in Kafka.
private final Map<String, Map<TopicPartition, CompletableFuture<Consumer<byte[]>>>>
consumers = new ConcurrentHashMap<>();

public OffsetAcker(PulsarClientImpl pulsarClient) {
this.consumerBuilder = pulsarClient.newConsumer()
.receiverQueueSize(0)
Expand All @@ -60,16 +74,13 @@ public OffsetAcker(PulsarClientImpl pulsarClient, BrokerService brokerService) {
this.brokerService = brokerService;
}

// map off consumser: <groupId, consumers>
Map<String, Map<TopicPartition, CompletableFuture<Consumer<byte[]>>>> consumers = new ConcurrentHashMap<>();

public void addOffsetsTracker(String groupId, byte[] assignment) {
ByteBuffer assignBuffer = ByteBuffer.wrap(assignment);
Assignment assign = ConsumerProtocol.deserializeAssignment(assignBuffer);
if (log.isDebugEnabled()) {
log.debug(" Add offsets after sync group: {}", assign.toString());
}
assign.partitions().forEach(topicPartition -> getConsumer(groupId, topicPartition));
assign.partitions().forEach(topicPartition -> getOrCreateConsumer(groupId, topicPartition));
}

public void ackOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsetMetadata) {
Expand All @@ -81,11 +92,13 @@ public void ackOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> of
}
offsetMetadata.forEach(((topicPartition, offsetAndMetadata) -> {
// 1. get consumer, then do ackCumulative
CompletableFuture<Consumer<byte[]>> consumerFuture = getConsumer(groupId, topicPartition);
CompletableFuture<Consumer<byte[]>> consumerFuture = getOrCreateConsumer(groupId, topicPartition);

consumerFuture.whenComplete((consumer, throwable) -> {
if (throwable != null) {
log.warn("Error when get consumer for offset ack:", throwable);
log.warn("Failed to create offset consumer for [group={}] [topic={}]: {}",
groupId, topicPartition, throwable.getMessage());
removeConsumer(groupId, topicPartition);
return;
}
KopTopic kopTopic = new KopTopic(topicPartition.topic());
Expand Down Expand Up @@ -125,7 +138,12 @@ public void ackOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> of

public void close(Set<String> groupIds) {
for (String groupId : groupIds) {
consumers.remove(groupId).forEach((topicPartition, consumerFuture) -> {
final Map<TopicPartition, CompletableFuture<Consumer<byte[]>>>
consumersToRemove = consumers.remove(groupId);
if (consumersToRemove == null) {
continue;
}
consumersToRemove.forEach((topicPartition, consumerFuture) -> {
if (!consumerFuture.isDone()) {
log.warn("Consumer of [group={}] [topic={}] is not done while being closed",
groupId, topicPartition);
Expand All @@ -148,14 +166,16 @@ public void close() {
close(consumers.keySet());
}

public CompletableFuture<Consumer<byte[]>> getConsumer(String groupId, TopicPartition topicPartition) {
@NonNull
public CompletableFuture<Consumer<byte[]>> getOrCreateConsumer(String groupId, TopicPartition topicPartition) {
Map<TopicPartition, CompletableFuture<Consumer<byte[]>>> group = consumers
.computeIfAbsent(groupId, gid -> new ConcurrentHashMap<>());
return group.computeIfAbsent(
topicPartition,
partition -> createConsumer(groupId, partition));
}

@NonNull
private CompletableFuture<Consumer<byte[]>> createConsumer(String groupId, TopicPartition topicPartition) {
KopTopic kopTopic = new KopTopic(topicPartition.topic());
return consumerBuilder.clone()
Expand All @@ -164,4 +184,22 @@ private CompletableFuture<Consumer<byte[]>> createConsumer(String groupId, Topic
.subscribeAsync();
}

public CompletableFuture<Consumer<byte[]>> getConsumer(String groupId, TopicPartition topicPartition) {
return consumers.getOrDefault(groupId, EMPTY_CONSUMERS).get(topicPartition);
}

public void removeConsumer(String groupId, TopicPartition topicPartition) {
final CompletableFuture<Consumer<byte[]>> consumerFuture =
consumers.getOrDefault(groupId, EMPTY_CONSUMERS).remove(topicPartition);
if (consumerFuture != null) {
consumerFuture.whenComplete((consumer, e) -> {
if (e == null) {
consumer.closeAsync();
} else {
log.error("Failed to create consumer for [group={}] [topic={}]: {}",
groupId, topicPartition, e.getMessage());
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.testng.annotations.Test;

/**
Expand Down Expand Up @@ -59,6 +66,7 @@ public void testDeleteClosedTopics() throws Exception {
final String topic = "test-delete-closed-topics";
final List<String> expectedMessages = Collections.singletonList("msg");

admin.topics().createPartitionedTopic(topic, 1);
final KafkaProducer<String, String> kafkaProducer = newKafkaProducer();
sendSingleMessages(kafkaProducer, topic, expectedMessages);

Expand Down Expand Up @@ -97,6 +105,39 @@ public void testDeleteClosedTopics() throws Exception {
}

kafkaConsumer2.close();
Thread.sleep(500); // Wait for consumers closed
admin.topics().deletePartitionedTopic(topic);
}

@Test(timeOut = 20000)
public void testKafkaConsumerMetrics() throws Exception {
final String topic = "test-kafka-consumer-metrics";
final String group = "group-test-kafka-consumer-metrics";
final List<String> expectedMessages = Arrays.asList("A", "B", "C");

@Cleanup
final KafkaProducer<String, String> kafkaProducer = newKafkaProducer();
sendSingleMessages(kafkaProducer, topic, expectedMessages);

final Properties consumerProps = newKafkaConsumerProperties();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, group);
@Cleanup
final KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
kafkaConsumer.subscribe(Collections.singleton(topic));
List<String> kafkaReceives = receiveMessages(kafkaConsumer, expectedMessages.size());
assertEquals(kafkaReceives, expectedMessages);

// Check stats
final TopicName topicName = TopicName.get(KopTopic.toString(new TopicPartition(topic, 0)));
final PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getMultiLayerTopicsMap()
.get(topicName.getNamespace())
.get(pulsar.getNamespaceService().getBundle(topicName).toString())
.get(topicName.toString());
final ConsumerStats stats =
persistentTopic.getSubscriptions().get(group).getDispatcher().getConsumers().get(0).getStats();
log.info("Consumer stats: [msgOutCounter={}] [bytesOutCounter={}]",
stats.msgOutCounter, stats.bytesOutCounter);
assertEquals(stats.msgOutCounter, expectedMessages.size());
assertTrue(stats.bytesOutCounter > 0);
}
}

0 comments on commit d0053b1

Please sign in to comment.