Статьи

Учебник по параллелизму Java – Блокировка очередей

Как обсуждалось в части 3 , пулы потоков, представленные в Java 1.5, обеспечивали поддержку ядра, которая быстро стала любимой для многих разработчиков Java.

Внутренне, реализации делают умное использование другой функции параллелизма, представленной в Java 1.5 — Блокирующие очереди.

Очередь

Сначала краткий обзор стандартной очереди. В информатике очередь — это просто коллекция, которая всегда добавляет элементы в конец и всегда берет элементы с самого начала. Выражение «первым пришел-первым вышел» (FIFO) обычно используется для описания стандартной очереди. В Java 1.6 введена Deque или двусторонняя очередь, и этот интерфейс теперь реализован в LinkedList . Некоторые очереди в java допускают альтернативное упорядочение, такое как использование Comparator или даже написание собственной реализации упорядочения. Несмотря на то, что расширенная функциональность хороша, сегодня мы фокусируемся на том, как BlockingQueues действительно сияет в параллельной разработке.

Очередь блокировки

Очереди блокировки — это очереди, которые также предоставляют функциональность для блокировки запросов на извлечение элемента, когда элемент недоступен с дополнительной опцией, ограничивающей количество времени, затрачиваемого на ожидание. В очереди с ограниченным размером такая же функциональность блокировки доступна при попытке добавления. Давайте подробно рассмотрим пример использования BlockingQueue .

Давайте предположим простой сценарий. У вас есть поток обработки, функция которого просто выполнять команды.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
 
private BlockingQueue<Command> workQueue = new LinkedBlockingQueue<Command>();
 
public void addCommand(Command command) {
    workQueue.offer(command);
}
 
public Object call() throws Exception {
    try {
        Command command = workQueue.take();
        command.execute();
    } catch (InterruptedException e) {
        throw new WorkException(e);
    }
}

Конечно, это очень простой пример, но он показывает основы использования BlockingQueue для нескольких потоков. Давайте попробуем что-нибудь более сложное. В этом примере нам нужно создать пул соединений с лимитом. Он должен создавать соединения только по мере необходимости. Ни один клиент не будет ожидать доступного соединения дольше 5 секунд.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
private BlockingQueue<Connection> pool = new ArrayBlockingQueue<Connection>(10);
private AtomicInteger connCount = new AtomicInteger();
 
public Connection getConnection() {
    Connection conn = pool.poll(5, TimeUnit.SECONDS);
    if (conn == null) {
        synchronized (connCount) {
            if (connCount.get() < 10) {
                conn = getNewConnection();
                pool.offer(conn);
                connCount.incrementAndGet();
            }
        }
        if (conn == null) {
            throw new ConnUnavailException();
        } else {
            return conn;
        }
    }
}

Наконец, давайте рассмотрим пример использования интересной реализации, SynchronousQueue .

В этом примере, аналогично нашему первому, мы хотим выполнить Команду, но нам нужно знать, когда она будет выполнена, и ждать не более 2 минут.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
private BlockingQueue workQueue = new LinkedBlockingQueue();
private Map commandQueueMap = new ConcurrentHashMap();
 
public SynchronousQueue addCommand(Command command) {
    SynchronousQueue queue = new SynchronousQueue();
    commandQueueMap.put(command, queue);
    workQueue.offer(command);
    return queue;
}
 
public Object call() throws Exception {
    try {
        Command command = workQueue.take();
        Result result = command.execute();
        SynchronousQueue queue = commandQueueMap.get(command);
        queue.offer(result);
        return null;
    } catch (InterruptedException e) {
        throw new WorkException(e);
    }
}

Теперь потребитель может безопасно запросить тайм-аут по своему запросу на выполнение своей команды.

1
2
3
4
5
6
7
8
Command command;
SynchronousQueue queue = commandRunner.addCommand(command);
Result result = queue.poll(2, TimeUnit.MINUTES);
if (result == null) {
    throw new CommandTooLongException(command);
} else {
    return result;
}

Как вы начинаете видеть, BlockingQueues в java обеспечивает большую гибкость и предоставляет вам относительно простые структуры для удовлетворения многих, если не всех, ваших потребностей в многопоточном приложении. Есть некоторые действительно интересные BlockingQueues, которые мы даже не рассмотрели, такие как PriorityBlockingQueue и DelayQueue . Посмотрите на них и свяжитесь с нами. Мы любим общаться с другими разработчиками.

Ссылка: Java Concurrency Part 5 — Блокировка очередей от наших партнеров JCG в блоге Carfey Software .

Статьи по Теме :