Я давно хотел публиковать сообщения о параллелизме весной. Это интересная тема, которая не привлекает того внимания, которого она заслуживает (если вообще вообще имеет), возможно, потому, что контейнер IoC и, в частности, Spring действительно помогают управлять зависимостями, задача, которая интуитивно способствует последовательной обработке, а также потому, что API-интерфейсы JEE (скажем, сервлет) или EJB) скрыть необходимость в этом. Есть определенная область, где независимо от того, что вы будете искать для параллелизма, это поиск данных. Пока задействованы пара различных ресурсов или даже один, если запрашиваемые данные независимы, эффективность параллельной обработки нескольких соединений возрастает. Обычным случаем в современных средах может быть, например, вызов веб-сервисов.
Стандартная Java на самом деле не предлагает никакого API для управления параллелизмом внутри запроса (хотя сам запрос управляется из пула). Фактически, он открыт для обсуждения, если стандарт запрещает открывать новые потоки в веб-контексте (он специально запрещен для EJB). WebSphere и Weblogic предложили альтернативу, называемую CommonJ или WorkManager API. Это очень хорошая альтернатива при работе под этими серверами приложений. Spring предлагает другой, возможно, даже более мощный вариант с абстракцией TaskExecutor . Иногда это предпочтительнее в среде Spring, поскольку он может использовать CommonJ в качестве базового API, но он также может использовать среду Java5 Executor (среди прочих), что делает переключение лишь вопросом изменения пары строк конфигурации.
Давайте рассмотрим, как использовать фреймворк. Наше единственное предварительное условие — иметь как минимум две службы извлечения данных, уже настроенные как зависимость третьего компонента. Все службы извлечения данных должны иметь общий интерфейс, я могу порекомендовать здесь что-то вроде шаблона Command (учтите, что этот подход не полностью соблюден ниже, чтобы лучше продемонстрировать обработку входящих данных). На этом этапе мы собираемся изменить отдельные зависимости и преобразовать их в коллекцию, добавим методы init и destroy и исполнителя (давайте начнем с реализации JDK5):
public class ParallelService implements InitializingBean, DisposableBean { private List<Command<T>> commands; private ExecutorService executor; }
В нашей текущей реализации, основанной на исполнителях Java 5, нам нужно запустить пул потоков в методе инициализации и завершить все, когда закрывается контекст Spring
public void afterPropertiesSet() throws Exception { executor = Executors.newFixedThreadPool(commands.size()); } public void destroy() throws Exception { executor.shutdownNow(); // Improve this as much as liked }
Нам просто нужно обработать параллельное выполнение сейчас. Это легко сделать с помощью управления асинхронными задачами в будущем . Другой альтернативой является отправка всех задач и ожидание завершения (см. ExecutorService ):
public void execute(Data data) { Set<Future<?>> tasks = new HashSet<Future<?>>(commands.size()); for (Command command : commands) tasks.add(executor.submit(new RunCommand(command, data))); for (Future future : tasks) future.get(); //Other stuff to execute after all data has been retrieved }
Приведенный выше код просто создает коллекцию объектов Future, чтобы проверить, когда задания завершены. Сложной задачей является создание параллельного задания из пользовательского сервиса и передача необходимых данных (при необходимости). Внутренней обертки класса будет достаточно:
private static class RunCommand implements Runnable { private final Data data; private final Command command; public RunCommand(Command command, Data data) { this.data = data; this.command = command; } public void run() { command.execute(data); } }
Ну, это было довольно легко. Прямо сейчас у нас есть совершенно правильный способ параллельного вызова bean-компонентов. У этого подхода есть плюсы и минусы. В первом списке у нас есть независимость от Spring API (конечно, представьте, что интерфейсы Spring заменены их соответствующими XML-атрибутами), но мы также ограничены средой Java5. Если мы не возражаем против введения зависимости с самим Spring, мы можем преобразовать исходный код для использования каркаса TaskExecutor:
public class ParallelService { private TaskExecutor taskExecutor; private List<Command<T>> commands; public void setTaskExecutor(TaskExecutor taskExecutor) { this.taskExecutor = taskExecutor; } }
И теперь методы init и destroy заменены некоторой конфигурацией XML:
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="5" /> <property name="maxPoolSize" value="10" /> <property name="queueCapacity" value="25" /> </bean>
Но обратите внимание, что не все реализации интерфейса TaskExecutor позволяют отслеживать ход выполнения задачи, запланированной для выполнения!
public void execute)( { for (Command command : commands) taskExecutor.execute(new RunCommand(command)); }