Статьи

Учебник по API параллелизма Java EE

Это образец главы, взятой из книги « Практическая Java EE 7» по книге WildFly, под редакцией

В этой главе обсуждается новый API параллелизма Java EE (JSR 236), в котором описывается стандартный способ параллельного выполнения задач на контейнере Java EE с использованием набора управляемых ресурсов. Чтобы описать, как использовать этот API в ваших приложениях, мы будем следовать этой схеме:

  • Краткое введение в утилиты параллелизма
  • Как использовать асинхронные задачи, используя ManagedExecutorService
  • Как планировать задачи в определенное время, используя ManagedScheduledExecutorService
  • Как создать динамические прокси-объекты, которые добавляют контекстную информацию, доступную в среде Java EE
  • Как использовать ManagedThreadFactory для создания управляемых потоков, которые будут использоваться вашими приложениями

Обзор параллельных утилит

До Java EE 7 выполнение параллельных задач в контейнере Java EE было широко признано опасной практикой, а иногда даже запрещено контейнером:

«Корпоративный компонент не должен пытаться управлять потоками. Корпоративный компонент не должен пытаться запускать, останавливать, приостанавливать или возобновлять поток или изменять приоритет или имя потока. Корпоративный компонент не должен пытаться управлять группами потоков ».

На самом деле, создание собственных неуправляемых потоков в контейнере Java EE с использованием J2SE API не гарантирует, что контекст контейнера будет передан потоку, выполняющему задачу.

Единственным доступным шаблоном было либо использование асинхронного EJB или Message Driven Bean , чтобы выполнить задачу асинхронным способом; Чаще всего этого было достаточно для простых моделей выстрелов и забываний, но контроль над Нитями все еще лежал в руках Контейнера.

С API параллелизма Java EE (JSR 236) вы можете использовать расширения API java.util.concurrent в качестве управляемых ресурсов , то есть управляемых контейнером. Единственное отличие от стандартного программирования J2SE состоит в том, что вы извлекаете ваши управляемые ресурсы из дерева JNDI контейнера. Тем не менее, вы все равно будете использовать свои интерфейсы Runnable или классы, которые являются частью пакета java.util.concurrent , такие как Future или ScheduledFuture .

В следующем разделе мы начнем с самого простого примера, который выполняет асинхронную задачу с использованием ManagedExecutorService .

Использование ManagedExecutorService для отправки задач

Чтобы создать наше первое асинхронное выполнение, мы покажем, как использовать ManagedExecutorService , который расширяет Java SE ExecutorService для предоставления методов для отправки задач для выполнения в среде Java EE. С помощью этой управляемой службы контекст контейнера передается потоку, выполняющему задачу: ManagedExecutorService включен как часть конфигурации EE сервера приложений:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<subsystem xmlns="urn:jboss:domain:ee:2.0">
 
. . .
 
   <concurrent>
 
. . . .
 
        <managed-executor-services>
 
             <managed-executor-service name="default"
 
                   jndi-name="java:jboss/ee/concurrency/executor/default"
 
                   context-service="default" hung-task-threshold="60000"
 
                   core-threads="5" max-threads="25" keepalive-time="5000"/>
 
        </managed-executor-services>
 
. . . .
 
   </concurrent>
 
</subsystem>

Чтобы создать наш первый пример, мы извлекаем ManagedExecutorService из контекста JNDI контейнера следующим образом:

1
2
3
@Resource(name = "DefaultManagedExecutorService")
 
ManagedExecutorService executor;

Используя экземпляр ManagedExecutorService, вы можете отправлять свои задачи, которые могут реализовывать либо интерфейс java.lang.Runnable либо интерфейс java.util.concurrent.Callable .

Вместо использования метода run() интерфейс Callable предлагает метод call() , который может возвращать любой универсальный тип.

Кодирование простой асинхронной задачи

Итак, давайте посмотрим на простой пример сервлета, который запускает асинхронную задачу с использованием ManagedExecutorService:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
@WebServlet("/ExecutorServlet")
 
public class ExecutorServlet extends HttpServlet {
 
    @Resource(name = "DefaultManagedExecutorService")
 
    ManagedExecutorService executor;
 
       protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 
 
             PrintWriter writer = response.getWriter();          
 
             executor.execute(new SimpleTask());         
 
             writer.write("Task SimpleTask executed! check logs");     
 
       }
 
}

Класс SimpleTask в нашем примере реализует интерфейс Runnable , обеспечивая параллельное выполнение.

01
02
03
04
05
06
07
08
09
10
public class SimpleTask implements Runnable {
 
