Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat[consumer]: support custom thread pool, providing the possibility for different consumers to reuse thread pools #8994

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.rocketmq.client.consumer;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
Expand Down Expand Up @@ -170,6 +173,22 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
private long adjustThreadPoolNumsThreshold = 100000;

/**
* Concurrently max span offset.it has no effect on sequential consumption
* Lifecycle managed by the provider
*/
private ScheduledExecutorService consumeMessageScheduledExecutor;
/**
* Thread pool for handling expired messages
* Lifecycle managed by the provider
*/
private ScheduledExecutorService cleanExpireMsgScheduledExecutor;
/**
* Thread pool for handling normal messages
* Lifecycle managed by the provider
*/
private ExecutorService consumeExecutor;

/**
* Concurrently max span offset.it has no effect on sequential consumption
*/
Expand Down Expand Up @@ -927,6 +946,30 @@ public void setAdjustThreadPoolNumsThreshold(long adjustThreadPoolNumsThreshold)
this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold;
}

public ScheduledExecutorService getConsumeMessageScheduledExecutor() {
return consumeMessageScheduledExecutor;
}

public void setConsumeMessageScheduledExecutor(ScheduledExecutorService consumeMessageScheduledExecutor) {
this.consumeMessageScheduledExecutor = consumeMessageScheduledExecutor;
}

public ScheduledExecutorService getCleanExpireMsgScheduledExecutor() {
return cleanExpireMsgScheduledExecutor;
}

public void setCleanExpireMsgScheduledExecutor(ScheduledExecutorService cleanExpireMsgScheduledExecutor) {
this.cleanExpireMsgScheduledExecutor = cleanExpireMsgScheduledExecutor;
}

public ExecutorService getConsumeExecutor() {
return consumeExecutor;
}

public void setConsumeExecutor(ExecutorService consumeExecutor) {
this.consumeExecutor = consumeExecutor;
}

public int getMaxReconsumeTimes() {
return maxReconsumeTimes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -54,11 +55,14 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
private final DefaultMQPushConsumer defaultMQPushConsumer;
private final MessageListenerConcurrently messageListener;
private final BlockingQueue<Runnable> consumeRequestQueue;
private final ThreadPoolExecutor consumeExecutor;
private final ExecutorService consumeExecutor;
private final boolean isConsumeExecutorFromUser;
private final String consumerGroup;

private final ScheduledExecutorService scheduledExecutorService;
private final boolean isScheduledExecutorServiceFromUser;
private final ScheduledExecutorService cleanExpireMsgExecutors;
private final boolean isCleanExpireMsgExecutorsFromUser;

public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerConcurrently messageListener) {
Expand All @@ -68,18 +72,36 @@ public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPush
this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<>();

String consumerGroupTag = (consumerGroup.length() > 100 ? consumerGroup.substring(0, 100) : consumerGroup) + "_";
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroupTag));

this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag));
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_" + consumerGroupTag));
if (this.defaultMQPushConsumer.getConsumeExecutor() != null) {
this.consumeExecutor = this.defaultMQPushConsumer.getConsumeExecutor();
this.isConsumeExecutorFromUser = true;
} else {
this.isConsumeExecutorFromUser = false;
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroupTag));
}

if (this.defaultMQPushConsumer.getConsumeMessageScheduledExecutor() != null) {
this.isScheduledExecutorServiceFromUser = true;
this.scheduledExecutorService = this.defaultMQPushConsumer.getConsumeMessageScheduledExecutor();
} else {
this.isScheduledExecutorServiceFromUser = false;
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag));
}

if (this.defaultMQPushConsumer.getCleanExpireMsgScheduledExecutor() != null) {
this.isCleanExpireMsgExecutorsFromUser = true;
this.cleanExpireMsgExecutors = this.defaultMQPushConsumer.getCleanExpireMsgScheduledExecutor();
} else {
this.isCleanExpireMsgExecutorsFromUser = false;
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_" + consumerGroupTag));
}
}

