Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #483] Virtual thread compatible #508

Closed
wants to merge 45 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
f5f25f5
virtual thread兼容,通过VirtualThreadExecutorAdapter进行声明
TheFatRatre Oct 14, 2024
cc83c79
bean instanceof ThreadContainer && bean instanceof ExecutorService 的…
TheFatRatre Oct 14, 2024
0d0c160
使用Executors.newVirtualThreadPerTaskExecutor()进行声明
TheFatRatre Oct 15, 2024
74f64c6
format
TheFatRatre Oct 15, 2024
17a913b
以注释的方式来声明executor的实现
TheFatRatre Oct 16, 2024
706d887
动态配置的实现思路
TheFatRatre Oct 16, 2024
f05dd26
proxy的重构,这样写Bean无法注册,耦合性也高
TheFatRatre Oct 16, 2024
a85826c
移除jdk21特性使之兼容老版本
TheFatRatre Oct 21, 2024
4a93554
Merge remote-tracking branch 'origin/virtual-thread' into virtual-thread
TheFatRatre Oct 21, 2024
4495be0
fix
TheFatRatre Oct 21, 2024
a729e11
配置中心进行配置,可以获取到executor并执行但spring没有感知到此bean,即postProcessAfterInitializ…
TheFatRatre Oct 21, 2024
6a1e1a9
解决配置中心配置VT未被登记的问题
TheFatRatre Nov 4, 2024
3755c8b
doc
TheFatRatre Nov 4, 2024
db44cd8
Merge branch 'springboot3' into virtual-thread
TheFatRatre Nov 4, 2024
0a38447
监控实现思路
TheFatRatre Nov 4, 2024
f4b5804
Merge remote-tracking branch 'origin/virtual-thread' into virtual-thread
TheFatRatre Nov 4, 2024
c791fd2
监控模块部分实现,VTExecutorStats为期望能获取到的数据
TheFatRatre Nov 5, 2024
ebc5e0e
监控基本实现,留了一些保留字段
TheFatRatre Nov 7, 2024
e29ebd4
complete EsCollector,format
TheFatRatre Nov 13, 2024
554f3f1
Merge remote-tracking branch 'origin/virtual-thread' into virtual-thread
TheFatRatre Nov 13, 2024
3dc7ee6
fix conflict and adapt
TheFatRatre Nov 13, 2024
0b18056
Merge branch 'springboot3' into virtual-thread
TheFatRatre Nov 13, 2024
4154092
fix and format
TheFatRatre Nov 13, 2024
d9b5d4d
fix dependency
TheFatRatre Nov 13, 2024
119d2e2
update mvn jdk
TheFatRatre Nov 13, 2024
7944b9b
try fix gitgub err
TheFatRatre Nov 13, 2024
33a4214
del
TheFatRatre Nov 13, 2024
fe40c63
format
TheFatRatre Nov 13, 2024
f449863
core rebuild and JDK version check
TheFatRatre Nov 15, 2024
c40e854
重构优化
TheFatRatre Nov 18, 2024
4027731
Merge branch 'springboot3' into virtual-thread
TheFatRatre Nov 18, 2024
2f7a99d
format
TheFatRatre Nov 18, 2024
a812aff
Merge remote-tracking branch 'origin/virtual-thread' into virtual-thread
TheFatRatre Nov 18, 2024
a96658e
Merge branch 'virtual-thread' into virtual-thread-spring-boot3
TheFatRatre Nov 18, 2024
b4e5dcb
fix
TheFatRatre Nov 18, 2024
7eaf4ca
rebuild
TheFatRatre Nov 20, 2024
15929e1
format
TheFatRatre Nov 20, 2024
8457c52
format
TheFatRatre Nov 20, 2024
69ec089
rebuild and fix
TheFatRatre Nov 21, 2024
84d4d30
Merge branch 'dromara:master' into virtual-thread
TheFatRatre Nov 22, 2024
21e3b41
rebuild
TheFatRatre Nov 22, 2024
d063f36
optimize
TheFatRatre Nov 26, 2024
36dc4c4
rebuild
TheFatRatre Nov 27, 2024
36cd88a
optimize
TheFatRatre Dec 2, 2024
64afeae
remane
TheFatRatre Dec 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions common/src/main/java/org/dromara/dynamictp/common/em/JreEnum.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public enum JreEnum {

public static final String DEFAULT_JAVA_VERSION = "1.8";

private static final int JRE_VERSION_OFFSET = 8;

/**
* get current JRE version
*
Expand All @@ -72,6 +74,14 @@ public static JreEnum currentVersion() {
return VERSION;
}

/**
* get current JRE integer version
* @return JRE integer version
*/
public static int currentIntVersion() {
return JreEnum.currentVersion().ordinal() + JRE_VERSION_OFFSET;
}

/**
* is current version
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,6 @@ public class ExecutorStats extends Metrics {
*/
private String executorAliasName;

/**
* 线程池名字
*/
@Deprecated
private String poolName;

/**
* 线程池别名
*/
@Deprecated
private String poolAliasName;

/**
* 核心线程数
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private static void refresh(ExecutorWrapper executorWrapper, DtpExecutorProps pr
TpMainFields oldFields = ExecutorConverter.toMainFields(executorWrapper);
doRefresh(executorWrapper, props);
TpMainFields newFields = ExecutorConverter.toMainFields(executorWrapper);
if (oldFields.equals(newFields) && !executorWrapper.isVirtualThreadExecutor()) {
if (oldFields.equals(newFields)) {
log.debug("DynamicTp refresh, main properties of [{}] have not changed.",
executorWrapper.getThreadPoolName());
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ public static ExecutorStats toMetrics(ExecutorWrapper wrapper) {
ExecutorStats executorStats = convertCommon(executor);
executorStats.setExecutorName(wrapper.getThreadPoolName());
executorStats.setExecutorAliasName(wrapper.getThreadPoolAliasName());
executorStats.setPoolName(wrapper.getThreadPoolName());
executorStats.setPoolAliasName(wrapper.getThreadPoolAliasName());
executorStats.setRunTimeoutCount(provider.getRunTimeoutCount());
executorStats.setQueueTimeoutCount(provider.getQueueTimeoutCount());
executorStats.setRejectCount(provider.getRejectedTaskCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,19 @@ public class JMXCollector extends AbstractCollector {

@Override
public void collect(ExecutorStats threadPoolStats) {
if (GAUGE_CACHE.containsKey(threadPoolStats.getPoolName())) {
ExecutorStats poolStats = (ExecutorStats) GAUGE_CACHE.get(threadPoolStats.getPoolName());
if (GAUGE_CACHE.containsKey(threadPoolStats.getExecutorName())) {
ExecutorStats poolStats = (ExecutorStats) GAUGE_CACHE.get(threadPoolStats.getExecutorName());
BeanCopierUtil.copyProperties(threadPoolStats, poolStats);
} else {
try {
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
ObjectName name = new ObjectName(DTP_METRIC_NAME_PREFIX + ":name=" + threadPoolStats.getPoolName());
ObjectName name = new ObjectName(DTP_METRIC_NAME_PREFIX + ":name=" + threadPoolStats.getExecutorName());
ThreadPoolStatsJMX stats = new ThreadPoolStatsJMX(threadPoolStats);
server.registerMBean(stats, name);
} catch (JMException e) {
log.error("collect thread pool stats error", e);
}
GAUGE_CACHE.put(threadPoolStats.getPoolName(), threadPoolStats);
GAUGE_CACHE.put(threadPoolStats.getExecutorName(), threadPoolStats);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private Object registerAndReturnCommon(Object bean, String beanName) {
} else {
BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanName);
if (!(beanDefinition instanceof AnnotatedBeanDefinition)) {
if (beanDefinition.getBeanClassName().equals("org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy")) {
if (VirtualThreadExecutorProxy.class.getName().equals("org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy")) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是"org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy"换成VirtualThreadExecutorProxy.class.getName()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是"org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy"换成VirtualThreadExecutorProxy.class.getName()

是指反过来equal对不

return doRegisterAndReturnCommon(bean, beanName);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

使用VirtualThreadExecutorProxy.class.getName(),后面维护也简单

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里doRegisterAndReturnCommon调用类似原来的代码最后再调用?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里doRegisterAndReturnCommon调用类似原来的代码最后再调用?

这个我是想着因为现在配置中心里没有需要动态配置的参数所以就直接以common的登记了。想着到时新增参数后再做重构

}
return bean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadFactory;

import static org.dromara.dynamictp.common.constant.DynamicTpConst.ALLOW_CORE_THREAD_TIMEOUT;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.AWAIT_TERMINATION_SECONDS;
Expand Down Expand Up @@ -74,7 +75,9 @@
@Slf4j
public class DtpBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware {

private static final Integer JDK_VERSION_21_OFFSET = 21 - 8;
private static final Integer JRE_VERSION_21 = 21;

private static final String VIRTUAL_THREAD_EXECUTOR_TYPE = "virtual";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

放在common.constant下比较好


private Environment environment;

Expand Down Expand Up @@ -104,6 +107,7 @@ public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, B
try {
args = buildConstructorArgs(executorTypeClass, e);
} catch (UnsupportedOperationException exception) {
log.warn("DynamicTp virtual thread executor {} register warn: update your JDK version or don't use virtual thread executor!", e.getThreadPoolName());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

日志描述有问题,不一定是虚拟线程的异常

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个是我自己抛的异常,应该没有其他的情况了吧
image

return;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里加下异常日志

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里加下异常日志

加了,在调用的方法里面
image

}
BeanRegistrationUtil.register(registry, e.getThreadPoolName(), executorTypeClass, propertyValues, args);
Expand All @@ -115,15 +119,17 @@ private Map<String, Object> buildPropertyValues(DtpExecutorProps props) {
propertyValues.put(THREAD_POOL_NAME, props.getThreadPoolName());
propertyValues.put(THREAD_POOL_ALIAS_NAME, props.getThreadPoolAliasName());

propertyValues.put(ALLOW_CORE_THREAD_TIMEOUT, props.isAllowCoreThreadTimeOut());
propertyValues.put(WAIT_FOR_TASKS_TO_COMPLETE_ON_SHUTDOWN, props.isWaitForTasksToCompleteOnShutdown());
propertyValues.put(AWAIT_TERMINATION_SECONDS, props.getAwaitTerminationSeconds());
propertyValues.put(PRE_START_ALL_CORE_THREADS, props.isPreStartAllCoreThreads());
propertyValues.put(REJECT_HANDLER_TYPE, props.getRejectedHandlerType());
propertyValues.put(REJECT_ENHANCED, props.isRejectEnhanced());
propertyValues.put(RUN_TIMEOUT, props.getRunTimeout());
propertyValues.put(TRY_INTERRUPT_WHEN_TIMEOUT, props.isTryInterrupt());
propertyValues.put(QUEUE_TIMEOUT, props.getQueueTimeout());
if (!props.getExecutorType().equals(VIRTUAL_THREAD_EXECUTOR_TYPE)) {
propertyValues.put(ALLOW_CORE_THREAD_TIMEOUT, props.isAllowCoreThreadTimeOut());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

判断不用加

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个目前虚拟线程的proxy没有这些属性,不加的话会报错

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

采用提前返回的形式吧,看着舒服点代码

propertyValues.put(WAIT_FOR_TASKS_TO_COMPLETE_ON_SHUTDOWN, props.isWaitForTasksToCompleteOnShutdown());
propertyValues.put(AWAIT_TERMINATION_SECONDS, props.getAwaitTerminationSeconds());
propertyValues.put(PRE_START_ALL_CORE_THREADS, props.isPreStartAllCoreThreads());
propertyValues.put(REJECT_HANDLER_TYPE, props.getRejectedHandlerType());
propertyValues.put(REJECT_ENHANCED, props.isRejectEnhanced());
propertyValues.put(RUN_TIMEOUT, props.getRunTimeout());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这些字段虚拟线程也会用到,超时相关是在AwareManager扩展中用到

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这些字段虚拟线程也会用到,超时相关是在AwareManager扩展中用到

这些字段虚拟线程的proxy目前没有这些属性,如果暴露的话会无法生成实例。要是之后有需要的话再进行添加重构吧

propertyValues.put(TRY_INTERRUPT_WHEN_TIMEOUT, props.isTryInterrupt());
propertyValues.put(QUEUE_TIMEOUT, props.getQueueTimeout());
}

val notifyItems = mergeAllNotifyItems(props.getNotifyItems());
propertyValues.put(NOTIFY_ITEMS, notifyItems);
Expand All @@ -144,13 +150,13 @@ private Object[] buildConstructorArgs(Class<?> clazz, DtpExecutorProps props) th
} else if (clazz.equals(PriorityDtpExecutor.class)) {
taskQueue = new PriorityBlockingQueue<>(props.getQueueCapacity(), PriorityDtpExecutor.getRunnableComparator());
} else if (clazz.equals(VirtualThreadExecutorProxy.class)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

虚拟线程判断可以提前判断、返回

int jdkVersion = JreEnum.currentVersion().ordinal();
if (jdkVersion < JDK_VERSION_21_OFFSET) {
log.warn("DynamicTp virtual thread executor {} register warn: update your JDK version or don't use virtual thread executor!", props.getThreadPoolName());
int jreVersion = JreEnum.currentIntVersion();
if (jreVersion < JRE_VERSION_21) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我在springboot3分支加了判断版本大小方法,merge过来后可以使用

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我在springboot3分支加了判断版本大小方法,merge过来后可以使用

throw new UnsupportedOperationException();
}
ThreadFactory factory = Thread.ofVirtual().name(props.getThreadPoolName()).factory();
return new Object[]{
Executors.newVirtualThreadPerTaskExecutor()
Executors.newThreadPerTaskExecutor(factory)
};
} else {
taskQueue = buildLbq(props.getQueueType(),
Expand Down
Loading