副标题#e#
本系列的第三篇文章将以实现一个极简朴的查找最大数的任务为例,别离给出了四个版本:1.顺序执行;2.基于传统的Thread.join();3.基于并发东西包的Future;4.基于JDK 7引入的Fork/Join框架。(2013.10.25最后更新)
分而治之(Divide-and-Conquer)是办理巨大问题的常用要领。在并发应用中,可将一个巨大算法解析为若干子问题,然后利用独立的线程去执行办理子问题的措施,之后再将各个子问题的功效逐级举办归并以获得最终功效。在特定情况中,这种方法大概较好地提高措施的执行效率。
方案1:顺序执行
找出对给定整形数组中的最大值,利用的要领很简朴,就是逐一遍历每个元素,将当前元素与当前最大值举办较量,若当前元素的值更大,则将该值作为新的当前最大值,再去与下一个元素举办较量,如此重复。在编写并发措施来实现这个算法之前,本文将先给出一个顺序执行的实现版本,但依然操作了分而治之的思想。即,先将给定命列支解成若干较小的子数列,找出各个子数列的最大值,将这些最大值构成一个新的数列,然后再利用同样的要领对这个新数罗列办支解与最大值归并,…依次类推,直至找到最大值。
代码清单1中的MaxNumberFinder是本文的基本类:1.getMaxNumber()要领展示了如何查找一个数列中的最大值;2.利用东西要领getNumberArray()/createNumberArray()可以建设指定长度的随机整数数列;3.要领findMaxNumber()展示了将一个数列支解为子数列(子数列的最大长度不高出指定值THRESHOLD),并查找子数列的最大值,以及将子数列的最大值构成各级新的中间数列去查找其最大值。
清单1 public class MaxNumberFinder { public static final int THRESHOLD = 50; private static final Random random = new Random(); public static int[] getNumberArray() { return createNumberArray(3000); } private static int[] createNumberArray(int capacity) { int[] numbers = new int[capacity]; for (int i = 0; i < numbers.length; i++) { numbers[i] = random.nextInt(capacity); } return numbers; } public static int getMaxNumber(int[] numbers) { if (numbers.length == 0) { return Integer.MIN_VALUE; } int max = numbers[0]; for (int i = 1; i < numbers.length; i++) { if (numbers[i] > max) { max = numbers[i]; } } return max; } private static int findMaxNumber(int[] numbers) { // interim max number array int[] maxNumbers = new int[numbers.length / THRESHOLD + (numbers.length % THRESHOLD == 0 ? 0 : 1)]; for (int i = 0; i <= numbers.length - THRESHOLD; i += THRESHOLD) { final int[] subNumbers = new int[THRESHOLD]; System.arraycopy(numbers, i, subNumbers, 0, subNumbers.length); maxNumbers[i / THRESHOLD] = getMaxNumber(subNumbers); } if (numbers.length % THRESHOLD != 0) { int[] lastSubNumbers = new int[numbers.length % THRESHOLD]; System.arraycopy(numbers, numbers.length - lastSubNumbers.length, lastSubNumbers, 0, lastSubNumbers.length); maxNumbers[maxNumbers.length - 1] = getMaxNumber(lastSubNumbers); } // if the length of interim max number array is greater than threshold, // it must divide-and-search recursively. if (maxNumbers.length > THRESHOLD) { return findMaxNumber(maxNumbers); } else { return getMaxNumber(maxNumbers); } } }
#p#副标题#e#
方案2:基于Thread.join()
基于要领MaxNumberFinder.findMaxNumber()的实现,在每次支解之后获得的数列,以及归并获得的中间最大值的数列,都可以利用独立的线程去别离查找它们的最大值,如代码清单2所示。
清单2 public class MaxNumberFinderOnThread { private static int findMaxNumber(int[] numbers) { // interim max number array final int[] maxNumbers = new int[numbers.length / THRESHOLD + (numbers.length % THRESHOLD == 0 ? 0 : 1)]; // the threads for searching max value in sub number array List<Thread> threads = new ArrayList<Thread>(); for (int i = 0; i <= numbers.length - THRESHOLD; i += THRESHOLD) { final int[] subNumbers = new int[THRESHOLD]; System.arraycopy(numbers, i, subNumbers, 0, subNumbers.length); final int bufIndex = i / THRESHOLD; Thread bufThread = new Thread(new Runnable() { @Override public void run() { maxNumbers[bufIndex] = MaxNumberFinder.getMaxNumber(subNumbers); } }); bufThread.start(); threads.add(bufThread); } if (numbers.length % THRESHOLD != 0) { final int[] lastSubNumbers = new int[numbers.length % THRESHOLD]; System.arraycopy(numbers, numbers.length - lastSubNumbers.length, lastSubNumbers, 0, lastSubNumbers.length); final int lastIndex = (numbers.length - 1) / THRESHOLD; Thread lastThread = new Thread(new Runnable() { @Override public void run() { maxNumbers[lastIndex] = MaxNumberFinder.getMaxNumber(lastSubNumbers); } }); threads.add(lastThread); lastThread.start(); } // waiting for all of jobs are finished for (int i = 0, size = threads.size(); i < size; i++) { try { threads.get(i).join(); } catch (InterruptedException e) { e.printStackTrace(); } } // if the length of interim max number array is greater than threshold, // it must divide-and-search recursively. if (maxNumbers.length > THRESHOLD) { return findMaxNumber(maxNumbers); } else { return MaxNumberFinder.getMaxNumber(maxNumbers); } } }
#p#分页标题#e#
为了可以或许将同一级线程所找的最大值归并成一个新的中间最大值数列,必需要期待这一组线程全部执行完毕。而通过别离挪用每个线程实例中join()要领即可满意这一要求。
必需留意的是,若数列的很长,而每次最多处理惩罚的数列较短(即,THRESHOLD值较小),该方案将会发生较多的线程,耗损大量内存。别的,还会有较多的高层线程在期待低层线程的执行功效,这大概会大大影响整个任务的执行效率。
方案3:基于Future
方案2利用的是旧有API,按照本系列上一篇中所提及的并发东西包中的Future,同样可以实现这一成果。只需要将利用Thread/Runnable的处所,相应地替换成利用Future/Callable即可,如代码清单3所示。
清单3 public class MaxNumberFinderOnFuture { private static ExecutorService executor = Executors.newCachedThreadPool(); private static int findMaxNumber(int[] numbers) { final int[] maxNumbers = new int[numbers.length / THRESHOLD + (numbers.length % THRESHOLD == 0 ? 0 : 1)]; List<Future<Integer>> futures = new ArrayList<Future<Integer>>(); for (int i = 0; i <= numbers.length - THRESHOLD; i += THRESHOLD) { final int[] subNumbers = new int[THRESHOLD]; System.arraycopy(numbers, i, subNumbers, 0, subNumbers.length); Future<Integer> bufFuture = executor.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { return MaxNumberFinder.getMaxNumber(subNumbers); } }); futures.add(bufFuture); } if (numbers.length % THRESHOLD != 0) { final int[] lastSubNumbers = new int[numbers.length % THRESHOLD]; System.arraycopy(numbers, numbers.length - lastSubNumbers.length, lastSubNumbers, 0, lastSubNumbers.length); Future<Integer> lastFuture = executor.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { return MaxNumberFinder.getMaxNumber(lastSubNumbers); } }); futures.add(lastFuture); } // retrieve results from Futures one by one, // get() method will be blocked if the searching isn't finished for (int i = 0, size = futures.size(); i < size; i++) { try { maxNumbers[i] = futures.get(i).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } // if the length of interim max number array is greater than threshold, // it must divide-and-search recursively. if (maxNumbers.length > THRESHOLD) { return findMaxNumber(maxNumbers); } else { return MaxNumberFinder.getMaxNumber(maxNumbers); } } }
查察本栏目
这一版的findMaxNumber()实现中,没有显示地挪用Thread.join()要领去期待线程的执行功效。但在任务执行完成之前,挪用Future.get()时会被阻塞,在此处的结果就与Thread.join()沟通。
方案4:基于Fork/Join
方案2与方案3不必然可以或许提高执行效率。假如该应用措施运行在单核处理惩罚器上,可能它没有操作上多核处理惩罚器中的多个内核,那么它的执行时间很有大概要长于方案1所利用的顺序执行方案,因为线程的建设、调治、上下文切换城市发生特另外开销。
#p#分页标题#e#
为了更好地适应已经十分普遍的多核处理惩罚器场景,JDK 7引入了Fork/Join框架。如代码清单4所示,基于该框架提供的RecursiveTask,我们就可以直接地对任务举办支解与归并,措施自己也更为清晰简捷。
清单4 public class NumberFinderOnForkJoin extends RecursiveTask<Integer> { private static final long serialVersionUID = -5871813408961649666L; private int[] numbers = null; private int maxNumber = Integer.MIN_VALUE; public NumberFinderOnForkJoin(int[] numbers) { this.numbers = numbers; } @Override public Integer compute() { if (numbers.length <= THRESHOLD) { maxNumber = NumberFinder.getMaxNumber(numbers); } else { int[] leftNumbers = new int[numbers.length / 2]; System.arraycopy(numbers, 0, leftNumbers, 0, leftNumbers.length); int[] rightNumbers = new int[numbers.length - numbers.length / 2]; System.arraycopy(numbers, leftNumbers.length, rightNumbers, 0, rightNumbers.length); // divide the task into two sub-tasks. NumberFinderOnForkJoin leftTask = new NumberFinderOnForkJoin(leftNumbers); NumberFinderOnForkJoin rightTask = new NumberFinderOnForkJoin(rightNumbers); invokeAll(leftTask, rightTask); maxNumber = Math.max(leftTask.maxNumber, rightTask.maxNumber); } return maxNumber; } public static void main(String[] args) throws InterruptedException, ExecutionException { NumberFinderOnForkJoin task = new NumberFinderOnForkJoin(NumberFinder.getNumberArray()); ForkJoinPool pool = new ForkJoinPool(); pool.submit(task); System.out.println(task.get()); pool.shutdown(); } }
稍稍说明一下,方案4对数罗列办支解的要领与前三个方案都差异。方案4是利用的二分法,而前三个方案都是按从前往后的顺序依次取出不长于THRESHOLD的子数列。但这两种要领没有本质上的区别,最多只是支解的次数略有差异而已。譬喻,一个长度为9的数列,将THRESHOLD的值设为3。若用二分法,第一轮会支解出4个子数列,其长度别离为3,2,3和1,后头还需要举办一轮支解;但若用顺序法,支解将获得的3个子数列,其长度均为3,之后不需要再举办支解了。
小结
解析任务,各个击破,是应对巨大问题的习用技巧。在资源富裕的环境下,应该尽大概地操作空闲的计较资源。Java并发东西包提供了适应多核情况的运行框架,使应用措施能更高效地操作多核处理惩罚器。
但对执行方案的选定,包罗THRESHOLD的值,依然要基于机能测试。对付本文的例子,在我的测试情况中,方案1其实是最高的。在并发执行方案中,方案2会明明慢于方案3和方案4,而方案3与方案4之间则难分伯仲。