       @Override
       public void run() {
 
             System.out.println("Thread started.");
 
       }
 
}

Получение результата из асинхронной задачи

Вышеуказанная задача является хорошим вариантом для практического сценария; как вы могли заметить, нет способа перехватить возвращаемое значение из Задачи. Кроме того, при использовании Runnable вы вынуждены использовать непроверенные исключения (если run ( ) сгенерировал проверенное исключение, кто его перехватит? Нет способа для вас включить этот вызов run () в обработчик, так как вы не написать код, который вызывает его).

Если вы хотите преодолеть эти ограничения, вы можете вместо этого реализовать интерфейс java.util.concurrent.Callable , передать его в ExecutorService и ожидать результата с помощью FutureTask.isDone() возвращенного ExecutorService.submit() .

Давайте посмотрим на новую версию нашего сервлета, которая фиксирует результат задачи с именем CallableTask :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@WebServlet("/CallableExecutorServlet")
 
public class CallableExecutorServlet extends HttpServlet {
 
    @Resource(name = "DefaultManagedExecutorService")
    ManagedExecutorService executor;
 
       protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
 
             PrintWriter writer = response.getWriter();
            
             Future<Long> futureResult = executor.submit(new CallableTask(5));                   
 
             while (!futureResult.isDone()) {
 
                    // Wait
                    try {
                           Thread.sleep(100);
                    } catch (InterruptedException e) {
                           e.printStackTrace();
                    }
 
             }
 
             try {
 
                    writer.write("Callable Task returned " +futureResult.get());
 
             } catch ( Exception e) {
                    e.printStackTrace();
             }
 
       }
 
}

Как видно из кода, мы запрашиваем завершение задачи с помощью метода isDone ( ) . Когда задача выполнена, мы можем вызвать метод get ( ) из FutureTask и получить возвращаемое значение.

Теперь давайте посмотрим нашу реализацию CallableTask которая в нашем примере возвращает значение суммирования числа:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class CallableTask implements Callable<Long> {
 
       private int id;
 
       public CallableTask(int id) {
 
             this.id = id;
 
       }
 
       public Long call() {
 
             long summation = 0;
 
             for (int i = 1; i <= id; i++) {
 
                    summation += i;
 
             }
 
             return new Long(summation);
 
       }
 
}

В нашем примере все, что нам нужно было сделать, — это реализовать метод call , который возвращает Integer, который в конечном итоге будет собран через метод get интерфейса Future.

Если ваша задача Callable вызвала исключение, то FutureTask.get() также вызовет исключение, и к исходному исключению можно получить доступ с помощью Exception.getCause()

Мониторинг состояния будущей задачи

В приведенном выше примере мы проверяем состояние Future Task, используя метод FutureTask.isDone() . Если вам нужен более точный контроль над жизненным циклом Future Task, вы можете реализовать экземпляр javax.enterprise.concurrent.ManagedTaskListener , чтобы получать уведомления о событиях жизненного цикла.

Вот наша расширенная задача, которая реализует taskSubmitting , taskStarting , taskDone и taskAborted :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class CallableListenerTask implements Callable<Long>,ManagedTaskListener {
 
       private int id;
 
       public CallableListenerTask(int id) {
 
             this.id = id;
 
       }
 
       public Long call() {
 
             long summation = 0;
 
             for (int i = 1; i <= id; i++) {
 
                    summation += i;
 
             }
 
             return new Long(summation);
 
       }
 
       public void taskSubmitted(Future<?> f, ManagedExecutorService es,
                    Object obj) {
 
             System.out.println("Task Submitted! "+f);
 
       }
 
       public void taskDone(Future<?> f, ManagedExecutorService es, Object obj,
                    Throwable exc) {
 
             System.out.println("Task DONE! "+f);
 
       }
 
       public void taskStarting(Future<?> f, ManagedExecutorService es,
                    Object obj) {
 
             System.out.println("Task Starting! "+f);
 
       }
 
       public void taskAborted(Future<?> f, ManagedExecutorService es,
                    Object obj, Throwable exc) {
 
             System.out.println("Task Aborted! "+f);
 
       }
 
}

Уведомления о жизненном цикле вызываются в следующем порядке:

  • taskSubmitting : по представлению taskSubmitting Исполнителю
  • taskStarting : до фактического запуска Task
  • taskDone : taskDone при завершении задачи
  • taskAborted : срабатывает, когда пользователь вызывает futureResult.cancel ()

Использование транзакции в асинхронных задачах

В распределенной среде Java EE это сложная задача — гарантировать правильное выполнение транзакций и для параллельных выполнений задач. API параллелизма Java EE использует Java Transaction API (JTA) для поддержки транзакций поверх своих компонентов через javax.transaction.UserTransaction который используется для явного разграничения границ транзакций.

