Внутренне, реализации делают умное использование другой функции параллелизма, представленной в Java 1.5 — Блокирующие очереди.
Очередь
Сначала краткий обзор стандартной очереди. В информатике очередь — это просто коллекция, которая всегда добавляет элементы в конец и всегда берет элементы с самого начала. Выражение «первым пришел-первым вышел» (FIFO) обычно используется для описания стандартной очереди. В Java 1.6 введена Deque или двусторонняя очередь, и этот интерфейс теперь реализован в LinkedList . Некоторые очереди в java допускают альтернативное упорядочение, такое как использование Comparator или даже написание собственной реализации упорядочения. Несмотря на то, что расширенная функциональность хороша, сегодня мы фокусируемся на том, как BlockingQueues действительно сияет в параллельной разработке.
Очередь блокировки
Очереди блокировки — это очереди, которые также предоставляют функциональность для блокировки запросов на извлечение элемента, когда элемент недоступен с дополнительной опцией, ограничивающей количество времени, затрачиваемого на ожидание. В очереди с ограниченным размером такая же функциональность блокировки доступна при попытке добавления. Давайте подробно рассмотрим пример использования BlockingQueue .
Давайте предположим простой сценарий. У вас есть поток обработки, функция которого просто выполнять команды.
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
11 
12 
13 
14 
15 
16 
17 
 | 
import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;private BlockingQueue<Command> workQueue = new LinkedBlockingQueue<Command>();public void addCommand(Command command) {    workQueue.offer(command);}public Object call() throws Exception {    try {        Command command = workQueue.take();        command.execute();    } catch (InterruptedException e) {        throw new WorkException(e);    }} | 
Конечно, это очень простой пример, но он показывает основы использования BlockingQueue для нескольких потоков. Давайте попробуем что-нибудь более сложное. В этом примере нам нужно создать пул соединений с лимитом. Он должен создавать соединения только по мере необходимости. Ни один клиент не будет ожидать доступного соединения дольше 5 секунд.
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
 | 
private BlockingQueue<Connection> pool = new ArrayBlockingQueue<Connection>(10);private AtomicInteger connCount = new AtomicInteger();public Connection getConnection() {    Connection conn = pool.poll(5, TimeUnit.SECONDS);    if (conn == null) {        synchronized (connCount) {            if (connCount.get() < 10) {                conn = getNewConnection();                pool.offer(conn);                connCount.incrementAndGet();            }        }        if (conn == null) {            throw new ConnUnavailException();        } else {            return conn;        }    }} | 
Наконец, давайте рассмотрим пример использования интересной реализации, SynchronousQueue .
В этом примере, аналогично нашему первому, мы хотим выполнить Команду, но нам нужно знать, когда она будет выполнена, и ждать не более 2 минут.
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
 | 
private BlockingQueue workQueue = new LinkedBlockingQueue();private Map commandQueueMap = new ConcurrentHashMap(); public SynchronousQueue addCommand(Command command) {    SynchronousQueue queue = new SynchronousQueue();    commandQueueMap.put(command, queue);    workQueue.offer(command);    return queue;}public Object call() throws Exception {    try {        Command command = workQueue.take();        Result result = command.execute();        SynchronousQueue queue = commandQueueMap.get(command);        queue.offer(result);        return null;    } catch (InterruptedException e) {        throw new WorkException(e);    }} | 
Теперь потребитель может безопасно запросить тайм-аут по своему запросу на выполнение своей команды.
| 
 1 
2 
3 
4 
5 
6 
7 
8 
 | 
Command command;SynchronousQueue queue = commandRunner.addCommand(command);Result result = queue.poll(2, TimeUnit.MINUTES);if (result == null) {    throw new CommandTooLongException(command);} else {    return result;} | 
Как вы начинаете видеть, BlockingQueues в java обеспечивает большую гибкость и предоставляет вам относительно простые структуры для удовлетворения многих, если не всех, ваших потребностей в многопоточном приложении. Есть некоторые действительно интересные BlockingQueues, которые мы даже не рассмотрели, такие как PriorityBlockingQueue и DelayQueue . Посмотрите на них и свяжитесь с нами. Мы любим общаться с другими разработчиками.
Ссылка: Java Concurrency Part 5 — Блокировка очередей от наших партнеров JCG в блоге Carfey Software .
- Учебник по параллелизму Java — Семафоры
 - Учебник по параллелизму Java — повторяющиеся блокировки
 - Учебник по параллелизму Java — Пулы потоков
 - Учебник по параллелизму Java — Callable, будущее
 - Учебник по параллелизму Java — CountDownLatch
 - Exchanger и Java без GC
 - Java Fork / Join для параллельного программирования
 - Как избежать ConcurrentModificationException при использовании итератора
 - Советы по повышению производительности приложений Java