public void start() {
Expand All @@ -98,17 +120,25 @@ public void run() {
}

public void shutdown(long awaitTerminateMillis) {
this.scheduledExecutorService.shutdown();
ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS);
this.cleanExpireMsgExecutors.shutdown();
if (!isScheduledExecutorServiceFromUser) {
this.scheduledExecutorService.shutdown();
}
if (!isConsumeExecutorFromUser) {
ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS);
}
if (!isCleanExpireMsgExecutorsFromUser) {
this.cleanExpireMsgExecutors.shutdown();
}
}

@Override
public void updateCorePoolSize(int corePoolSize) {
if (corePoolSize > 0
&& corePoolSize <= Short.MAX_VALUE
&& corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
this.consumeExecutor.setCorePoolSize(corePoolSize);
if (consumeExecutor instanceof ThreadPoolExecutor) {
((ThreadPoolExecutor)this.consumeExecutor).setCorePoolSize(corePoolSize);
}
}
}

Expand All @@ -124,7 +154,10 @@ public void decCorePoolSize() {

@Override
public int getCorePoolSize() {
return this.consumeExecutor.getCorePoolSize();
if (consumeExecutor instanceof ThreadPoolExecutor) {
return ((ThreadPoolExecutor)this.consumeExecutor).getCorePoolSize();
}
return -1;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -58,10 +59,12 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
private final DefaultMQPushConsumer defaultMQPushConsumer;
private final MessageListenerOrderly messageListener;
private final BlockingQueue<Runnable> consumeRequestQueue;
private final ThreadPoolExecutor consumeExecutor;
private final ExecutorService consumeExecutor;
private final boolean isConsumeExecutorFromUser;
private final String consumerGroup;
private final MessageQueueLock messageQueueLock = new MessageQueueLock();
private final ScheduledExecutorService scheduledExecutorService;
private final boolean isScheduledExecutorServiceFromUser;
private volatile boolean stopped = false;

public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
Expand All @@ -72,17 +75,28 @@ public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsu
this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<>();

String consumerGroupTag = (consumerGroup.length() > 100 ? consumerGroup.substring(0, 100) : consumerGroup) + "_";
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroupTag));

this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag));
if (this.defaultMQPushConsumer.getConsumeExecutor() != null) {
this.consumeExecutor = this.defaultMQPushConsumer.getConsumeExecutor();
this.isConsumeExecutorFromUser = true;
} else {
this.isConsumeExecutorFromUser = false;
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroupTag));
}

if (this.defaultMQPushConsumer.getConsumeMessageScheduledExecutor() != null) {
this.scheduledExecutorService = this.defaultMQPushConsumer.getConsumeMessageScheduledExecutor();
isScheduledExecutorServiceFromUser = true;
} else {
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag));
isScheduledExecutorServiceFromUser = false;
}
}

@Override
Expand All @@ -104,8 +118,12 @@ public void run() {
@Override
public void shutdown(long awaitTerminateMillis) {
this.stopped = true;
this.scheduledExecutorService.shutdown();
ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS);
if (!isScheduledExecutorServiceFromUser) {
this.scheduledExecutorService.shutdown();
}
if (!isConsumeExecutorFromUser) {
ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS);
}
if (MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
this.unlockAllMQ();
}
Expand All @@ -120,7 +138,9 @@ public void updateCorePoolSize(int corePoolSize) {
if (corePoolSize > 0
&& corePoolSize <= Short.MAX_VALUE
&& corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
this.consumeExecutor.setCorePoolSize(corePoolSize);
if (consumeExecutor instanceof ThreadPoolExecutor) {
((ThreadPoolExecutor)this.consumeExecutor).setCorePoolSize(corePoolSize);
}
}
}

Expand All @@ -134,7 +154,10 @@ public void decCorePoolSize() {

@Override
public int getCorePoolSize() {
return this.consumeExecutor.getCorePoolSize();
if (consumeExecutor instanceof ThreadPoolExecutor) {
return ((ThreadPoolExecutor)this.consumeExecutor).getCorePoolSize();
}
return -1;
}

@Override
Expand Down
Loading