Статьи

A Ленивые Разработчики Введение в исполнителей Параллелизма Java

Я бы дурачил себя, если скажу, что API util.concurrent пинают задницу гепарда, когда классы доступны с 2004 года. Тем не менее, есть некоторые интересные функции, которые я хотел бы посетить. Эксперты по параллелизму, сейчас самое время закрыть это окно. Все остальные, будьте внимательны к веселой поездке.

Ты не забудешь свои корни

Executor — это корневой интерфейс с единственным методом execute. Все, что реализует интерфейс Runnable, может быть передано в качестве параметра. Однако Silly Executor не поддерживает Callable.

Хорошие новости: интерфейс ExecutorService, который расширяет Executor, добавляет поддержку Callable. Его класс реализации — ThreadPoolExecutor.

Я собираюсь притвориться, что интерфейс ScheduledExecutorService и его класс реализации ScheduledThreadPoolExecutor не существует, поскольку они просто добавляют возможности планирования поверх ExecutorService и ThreadPoolExecutor. Но помните этот класс, когда мощного, но скучного java.util.Timer недостаточно, а полноценного внешнего планировщика слишком много.

Если вы новичок в параллелизме или забыли разницу между Callable и Runnable, вы можете прочитать немного, прежде чем читать дальше. Манекен гид здесь

Факты ExecutorService.submit:

Три варианта отправки:

Будущее представить (вызываемое задание)
Future submit (Выполненная задача)
Future submit (Выполненная задача, T результат)

  1. Метод submit ExecutorService перегружен и принимает либо Callable либо Runnable .
  2. Поскольку метод run для Runnable возвращает void, неудивительно, что Future.get всегда возвращает null, когда задача завершена.
    1
    Future<?>   submit(Runnable task)
  3. Другой перегруженный метод submit который принимает Runnable и универсальный, возвращает результат, который вы передали в качестве второго параметра в качестве результата.
    1
    <T> Future<T>   submit(Runnable task, T result)

Фактически, открыв код ( FutureTask ), вы заметите, что вложенный класс Executors класса RunnableAdapter просто хранит результат и возвращает тот же результат после завершения метода run.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
static final class RunnableAdapter<T> implements Callable<T> {
     final Runnable task;
  final T result;
 
  RunnableAdapter(Runnable task, T result) {
         this.task = task;
         this.result = result;
        }
 
        public T  [More ...] call() {
           task.run();
           return result;
        }
}

Источник RunnableAdapter

В обоих случаях, если вы хотите (вы должны!) Завершить программу вместо потока вашего исполнителя, блокирующего программу и входящего в занятый цикл , вы должны вызвать метод shutdown, как в

1
executorService.shutdown()

факты закрытия

Вы можете представить себе shutdown как полузакрытую дверь торгового центра. Новые клиенты не будут впущены, но существующие клиенты могут покинуть торговый центр, как только они будут сделаны.

Чтобы повторить,

  1. shutdown — это вежливый метод. Это фактически не закрывает задачи в пуле немедленно. Это просто говорит о том, что никакие новые задачи не будут приняты.
  2. Если вы не выполняете свои задачи с помощью invokeAll , вам нужно будет дождаться завершения всех выполняемых задач. Это достигается путем вызова метода awaitTermination . (invokeAll и отправьте примеры внизу поста)
  3. Как только все текущие задачи выполнены, служба исполнителя завершает работу.

Если вам нужен невежливый, навязчивый метод, который не заботится о том, выполняются ли текущие потоки с их задачами, тогда shutdownNow — ваш парень Тем не менее, нет никакой гарантии, что метод остановит службу на точке, но это самое близкое к вам немедленное отключение.

На awaitTermination вы можете указать период ожидания, до которого основной поток должен ждать потоков пула для выполнения своих задач.

1
2
3
4
5
6
7
8
9
ExecutorService executorService=Executors.newFixedThreadPool(10);
    
    future = executorService.submit(getInstanceOfCallable(count,sum));
    
    executorService.shutdown();
 
    if (executorService.awaitTermination(10, TimeUnit.SECONDS)){
        System.out.println('All threads done with their jobs');
    }

Исполнители — заводской парень

Классы выше, все классные. Но, скажем, вы хотели создать однопотокового исполнителя, вы бы написали что-то вроде

1
new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());

Сравните это с

1
Executors.newSingleThreadExecutor()

Итак, поехали. Executors — это класс с фабричными методами для создания различных форм службы executor с некоторыми часто используемыми значениями по умолчанию. Обратите внимание, что, кроме удивительных фабричных методов, он не приносит никаких новых возможностей.

Рекомендуется быстро взглянуть на реализацию фабричных методов и проверить, соответствует ли она вашим потребностям.

Вызов все и отправка

