Skip to content

Commit

Permalink
[ISSUE apache#9115] Optimize the broker's reverse notification for co…
Browse files Browse the repository at this point in the history
…nsumerId change
  • Loading branch information
yx9o committed Jan 9, 2025
1 parent e0db654 commit 6a47b12
Showing 1 changed file with 44 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.constant.LoggerName;
Expand All @@ -41,6 +43,8 @@ public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListen

private ConcurrentHashMap<String,List<Channel>> consumerChannelMap = new ConcurrentHashMap<>(cacheSize);

private final ConcurrentHashMap<String, NotifyTaskControl> activeGroupNotifyMap = new ConcurrentHashMap<>();

public DefaultConsumerIdsChangeListener(BrokerController brokerController) {
this.brokerController = brokerController;

Expand Down Expand Up @@ -70,9 +74,25 @@ public void handle(ConsumerGroupEvent event, String group, Object... args) {
List<Channel> channels = (List<Channel>) args[0];
if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
if (this.brokerController.getBrokerConfig().isRealTimeNotifyConsumerChange()) {
for (Channel chl : channels) {
NotifyTaskControl currentNotifyTaskControl = new NotifyTaskControl(channels);
activeGroupNotifyMap.compute(group, (k, oldVal) -> {
if (null != oldVal) {
oldVal.interrupt();
}
return currentNotifyTaskControl;
});

boolean isNormalCompletion = true;
for (Channel chl : currentNotifyTaskControl.getChannels()) {
if (currentNotifyTaskControl.isInterrupted()) {
isNormalCompletion = false;
break;
}
this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
}
if (isNormalCompletion) {
activeGroupNotifyMap.remove(group);
}
} else {
consumerChannelMap.put(group, channels);
}
Expand Down Expand Up @@ -125,4 +145,27 @@ private void notifyConsumerChange() {
public void shutdown() {
this.scheduledExecutorService.shutdown();
}

private static class NotifyTaskControl {

private final AtomicBoolean interrupted = new AtomicBoolean(false);

private final List<Channel> channels;

public NotifyTaskControl(List<Channel> channels) {
this.channels = channels;
}

public boolean isInterrupted() {
return interrupted.get();
}

public void interrupt() {
interrupted.set(true);
}

public List<Channel> getChannels() {
return channels;
}
}
}

0 comments on commit 6a47b12

Please sign in to comment.