Статьи

Передача задания регулирования с BlockingExecutor

Java.util.concurrent.ThreadPoolExecutor JDK позволяет отправлять задачи в пул потоков и использует BlockingQueue для хранения отправленных задач. Если у вас есть тысячи задач для отправки, вы указываете «ограниченную» очередь (то есть одну с максимальной емкостью), иначе вашей JVM может не хватить памяти. Вы можете установить RejectedExecutionHandler для обработки того, что происходит, когда очередь заполнена, но есть еще невыполненные задачи для отправки.

Вот простой пример, показывающий, как вы будете использовать ThreadPoolExecutor с BlockingQueue емкостью 1000. CallerRunsPolicy гарантирует, что при заполнении очереди дополнительные задачи будут обрабатываться отправляющим потоком.

1
2
3
4
int numThreads = 5;
ExecutorService exec = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                                 new ArrayBlockingQueue<Runnable>(1000),
                                 new ThreadPoolExecutor.CallerRunsPolicy());

Проблема с этим подходом состоит в том, что, когда очередь заполнена, поток, отправляющий задачи в пул, становится занятым выполнением самой задачи, и в течение этого времени очередь может стать пустой, а потоки в пуле могут стать незанятыми. Это не очень эффективно. Мы хотим, чтобы пул потоков был занят и рабочая очередь всегда была насыщенной.

Существуют различные решения этой проблемы. Одним из них является использование специального Executor который блокирует (и, таким образом, предотвращает отправку дальнейших задач в пул), когда очередь заполнена. Код для BlockingExecutor показан ниже. Он основан на примере BoundedExecutor от Brian Goetz, 2006. Параллелизм Java на практике. 1 издание. Аддисон-Уэсли Профессионал. (Раздел 8.3.3) .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
/**
 * An executor which blocks and prevents further tasks from
 * being submitted to the pool when the queue is full.
 * <p>
 * Based on the BoundedExecutor example in:
 * Brian Goetz, 2006. Java Concurrency in Practice. (Listing 8.4)
 */
public class BlockingExecutor extends ThreadPoolExecutor {
 
  private static final Logger LOGGER = LoggerFactory.
                                          getLogger(BlockingExecutor.class);
  private final Semaphore semaphore;
 
  /**
   * Creates a BlockingExecutor which will block and prevent further
   * submission to the pool when the specified queue size has been reached.
   *
   * @param poolSize the number of the threads in the pool
   * @param queueSize the size of the queue
   */
  public BlockingExecutor(final int poolSize, final int queueSize) {
    super(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS,
          new LinkedBlockingQueue<Runnable>());
 
    // the semaphore is bounding both the number of tasks currently executing
    // and those queued up
    semaphore = new Semaphore(poolSize + queueSize);
  }
 
  /**
   * Executes the given task.
   * This method will block when the semaphore has no permits
   * i.e. when the queue has reached its capacity.
   */
  @Override
  public void execute(final Runnable task) {
    boolean acquired = false;
    do {
        try {
            semaphore.acquire();
            acquired = true;
        } catch (final InterruptedException e) {
            LOGGER.warn("InterruptedException whilst aquiring semaphore", e);
        }
    } while (!acquired);
 
    try {
        super.execute(task);
    } catch (final RejectedExecutionException e) {
        semaphore.release();
        throw e;
    }
  }
 
  /**
   * Method invoked upon completion of execution of the given Runnable,
   * by the thread that executed the task.
   * Releases a semaphore permit.
   */
  @Override
  protected void afterExecute(final Runnable r, final Throwable t) {
    super.afterExecute(r, t);
    semaphore.release();
  }
}

Ссылка: Передача задания по регулированию с BlockingExecutor от нашего партнера JCG Фахда Шарифа в блоге fahd.blog .