-
Notifications
You must be signed in to change notification settings - Fork 783
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #483] Virtual thread compatible #508
Conversation
# Conflicts: # adapter/adapter-common/src/main/java/org/dromara/dynamictp/adapter/common/DtpAdapterListener.java # core/src/main/java/org/dromara/dynamictp/core/monitor/collector/MicroMeterCollector.java # core/src/main/java/org/dromara/dynamictp/core/monitor/collector/jmx/JMXCollector.java # example/example-nacos-cloud/src/main/resources/dynamic-tp-nacos-cloud-demo-dtp-dev.yml # extension/extension-limiter-redis/src/main/java/org/dromara/dynamictp/extension/limiter/redis/ratelimiter/NotifyRedisRateLimiterFilter.java # spring/src/main/java/org/dromara/dynamictp/spring/DtpPostProcessor.java # spring/src/main/java/org/dromara/dynamictp/spring/annotation/DtpBeanDefinitionRegistrar.java # test/test-core/src/test/resources/postprocessor-dtp-dev.yml
@@ -109,13 +109,13 @@ public Map<String, ExecutorWrapper> getExecutorWrappers() { | |||
* @return thead pools stats | |||
*/ | |||
@Override | |||
public List<ThreadPoolStats> getMultiPoolStats() { | |||
public List<ExecutorStats> getMultiPoolStats() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
改完全点,方法、变量名
if (!Objects.equals(currentRejectHandlerType, props.getRejectedHandlerType())) { | ||
val rejectHandler = RejectHandlerGetter.buildRejectedHandler(props.getRejectedHandlerType()); | ||
executorWrapper.setRejectHandler(rejectHandler); | ||
if (!executorWrapper.isVirtualThreadExecutor()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个判断去掉吧,判断太多可读性会变差
poolStats.setDynamic(executor instanceof DtpExecutor); | ||
ExecutorStats executorStats = convertCommon(executor); | ||
executorStats.setPoolName(wrapper.getThreadPoolName()); | ||
executorStats.setPoolAliasName(wrapper.getThreadPoolAliasName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
新加的两个字段没赋值?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我还是用的poolname那两个打通的逻辑,想着到时一起改可能会好些。要是只改虚拟线程这边的话我怕会出现问题
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我还是用的poolname那两个打通的逻辑,想着到时一起改可能会好些。要是只改虚拟线程这边的话我怕会出现问题
字段赋值不影响啊,就是监控指标多了两个字段,可用可不用
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我还是用的poolname那两个打通的逻辑,想着到时一起改可能会好些。要是只改虚拟线程这边的话我怕会出现问题
字段赋值不影响啊,就是监控指标多了两个字段,可用可不用
好,那我把那两个加上去
poolStats.setTp99(performanceSnapshot.getTp99()); | ||
poolStats.setTp999(performanceSnapshot.getTp999()); | ||
return poolStats; | ||
if (!wrapper.isVirtualThreadExecutor()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
为啥加这个判断,虚拟线程也会有超时计数
|
||
Iterable<Tag> tags = getTags(executorStats); | ||
|
||
if (!executorStats.isVirtualExecutor()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
判断不用加,不存在的用空值
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
好
public static final String APP_NAME_TAG = "app.name"; | ||
|
||
private static final Map<String, ThreadPoolStats> GAUGE_CACHE = new ConcurrentHashMap<>(); | ||
private static final Map<String, org.dromara.dynamictp.common.entity.Metrics> GAUGE_CACHE = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
去掉包路径
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
现在我换了一个类
@Override | ||
public <T> Future<T> submit(Runnable runnable, T t) { | ||
runnable = getEnhancedTask(runnable); | ||
EnhancedRunnable.of(runnable, this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
返回值
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pom.xml
Outdated
<maven.compiler.target>17</maven.compiler.target> | ||
|
||
<maven.compiler.source>21</maven.compiler.source> | ||
<maven.compiler.target>21</maven.compiler.target> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
用17,前后兼容
propertyValues.put(QUEUE_TIMEOUT, props.getQueueTimeout()); | ||
|
||
if (!ExecutorType.getClass(props.getExecutorType()).equals(VirtualThreadExecutorProxy.class)) { | ||
propertyValues.put(ALLOW_CORE_THREAD_TIMEOUT, props.isAllowCoreThreadTimeOut()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
判断不用加
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个目前虚拟线程的proxy没有这些属性,不加的话会报错
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
采用提前返回的形式吧,看着舒服点代码
propertyValues.put(PRE_START_ALL_CORE_THREADS, props.isPreStartAllCoreThreads()); | ||
propertyValues.put(REJECT_HANDLER_TYPE, props.getRejectedHandlerType()); | ||
propertyValues.put(REJECT_ENHANCED, props.isRejectEnhanced()); | ||
propertyValues.put(RUN_TIMEOUT, props.getRunTimeout()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这些字段虚拟线程也会用到,超时相关是在AwareManager扩展中用到
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这些字段虚拟线程也会用到,超时相关是在AwareManager扩展中用到
这些字段虚拟线程的proxy目前没有这些属性,如果暴露的话会无法生成实例。要是之后有需要的话再进行添加重构吧
/** | ||
* 执行器名字 | ||
*/ | ||
private String executorName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JMXCollector没有用到此名字
private String executorName; | ||
|
||
/** | ||
* 执行器别名 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这两个名字字段如果标注@deprecated的话需要兼容,不然直接改名就好?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这两个名字字段如果标注@deprecated的话需要兼容,不然直接改名就好?
行,我看看要是没有其他地方用到这两个字段我就去掉算了
@@ -196,7 +196,7 @@ private static void refresh(ExecutorWrapper executorWrapper, DtpExecutorProps pr | |||
TpMainFields oldFields = ExecutorConverter.toMainFields(executorWrapper); | |||
doRefresh(executorWrapper, props); | |||
TpMainFields newFields = ExecutorConverter.toMainFields(executorWrapper); | |||
if (oldFields.equals(newFields)) { | |||
if (oldFields.equals(newFields) && !executorWrapper.isVirtualThreadExecutor()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里使用&&有问题
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里使用&&有问题
那我是直接删掉还是?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里使用&&有问题
那我是直接删掉还是?
感觉删掉较好
@@ -121,6 +125,9 @@ private Object registerAndReturnCommon(Object bean, String beanName) { | |||
} else { | |||
BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanName); | |||
if (!(beanDefinition instanceof AnnotatedBeanDefinition)) { | |||
if (beanDefinition.getBeanClassName().equals("org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy")) { | |||
return doRegisterAndReturnCommon(bean, beanName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
使用VirtualThreadExecutorProxy.class.getName(),后面维护也简单
@@ -121,6 +125,9 @@ private Object registerAndReturnCommon(Object bean, String beanName) { | |||
} else { | |||
BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanName); | |||
if (!(beanDefinition instanceof AnnotatedBeanDefinition)) { | |||
if (beanDefinition.getBeanClassName().equals("org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy")) { | |||
return doRegisterAndReturnCommon(bean, beanName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里doRegisterAndReturnCommon调用类似原来的代码最后再调用?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里doRegisterAndReturnCommon调用类似原来的代码最后再调用?
这个我是想着因为现在配置中心里没有需要动态配置的参数所以就直接以common的登记了。想着到时新增参数后再做重构
DtpRegistry.registerExecutor(new ExecutorWrapper(poolName, proxy), REGISTER_SOURCE); | ||
return bean; | ||
} | ||
Executor proxy; | ||
if (bean instanceof ScheduledThreadPoolExecutor) { | ||
proxy = newScheduledTpProxy(poolName, (ScheduledThreadPoolExecutor) bean); | ||
} else if (bean.getClass().getName().equals("java.util.concurrent.ThreadPerTaskExecutor")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里一样使用getClass().name(),使用JreEnum做下版本限制
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里一样使用getClass().name(),使用JreEnum做下版本限制
这个地方改不了,ThreadPerTaskExecutor这个类JDK没有暴露出来,所以只能用这种方式来写。这也是虚拟线程的proxy有些不太一样的原因
try { | ||
args = buildConstructorArgs(executorTypeClass, e); | ||
} catch (UnsupportedOperationException exception) { | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里加下异常日志
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} else if (clazz.equals(VirtualThreadExecutorProxy.class)) { | ||
int jdkVersion = JreEnum.currentVersion().ordinal(); | ||
if (jdkVersion < JDK_VERSION_21_OFFSET) { | ||
log.warn("DynamicTp virtual thread executor {} register warn: update your JDK version or don't use virtual thread executor!", props.getThreadPoolName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里日志可以去掉,抛异常带上信息,外层捕获的时候打印
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
那我把这个日志移到外层去吧
BlockingQueue<Runnable> taskQueue; | ||
if (clazz.equals(EagerDtpExecutor.class)) { | ||
taskQueue = new TaskQueue(props.getQueueCapacity()); | ||
} else if (clazz.equals(PriorityDtpExecutor.class)) { | ||
taskQueue = new PriorityBlockingQueue<>(props.getQueueCapacity(), PriorityDtpExecutor.getRunnableComparator()); | ||
} else if (clazz.equals(VirtualThreadExecutorProxy.class)) { | ||
int jdkVersion = JreEnum.currentVersion().ordinal(); | ||
if (jdkVersion < JDK_VERSION_21_OFFSET) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
版本判断加个方法到JreEnum里面,以后也会用到
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
好
throw new UnsupportedOperationException(); | ||
} | ||
return new Object[]{ | ||
Executors.newVirtualThreadPerTaskExecutor() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ThreadFactory factory = Thread.ofVirtual().name("xx").factory();
Executors.newThreadPerTaskExecutor(factory);
这里使用factory设置下thread name
@@ -125,7 +125,7 @@ private Object registerAndReturnCommon(Object bean, String beanName) { | |||
} else { | |||
BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanName); | |||
if (!(beanDefinition instanceof AnnotatedBeanDefinition)) { | |||
if (beanDefinition.getBeanClassName().equals("org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy")) { | |||
if (VirtualThreadExecutorProxy.class.getName().equals("org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里是"org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy"换成VirtualThreadExecutorProxy.class.getName()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里是"org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy"换成VirtualThreadExecutorProxy.class.getName()
是指反过来equal对不
BlockingQueue<Runnable> taskQueue; | ||
if (clazz.equals(EagerDtpExecutor.class)) { | ||
taskQueue = new TaskQueue(props.getQueueCapacity()); | ||
} else if (clazz.equals(PriorityDtpExecutor.class)) { | ||
taskQueue = new PriorityBlockingQueue<>(props.getQueueCapacity(), PriorityDtpExecutor.getRunnableComparator()); | ||
} else if (clazz.equals(VirtualThreadExecutorProxy.class)) { | ||
int jreVersion = JreEnum.currentIntVersion(); | ||
if (jreVersion < JRE_VERSION_21) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我在springboot3分支加了判断版本大小方法,merge过来后可以使用
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我在springboot3分支加了判断版本大小方法,merge过来后可以使用
好
@@ -71,6 +75,10 @@ | |||
@Slf4j | |||
public class DtpBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware { | |||
|
|||
private static final Integer JRE_VERSION_21 = 21; | |||
|
|||
private static final String VIRTUAL_THREAD_EXECUTOR_TYPE = "virtual"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
放在common.constant下比较好
GitHub的那个检查报错是不是该解决一下,好像是maven的编译版本低了,我去把它版本调高可以吗,或者有什么其他的解决方案 |
GitHub的那个检查报错是不是该解决一下,好像是maven的编译版本低了,我去把它版本调高可以吗 |
不是maven版本问题吧,JDK版本 |
|
@@ -121,6 +125,9 @@ private Object registerAndReturnCommon(Object bean, String beanName) { | |||
} else { | |||
BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanName); | |||
if (!(beanDefinition instanceof AnnotatedBeanDefinition)) { | |||
if ("org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy".equals(VirtualThreadExecutorProxy.class.getName())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里判断应该改为:beanDefinition.getBeanClassName().equals(VirtualThreadExecutorProxy.class.getName())
/** | ||
* jre | ||
*/ | ||
private static final Integer JRE_VERSION_21 = 21; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
如果没用到可以删除
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
} | ||
gauge(GAUGE_CACHE.get(threadPoolStats.getPoolName())); | ||
gauge((ExecutorStats) GAUGE_CACHE.get(executorStats.getExecutorName())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
不用强转了
private static ThreadPoolStats convertCommon(ExecutorAdapter<?> executor) { | ||
ThreadPoolStats poolStats = new ThreadPoolStats(); | ||
private static ExecutorStats convertCommon(ExecutorAdapter<?> executor) { | ||
ExecutorStats poolStats = new ExecutorStats(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
变量名也改了
@@ -88,13 +90,15 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro | |||
|
|||
@Override | |||
public Object postProcessAfterInitialization(@NonNull Object bean, @NonNull String beanName) throws BeansException { | |||
if (!(bean instanceof ThreadPoolExecutor) && !(bean instanceof ThreadPoolTaskExecutor)) { | |||
if (!(bean instanceof ThreadPoolExecutor) && !(bean instanceof ThreadPoolTaskExecutor) && | |||
!(bean.getClass().getName().equals("java.util.concurrent.ThreadPerTaskExecutor")) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
java.util.concurrent.ThreadPerTaskExecutor 提取成常量吧
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
好
Map<String, Object> propertyValues = buildPropertyValues(e); | ||
Object[] args = buildConstructorArgs(executorTypeClass, e); | ||
Map<String, Object> propertyValues; | ||
propertyValues = buildPropertyValues(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
为啥不合一行?
try { | ||
args = buildConstructorArgs(executorTypeClass, e); | ||
} catch (UnsupportedOperationException exception) { | ||
log.warn("DynamicTp virtual thread executor {} register warn: update your JDK version or don't use virtual thread executor!", e.getThreadPoolName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
日志描述有问题,不一定是虚拟线程的异常
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BlockingQueue<Runnable> taskQueue; | ||
if (clazz.equals(EagerDtpExecutor.class)) { | ||
taskQueue = new TaskQueue(props.getQueueCapacity()); | ||
} else if (clazz.equals(PriorityDtpExecutor.class)) { | ||
taskQueue = new PriorityBlockingQueue<>(props.getQueueCapacity(), PriorityDtpExecutor.getRunnableComparator()); | ||
} else if (clazz.equals(VirtualThreadExecutorProxy.class)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
虚拟线程判断可以提前判断、返回
propertyValues.put(QUEUE_TIMEOUT, props.getQueueTimeout()); | ||
|
||
if (!ExecutorType.getClass(props.getExecutorType()).equals(VirtualThreadExecutorProxy.class)) { | ||
propertyValues.put(ALLOW_CORE_THREAD_TIMEOUT, props.isAllowCoreThreadTimeOut()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
采用提前返回的形式吧,看着舒服点代码
Virtual thread compatible