Абстракция ExecutorService
существует с Java 5. Мы говорим о 2004 году здесь. Просто быстрое напоминание: Java 5 и 6 больше не поддерживаются, Java 7 не будет через полгода . Причина, по которой я это поднимаю, заключается в том, что многие Java-программисты до сих пор не до конца понимают, как работает ExecutorService
. Есть много мест, где это можно узнать, сегодня я хотел поделиться несколькими менее известными функциями и практиками. Однако эта статья все еще нацелена на программистов среднего уровня, ничего особенно продвинутого.
1. Имя пула потоков
Я не могу это подчеркнуть. При выгрузке потоков работающей JVM или во время отладки используется схема именования пула потоков по умолчанию: pool-N-thread-M
, где N
обозначает порядковый номер пула (каждый раз, когда вы создаете новый пул потоков, глобальный счетчик N
увеличивается) и M
порядковый номер потока в пуле Например, pool-2-thread-3
означает третий поток во втором пуле, созданный в жизненном цикле JVM. Смотрите: Executors.defaultThreadFactory()
. Не очень описательно. JDK немного усложняет правильное именование потоков, потому что стратегия именования скрыта внутри ThreadFactory
. К счастью, у Guava есть класс помощника для этого:
1
2
3
4
5
6
7
|
import com.google.common.util.concurrent.ThreadFactoryBuilder; final ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat( "Orders-%d" ) .setDaemon( true ) .build(); final ExecutorService executorService = Executors.newFixedThreadPool( 10 , threadFactory); |
По умолчанию пулы потоков создают потоки, не являющиеся демонами, решите, подходит ли вам это или нет.
2. Переключайте имена в соответствии с контекстом
Это уловка, которую я узнал из Supercharged jstack: Как отлаживать ваши серверы на скорости 100 миль в час . Как только мы вспомним имена потоков, мы сможем изменить их во время выполнения, когда захотим! Это имеет смысл, поскольку в дампах потоков отображаются классы и имена методов, а не параметры и локальные переменные. Изменяя имя потока, чтобы сохранить некоторый существенный идентификатор транзакции, мы можем легко отслеживать, какое сообщение / запись / запрос / и т.д. медленный или вызванный тупик. Пример:
01
02
03
04
05
06
07
08
09
10
11
12
|
private void process(String messageId) { executorService.submit(() -> { final Thread currentThread = Thread.currentThread(); final String oldName = currentThread.getName(); currentThread.setName( "Processing-" + messageId); try { //real logic here... } finally { currentThread.setName(oldName); } }); } |
Внутри try
— finally
блок текущего потока называется Processing-WHATEVER-MESSAGE-ID-IS
. Это может пригодиться при отслеживании потока сообщений через систему.
3. Явное и безопасное отключение
Между клиентскими потоками и пулом потоков существует очередь задач. Когда ваше приложение закрывается, вы должны позаботиться о двух вещах: что происходит с поставленными в очередь задачами и как ведут себя уже запущенные задачи (подробнее об этом позже). Удивительно, но многие разработчики не закрывают пул потоков должным образом или сознательно. Есть два метода: либо разрешить выполнение всех поставленных в очередь задач ( shutdown()
), либо отбросить их ( shutdownNow()
) — это полностью зависит от вашего варианта использования. Например, если мы отправили несколько задач и хотим вернуться, как только все они будут выполнены, используйте shutdown()
:
1
2
3
4
5
6
7
8
|
private void sendAllEmails(List<String> emails) throws InterruptedException { emails.forEach(email -> executorService.submit(() -> sendEmail(email))); executorService.shutdown(); final boolean done = executorService.awaitTermination( 1 , TimeUnit.MINUTES); log.debug( "All e-mails were sent so far? {}" , done); } |
В этом случае мы отправляем несколько электронных писем, каждое из которых является отдельной задачей в пуле потоков. После отправки этих задач мы закрываем пул, чтобы он больше не принимал новые задачи. Затем мы ждем максимум одну минуту, пока все эти задачи не будут выполнены. Однако, если некоторые задачи еще awaitTermination()
, awaitTermination()
просто вернет false
. Более того, незавершенные задачи продолжат обработку. Я знаю, что хипстеры пойдут на:
1
|
emails.parallelStream().forEach( this ::sendEmail); |
Назовите меня старомодным, но мне нравится контролировать количество параллельных потоков. Неважно, альтернатива изящному shutdown()
— shutdownNow()
:
1
2
|
final List<Runnable> rejected = executorService.shutdownNow(); log.debug( "Rejected tasks: {}" , rejected.size()); |
На этот раз все поставленные в очередь задачи отбрасываются и возвращаются. Уже запущенные задания могут быть продолжены.
4. Обращайтесь с прерыванием осторожно
Менее известной особенностью интерфейса Future
является отмена. Вместо того, чтобы повторяться, посмотрите мою старую статью: InterruptedException и прерывание потоков объяснили
5. Контролируйте длину очереди и держите ее ограниченной
Пулы потоков неправильного размера могут вызвать медлительность, нестабильность и утечку памяти. Если вы сконфигурируете слишком мало потоков, очередь будет накапливаться, занимая много памяти. С другой стороны, слишком большое количество потоков замедлит работу всей системы из-за чрезмерного переключения контекста и приведет к тем же симптомам. Важно смотреть на глубину очереди и ограничивать ее, чтобы перегруженный пул потоков просто временно отклонял новые задачи:
1
2
3
4
|
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>( 100 ); executorService = new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, queue); |
LinkedBlockingQueue
выше код эквивалентен Executors.newFixedThreadPool(n)
, однако вместо неограниченного по умолчанию LinkedBlockingQueue
мы используем ArrayBlockingQueue
с фиксированной емкостью 100
. Это означает, что если 100 задач уже поставлены в очередь (и n
выполняется), новая задача будет отклонена с RejectedExecutionException
. Также, поскольку queue
теперь доступна извне, мы можем периодически вызывать size()
и помещать его в logs / JMX / любой механизм мониторинга, который вы используете.
6. Помните об обработке исключений
Каков будет результат следующего фрагмента?
1
2
3
|
executorService.submit(() -> { System.out.println( 1 / 0 ); }); |
Слишком много раз меня это укусило: оно ничего не напечатает. Нет признаков java.lang.ArithmeticException: / by zero
, ничего. Пул потоков просто проглатывает это исключение, как если бы оно никогда не происходило. Если бы это был хороший java.lang.Thread
созданный с нуля, UncaughtExceptionHandler
мог бы работать. Но с пулами потоков вы должны быть более осторожны. Если вы отправляете Runnable
(без какого-либо результата, как описано выше), вы должны окружить все тело try
catch
и, по крайней мере, зарегистрировать его. Если вы отправляете Callable<Integer>
, убедитесь, что вы всегда разыменовываете его, используя блокировку get()
для повторного выброса исключения:
1
2
3
|
final Future<Integer> division = executorService.submit(() -> 1 / 0 ); //below will throw ExecutionException caused by ArithmeticException division.get(); |
Интересно, что даже Spring Framework сделал эту ошибку с @Async
, см .: SPR-8995 и SPR-12090 .
7. Контролировать время ожидания в очереди
Мониторинг глубины очереди работ — одна сторона. Однако при устранении неполадок в отдельной транзакции / задаче стоит посмотреть, сколько времени прошло между отправкой задачи и ее фактическим выполнением. Эта длительность предпочтительно должна быть близка к 0 (когда в пуле был какой-то незанятый поток), однако она будет расти, когда задача должна быть поставлена в очередь. Более того, если в пуле нет фиксированного числа потоков, запуск новой задачи может потребовать порождения потока, что также потребует небольшого количества времени. Чтобы точно отслеживать этот показатель, оберните оригинальный ExecutorService
чем-то похожим на это:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
public class WaitTimeMonitoringExecutorService implements ExecutorService { private final ExecutorService target; public WaitTimeMonitoringExecutorService(ExecutorService target) { this .target = target; } @Override public <T> Future<T> submit(Callable<T> task) { final long startTime = System.currentTimeMillis(); return target.submit(() -> { final long queueDuration = System.currentTimeMillis() - startTime; log.debug( "Task {} spent {}ms in queue" , task, queueDuration); return task.call(); } ); } @Override public <T> Future<T> submit(Runnable task, T result) { return submit(() -> { task.run(); return result; }); } @Override public Future<?> submit(Runnable task) { return submit( new Callable<Void>() { @Override public Void call() throws Exception { task.run(); return null ; } }); } //... } |
Это не полная реализация, но вы получите основную идею. В тот момент, когда мы отправляем задачу в пул потоков, мы сразу начинаем измерять время. Мы останавливаемся, как только задача подобрана и начинается выполнение. Не обманывайтесь близкой близостью startTime
и queueDuration
в исходном коде. Фактически эти две строки оцениваются в разных потоках, вероятно, с интервалом в миллисекунды или даже секунды, например:
1
|
Task com.nurkiewicz.MyTask @7c7f3894 spent 9883ms in queue |
8. Сохраните трассировку стека клиента
Реактивное программирование, кажется, привлекает много внимания в эти дни. Реактивный манифест , реактивные потоки , RxJava (только что выпущенный 1.0!), Агенты Clojure , scala.rx … Все они прекрасно работают, но трассировка стека больше не ваш друг, но в большинстве случаев бесполезна. Возьмем, к примеру, исключение, возникающее в задаче, переданной в пул потоков:
1
2
3
4
5
6
7
|
java.lang.NullPointerException: null at com.nurkiewicz.MyTask.call(Main.java: 76 ) ~[classes/:na] at com.nurkiewicz.MyTask.call(Main.java: 72 ) ~[classes/:na] at java.util.concurrent.FutureTask.run(FutureTask.java: 266 ) ~[na: 1.8 . 0 ] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java: 1142 ) ~[na: 1.8 . 0 ] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 617 ) ~[na: 1.8 . 0 ] at java.lang.Thread.run(Thread.java: 744 ) ~[na: 1.8 . 0 ] |
Мы можем легко обнаружить, что MyTask
бросил NPE в строке 76. Но мы понятия не имеем, кто отправил эту задачу, потому что трассировка стека показывает только Thread
и ThreadPoolExecutor
. Мы можем технически перемещаться по исходному коду в надежде найти только одно место, где MyTask
. Но без потоков (не говоря уже о событийном, реактивном, актерском-ниндзя-программировании) мы бы сразу увидели полную картину. Что если бы мы могли сохранить трассировку стека клиентского кода (который отправил задачу) и показать его, например, в случае сбоя? Идея не нова, например, Hazelcast распространяет исключения из узла-владельца в код клиента . Вот как могла бы выглядеть наивная поддержка для отслеживания клиентского стека в случае сбоя:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
public class ExecutorServiceWithClientTrace implements ExecutorService { protected final ExecutorService target; public ExecutorServiceWithClientTrace(ExecutorService target) { this .target = target; } @Override public <T> Future<T> submit(Callable<T> task) { return target.submit(wrap(task, clientTrace(), Thread.currentThread().getName())); } private <T> Callable<T> wrap( final Callable<T> task, final Exception clientStack, String clientThreadName) { return () -> { try { return task.call(); } catch (Exception e) { log.error( "Exception {} in task submitted from thrad {} here:" , e, clientThreadName, clientStack); throw e; } }; } private Exception clientTrace() { return new Exception( "Client stack trace" ); } @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { return tasks.stream().map( this ::submit).collect(toList()); } //... } |
На этот раз в случае сбоя мы получим полную трассировку стека и имя потока места, где было отправлено задание. Гораздо ценнее по сравнению со стандартным исключением, замеченным ранее:
01
02
03
04
05
06
07
08
09
10
|
Exception java.lang.NullPointerException in task submitted from thrad main here: java.lang.Exception: Client stack trace at com.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace(ExecutorServiceWithClientTrace.java: 43 ) ~[classes/:na] at com.nurkiewicz.ExecutorServiceWithClientTrace.submit(ExecutorServiceWithClientTrace.java: 28 ) ~[classes/:na] at com.nurkiewicz.Main.main(Main.java: 31 ) ~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na: 1.8 . 0 ] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: 62 ) ~[na: 1.8 . 0 ] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java: 43 ) ~[na: 1.8 . 0 ] at java.lang.reflect.Method.invoke(Method.java: 483 ) ~[na: 1.8 . 0 ] at com.intellij.rt.execution.application.AppMain.main(AppMain.java: 134 ) ~[idea_rt.jar:na] |
9. Предпочитают CompletableFuture
В Java 8 был представлен более мощный CompletableFuture
. Пожалуйста, используйте его, когда это возможно. ExecutorService
не был расширен для поддержки этой расширенной абстракции, поэтому вы должны позаботиться об этом самостоятельно. Вместо:
1
2
|
final Future<BigDecimal> future = executorService.submit( this ::calculate); |
делать:
1
2
|
final CompletableFuture<BigDecimal> future = CompletableFuture.supplyAsync( this ::calculate, executorService); |
CompletableFuture
расширяет Future
поэтому все работает, как раньше. Но более продвинутые пользователи вашего API по-настоящему оценят расширенную функциональность, предоставляемую CompletableFuture
.
10. Синхронная очередь
SynchronousQueue
— интересная BlockingQueue
которая на самом деле не является очередью. Это даже не структура данных как таковая . Это лучше всего объяснить как очередь с емкостью 0. Цитируя JavaDoc:
каждая операция
insert
должна ожидать соответствующей операцииremove
другим потоком, и наоборот. Синхронная очередь не имеет внутренней емкости, даже емкости одной. Вы не можете заглянуть в синхронную очередь, потому что элемент присутствует только при попытке удалить его; вы не можете вставить элемент (используя любой метод), если другой поток не пытается удалить его; Вы не можете повторять, так как нечего повторять. […]Синхронные очереди похожи на каналы рандеву, используемые в CSP и Ada.
Как это связано с пулами потоков? Попробуйте использовать SynchronousQueue
с ThreadPoolExecutor
:
1
2
3
4
|
BlockingQueue<Runnable> queue = new SynchronousQueue<>(); ExecutorService executorService = new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, queue); |
Мы создали пул потоков с двумя потоками и SynchronousQueue
перед ним. Поскольку SynchronousQueue
по сути, является очередью с нулевой емкостью, такой ExecutorService
будет принимать новые задачи, только если имеется свободный поток. Если все потоки заняты, новая задача будет немедленно отклонена и никогда не будет ждать. Такое поведение может быть желательным, когда обработка в фоновом режиме должна начаться немедленно или отбрасываться.
Вот и все, надеюсь, вы нашли хотя бы одну интересную особенность!
Ссылка: | ExecutorService — 10 советов и рекомендаций от нашего партнера по JCG Томаша Нуркевича в блоге, посвященном Java и соседству . |