Следующий код показывает, как вызываемая задача получает UserTransaction из дерева JNDI, а затем запускает и фиксирует транзакцию с внешним компонентом (EJB):

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class TxCallableTask implements Callable<Long> {
 
       long id;
 
       public TxCallableTask(long i) {
 
             this.id = i;
 
       }
 
       public Long call() {
 
             long value = 0;
 
             UserTransaction tx = lookupUserTransaction();
 
             SimpleEJB ejb = lookupEJB();
 
             try {
 
                    tx.begin();
 
                    value = ejb.calculate(id); // Do Transactions here
 
                    tx.commit();
 
             } catch (Exception e) {
 
                    e.printStackTrace();
 
                    try {  tx.rollback(); } catch (Exception e1) {        e1.printStackTrace(); }
 
             }
 
             return value;
 
       }
 
// Lookup EJB and UserTransaction here ..
 
}

Основным ограничением этого подхода является то, что, хотя объекты контекста могут начинать, фиксировать или откатывать транзакции, эти объекты не могут быть включены в транзакции родительского компонента.

Планирование задач с помощью ManagedScheduledExecutorService

ManagedScheduledExecutorService расширяет Java SE ScheduledExecutorService предоставляя методы для отправки отложенных или периодических задач для выполнения в среде Java EE. Что касается других управляемых объектов, вы можете получить экземпляр ExecutorService через поиск JNDI:

1
2
@Resource(name ="DefaultManagedScheduledExecutorService")
ManagedScheduledExecutorService scheduledExecutor;

Получив ссылку на ExecutorService, вы можете вызвать метод schedule для отправки отложенных или периодических задач. ScheduledExecutors, как и ManagedExecutors, могут быть связаны либо с интерфейсом Runnable, либо с интерфейсом Callable . Следующий раздел показывает оба подхода.

Отправка простой ScheduledTask

В простейшей форме для отправки запланированной задачи требуется настроить выражение расписания и передать его в службу ManagedSchedulerExecutor. В этом примере мы создаем отложенную задачу, которая будет запускаться только один раз за 10 секунд, так как вызывается метод schedule ( ) :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
@WebServlet("/ScheduledExecutor")
public class ScheduledExecutor extends HttpServlet {
 
       @Resource(name ="DefaultManagedScheduledExecutorService")
       ManagedScheduledExecutorService scheduledExecutor;
 
       protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
 
             PrintWriter writer = response.getWriter();          
 
             ScheduledFuture<?> futureResult = scheduledExecutor.schedule(new SimpleTask(),       10,TimeUnit.SECONDS);
 
             writer.write("Waiting 10 seconds before firing the task");
 
       }
 
}

Если вам нужно планировать вашу задачу несколько раз , то вы можете использовать метод scheduleAtFixedRate , который принимает в качестве входных данных время до запуска задачи, время перед каждым повторным выполнением и TimeUnit. Посмотрите следующий пример, который планирует Задачу каждые 10 секунд секунд после начальной задержки в 1 секунду:

1
ScheduledFuture<?> futureResult = scheduledExecutor. scheduleAtFixedRate (new SimpleTask(),1, 10,TimeUnit.SECONDS);

Захват результата запланированного выполнения

Если вам нужно получить возвращаемое значение из задачи, которую планируется выполнить, то вы можете использовать интерфейс ScheduledFuture который возвращается методом schedule . Вот пример, который фиксирует результат из нашего факториального примера Task, который мы ранее кодировали:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
ScheduledFuture<Long> futureResult =
 
                    scheduledExecutor.schedule(new CallableTask(5), 5, TimeUnit.SECONDS);                
 
while (!futureResult.isDone()) {      
 
       try {
 
             Thread.sleep(100); // Wait
 
       } catch (InterruptedException e) {                  
 
             e.printStackTrace();
 
       }
 
}    
 
try {
 
       writer.write("Callable Task returned " +futureResult.get());
 
} catch ( Exception e) {
 
       e.printStackTrace();
 
}

Создание управляемых потоков с использованием ManagedThreadFactory

javax.enterprise.concurrent.ManagedThreadFactory является эквивалентом J2SE ThreadFactory, который можно использовать для создания собственных потоков. Чтобы использовать ManagedThreadFactory, вам нужно добавить его из JNDI как обычно:

1
2
3
@Resource(name ="DefaultManagedThreadFactory")
 
ManagedThreadFactory factory;

