Skip to content

Commit

Permalink
dont shutdown thread pool from user
Browse files Browse the repository at this point in the history
  • Loading branch information
duanlinlin committed Nov 28, 2024
1 parent 9b6f373 commit b5a3d9b
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,17 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume

/**
* Concurrently max span offset.it has no effect on sequential consumption
* Lifecycle managed by the provider
*/
private ScheduledExecutorService consumeMessageScheduledExecutor;
/**
* Thread pool for handling expired messages
* Lifecycle managed by the provider
*/
private ScheduledExecutorService cleanExpireMsgScheduledExecutor;
/**
* Thread pool for handling normal messages
* Lifecycle managed by the provider
*/
private ExecutorService consumeExecutor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,13 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
private final MessageListenerConcurrently messageListener;
private final BlockingQueue<Runnable> consumeRequestQueue;
private final ExecutorService consumeExecutor;
private final boolean isConsumeExecutorFromUser;
private final String consumerGroup;

private final ScheduledExecutorService scheduledExecutorService;
private final boolean isScheduledExecutorServiceFromUser;
private final ScheduledExecutorService cleanExpireMsgExecutors;
private final boolean isCleanExpireMsgExecutorsFromUser;

public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerConcurrently messageListener) {
Expand All @@ -72,7 +75,9 @@ public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPush
String consumerGroupTag = (consumerGroup.length() > 100 ? consumerGroup.substring(0, 100) : consumerGroup) + "_";
if (this.defaultMQPushConsumer.getConsumeExecutor() != null) {
this.consumeExecutor = this.defaultMQPushConsumer.getConsumeExecutor();
this.isConsumeExecutorFromUser = true;
} else {
this.isConsumeExecutorFromUser = false;
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
Expand All @@ -83,14 +88,18 @@ public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPush
}

if (this.defaultMQPushConsumer.getConsumeMessageScheduledExecutor() != null) {
this.isScheduledExecutorServiceFromUser = true;
this.scheduledExecutorService = this.defaultMQPushConsumer.getConsumeMessageScheduledExecutor();
} else {
this.isScheduledExecutorServiceFromUser = false;
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag));
}

if (this.defaultMQPushConsumer.getCleanExpireMsgScheduledExecutor() != null) {
this.isCleanExpireMsgExecutorsFromUser = true;
this.cleanExpireMsgExecutors = this.defaultMQPushConsumer.getCleanExpireMsgScheduledExecutor();
} else {
this.isCleanExpireMsgExecutorsFromUser = false;
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_" + consumerGroupTag));
}
}
Expand All @@ -111,9 +120,15 @@ public void run() {
}

public void shutdown(long awaitTerminateMillis) {
this.scheduledExecutorService.shutdown();
ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS);
this.cleanExpireMsgExecutors.shutdown();
if (!isScheduledExecutorServiceFromUser) {
this.scheduledExecutorService.shutdown();
}
if (!isConsumeExecutorFromUser) {
ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS);
}
if (!isCleanExpireMsgExecutorsFromUser) {
this.cleanExpireMsgExecutors.shutdown();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
private final MessageListenerOrderly messageListener;
private final BlockingQueue<Runnable> consumeRequestQueue;
private final ExecutorService consumeExecutor;
private final boolean isConsumeExecutorFromUser;
private final String consumerGroup;
private final MessageQueueLock messageQueueLock = new MessageQueueLock();
private final ScheduledExecutorService scheduledExecutorService;
private final boolean isScheduledExecutorServiceFromUser;
private volatile boolean stopped = false;

public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
Expand All @@ -76,7 +78,9 @@ public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsu
String consumerGroupTag = (consumerGroup.length() > 100 ? consumerGroup.substring(0, 100) : consumerGroup) + "_";
if (this.defaultMQPushConsumer.getConsumeExecutor() != null) {
this.consumeExecutor = this.defaultMQPushConsumer.getConsumeExecutor();
this.isConsumeExecutorFromUser = true;
} else {
this.isConsumeExecutorFromUser = false;
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
Expand All @@ -88,8 +92,10 @@ public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsu

if (this.defaultMQPushConsumer.getConsumeMessageScheduledExecutor() != null) {
this.scheduledExecutorService = this.defaultMQPushConsumer.getConsumeMessageScheduledExecutor();
isScheduledExecutorServiceFromUser = true;
} else {
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag));
isScheduledExecutorServiceFromUser = false;
}
}

Expand All @@ -112,8 +118,12 @@ public void run() {
@Override
public void shutdown(long awaitTerminateMillis) {
this.stopped = true;
this.scheduledExecutorService.shutdown();
ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS);
if (!isScheduledExecutorServiceFromUser) {
this.scheduledExecutorService.shutdown();
}
if (!isConsumeExecutorFromUser) {
ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS);
}
if (MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
this.unlockAllMQ();
}
Expand Down

0 comments on commit b5a3d9b

Please sign in to comment.