Пулы потоков очень важны для выполнения синхронных и асинхронных процессов. В этой статье показано, как разрабатывать и отслеживать службы пула потоков с помощью Spring. Создание пула потоков было объяснено двумя альтернативными методами.
Используемые технологии:
JDK 1.6.0_21
Spring 3.0.5
Maven 3.0.2
ШАГ 1: СОЗДАТЬ MAVEN ПРОЕКТ
Maven проект создается как показано ниже. (Его можно создать с помощью Maven или IDE Plug-in).
ШАГ 2: БИБЛИОТЕКИ
Spring-зависимости добавляются в pom.xml Maven.
<!-- Spring 3 dependencies --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency>
Для создания runnable-jar, ниже плагин может быть использован.
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.3.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.otv.exe.Application</mainClass> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.handlers</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.schemas</resource> </transformer> </transformers> </configuration> </execution> </executions> </plugin>
ШАГ 3: СОЗДАЙТЕ КЛАСС ЗАДАЧИ
Новый класс TestTask создается путем реализации Runnable Interface. Этот класс показывает выполняемые задачи.
package com.otv.task; import org.apache.log4j.Logger; /** * @author onlinetechvision.com * @since 17 Oct 2011 * @version 1.0.0 * */ public class TestTask implements Runnable { private static Logger log = Logger.getLogger(TestTask.class); String taskName; public TestTask() { } public TestTask(String taskName) { this.taskName = taskName; } public void run() { try { log.debug(this.taskName + " : is started."); Thread.sleep(10000); log.debug(this.taskName + " : is completed."); } catch (InterruptedException e) { log.error(this.taskName + " : is not completed!"); e.printStackTrace(); } } @Override public String toString() { return (getTaskName()); } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } }
ШАГ 4: СОЗДАЙТЕ TestRejectedExecutionHandler CLASS
Класс TestRejectedExecutionHandler создается путем реализации интерфейса RejectedExecutionHandler. Если нет свободного потока и переполнения очереди, задачи будут отклонены. Этот класс обрабатывает отклоненные задачи.
package com.otv.handler; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import org.apache.log4j.Logger; /** * @author onlinetechvision.com * @since 17 Oct 2011 * @version 1.0.0 * */ public class TestRejectedExecutionHandler implements RejectedExecutionHandler { private static Logger log = Logger.getLogger(TestRejectedExecutionHandler.class); public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { log.debug(runnable.toString() + " : has been rejected"); } }
ШАГ 5: СОЗДАЙТЕ ITestThreadPoolExecutorService INTERFACE
ITestThreadPoolExecutorService Интерфейс создан.
package com.otv.srv; import java.util.concurrent.ThreadPoolExecutor; import com.otv.handler.TestRejectedExecutionHandler; /** * @author onlinetechvision.com * @since 17 Oct 2011 * @version 1.0.0 * */ public interface ITestThreadPoolExecutorService { public ThreadPoolExecutor createNewThreadPool(); public int getCorePoolSize(); public void setCorePoolSize(int corePoolSize); public int getMaxPoolSize(); public void setMaxPoolSize(int maximumPoolSize); public long getKeepAliveTime(); public void setKeepAliveTime(long keepAliveTime); public int getQueueCapacity(); public void setQueueCapacity(int queueCapacity); public TestRejectedExecutionHandler getTestRejectedExecutionHandler(); public void setTestRejectedExecutionHandler(TestRejectedExecutionHandler testRejectedExecutionHandler); }
ШАГ 6: СОЗДАЙТЕ TestThreadPoolExecutorService CLASS
Класс TestThreadPoolExecutorService создается путем реализации интерфейса ITestThreadPoolExecutorService. Этот класс создает новый пул потоков.
package com.otv.srv; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import com.otv.handler.TestRejectedExecutionHandler; /** * @author onlinetechvision.com * @since 17 Oct 2011 * @version 1.0.0 * */ public class TestThreadPoolExecutorService implements ITestThreadPoolExecutorService { private int corePoolSize; private int maxPoolSize; private long keepAliveTime; private int queueCapacity; TestRejectedExecutionHandler testRejectedExecutionHandler; public ThreadPoolExecutor createNewThreadPool() { ThreadPoolExecutor executor = new ThreadPoolExecutor(getCorePoolSize(), getMaxPoolSize(), getKeepAliveTime(), TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(getQueueCapacity()), getTestRejectedExecutionHandler()); return executor; } public int getCorePoolSize() { return corePoolSize; } public void setCorePoolSize(int corePoolSize) { this.corePoolSize = corePoolSize; } public int getMaxPoolSize() { return maxPoolSize; } public void setMaxPoolSize(int maxPoolSize) { this.maxPoolSize = maxPoolSize; } public long getKeepAliveTime() { return keepAliveTime; } public void setKeepAliveTime(long keepAliveTime) { this.keepAliveTime = keepAliveTime; } public int getQueueCapacity() { return queueCapacity; } public void setQueueCapacity(int queueCapacity) { this.queueCapacity = queueCapacity; } public TestRejectedExecutionHandler getTestRejectedExecutionHandler() { return testRejectedExecutionHandler; } public void setTestRejectedExecutionHandler(TestRejectedExecutionHandler testRejectedExecutionHandler) { this.testRejectedExecutionHandler = testRejectedExecutionHandler; } }
ШАГ 7: СОЗДАЙТЕ ИНТЕРФЕЙС IThreadPoolMonitorService
IThreadPoolMonitorService Интерфейс создан.
package com.otv.monitor.srv; import java.util.concurrent.ThreadPoolExecutor; public interface IThreadPoolMonitorService extends Runnable { public void monitorThreadPool(); public ThreadPoolExecutor getExecutor(); public void setExecutor(ThreadPoolExecutor executor); }
ШАГ 8: СОЗДАНИЕ ThreadPoolMonitorService CLASS
Класс ThreadPoolMonitorService создается путем реализации интерфейса IThreadPoolMonitorService. Этот класс контролирует созданный пул потоков.
package com.otv.monitor.srv; import java.util.concurrent.ThreadPoolExecutor; import org.apache.log4j.Logger; /** * @author onlinetechvision.com * @since 17 Oct 2011 * @version 1.0.0 * */ public class ThreadPoolMonitorService implements IThreadPoolMonitorService { private static Logger log = Logger.getLogger(ThreadPoolMonitorService.class); ThreadPoolExecutor executor; private long monitoringPeriod; public void run() { try { while (true){ monitorThreadPool(); Thread.sleep(monitoringPeriod*1000); } } catch (Exception e) { log.error(e.getMessage()); } } public void monitorThreadPool() { StringBuffer strBuff = new StringBuffer(); strBuff.append("CurrentPoolSize : ").append(executor.getPoolSize()); strBuff.append(" - CorePoolSize : ").append(executor.getCorePoolSize()); strBuff.append(" - MaximumPoolSize : ").append(executor.getMaximumPoolSize()); strBuff.append(" - ActiveTaskCount : ").append(executor.getActiveCount()); strBuff.append(" - CompletedTaskCount : ").append(executor.getCompletedTaskCount()); strBuff.append(" - TotalTaskCount : ").append(executor.getTaskCount()); strBuff.append(" - isTerminated : ").append(executor.isTerminated()); log.debug(strBuff.toString()); } public ThreadPoolExecutor getExecutor() { return executor; } public void setExecutor(ThreadPoolExecutor executor) { this.executor = executor; } public long getMonitoringPeriod() { return monitoringPeriod; } public void setMonitoringPeriod(long monitoringPeriod) { this.monitoringPeriod = monitoringPeriod; } }
ШАГ 9: СОЗДАЙТЕ СТАРТ КЛАСС
Стартовый класс создан.
package com.otv.start; import java.util.concurrent.ThreadPoolExecutor; import org.apache.log4j.Logger; import com.otv.handler.TestRejectedExecutionHandler; import com.otv.monitor.srv.IThreadPoolMonitorService; import com.otv.monitor.srv.ThreadPoolMonitorService; import com.otv.srv.ITestThreadPoolExecutorService; import com.otv.srv.TestThreadPoolExecutorService; import com.otv.task.TestTask; /** * @author onlinetechvision.com * @since 17 Oct 2011 * @version 1.0.0 * */ public class Starter { private static Logger log = Logger.getLogger(TestRejectedExecutionHandler.class); IThreadPoolMonitorService threadPoolMonitorService; ITestThreadPoolExecutorService testThreadPoolExecutorService; public void start() { // A new thread pool is created... ThreadPoolExecutor executor = testThreadPoolExecutorService.createNewThreadPool(); executor.allowCoreThreadTimeOut(true); // Created executor is set to ThreadPoolMonitorService... threadPoolMonitorService.setExecutor(executor); // ThreadPoolMonitorService is started... Thread monitor = new Thread(threadPoolMonitorService); monitor.start(); // New tasks are executed... for(int i=1;i<10;i++) { executor.execute(new TestTask("Task"+i)); } try { Thread.sleep(40000); } catch (Exception e) { log.error(e.getMessage()); } for(int i=10;i<19;i++) { executor.execute(new TestTask("Task"+i)); } // executor is shutdown... executor.shutdown(); } public IThreadPoolMonitorService getThreadPoolMonitorService() { return threadPoolMonitorService; } public void setThreadPoolMonitorService(IThreadPoolMonitorService threadPoolMonitorService) { this.threadPoolMonitorService = threadPoolMonitorService; } public ITestThreadPoolExecutorService getTestThreadPoolExecutorService() { return testThreadPoolExecutorService; } public void setTestThreadPoolExecutorService(ITestThreadPoolExecutorService testThreadPoolExecutorService) { this.testThreadPoolExecutorService = testThreadPoolExecutorService; } }
ШАГ 10: СОЗДАТЬ КЛАСС ПРИЛОЖЕНИЙ
Класс приложения создан. Этот класс запускает приложение.
package com.otv.start; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * @author onlinetechvision.com * @since 17 Oct 2011 * @version 1.0.0 * */ public class Application { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml"); Starter starter = (Starter) context.getBean("Starter"); starter.start(); } }
ШАГ 11: СОЗДАТЬ applicationContext.xml
applicationContext.xml создан.
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> <!-- Beans Declaration --> <bean id="TestTask" class="com.otv.task.TestTask"></bean> <bean id="ThreadPoolMonitorService" class="com.otv.monitor.srv.ThreadPoolMonitorService"> <property name="monitoringPeriod" value="5" /> </bean> <bean id="TestRejectedExecutionHandler" class="com.otv.handler.TestRejectedExecutionHandler"></bean> <bean id="TestThreadPoolExecutorService" class="com.otv.srv.TestThreadPoolExecutorService"> <property name="corePoolSize" value="1" /> <property name="maxPoolSize" value="3" /> <property name="keepAliveTime" value="10" /> <property name="queueCapacity" value="3" /> <property name="testRejectedExecutionHandler" ref="TestRejectedExecutionHandler" /> </bean> <bean id="Starter" class="com.otv.start.Starter"> <property name="threadPoolMonitorService" ref="ThreadPoolMonitorService" /> <property name="testThreadPoolExecutorService" ref="TestThreadPoolExecutorService" /> </bean> </beans>
ШАГ 12: АЛЬТЕРНАТИВНЫЙ МЕТОД СОЗДАНИЯ РЕЗЬБОВОГО БАССЕЙНА
Класс ThreadPoolTaskExecutor, предоставляемый Spring, также можно использовать для создания пула потоков.
<bean id="threadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="1" /> <property name="maxPoolSize" value="3" /> <property name="queueCapacity" value="3" /> </bean> <bean id="testTaskExecutor" class="TestTaskExecutor"> <constructor-arg ref="threadPoolTaskExecutor" /> </bean>
STEP 13 : BUILD PROJECT
After OTV_Spring_ThreadPool Project is build, OTV_Spring_ThreadPool-0.0.1-SNAPSHOT.jar will be created.
STEP 14 : RUN PROJECT
After created OTV_Spring_ThreadPool-0.0.1-SNAPSHOT.jar file is run, below output logs will be shown :
18.10.2011 20:08:48 DEBUG (TestRejectedExecutionHandler.java:19) - Task7 : has been rejected 18.10.2011 20:08:48 DEBUG (TestRejectedExecutionHandler.java:19) - Task8 : has been rejected 18.10.2011 20:08:48 DEBUG (TestRejectedExecutionHandler.java:19) - Task9 : has been rejected 18.10.2011 20:08:48 DEBUG (TestTask.java:25) - Task1 : is started. 18.10.2011 20:08:48 DEBUG (TestTask.java:25) - Task6 : is started. 18.10.2011 20:08:48 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 2 - CompletedTaskCount : 0 - TotalTaskCount : 5 - isTerminated : false 18.10.2011 20:08:48 DEBUG (TestTask.java:25) - Task5 : is started. 18.10.2011 20:08:53 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 3 - CompletedTaskCount : 0 - TotalTaskCount : 6 - isTerminated : false 18.10.2011 20:08:58 DEBUG (TestTask.java:27) - Task6 : is completed. 18.10.2011 20:08:58 DEBUG (TestTask.java:27) - Task1 : is completed. 18.10.2011 20:08:58 DEBUG (TestTask.java:25) - Task3 : is started. 18.10.2011 20:08:58 DEBUG (TestTask.java:25) - Task2 : is started. 18.10.2011 20:08:58 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 3 - CompletedTaskCount : 2 - TotalTaskCount : 6 - isTerminated : false 18.10.2011 20:08:58 DEBUG (TestTask.java:27) - Task5 : is completed. 18.10.2011 20:08:58 DEBUG (TestTask.java:25) - Task4 : is started. 18.10.2011 20:09:03 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 3 - CompletedTaskCount : 3 - TotalTaskCount : 6 - isTerminated : false 18.10.2011 20:09:08 DEBUG (TestTask.java:27) - Task2 : is completed. 18.10.2011 20:09:08 DEBUG (TestTask.java:27) - Task3 : is completed. 18.10.2011 20:09:08 DEBUG (TestTask.java:27) - Task4 : is completed. 18.10.2011 20:09:08 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 0 - CompletedTaskCount : 6 - TotalTaskCount : 6 - isTerminated : false 18.10.2011 20:09:13 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 0 - CompletedTaskCount : 6 - TotalTaskCount : 6 - isTerminated : false 18.10.2011 20:09:18 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 0 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 0 - CompletedTaskCount : 6 - TotalTaskCount : 6 - isTerminated : false 18.10.2011 20:09:23 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 0 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 0 - CompletedTaskCount : 6 - TotalTaskCount : 6 - isTerminated : false 18.10.2011 20:09:28 DEBUG (TestTask.java:25) - Task10 : is started. 18.10.2011 20:09:28 DEBUG (TestRejectedExecutionHandler.java:19) - Task16 : has been rejected 18.10.2011 20:09:28 DEBUG (TestRejectedExecutionHandler.java:19) - Task17 : has been rejected 18.10.2011 20:09:28 DEBUG (TestRejectedExecutionHandler.java:19) - Task18 : has been rejected 18.10.2011 20:09:28 DEBUG (TestTask.java:25) - Task14 : is started. 18.10.2011 20:09:28 DEBUG (TestTask.java:25) - Task15 : is started. 18.10.2011 20:09:28 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 3 - CompletedTaskCount : 6 - TotalTaskCount : 12 - isTerminated : false 18.10.2011 20:09:33 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 3 - CompletedTaskCount : 6 - TotalTaskCount : 12 - isTerminated : false 18.10.2011 20:09:38 DEBUG (TestTask.java:27) - Task10 : is completed. 18.10.2011 20:09:38 DEBUG (TestTask.java:25) - Task11 : is started. 18.10.2011 20:09:38 DEBUG (TestTask.java:27) - Task14 : is completed. 18.10.2011 20:09:38 DEBUG (TestTask.java:27) - Task15 : is completed. 18.10.2011 20:09:38 DEBUG (TestTask.java:25) - Task12 : is started. 18.10.2011 20:09:38 DEBUG (TestTask.java:25) - Task13 : is started. 18.10.2011 20:09:38 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 3 - CompletedTaskCount : 9 - TotalTaskCount : 12 - isTerminated : false 18.10.2011 20:09:43 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 3 - CompletedTaskCount : 9 - TotalTaskCount : 12 - isTerminated : false 18.10.2011 20:09:48 DEBUG (TestTask.java:27) - Task11 : is completed. 18.10.2011 20:09:48 DEBUG (TestTask.java:27) - Task13 : is completed. 18.10.2011 20:09:48 DEBUG (TestTask.java:27) - Task12 : is completed. 18.10.2011 20:09:48 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 0 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 0 - CompletedTaskCount : 12 - TotalTaskCount : 12 - isTerminated : true 18.10.2011 20:09:53 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 0 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 0 - CompletedTaskCount : 12 - TotalTaskCount : 12 - isTerminated : true
STEP 15 : DOWNLOAD