Skip to content

线程助手 ThreadUtil

feilong edited this page May 22, 2020 · 1 revision

曾几何时,多线程是JAVA里面很少触碰的技术,而随着技术的发展,对于多线程使用越来越多,在合适的时候合理的使用多线程,对于性能有着质的提高, 但是如果自己从头写,会比较繁琐,那么我们就封装一个吧(这也是feilong的精髓思想之所在)

我们来看看 线程助手 - ThreadUtil

1.执行.

方法 Description
void execute(List list,int eachSize,Map<String, ?> paramsMap,PartitionRunnableBuilder partitionRunnableBuilder) 给定一个待解析的 list,设定每个线程执行多少条 eachSize,传入一些额外的参数 paramsMap,使用自定义的 partitionRunnableBuilder,自动构造多条线程并运行..

1.1 void execute(List list,int eachSize,Map<String, ?> paramsMap,PartitionRunnableBuilder partitionRunnableBuilder)

给定一个待解析的 list,设定每个线程执行多少条 eachSize,传入一些额外的参数 paramsMap,使用自定义的 partitionRunnableBuilder,自动构造多条线程并运行.

适用场景:

比如同步库存,一次从MQ或者其他接口中得到了5000条数据,如果使用单线程做5000次循环,势必会比较慢,并且影响性能; 如果调用这个方法,传入eachSize=100, 那么自动会开启5000/100=50 个线程来跑功能,大大提高同步库存的速度

其他的适用场景还有诸如同步商品主档数据,同步订单等等这类每个独立对象之间没有相关联关系的数据,能提高执行速度和效率

重构:

对于以下代码:模拟10个对象/数字,循环执行任务(可能是操作数据库等)

 public void testExecuteTest() throws InterruptedException{
     Date beginDate = new Date();
 
     List<Integer> list = toList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
     for (Integer integer : list){
         //------
 
         //模拟 do something
 
         //---------
         Thread.sleep(1 * MILLISECOND_PER_SECONDS);
     }
     LOGGER.info("use time:{}", formatDuration(beginDate));
 }

统计总耗时时间 需要 use time:10秒28毫秒

此时你可以调用此方法,改成多线程执行:

 public void testExecuteTestUsePartitionRunnableBuilder() throws InterruptedException{
     Date beginDate = new Date();
     List<Integer> list = toList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
 
     //每个线程执行2条数据, 没有自定义 paramsMap
 
     //将会自动创建 list.size()/2 =5个 线程执行
 
     //每个线程执行的,将会是 PartitionRunnableBuilder build 返回的 Runnable
     ThreadUtil.execute(list, 1, null, new PartitionRunnableBuilder<Integer>(){
 
         @Override
         public Runnable build(final List<Integer> perBatchList,PartitionThreadEntity partitionThreadEntity,Map<String, ?> paramsMap){
 
             return new Runnable(){
 
                 @Override
                 public void run(){
                     for (Integer integer : perBatchList){
                         //------
 
                         //模拟 do something
 
                         //---------
                         try{
                             Thread.sleep(1 * MILLISECOND_PER_SECONDS);
                         }catch (InterruptedException e){
                             LOGGER.error("", e);
                         }
                     }
                 }
             };
         }
 
     });
 
     LOGGER.info("use time:{}", formatDuration(beginDate));
 }

统计总耗时时间 需要 use time:2秒36毫秒

对于上述的case,如果将 eachSize 参数由2 改成1, 统计总耗时时间 需要 use time:1秒36毫秒

可见 调用该方法,使用多线程能节省执行时间,提高效率; 但是也需要酌情考虑eachSize大小,合理的开启线程数量

说明:

  • 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性;

  • 需要注意合理的评估list 的大小和eachSize 比率; 不建议list size很大,比如 20W,而eachSize值很小,比如2 ,那么会开启20W/2=10W个线程;此时建议考虑 线程池的实现方案

  • 对于参数 paramsMap 的使用: 比如你需要拿到最终每条数据执行的结果,以便后续进行处理(比如对失败的操作再次执行或者发送汇报邮件等)

 public void testExecuteTestUsePartitionRunnableBuilderParamsMap() throws InterruptedException{
     Date beginDate = new Date();
     List<Integer> list = toList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
 
     final Map<Integer, Boolean> indexAndResultMap = Collections.synchronizedSortedMap(new TreeMap());
 
     ThreadUtil.execute(list, 2, null, new PartitionRunnableBuilder<Integer>(){
 
         @Override
         public Runnable build(
                         final List<Integer> perBatchList,
                         final PartitionThreadEntity partitionThreadEntity,
                         Map<String, ?> paramsMap){
 
             return new Runnable(){
 
                 @Override
                 public void run(){
 
                     int i = 0;
                     for (Integer integer : perBatchList){
                         //------
 
                         //模拟 do something
 
                         //---------
                         try{
                             Thread.sleep(1 * MILLISECOND_PER_SECONDS);
                         }catch (InterruptedException e){
                             LOGGER.error("", e);
                         }
 
                         int indexInTotalList = getIndex(partitionThreadEntity, i);
 
                         //模拟 当值是 5 或者8 的时候 操作结果是false
                         boolean result = (integer == 5 || integer == 8) ? false : true;
 
                         indexAndResultMap.put(indexInTotalList, result);
 
                         i++;
                     }
                 }
 
                 private Integer getIndex(PartitionThreadEntity partitionThreadEntity,int i){
                     int batchNumber = partitionThreadEntity.getBatchNumber();
                     return batchNumber * partitionThreadEntity.getEachSize() + i;
                 }
             };
         }
 
     });
 
     LOGGER.debug(JsonUtil.format(indexAndResultMap));
 
     LOGGER.info("use time:{}", formatDuration(beginDate));
 }

输出结果:

    29:21 DEBUG (ThreadUtilExample.java:161) [testExecuteTestUsePartitionRunnableBuilderParamsMap()]     {
        "0": true,
        "1": true,
        "2": true,
        "3": true,
        "4": true,
        "5": false,
        "6": true,
        "7": true,
        "8": false,
        "9": true
    }
    29:21 INFO  (ThreadUtilExample.java:164) [testExecuteTestUsePartitionRunnableBuilderParamsMap()] use time:2秒181毫秒

2.创建指定数量 threadCount 的线程,并执行

方法 Description
void execute(Runnable runnable,int threadCount) 创建指定数量 threadCount 的线程,并执行.

2.1 void execute(Runnable runnable,int threadCount)

创建指定数量 threadCount 的线程,并执行.

core

Clone this wiki locally