Это образец главы, взятой из книги « Практическая Java EE 7» по книге WildFly, под редакцией
В этой главе обсуждается новый API параллелизма Java EE (JSR 236), в котором описывается стандартный способ параллельного выполнения задач на контейнере Java EE с использованием набора управляемых ресурсов. Чтобы описать, как использовать этот API в ваших приложениях, мы будем следовать этой схеме:
- Краткое введение в утилиты параллелизма
- Как использовать асинхронные задачи, используя ManagedExecutorService
- Как планировать задачи в определенное время, используя ManagedScheduledExecutorService
- Как создать динамические прокси-объекты, которые добавляют контекстную информацию, доступную в среде Java EE
- Как использовать ManagedThreadFactory для создания управляемых потоков, которые будут использоваться вашими приложениями
Обзор параллельных утилит
До Java EE 7 выполнение параллельных задач в контейнере Java EE было широко признано опасной практикой, а иногда даже запрещено контейнером:
«Корпоративный компонент не должен пытаться управлять потоками. Корпоративный компонент не должен пытаться запускать, останавливать, приостанавливать или возобновлять поток или изменять приоритет или имя потока. Корпоративный компонент не должен пытаться управлять группами потоков ».
На самом деле, создание собственных неуправляемых потоков в контейнере Java EE с использованием J2SE API не гарантирует, что контекст контейнера будет передан потоку, выполняющему задачу.
Единственным доступным шаблоном было либо использование асинхронного EJB или Message Driven Bean , чтобы выполнить задачу асинхронным способом; Чаще всего этого было достаточно для простых моделей выстрелов и забываний, но контроль над Нитями все еще лежал в руках Контейнера.
 С API параллелизма Java EE (JSR 236) вы можете использовать расширения API java.util.concurrent в качестве управляемых ресурсов , то есть управляемых контейнером.  Единственное отличие от стандартного программирования J2SE состоит в том, что вы извлекаете ваши управляемые ресурсы из дерева JNDI контейнера.  Тем не менее, вы все равно будете использовать свои интерфейсы Runnable или классы, которые являются частью пакета java.util.concurrent , такие как Future или ScheduledFuture . 
  В следующем разделе мы начнем с самого простого примера, который выполняет асинхронную задачу с использованием ManagedExecutorService . 
Использование ManagedExecutorService для отправки задач
  Чтобы создать наше первое асинхронное выполнение, мы покажем, как использовать ManagedExecutorService , который расширяет Java SE ExecutorService для предоставления методов для отправки задач для выполнения в среде Java EE.  С помощью этой управляемой службы контекст контейнера передается потоку, выполняющему задачу: ManagedExecutorService включен как часть конфигурации EE сервера приложений: 
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | <subsystemxmlns="urn:jboss:domain:ee:2.0">. . .   <concurrent>. . . .        <managed-executor-services>             <managed-executor-servicename="default"                   jndi-name="java:jboss/ee/concurrency/executor/default"                   context-service="default"hung-task-threshold="60000"                   core-threads="5"max-threads="25"keepalive-time="5000"/>        </managed-executor-services>. . . .   </concurrent></subsystem> | 
Чтобы создать наш первый пример, мы извлекаем ManagedExecutorService из контекста JNDI контейнера следующим образом:
| 1 2 3 | @Resource(name = "DefaultManagedExecutorService")ManagedExecutorService executor; | 
  Используя экземпляр ManagedExecutorService, вы можете отправлять свои задачи, которые могут реализовывать либо интерфейс java.lang.Runnable либо интерфейс java.util.concurrent.Callable . 
  Вместо использования метода run() интерфейс Callable предлагает метод call() , который может возвращать любой универсальный тип. 
Кодирование простой асинхронной задачи
Итак, давайте посмотрим на простой пример сервлета, который запускает асинхронную задачу с использованием ManagedExecutorService:
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 | @WebServlet("/ExecutorServlet")publicclassExecutorServlet extendsHttpServlet {    @Resource(name = "DefaultManagedExecutorService")    ManagedExecutorService executor;       protectedvoiddoGet(HttpServletRequest request, HttpServletResponse response) throwsServletException, IOException {               PrintWriter writer = response.getWriter();                        executor.execute(newSimpleTask());                       writer.write("Task SimpleTask executed! check logs");             }} | 
  Класс SimpleTask в нашем примере реализует интерфейс Runnable , обеспечивая параллельное выполнение. 
