Skip to content

Commit

Permalink
[ISSUE apache#3288]Refactor ConsumerGroupManager
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Feb 27, 2023
1 parent 8a014d4 commit be99e0f
Showing 1 changed file with 20 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,36 @@

public class ConsumerGroupManager {

protected AtomicBoolean started = new AtomicBoolean(Boolean.FALSE);
private final AtomicBoolean started = new AtomicBoolean(false);

protected AtomicBoolean inited = new AtomicBoolean(Boolean.FALSE);
private final AtomicBoolean inited = new AtomicBoolean(false);

private EventMeshHTTPServer eventMeshHTTPServer;
private final EventMeshHTTPServer eventMeshHTTPServer;

private EventMeshConsumer eventMeshConsumer;
private final EventMeshConsumer eventMeshConsumer;

private ConsumerGroupConf consumerGroupConfig;

public ConsumerGroupManager(EventMeshHTTPServer eventMeshHTTPServer, ConsumerGroupConf consumerGroupConfig) {
public ConsumerGroupManager(final EventMeshHTTPServer eventMeshHTTPServer, final ConsumerGroupConf consumerGroupConfig) {
this.eventMeshHTTPServer = eventMeshHTTPServer;
this.consumerGroupConfig = consumerGroupConfig;
eventMeshConsumer = new EventMeshConsumer(this.eventMeshHTTPServer, this.consumerGroupConfig);
this.eventMeshConsumer = new EventMeshConsumer(this.eventMeshHTTPServer, this.consumerGroupConfig);
}

public synchronized void init() throws Exception {
public void init() throws Exception {
if (!inited.compareAndSet(false, true)) {
return;
}
eventMeshConsumer.init();
inited.compareAndSet(false, true);

}

public synchronized void start() throws Exception {
public void start() throws Exception {
if (!started.compareAndSet(false, true)) {
return;
}
setupEventMeshConsumer(consumerGroupConfig);
eventMeshConsumer.start();
started.compareAndSet(false, true);
}

private synchronized void setupEventMeshConsumer(ConsumerGroupConf consumerGroupConfig) throws Exception {
Expand All @@ -62,12 +67,14 @@ private synchronized void setupEventMeshConsumer(ConsumerGroupConf consumerGroup
}
}

public synchronized void shutdown() throws Exception {
public void shutdown() throws Exception {
if (!started.compareAndSet(true, false)) {
return;
}
eventMeshConsumer.shutdown();
started.compareAndSet(true, false);
}

public synchronized void refresh(ConsumerGroupConf consumerGroupConfig) throws Exception {
public synchronized void refresh(final ConsumerGroupConf consumerGroupConfig) throws Exception {

if (consumerGroupConfig == null || this.consumerGroupConfig.equals(consumerGroupConfig)) {
return;
Expand Down

0 comments on commit be99e0f

Please sign in to comment.