Основное преимущество создания собственных управляемых потоков из фабрики (по сравнению с теми, которые создаются ManagedExecutorService) заключается в том, что вы можете установить некоторые типичные свойства потоков (например, имя или приоритет) и создать управляемую версию службы исполнителя J2SE. , Следующие примеры покажут вам как.

Создание управляемых потоков с фабрики

В этом примере мы создадим и запустим новый DefaultManagedThreadFactory используя DefaultManagedThreadFactory . Как видно из кода, после того, как мы создали экземпляр класса Thread, мы можем установить для него значимое имя и связать его с приоритетом. Затем мы свяжем Thread с нашей SimpleTask, которая регистрирует некоторые данные на консоли:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@WebServlet("/FactoryExecutorServlet")
 
public class FactoryExecutorServlet extends HttpServlet {
 
       @Resource(name ="DefaultManagedThreadFactory")
       ManagedThreadFactory factory;
 
       protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
 
             PrintWriter writer = response.getWriter();
             
             Thread thread = factory.newThread(new SimpleTask());
 
             thread.setName("My Managed Thread");
 
             thread.setPriority(Thread.MAX_PRIORITY);
 
             thread.start();
 
             writer.write("Thread started. Check logs");
 
       }
 
}

Теперь проверьте журналы вашего сервера: не сомневайтесь, что легче обнаружить выходные данные ваших самостоятельно созданных потоков:

1
14:44:31,838 INFO [stdout] (My Managed Thread) Simple Task started

Сбор информации об имени потока особенно полезен при анализе дампа потока, а имя потока — единственный способ отследить путь выполнения потока.

Использование службы Managed Executor

Интерфейс java.util.concurrent.ExecutorService представляет собой стандартный механизм J2SE, который значительно заменил использование прямых потоков для выполнения асинхронных выполнений. Одним из основных преимуществ ExecutorService перед стандартным механизмом Thread является то, что вы можете определить пул экземпляров для выполнения ваших заданий и что у вас есть более безопасный способ прерывать ваши задания.

Использовать ExecutorService в ваших корпоративных приложениях просто: все, что вам нужно сделать, это передать экземпляр вашего Managed ThreadFactory в конструктор вашего ExecutorService . В следующем примере мы используем SingletonEJB для предоставления ExecutorService в качестве службы в своем методе getThreadPoolExecutor :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Singleton
 
public class PoolExecutorEJB {
 
       private ExecutorService threadPoolExecutor = null;
 
       int  corePoolSize  =    5;
 
       int  maxPoolSize   =   10;
 
       long keepAliveTime = 5000;
 
       @Resource(name = "DefaultManagedThreadFactory")
       ManagedThreadFactory factory;
 
       public ExecutorService getThreadPoolExecutor() {
 
             return threadPoolExecutor;
 
       }
 
       @PostConstruct
       public void init() {            
 
             threadPoolExecutor =  new ThreadPoolExecutor(corePoolSize, maxPoolSize,    
 
                                  keepAliveTime, TimeUnit.SECONDS,
 
                                  new ArrayBlockingQueue<Runnable>(10), factory);     
 
       }
 
       @PreDestroy
       public void releaseResources() {
 
             threadPoolExecutor.shutdown();  
 
       }
 
}

ThreadPoolExecutor содержит два основных параметра в своем конструкторе: corePoolSize и maximumPoolSize . Когда в методе передается новая задача и выполняется меньше потоков corePoolSize, создается новый поток для обработки запроса, даже если другие рабочие потоки простаивают. Если запущено больше потоков corePoolSize, но меньше MaximumPoolSize, новый поток будет создан, только если очередь заполнена.

Затем ExecutorService используется для запуска новой асинхронной задачи, как в следующем примере, где в сервлете предоставляется анонимная реализация Runnable:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@WebServlet("/FactoryExecutorServiceServlet")
public class FactoryExecutorServiceServlet extends HttpServlet {
 
       @EJB PoolExecutorEJB ejb;
 
       protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
 
             final PrintWriter writer = response.getWriter();
 
             writer.write("Invoking ExecutorService. Check Logs.");
 
             ExecutorService executorService = ejb.getThreadPoolExecutor();
 
              executorService.execute(new Runnable() {
 
                    public void run() {
 
                           System.out.println("Message from your Executor!");
 
                    }
 
             });
 
}
 
}

Как только PoolExecutorEJB завершается, ExecutorService будет также завершен в методе @PreDestroy объекта Singleton, который вызовет метод shutdown() объекта ThreadPoolExecutor. ExecutorService не будет немедленно закрываться, но он больше не будет принимать новые задачи, и как только все потоки завершат текущие задачи, ExecutorService завершит работу.

