Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
create consumer sync
Browse files Browse the repository at this point in the history
  • Loading branch information
dockerzhang committed Dec 23, 2020
1 parent 773a773 commit 1860bf9
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch,
.pulsar().getNamespaceService().getBundle(topicName);
// make sure internal consumer existed
requestHandler.getGroupCoordinator()
.getoffsetAcker().getConsumer(groupId, kafkaPartition).get();
.getoffsetAcker().getConsumer(groupId, kafkaPartition);

PersistentTopic persistentTopic = (PersistentTopic) requestHandler.pulsarService
.getBrokerService().getMultiLayerTopicsMap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
Expand All @@ -29,6 +28,7 @@
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
Expand All @@ -49,15 +49,14 @@ public OffsetAcker(PulsarClientImpl pulsarClient) {
}

// map off consumser: <groupId, consumers>
Map<String, Map<TopicPartition, CompletableFuture<Consumer<byte[]>>>> consumers = new ConcurrentHashMap<>();
Map<String, Map<TopicPartition, Consumer<byte[]>>> consumers = new ConcurrentHashMap<>();

public void addOffsetsTracker(String groupId, byte[] assignment) {
ByteBuffer assignBuffer = ByteBuffer.wrap(assignment);
Assignment assign = ConsumerProtocol.deserializeAssignment(assignBuffer);
if (log.isDebugEnabled()) {
log.debug(" Add offsets after sync group: {}", assign.toString());
}
assign.partitions().forEach(topicPartition -> getConsumer(groupId, topicPartition));
}

public void ackOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsetMetadata) {
Expand All @@ -68,34 +67,20 @@ public void ackOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> of
partition, MessageIdUtils.getPosition(metadata.offset())));
}
offsetMetadata.forEach(((topicPartition, offsetAndMetadata) -> {
// 1. get consumer, then do ackCumulative
CompletableFuture<Consumer<byte[]>> consumerFuture = getConsumer(groupId, topicPartition);

consumerFuture.whenComplete((consumer, throwable) -> {
if (throwable != null) {
log.warn("Error when get consumer for offset ack:", throwable);
return;
}
MessageId messageId = MessageIdUtils.getMessageId(offsetAndMetadata.offset());
consumer.acknowledgeCumulativeAsync(messageId);
});
Consumer<byte[]> consumer = getConsumer(groupId, topicPartition);
MessageId messageId = MessageIdUtils.getMessageId(offsetAndMetadata.offset());
consumer.acknowledgeCumulativeAsync(messageId);
}));
}

public void close(Set<String> groupIds) {
groupIds.forEach(groupId -> consumers.get(groupId).values().forEach(consumerFuture -> {
consumerFuture.whenComplete((consumer, throwable) -> {
if (throwable != null) {
log.warn("Error when get consumer for consumer group close:", throwable);
return;
}
groupIds.forEach(groupId -> consumers.get(groupId).values().forEach(consumer -> {
try {
consumer.close();
} catch (Exception e) {
log.warn("Error when close consumer topic: {}, sub: {}.",
consumer.getTopic(), consumer.getSubscription(), e);
}
});
}));
}

Expand All @@ -105,20 +90,26 @@ public void close() {
close(consumers.keySet());
}

public CompletableFuture<Consumer<byte[]>> getConsumer(String groupId, TopicPartition topicPartition) {
Map<TopicPartition, CompletableFuture<Consumer<byte[]>>> group = consumers
public Consumer<byte[]> getConsumer(String groupId, TopicPartition topicPartition) {
Map<TopicPartition, Consumer<byte[]>> group = consumers
.computeIfAbsent(groupId, gid -> new ConcurrentHashMap<>());
return group.computeIfAbsent(
topicPartition,
partition -> createConsumer(groupId, partition));
}

private CompletableFuture<Consumer<byte[]>> createConsumer(String groupId, TopicPartition topicPartition) {
private Consumer<byte[]> createConsumer(String groupId, TopicPartition topicPartition) {
KopTopic kopTopic = new KopTopic(topicPartition.topic());
return consumerBuilder.clone()
.topic(kopTopic.getPartitionName(topicPartition.partition()))
.subscriptionName(groupId)
.subscribeAsync();
Consumer<byte[]> consumer = null;
try {
consumer = consumerBuilder.clone()
.topic(kopTopic.getPartitionName(topicPartition.partition()))
.subscriptionName(groupId)
.subscribe();
} catch (PulsarClientException e) {
log.error("create consumer error", e);
}
return consumer;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,9 @@ public static MemoryRecords entriesToRecords(List<org.apache.bookkeeper.mledger.
payload.release();
entry.release();
});
consumer.updateStats(consumerStats);
if (consumer != null) {
consumer.updateStats(consumerStats);
}
return builder.build();
} catch (IOException ioe){
log.error("Meet IOException: {}", ioe);
Expand Down

0 comments on commit 1860bf9

Please sign in to comment.