2026/2/15 9:23:06
网站建设
项目流程
网站推广方法技巧,代售网站建设,app营销型网站的特点,如何在网站上做标记圈信息简介
Java7引入的线程池实现#xff0c;适合于计算可以被递归执行的任务#xff0c;并且这些任务都是是计算密集型的#xff0c;不会有IO阻塞。
ForkJoinPool中有两个关键点#xff1a;
分治法#xff1a;将一个规模为N的问题分解为K个规模较小的子问题#xff0c;这些子…简介Java7引入的线程池实现适合于计算可以被递归执行的任务并且这些任务都是是计算密集型的不会有IO阻塞。ForkJoinPool中有两个关键点分治法将一个规模为N的问题分解为K个规模较小的子问题这些子问题相互独立、并且与原问题性质相同求出子问题的解后将这些解合并就可以得出原问题的解例如二分法。工作窃取当某个线程的任务队列中没有任务时从其他线程的任务队列中获取任务来执行以充分利用工作线程的窃取能力减少由于线程获取不到任务而造成的空闲浪费。在工作窃取机制中每个线程都有自己的双端队列存储任务线程从自身队列的尾部获取任务此时是后进先出如果当前线程空闲会从其他线程的任务队列的头部获取任务此时是先进先出获取任务均是通过CAS操作实现的。ForkJoinPool基于工作窃取算法能高效利用多核处理器提升并发性能。入门案例案例1求start到end之间的和需求/** * 求x到y之间的和 */publicclassSumTaskextendsRecursiveTaskBigInteger{privatestaticfinalLongTHRESHOLD102400000L;privatefinalIntegerstart;privatefinalIntegerend;publicSumTask(Integerstart,Integerend){this.startstart;this.endend;}OverrideprotectedBigIntegercompute(){if(end-startTHRESHOLD){BigIntegerresultBigInteger.ZERO;longlenend-start1;longsstart;for(longi0;ilen;i){resultresult.add(BigInteger.valueOf(s));s;}returnresult;}else{intmidstart(end-start)/2;SumTaskt1newSumTask(start,mid);SumTaskt2newSumTask(mid1,end);invokeAll(t1,t2);BigIntegerjoint1.join();BigIntegerjoin1t2.join();returnjoin.add(join1);}}}publicclassSumTaskTest{publicstaticvoidmain(String[]args){longstartTimeSystem.currentTimeMillis();ForkJoinPoolcommonPoolForkJoinPool.commonPool();BigIntegerresultcommonPool.invoke(newSumTask(1,Integer.MAX_VALUE));longexecuteTimeSystem.currentTimeMillis()-startTime;System.out.println(结果 result.toString() 耗时 executeTimems);}}案例2斐波那契数列斐波那契数列第一项是0还是1 第一项为0和第一项为1都符合定义都可以正确计算。最初斐波那契在《计算之书》1202年中以兔子繁殖为例提出数列时起始项为 1、1。 随着数学体系的发展第一项为0被引入进来。通常如果没有特殊说明第一项通常是0importjava.util.ArrayList;importjava.util.List;importjava.util.concurrent.RecursiveTask;publicclassFibonacciTaskextendsRecursiveTaskLong{privatestaticfinalIntegerTHRESHOLD10;privatefinallongn;publicFibonacciTask(Longn){if(nnull){thrownewIllegalArgumentException(参数 n 不可为null);}if(n0){thrownewIllegalArgumentException(参数 n 不可小于0);}this.nn;}OverrideprotectedLongcompute(){if(nTHRESHOLD){returncomputeSequentially();}ListFibonacciTasklistnewArrayList();list.add(newFibonacciTask(n-1));list.add(newFibonacciTask(n-2));invokeAll(list);returnlist.get(0).join()list.get(1).join();}/** * 计算斐波那契数列 * return 成员变量n对应的数列值 */privateLongcomputeSequentially(){if(n1){returnn;}longa0,b1;for(longi2;in1;i){longtmpb;bab;atmp;}returnb;}}importjava.util.concurrent.ForkJoinPool;publicclassFibonacciTaskTest{publicstaticvoidmain(String[]args){ForkJoinPoolcommonPoolForkJoinPool.commonPool();longstartTimeSystem.currentTimeMillis();FibonacciTaskfibonacciTasknewFibonacciTask(100L);LongresultcommonPool.invoke(fibonacciTask);longexecuteTimeSystem.currentTimeMillis()-startTime;System.out.println(executeTimems, result result);}}案例3无返回值的异步任务归并排序publicclassMergeSortTask2TextendsComparableTextendsRecursiveAction{privatestaticfinalIntegerTHRESHOLD100000;privatefinalT[]arr;privatefinalintstart;privatefinalintend;publicMergeSortTask2(T[]arr){this.arrarr;start0;endarr.length-1;}privateMergeSortTask2(T[]arr,intstart,intend){this.arrarr;this.startstart;this.endend;}Overrideprotectedvoidcompute(){if(end-startTHRESHOLD){mergeSort(arr,start,end);}else{intmidcalMid(start,end);MergeSortTask2Ttask1newMergeSortTask2(arr,start,mid);MergeSortTask2Ttask2newMergeSortTask2(arr,mid1,end);// 先提交task1到线程池再在当前线程计算task2然后再阻塞地等待task1的结果task1.fork();task2.compute();task1.join();merge(arr,start,mid,end);}}/** * 计算start和end的中间值 */privateintcalMid(intstart,intend){returnstart(end-start)/2;}// 归并排序privatevoidmergeSort(T[]arr,intleft,intright){if(leftright){return;}intmidcalMid(left,right);mergeSort(arr,left,mid);mergeSort(arr,mid1,right);merge(arr,left,mid,right);}SuppressWarnings(unchecked)privatevoidmerge(T[]arr,intstart,intmid,intend){intlenend-start1;T[]tmpArr(T[])newComparable[len];intistart;intjmid1;intk0;while(imidjend){if(arr[i].compareTo(arr[j])0){tmpArr[k]arr[i];}else{tmpArr[k]arr[j];}}while(imid){tmpArr[k]arr[i];}while(jend){tmpArr[k]arr[j];}System.arraycopy(tmpArr,0,arr,start,len);}}测试Testpublicvoidtest3(){Integer[]arrbuildIntegerArr(n);// 100000000longstartTimeSystem.currentTimeMillis();MergeSortTask2IntegerintegerMergeSortTasknewMergeSortTask2(arr);integerMergeSortTask.invoke();// 1亿大小的数组使用ForkJoinPool排序花费 98438ms// System.out.println(arr Arrays.toString(arr));System.out.println(result (System.currentTimeMillis()-startTime)ms);}/** * 构建待排序的数组 */privatestaticInteger[]buildIntegerArr(intlimit){longstartTimeSystem.currentTimeMillis();Integer[]arrnewInteger[limit];RandomrandomnewRandom();for(inti0;ilimit;i){arr[i]random.nextInt(limit*10);}System.out.println(构建测试limit大小的数组花费 (System.currentTimeMillis()-startTime)ms);returnarr;}案例4ForkJoinPool 状态监控/** * 打印ForkJoinPool的状态 */publicstaticvoidprintCommonPoolStatus(ForkJoinPoolcommonPool){if(commonPoolnull){return;}System.out.println(----------------------);System.out.println(并行度 commonPool.getParallelism()\n工作线程数 commonPool.getPoolSize()\n活跃的线程数 commonPool.getActiveThreadCount()\n运行中的线程数 commonPool.getRunningThreadCount()\n排队提交的数量 commonPool.getQueuedSubmissionCount()\n排队任务的数量 commonPool.getQueuedTaskCount()\n当前所有线程是否空闲中 commonPool.isQuiescent()\n偷窃任务数 commonPool.getStealCount()\n是否异步模式 commonPool.getAsyncMode()\n是否还有未执行的任务 commonPool.hasQueuedSubmissions()\n线程工厂 commonPool.getFactory()\n异常处理器 commonPool.getUncaughtExceptionHandler());System.out.println();}提交任务后另起一个线程循环调用这个方法打印线程池的状态基本使用ForkJoinPool自带一个公共线程池线程数是CPU核数减1如果用户向ForkJoinPool中提交任务时没有指定线程池就是案例中的那种写法就使用默认线程池。向ForkJoinPool中提交任务时指定线程池ForkJoinPoolforkJoinPoolnewForkJoinPool(4);forkJoinPool.execute(integerMergeSortTask);ForkJoinPool中执行的任务需要继承RecursiveTask或RecursiveAction一个是有返回值的任务一个是无返回值的任务任务本身负责自身的拆分直到拆分到一定阈值再执行。这里就是把单线程执行的计算任务拆分为多线程可以执行的最后再合并它的结果。同时基于工作窃取机制确保执行任务时所有线程都不会空闲除非数据倾斜导致某些任务的计算量比较大否则所有线程都可以保证一个很好的效率好的一点是如何拆分数据取决于用户他可以在拆分时避免数据倾斜。向线程池中提交task或者调用task本身的invoke方法都可以把任务提交到线程池。