All часть метода invokeAll в ExecutorService не удивляет. Это просто говорит о том, что вам нужно передать в коллекцию Callable s. Опять же, как и ожидалось, метод не вернется, пока все потоки не выполнят свои задачи. Таким образом, для случаев, когда вы заинтересованы в результате только после завершения всех заданий, invokeAll — ваш парень.

С другой стороны, метод submit возвращается сразу после того, как вызываемый объект передается в службу executor. Если вы ничего не делаете в своем методе call вашего Callable , ваши рабочие потоки в идеале должны работать, когда возвращается метод submit .

Следующие примеры могут быть полезны для вашей справки. Программа просто пытается найти сумму всех натуральных чисел до 100 (грубая сила, конечно)

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package me.rerun.incubator;
 
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
 
public class ExecutorInvokeAll {
 
 public void runApp() throws InterruptedException, ExecutionException{
 
                //variable to store the sum
  AtomicInteger sum=new AtomicInteger();
 
                //Use our friendly neighbourhood factory method of the Executors.
  ExecutorService executorService=Executors.newFixedThreadPool(10);
 
  List<Callable<AtomicInteger>> callableList=new ArrayList<Callable<AtomicInteger>>();
 
  for (int count = 0; count <= 100;count++) {
   callableList.add(getInstanceOfCallable(count,sum));
 
  }
 
                //returns only after all tasks are complete
  List<Future<AtomicInteger>> resultFuture = executorService.invokeAll(callableList);
 
  //Prints 5050 all through
  for (Future<AtomicInteger> future : resultFuture) {
   //Didn't deliberately put a timeout here for the get method. Remember, the invoke All does not return until the task is done.
   System.out.println("Status of future : " + future.isDone() +". Result of future : "+future.get().get());
  }
 
 
  executorService.shutdown();
 
  // You might as well call a resultFuture.get(0).get().get() and that would give you the same
  //result since all your worker threads hold reference to the same atomicinteger sum.
  System.out.println("Final Sum : "+sum);
 
 }
 //Adds count to the sum and returns the reference of the sum as the result
 private Callable<AtomicInteger> getInstanceOfCallable(final int count, final AtomicInteger sum) {
 
  Callable<AtomicInteger> clientPlanCall=new Callable<AtomicInteger>(){
   public AtomicInteger call() {
    sum.addAndGet(count);
    System.out.println("Intermediate sum :"+sum);
    return sum;
   }
  };
 
  return clientPlanCall;
 }
 
 public static void main(String[] args) throws ExecutionException {
 
  try {
   new ExecutorInvokeAll().runApp();
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 }
 
}
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package me.rerun.incubator;
 
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
 
public class ExecutorSubmit {
 
 public void runApp() throws InterruptedException, ExecutionException{
 
                //holder for the total sum
  AtomicInteger sum=new AtomicInteger();
 
                //Use the factory method of Executors
  ExecutorService executorService=Executors.newFixedThreadPool(10);
 
  Future<AtomicInteger> future = null;
 
  for (int count = 0; count <= 100; count++) {
   future = executorService.submit(getInstanceOfCallable(count,sum));
   //prints intermediate sum
   try {
    System.out.println("Status of future : " + future.isDone() +". Result of future : "+future.get(1000, TimeUnit.MILLISECONDS).get());
   } catch (TimeoutException e) {
    System.out.println("<IGNORE> Timeout exception for count : "+count);
    //e.printStackTrace();
   }
   //System.out.println("Result of future : "+future.get().get() +".Status of future : " + future.isDone());
  }
 
 
  executorService.shutdown();
 
  if (executorService.awaitTermination(10, TimeUnit.SECONDS)){
   System.out.println("All threads done with their jobs");
  }
  //exec
  System.out.println("Final Sum : "+sum);
 
 }
 
        //Adds count to the sum and returns the reference of the sum as the result
 private Callable<AtomicInteger> getInstanceOfCallable(final int count, final AtomicInteger sum) {
 
 
 
  Callable<AtomicInteger> clientPlanCall=new Callable<AtomicInteger>(){
   public AtomicInteger call() {
    sum.addAndGet(count);
    //System.out.println("Intermediate sum :"+sum);
    return sum;
   }
  };
 
  return clientPlanCall;
 }
 
 public static void main(String[] args) throws ExecutionException {
 
  try {
   new ExecutorSubmit().runApp();
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 }
 
}

Дальнейшее чтение :

Потрясающий блог Алекса Миллера

Совпадения Алекса Миллера

Статья Фогеллы о сравнении с оригинальным API

Хорошее введение в параллелизм в целом

Настоятельно рекомендуется книга по параллелизму Java

Приятного кодирования и не забудьте поделиться!

Ссылка: Ленивые разработчики Введение в Java Параллельные исполнители от нашего партнера JCG Аруна Маниваннана в блоге Rerun.me .