Использование динамических контекстных объектов

Динамический прокси — это полезная настройка Java, которую можно использовать для создания динамических реализаций интерфейсов с использованием API java.lang.reflect.Proxy . Вы можете использовать динамические прокси для различных целей, таких как подключение к базе данных и управление транзакциями, динамические фиктивные объекты для модульного тестирования и другие методы перехвата AOP-подобных методов.

В среде Java EE вы можете использовать специальный тип динамических прокси, называемых динамическими контекстными прокси .

Наиболее интересной особенностью динамических контекстных объектов является то, что контекст именования JNDI, загрузчик классов и контекст безопасности распространяются на проксируемые объекты . Это может быть полезно в контексте, когда вы вносите реализации J2SE в ваши корпоративные приложения и хотите запускать их в контексте контейнера.

В следующем фрагменте показано, как вставить контекстные объекты в контейнер. Так как контекстным объектам также нужен ExecutorService, в который вы можете отправить задачу, ThreadFactory также внедряется:

1
2
3
4
5
6
7
@Resource(name ="DefaultContextService")
 
ContextService cs;
 
@Resource(name ="DefaultManagedThreadFactory")
 
ManagedThreadFactory factory;

В следующем разделе мы покажем, как создавать динамические контекстные объекты, используя пересмотренную версию нашего Singleton EJB.

Выполнение контекстных задач

В следующем примере показано, как запустить контекстный прокси для задачи Callable . Для этого нам понадобится как ManagedThreadfactory, так и ContextService. Наш EJB ContextExecutor первоначально создаст ThreadPoolExecutor в своем методе init . Затем в методе submit создаются новые контекстные прокси для задач Callable, которые передаются исполнителю ThreadPool.

Вот код для нашего ContextExecutorEJB :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Singleton
 
public class ContextExecutorEJB {
 
       private ExecutorService threadPoolExecutor = null;
 
       @Resource(name = "DefaultManagedThreadFactory")
       ManagedThreadFactory factory;
 
       @Resource(name = "DefaultContextService")
       ContextService cs;
 
       public ExecutorService getThreadPoolExecutor() {
 
             return threadPoolExecutor;
 
       }
 
       @PostConstruct
       public void init() {
             threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
 
                           new ArrayBlockingQueue>Runnable>(10), factory);
       }
 
       public Future>Long> submitJob(Callable>Long> task) {
 
             Callable>Long> proxy = cs.createContextualProxy(task, Callable.class);
 
             return getThreadPoolExecutor().submit(proxy);
 
       }
 
}

Класс CallableTask немного сложнее, чем в нашем первом примере, так как он собирается регистрировать информацию о javax.security.auth.Subject , которая содержится в потоке вызывающей стороны:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class CallableTask implements Callable<Long> {
 
       private int id;
 
       public CallableTask(int id) {
 
             this.id = id;
 
       }
 
       public Long call() {
 
             long summation = 0;
 
             // Do calculation
 
             Subject subject = Subject.getSubject(AccessController.getContext());
 
             logInfo(subject, summation); // Log Traces Subject identity
 
             return new Long(summation);
 
       }
 
       private void logInfo(Subject subject, long summation) { . .  }
 
}

Ниже приведен простой способ отправки новых контекстных задач в наш SingletonEJB:

1
2
3
4
5
6
7
8
9
       @EJB ContextExecutorEJB ejb;  
 
       protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {                     
 
             CallableTask task = new CallableTask(5);
 
             ejb.submitJob(task);
 
}

Строим ваши примеры

Чтобы использовать утилиты Concurrency для Java EE API, в вашем приложении требуется следующая зависимость Maven:

1
2
3
4
5
6
7
8
9
<dependency>
 
       <groupId>org.jboss.spec.javax.enterprise.concurrent</groupId>
 
       <artifactId>jboss-concurrency-api_1.0_spec</artifactId>
 
       <version>1.0.0.Final</version>
 
</dependency>

http://www.itbuzzpress.com/ Этот отрывок взят из книги « Практическая разработка Java EE 7 на WildFly », которая представляет собой практическое практическое руководство, раскрывающее все области разработки Java EE 7 на новейшем сервере приложений WildFly. Охватывает все — от базовых компонентов (EJB, Servlets, CDI, JPA) до нового технологического стека, определенного в Java Enterprise Edition 7, и, следовательно, включает в себя новый пакетный API, JSON-P Api, API параллелизма, веб-сокеты, API JMS 2.0, основной стек Web-сервисов (JAX-WS, JAX-RS). Область тестирования с Arquillian Framework и API безопасности завершает список тем, обсуждаемых в книге.