| 01 02 03 04 05 06 07 08 09 10 | publicclassSimpleTask implementsRunnable {       @Override       publicvoidrun() {             System.out.println("Thread started.");       }} | 
Получение результата из асинхронной задачи
Вышеуказанная задача является хорошим вариантом для практического сценария; как вы могли заметить, нет способа перехватить возвращаемое значение из Задачи. Кроме того, при использовании Runnable вы вынуждены использовать непроверенные исключения (если run ( ) сгенерировал проверенное исключение, кто его перехватит? Нет способа для вас включить этот вызов run () в обработчик, так как вы не написать код, который вызывает его).
  Если вы хотите преодолеть эти ограничения, вы можете вместо этого реализовать интерфейс java.util.concurrent.Callable , передать его в ExecutorService и ожидать результата с помощью FutureTask.isDone() возвращенного ExecutorService.submit() . 
  Давайте посмотрим на новую версию нашего сервлета, которая фиксирует результат задачи с именем CallableTask : 
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | @WebServlet("/CallableExecutorServlet")publicclassCallableExecutorServlet extendsHttpServlet {    @Resource(name = "DefaultManagedExecutorService")    ManagedExecutorService executor;       protectedvoiddoGet(HttpServletRequest request, HttpServletResponse response) throwsServletException, IOException {             PrintWriter writer = response.getWriter();                        Future<Long> futureResult = executor.submit(newCallableTask(5));                                 while(!futureResult.isDone()) {                    // Wait                    try{                           Thread.sleep(100);                    } catch(InterruptedException e) {                           e.printStackTrace();                    }             }             try{                    writer.write("Callable Task returned "+futureResult.get());             } catch( Exception e) {                    e.printStackTrace();             }        }} | 
Как видно из кода, мы запрашиваем завершение задачи с помощью метода isDone ( ) . Когда задача выполнена, мы можем вызвать метод get ( ) из FutureTask и получить возвращаемое значение.
  Теперь давайте посмотрим нашу реализацию CallableTask которая в нашем примере возвращает значение суммирования числа: 
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | publicclassCallableTask implementsCallable<Long> {       privateintid;       publicCallableTask(intid) {             this.id = id;       }       publicLong call() {             longsummation = 0;             for(inti = 1; i <= id; i++) {                    summation += i;             }             returnnewLong(summation);       }} | 
  В нашем примере все, что нам нужно было сделать, — это реализовать метод call , который возвращает Integer, который в конечном итоге будет собран через метод get интерфейса Future. 
  Если ваша задача Callable вызвала исключение, то FutureTask.get() также вызовет исключение, и к исходному исключению можно получить доступ с помощью Exception.getCause() 
Мониторинг состояния будущей задачи
  В приведенном выше примере мы проверяем состояние Future Task, используя метод FutureTask.isDone() .  Если вам нужен более точный контроль над жизненным циклом Future Task, вы можете реализовать экземпляр javax.enterprise.concurrent.ManagedTaskListener , чтобы получать уведомления о событиях жизненного цикла. 
  Вот наша расширенная задача, которая реализует taskSubmitting , taskStarting , taskDone и taskAborted : 
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | publicclassCallableListenerTask implementsCallable<Long>,ManagedTaskListener {       privateintid;       publicCallableListenerTask(intid) {             this.id = id;       }       publicLong call() {             longsummation = 0;             for(inti = 1; i <= id; i++) {                    summation += i;             }             returnnewLong(summation);       }       publicvoidtaskSubmitted(Future<?> f, ManagedExecutorService es,                    Object obj) {             System.out.println("Task Submitted! "+f);       }       publicvoidtaskDone(Future<?> f, ManagedExecutorService es, Object obj,                    Throwable exc) {             System.out.println("Task DONE! "+f);       }       publicvoidtaskStarting(Future<?> f, ManagedExecutorService es,                    Object obj) {             System.out.println("Task Starting! "+f);       }       publicvoidtaskAborted(Future<?> f, ManagedExecutorService es,                    Object obj, Throwable exc) {             System.out.println("Task Aborted! "+f);       }} | 
Уведомления о жизненном цикле вызываются в следующем порядке:
-   taskSubmitting: по представлениюtaskSubmittingИсполнителю
-   taskStarting: до фактического запуска Task
-   taskDone:taskDoneпри завершении задачи
-   taskAborted: срабатывает, когда пользователь вызывает futureResult.cancel ()
Использование транзакции в асинхронных задачах
  В распределенной среде Java EE это сложная задача — гарантировать правильное выполнение транзакций и для параллельных выполнений задач.  API параллелизма Java EE использует Java Transaction API (JTA) для поддержки транзакций поверх своих компонентов через javax.transaction.UserTransaction который используется для явного разграничения границ транзакций. 
Следующий код показывает, как вызываемая задача получает UserTransaction из дерева JNDI, а затем запускает и фиксирует транзакцию с внешним компонентом (EJB):
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | publicclassTxCallableTask implementsCallable<Long> {       longid;       publicTxCallableTask(longi) {             this.id = i;       }       publicLong call() {             longvalue = 0;             UserTransaction tx = lookupUserTransaction();             SimpleEJB ejb = lookupEJB();             try{                    tx.begin();                    value = ejb.calculate(id); // Do Transactions here                    tx.commit();             } catch(Exception e) {                    e.printStackTrace();                    try{  tx.rollback(); } catch(Exception e1) {        e1.printStackTrace(); }             }             returnvalue;       }// Lookup EJB and UserTransaction here ..} | 
Основным ограничением этого подхода является то, что, хотя объекты контекста могут начинать, фиксировать или откатывать транзакции, эти объекты не могут быть включены в транзакции родительского компонента.
Планирование задач с помощью ManagedScheduledExecutorService
  ManagedScheduledExecutorService расширяет Java SE ScheduledExecutorService предоставляя методы для отправки отложенных или периодических задач для выполнения в среде Java EE.  Что касается других управляемых объектов, вы можете получить экземпляр ExecutorService через поиск JNDI: 
| 1 2 | @Resource(name ="DefaultManagedScheduledExecutorService")ManagedScheduledExecutorService scheduledExecutor; | 
  Получив ссылку на ExecutorService, вы можете вызвать метод schedule для отправки отложенных или периодических задач.  ScheduledExecutors, как и ManagedExecutors, могут быть связаны либо с интерфейсом Runnable, либо с интерфейсом Callable .  Следующий раздел показывает оба подхода. 
Отправка простой ScheduledTask
В простейшей форме для отправки запланированной задачи требуется настроить выражение расписания и передать его в службу ManagedSchedulerExecutor. В этом примере мы создаем отложенную задачу, которая будет запускаться только один раз за 10 секунд, так как вызывается метод schedule ( ) :
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 | @WebServlet("/ScheduledExecutor")publicclassScheduledExecutor extendsHttpServlet {       @Resource(name ="DefaultManagedScheduledExecutorService")       ManagedScheduledExecutorService scheduledExecutor;       protectedvoiddoGet(HttpServletRequest request, HttpServletResponse response) throwsServletException, IOException {             PrintWriter writer = response.getWriter();                        ScheduledFuture<?> futureResult = scheduledExecutor.schedule(newSimpleTask(),       10,TimeUnit.SECONDS);             writer.write("Waiting 10 seconds before firing the task");       }} | 
  Если вам нужно планировать вашу задачу несколько раз , то вы можете использовать метод scheduleAtFixedRate , который принимает в качестве входных данных время до запуска задачи, время перед каждым повторным выполнением и TimeUnit.  Посмотрите следующий пример, который планирует Задачу каждые 10 секунд секунд после начальной задержки в 1 секунду: 
| 1 | ScheduledFuture<?> futureResult = scheduledExecutor. scheduleAtFixedRate (newSimpleTask(),1, 10,TimeUnit.SECONDS); | 
Захват результата запланированного выполнения
  Если вам нужно получить возвращаемое значение из задачи, которую планируется выполнить, то вы можете использовать интерфейс ScheduledFuture который возвращается методом schedule .  Вот пример, который фиксирует результат из нашего факториального примера Task, который мы ранее кодировали: 
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | ScheduledFuture<Long> futureResult =                    scheduledExecutor.schedule(newCallableTask(5), 5, TimeUnit.SECONDS);                 while(!futureResult.isDone()) {              try{             Thread.sleep(100); // Wait       } catch(InterruptedException e) {                                e.printStackTrace();       }}     try{       writer.write("Callable Task returned "+futureResult.get());} catch( Exception e) {       e.printStackTrace();} | 
Создание управляемых потоков с использованием ManagedThreadFactory
  javax.enterprise.concurrent.ManagedThreadFactory является эквивалентом J2SE ThreadFactory, который можно использовать для создания собственных потоков.  Чтобы использовать ManagedThreadFactory, вам нужно добавить его из JNDI как обычно: 
| 1 2 3 | @Resource(name ="DefaultManagedThreadFactory")ManagedThreadFactory factory; | 
Основное преимущество создания собственных управляемых потоков из фабрики (по сравнению с теми, которые создаются ManagedExecutorService) заключается в том, что вы можете установить некоторые типичные свойства потоков (например, имя или приоритет) и создать управляемую версию службы исполнителя J2SE. , Следующие примеры покажут вам как.
Создание управляемых потоков с фабрики
  В этом примере мы создадим и запустим новый DefaultManagedThreadFactory используя DefaultManagedThreadFactory .  Как видно из кода, после того, как мы создали экземпляр класса Thread, мы можем установить для него значимое имя и связать его с приоритетом.  Затем мы свяжем Thread с нашей SimpleTask, которая регистрирует некоторые данные на консоли: 
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | @WebServlet("/FactoryExecutorServlet")publicclassFactoryExecutorServlet extendsHttpServlet {       @Resource(name ="DefaultManagedThreadFactory")       ManagedThreadFactory factory;       protectedvoiddoGet(HttpServletRequest request, HttpServletResponse response) throwsServletException, IOException {             PrintWriter writer = response.getWriter();                         Thread thread = factory.newThread(newSimpleTask());             thread.setName("My Managed Thread");             thread.setPriority(Thread.MAX_PRIORITY);             thread.start();             writer.write("Thread started. Check logs");       }} | 
Теперь проверьте журналы вашего сервера: не сомневайтесь, что легче обнаружить выходные данные ваших самостоятельно созданных потоков:
| 1 | 14:44:31,838 INFO [stdout] (My Managed Thread) Simple Task started | 
Сбор информации об имени потока особенно полезен при анализе дампа потока, а имя потока — единственный способ отследить путь выполнения потока.
Использование службы Managed Executor
  Интерфейс java.util.concurrent.ExecutorService представляет собой стандартный механизм J2SE, который значительно заменил использование прямых потоков для выполнения асинхронных выполнений.  Одним из основных преимуществ ExecutorService перед стандартным механизмом Thread является то, что вы можете определить пул экземпляров для выполнения ваших заданий и что у вас есть более безопасный способ прерывать ваши задания. 
  Использовать ExecutorService в ваших корпоративных приложениях просто: все, что вам нужно сделать, это передать экземпляр вашего Managed ThreadFactory в конструктор вашего ExecutorService .  В следующем примере мы используем SingletonEJB для предоставления ExecutorService в качестве службы в своем методе getThreadPoolExecutor : 
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | @SingletonpublicclassPoolExecutorEJB {       privateExecutorService threadPoolExecutor = null;       intcorePoolSize  =    5;       intmaxPoolSize   =   10;       longkeepAliveTime = 5000;       @Resource(name = "DefaultManagedThreadFactory")       ManagedThreadFactory factory;       publicExecutorService getThreadPoolExecutor() {             returnthreadPoolExecutor;       }       @PostConstruct       publicvoidinit() {                          threadPoolExecutor =  newThreadPoolExecutor(corePoolSize, maxPoolSize,                                       keepAliveTime, TimeUnit.SECONDS,                                  newArrayBlockingQueue<Runnable>(10), factory);             }       @PreDestroy       publicvoidreleaseResources() {             threadPoolExecutor.shutdown();          }} | 
  ThreadPoolExecutor содержит два основных параметра в своем конструкторе: corePoolSize и maximumPoolSize .  Когда в методе передается новая задача и выполняется меньше потоков corePoolSize, создается новый поток для обработки запроса, даже если другие рабочие потоки простаивают.  Если запущено больше потоков corePoolSize, но меньше MaximumPoolSize, новый поток будет создан, только если очередь заполнена. 
  Затем ExecutorService используется для запуска новой асинхронной задачи, как в следующем примере, где в сервлете предоставляется анонимная реализация Runnable: 
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | @WebServlet("/FactoryExecutorServiceServlet")publicclassFactoryExecutorServiceServlet extendsHttpServlet {       @EJBPoolExecutorEJB ejb;       protectedvoiddoGet(HttpServletRequest request, HttpServletResponse response) throwsServletException, IOException {             finalPrintWriter writer = response.getWriter();             writer.write("Invoking ExecutorService. Check Logs.");             ExecutorService executorService = ejb.getThreadPoolExecutor();              executorService.execute(newRunnable() {                    publicvoidrun() {                           System.out.println("Message from your Executor!");                    }             });}} | 
  Как только PoolExecutorEJB завершается, ExecutorService будет также завершен в методе @PreDestroy объекта Singleton, который вызовет метод shutdown() объекта ThreadPoolExecutor.  ExecutorService не будет немедленно закрываться, но он больше не будет принимать новые задачи, и как только все потоки завершат текущие задачи, ExecutorService завершит работу. 
Использование динамических контекстных объектов
  Динамический прокси — это полезная настройка Java, которую можно использовать для создания динамических реализаций интерфейсов с использованием API java.lang.reflect.Proxy .  Вы можете использовать динамические прокси для различных целей, таких как подключение к базе данных и управление транзакциями, динамические фиктивные объекты для модульного тестирования и другие методы перехвата AOP-подобных методов. 
В среде Java EE вы можете использовать специальный тип динамических прокси, называемых динамическими контекстными прокси .
Наиболее интересной особенностью динамических контекстных объектов является то, что контекст именования JNDI, загрузчик классов и контекст безопасности распространяются на проксируемые объекты . Это может быть полезно в контексте, когда вы вносите реализации J2SE в ваши корпоративные приложения и хотите запускать их в контексте контейнера.
В следующем фрагменте показано, как вставить контекстные объекты в контейнер. Так как контекстным объектам также нужен ExecutorService, в который вы можете отправить задачу, ThreadFactory также внедряется:
| 1 2 3 4 5 6 7 | @Resource(name ="DefaultContextService")ContextService cs;@Resource(name ="DefaultManagedThreadFactory")ManagedThreadFactory factory; | 
В следующем разделе мы покажем, как создавать динамические контекстные объекты, используя пересмотренную версию нашего Singleton EJB.
Выполнение контекстных задач
  В следующем примере показано, как запустить контекстный прокси для задачи Callable .  Для этого нам понадобится как ManagedThreadfactory, так и ContextService.  Наш EJB ContextExecutor первоначально создаст ThreadPoolExecutor в своем методе init .  Затем в методе submit создаются новые контекстные прокси для задач Callable, которые передаются исполнителю ThreadPool. 
  Вот код для нашего ContextExecutorEJB : 
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | @SingletonpublicclassContextExecutorEJB {       privateExecutorService threadPoolExecutor = null;       @Resource(name = "DefaultManagedThreadFactory")       ManagedThreadFactory factory;       @Resource(name = "DefaultContextService")       ContextService cs;       publicExecutorService getThreadPoolExecutor() {             returnthreadPoolExecutor;       }       @PostConstruct       publicvoidinit() {             threadPoolExecutor = newThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,                           newArrayBlockingQueue>Runnable>(10), factory);       }       publicFuture>Long> submitJob(Callable>Long> task) {             Callable>Long> proxy = cs.createContextualProxy(task, Callable.class);             returngetThreadPoolExecutor().submit(proxy);       }} | 
  Класс CallableTask немного сложнее, чем в нашем первом примере, так как он собирается регистрировать информацию о javax.security.auth.Subject , которая содержится в потоке вызывающей стороны: 
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | publicclassCallableTask implementsCallable<Long> {       privateintid;       publicCallableTask(intid) {             this.id = id;       }       publicLong call() {             longsummation = 0;             // Do calculation             Subject subject = Subject.getSubject(AccessController.getContext());             logInfo(subject, summation); // Log Traces Subject identity             returnnewLong(summation);       }       privatevoidlogInfo(Subject subject, longsummation) { . .  }} | 
Ниже приведен простой способ отправки новых контекстных задач в наш SingletonEJB:
| 1 2 3 4 5 6 7 8 9 |        @EJBContextExecutorEJB ejb;          protectedvoiddoGet(HttpServletRequest request, HttpServletResponse response) throwsServletException, IOException {                                   CallableTask task = newCallableTask(5);             ejb.submitJob(task);} | 
Строим ваши примеры
Чтобы использовать утилиты Concurrency для Java EE API, в вашем приложении требуется следующая зависимость Maven:
| 1 2 3 4 5 6 7 8 9 | <dependency>       <groupId>org.jboss.spec.javax.enterprise.concurrent</groupId>       <artifactId>jboss-concurrency-api_1.0_spec</artifactId>       <version>1.0.0.Final</version></dependency> | 
 