Java.util.concurrent.ThreadPoolExecutor JDK позволяет отправлять задачи в пул потоков и использует BlockingQueue
для хранения отправленных задач. Если у вас есть тысячи задач для отправки, вы указываете «ограниченную» очередь (то есть одну с максимальной емкостью), иначе вашей JVM может не хватить памяти. Вы можете установить RejectedExecutionHandler
для обработки того, что происходит, когда очередь заполнена, но есть еще невыполненные задачи для отправки.
Вот простой пример, показывающий, как вы будете использовать ThreadPoolExecutor
с BlockingQueue
емкостью 1000. CallerRunsPolicy
гарантирует, что при заполнении очереди дополнительные задачи будут обрабатываться отправляющим потоком.
1
2
3
4
|
int numThreads = 5 ; ExecutorService exec = new ThreadPoolExecutor( 5 , 5 , 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>( 1000 ), new ThreadPoolExecutor.CallerRunsPolicy()); |
Проблема с этим подходом состоит в том, что, когда очередь заполнена, поток, отправляющий задачи в пул, становится занятым выполнением самой задачи, и в течение этого времени очередь может стать пустой, а потоки в пуле могут стать незанятыми. Это не очень эффективно. Мы хотим, чтобы пул потоков был занят и рабочая очередь всегда была насыщенной.
Существуют различные решения этой проблемы. Одним из них является использование специального Executor
который блокирует (и, таким образом, предотвращает отправку дальнейших задач в пул), когда очередь заполнена. Код для BlockingExecutor
показан ниже. Он основан на примере BoundedExecutor
от Brian Goetz, 2006. Параллелизм Java на практике. 1 издание. Аддисон-Уэсли Профессионал. (Раздел 8.3.3) .
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * An executor which blocks and prevents further tasks from * being submitted to the pool when the queue is full. * <p> * Based on the BoundedExecutor example in: * Brian Goetz, 2006. Java Concurrency in Practice. (Listing 8.4) */ public class BlockingExecutor extends ThreadPoolExecutor { private static final Logger LOGGER = LoggerFactory. getLogger(BlockingExecutor. class ); private final Semaphore semaphore; /** * Creates a BlockingExecutor which will block and prevent further * submission to the pool when the specified queue size has been reached. * * @param poolSize the number of the threads in the pool * @param queueSize the size of the queue */ public BlockingExecutor( final int poolSize, final int queueSize) { super (poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // the semaphore is bounding both the number of tasks currently executing // and those queued up semaphore = new Semaphore(poolSize + queueSize); } /** * Executes the given task. * This method will block when the semaphore has no permits * i.e. when the queue has reached its capacity. */ @Override public void execute( final Runnable task) { boolean acquired = false ; do { try { semaphore.acquire(); acquired = true ; } catch ( final InterruptedException e) { LOGGER.warn( "InterruptedException whilst aquiring semaphore" , e); } } while (!acquired); try { super .execute(task); } catch ( final RejectedExecutionException e) { semaphore.release(); throw e; } } /** * Method invoked upon completion of execution of the given Runnable, * by the thread that executed the task. * Releases a semaphore permit. */ @Override protected void afterExecute( final Runnable r, final Throwable t) { super .afterExecute(r, t); semaphore.release(); } } |