Еще один блог по Java 7, на этот раз он посвящен новым утилитам параллелизма. Речь идет о простой вилке и, в частности, о присоединении. Все объясняется с помощью простого примера кода. По сравнению с Project Coin-параллелизм — это сложная тема. Пример кода немного сложнее. Давайте начнем.
Служба Fork and Join Executor Service
Fork and join использует эффективный алгоритм планирования задач, который обеспечивает оптимизированное использование ресурсов (памяти и процессора) на многоядерных машинах. Этот алгоритм известен как « кража работы«. Свободные потоки в пуле объединения вил пытаются найти и выполнить подзадачи, созданные другими активными потоками. Это очень эффективно, потому что большие блоки делятся на более мелкие единицы работы, которые распределяются по всем активным потокам (и процессорам). Вот аналогия, объясняющая силу алгоритмов вилки и соединения: если у вас есть баночка с вареньем и вы наполняете ее шариками для пинг-понга, в стекле остается много воздуха. Думайте о воздухе как о неиспользованном ресурсе ЦП. Ваша банка для варенья с горошком (или песком) содержит меньше воздуха в стакане. Вилка и соединение похожи на заполнение банки для варенья горохом. В вашем стакане больше объема с использованием гороха, потому что меньше воздуха (меньше отходов) Алгоритмы разветвления и объединения всегда обеспечивают оптимальное (меньшее) количество активных потоков, чем алгоритмы совместного использования. Это по той же «причине гороха».Представьте, что баночка варенья — это ваш пул ниток, а горох — это ваши задачи. С помощью fork и join вы можете размещать больше задач (и полный объем) с одинаковым количеством потоков (в одной и той же банке с вареньем).
Изображение 1: вилка и присоединиться к банке с вареньем |
Вот простой пример ветвления и кода соединения:
import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class ForkJoinTaskExample_Plain extends RecursiveTask<List<Map<String, Double>>> { private List<Proposal> proposals; private static final long serialVersionUID = -2703342063482619328L; public ForkJoinTaskExample_Plain(List<Proposal> proposals) { super(); this.proposals = proposals; } @Override protected List<Map<String, Double>> compute() { if (proposals.size() == 1) { // task is small enough to compute linear in this thread return Arrays.asList(computeDirectly(proposals.get(0))); } // task is to large for one thread to execute efficiently, split the task // make sure splitting of tasks makes sense! tasks must not be too small ... int split = proposals.size() / 2; ForkJoinTaskExample_Plain f1 = new ForkJoinTaskExample_Plain(proposals.subList(0, split)); f1.fork(); // generate task for some other thread that can execute on some other CPU ForkJoinTaskExample_Plain f2 = new ForkJoinTaskExample_Plain(proposals.subList(split, proposals.size())); List<Map<String, Double>> newList = new ArrayList<>(); newList.addAll(f2.compute()); // compute this sub task in the current thread newList.addAll(f1.join()); // join the results of the other sub task return newList; } private Map<String, Double> computeDirectly(Proposal proposal) { return new PricingEngine().calculatePrices(proposal); } public static void main(String[] args) { // Calculate four proposals ForkJoinTaskExample_Plain task = new ForkJoinTaskExample_Plain(Arrays.asList(new Proposal("Niklas", "Schlimm", "7909", "AAL", true, true, true), new Proposal("Andreas", "Fritz", "0005", "432", true, true, true), new Proposal("Christian", "Toennessen", "0583", "442", true, true, true), new Proposal("Frank", "Hinkel", "4026", "AAA", true, true, true))); ForkJoinPool pool = new ForkJoinPool(); System.out.println(new Date()); System.out.println(pool.invoke(task)); System.out.println(new Date()); } }
Задачи Fork и Join всегда имеют похожий типичный поток управления fork и join. В моем примере я хочу рассчитать цены на список предложений по страхованию автомобилей. Давайте рассмотрим пример.
Строка 10 : задачи Fork и join расширяют RecursiveTask или RecursiveAction. Задачи возвращают результат, а действия — нет. Результатом моего примера является список карт, которые содержат цены на страховые полисы. Одна карта цен на каждое предложение.
Строка 12 : задание рассчитает цены для предложений.
Строка 22 : Задачи Fork и Join реализуют метод вычисления. Опять же, метод compute возвращает список карт, которые содержат цены. Если во входном списке четыре предложения, то будет четыре карты цен.
Линия 24-26: Достаточно ли стек задач (список предложений) для непосредственного вычисления? Если да, то вычислите в этой теме, что означает вызов механизма оценки для расчета предложения. Если нет, продолжайте: разделите работу и рекурсивно вызовите задачу.
Строка 31 : определить, где разделить список.
Строка 33 : создайте новую задачу для первой части разделенного списка.
Строка 34 : выполнить эту задачу: разрешить другому потоку выполнить эту меньшую подзадачу. Этот поток будет рекурсивно вызывать вычисления для этого экземпляра подзадачи.
Строка 35 : создайте новое задание для второй части разделенного списка.
Линия 36: Подготовить составленный список результатов двух разделенных подзадач (необходимо объединить результаты двух подзадач в один результат родительской задачи)
Строка 37 : вычислить вторую подзадачу в этом текущем потоке и добавить результат в список результатов ,
Строка 38 : Тем временем первая подзадача f1 была вычислена другим потоком. Объедините результат первой подзадачи в составленный список результатов.
Строка 39 : вернуть составной результат.
Вам нужно запустить форк и присоединиться к задаче.
Строка 49 : создайте основной форк и объедините задачу с начальным списком предложений.
Строка 53 : создайте ветвь и объедините пул потоков.
Строка 55 : отправьте основную задачу в форк и объедините пул.
Вот и все. Вы можете посмотреть полный код
здесь . Вам понадобятся
PricingEngine.java и
Proposal.java .
От http://niklasschlimm.blogspot.com/2011/12/java-7-fork-and-join-and-jar-jam.html