Внутренне, реализации делают умное использование другой функции параллелизма, представленной в Java 1.5 — Блокирующие очереди.
Очередь
Сначала краткий обзор стандартной очереди. В информатике очередь — это просто коллекция, которая всегда добавляет элементы в конец и всегда берет элементы с самого начала. Выражение «первым пришел-первым вышел» (FIFO) обычно используется для описания стандартной очереди. В Java 1.6 введена Deque или двусторонняя очередь, и этот интерфейс теперь реализован в LinkedList . Некоторые очереди в java допускают альтернативное упорядочение, такое как использование Comparator или даже написание собственной реализации упорядочения. Несмотря на то, что расширенная функциональность хороша, сегодня мы фокусируемся на том, как BlockingQueues действительно сияет в параллельной разработке.
Очередь блокировки
Очереди блокировки — это очереди, которые также предоставляют функциональность для блокировки запросов на извлечение элемента, когда элемент недоступен с дополнительной опцией, ограничивающей количество времени, затрачиваемого на ожидание. В очереди с ограниченным размером такая же функциональность блокировки доступна при попытке добавления. Давайте подробно рассмотрим пример использования BlockingQueue .
Давайте предположим простой сценарий. У вас есть поток обработки, функция которого просто выполнять команды.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; private BlockingQueue<Command> workQueue = new LinkedBlockingQueue<Command>(); public void addCommand(Command command) { workQueue.offer(command); } public Object call() throws Exception { try { Command command = workQueue.take(); command.execute(); } catch (InterruptedException e) { throw new WorkException(e); } } |
Конечно, это очень простой пример, но он показывает основы использования BlockingQueue для нескольких потоков. Давайте попробуем что-нибудь более сложное. В этом примере нам нужно создать пул соединений с лимитом. Он должен создавать соединения только по мере необходимости. Ни один клиент не будет ожидать доступного соединения дольше 5 секунд.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
private BlockingQueue<Connection> pool = new ArrayBlockingQueue<Connection>( 10 ); private AtomicInteger connCount = new AtomicInteger(); public Connection getConnection() { Connection conn = pool.poll( 5 , TimeUnit.SECONDS); if (conn == null ) { synchronized (connCount) { if (connCount.get() < 10 ) { conn = getNewConnection(); pool.offer(conn); connCount.incrementAndGet(); } } if (conn == null ) { throw new ConnUnavailException(); } else { return conn; } } } |
Наконец, давайте рассмотрим пример использования интересной реализации, SynchronousQueue .
В этом примере, аналогично нашему первому, мы хотим выполнить Команду, но нам нужно знать, когда она будет выполнена, и ждать не более 2 минут.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
private BlockingQueue workQueue = new LinkedBlockingQueue(); private Map commandQueueMap = new ConcurrentHashMap(); public SynchronousQueue addCommand(Command command) { SynchronousQueue queue = new SynchronousQueue(); commandQueueMap.put(command, queue); workQueue.offer(command); return queue; } public Object call() throws Exception { try { Command command = workQueue.take(); Result result = command.execute(); SynchronousQueue queue = commandQueueMap.get(command); queue.offer(result); return null ; } catch (InterruptedException e) { throw new WorkException(e); } } |
Теперь потребитель может безопасно запросить тайм-аут по своему запросу на выполнение своей команды.
1
2
3
4
5
6
7
8
|
Command command; SynchronousQueue queue = commandRunner.addCommand(command); Result result = queue.poll( 2 , TimeUnit.MINUTES); if (result == null ) { throw new CommandTooLongException(command); } else { return result; } |
Как вы начинаете видеть, BlockingQueues в java обеспечивает большую гибкость и предоставляет вам относительно простые структуры для удовлетворения многих, если не всех, ваших потребностей в многопоточном приложении. Есть некоторые действительно интересные BlockingQueues, которые мы даже не рассмотрели, такие как PriorityBlockingQueue и DelayQueue . Посмотрите на них и свяжитесь с нами. Мы любим общаться с другими разработчиками.
Ссылка: Java Concurrency Part 5 — Блокировка очередей от наших партнеров JCG в блоге Carfey Software .
- Учебник по параллелизму Java — Семафоры
- Учебник по параллелизму Java — повторяющиеся блокировки
- Учебник по параллелизму Java — Пулы потоков
- Учебник по параллелизму Java — Callable, будущее
- Учебник по параллелизму Java — CountDownLatch
- Exchanger и Java без GC
- Java Fork / Join для параллельного программирования
- Как избежать ConcurrentModificationException при использовании итератора
- Советы по повышению производительности приложений Java