Статьи

Пакет java.util.concurrent

Эта статья является частью нашего курса Академии под названием Основы параллелизма Java .


В этом курсе вы погрузитесь в магию параллелизма. Вы познакомитесь с основами параллелизма и параллельного кода и узнаете о таких понятиях, как атомарность, синхронизация и безопасность потоков. Проверьте это здесь !

1. Введение

В следующей главе представлен пакет java.util.concurrent . В этом пакете находится множество интересных классов, которые предоставляют необходимую и полезную функциональность, необходимую для реализации многопоточных приложений. После обсуждения того, как использовать интерфейс Executor и его реализацию, в главе рассматриваются атомарные типы данных и параллельные структуры данных. Последний раздел проливает свет на семафоры и обратные отсчеты.

2. java.util.concurrent

После прочтения предыдущих статей о параллелизме и многопоточности у вас может возникнуть ощущение, что не всегда легко написать надежный код, который хорошо работает в многопоточной среде. Есть пословица, которая иллюстрирует это (источник неизвестен):

  • Младшие программисты считают параллелизм трудным.
  • Опытные программисты считают параллелизм легким.
  • Старшие программисты считают параллелизм трудным.

Поэтому надежная библиотека структур данных и классов, которые обеспечивают хорошо протестированную безопасность потоков, очень полезна для всех, кто пишет программы, использующие параллелизм. К счастью, JDK предоставляет набор готовых структур данных и функциональных возможностей именно для этой цели. Все эти классы находятся в пакете java.util.concurrent.

2.1. душеприказчик

Пакет java.util.concurrent определяет набор интерфейсов, реализации которых выполняют задачи. Самым простым из них является интерфейс Executor:

1
2
3
public interface Executor {
    void execute(Runnable command);
}

Следовательно, реализация Executor берет данный экземпляр Runnable и выполняет его. Интерфейс не делает никаких предположений о способе выполнения, Javadoc только заявляет: «Выполняет данную команду в будущем». Поэтому простая реализация может быть:

1
2
3
4
5
6
public class MyExecutor implements Executor {
 
    public void execute(Runnable r) {
        (new Thread(r)).start();
    }
}

Наряду с простым интерфейсом JDK также поставляет полноценную и расширяемую реализацию с именем ThreadPoolExecutor . Под капотом ThreadPoolExecutor поддерживает пул потоков и отправляет экземпляры Runnable помощью метода execute() в пул. Аргументы, передаваемые в конструктор, управляют поведением пула потоков. Конструктор с наибольшим количеством аргументов следующий:

ThreadPoolExecutor (int corePoolSize, int MaximumPoolSize, long keepAliveTime, блок TimeUnit, BlockingQueue <Runnable> workQueue, ThreadFactory threadFactory, обработчик RejectedExecutionHandler)

Давайте рассмотрим различные аргументы шаг за шагом:

  • corePoolSize : ThreadPoolExecutor имеет атрибут corePoolSize который определяет, сколько потоков он будет запускать до тех пор, пока новые потоки не будут запущены, только когда очередь заполнена.
  • maximumPoolSize : Этот атрибут определяет, сколько потоков запущено в максимуме. Вы можете установить это в Integer.MAX_VALUE , чтобы не иметь верхней границы.
  • keepAliveTime : Когда ThreadPoolExecutor создал больше потоков corePoolSize , поток будет удален из пула, если он простаивает в течение заданного промежутка времени.
  • unit : это просто TimeUnit для keepAliveTime .
  • workQueue : в этой очереди хранятся экземпляры Runnable, переданные с помощью метода execute() до тех пор, пока они не будут запущены.
  • threadFactory : реализация этого интерфейса дает вам контроль над созданием потоков, используемых ThreadPoolExecutor .
  • обработчик : когда вы указываете фиксированный размер для workQueue и предоставляете MaximumPoolSize, тогда может случиться, что ThreadPoolExecutor не сможет выполнить ваш экземпляр Runnable из-за насыщения. В этом случае вызывается предоставленный обработчик, который дает вам контроль над тем, что должно произойти в этом случае.

Поскольку есть много параметров для настройки, давайте рассмотрим код, который их использует:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class ThreadPoolExecutorExample implements Runnable {
    private static AtomicInteger counter = new AtomicInteger();
    private final int taskId;
 
    public int getTaskId() {
        return taskId;
    }
 
    public ThreadPoolExecutorExample(int taskId) {
        this.taskId = taskId;
    }
     
    public void run() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
 
    public static void main(String[] args) {
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(10);
        ThreadFactory threadFactory = new ThreadFactory() {
            public Thread newThread(Runnable r) {
                int currentCount = counter.getAndIncrement();
                System.out.println("Creating new thread: " + currentCount);
                return new Thread(r, "mythread" + currentCount);
            }
        };
        RejectedExecutionHandler rejectedHandler = new RejectedExecutionHandler() {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                if (r instanceof ThreadPoolExecutorExample) {
                    ThreadPoolExecutorExample example = (ThreadPoolExecutorExample) r;
                    System.out.println("Rejecting task with id " + example.getTaskId());
                }
            }
        };
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS, queue, threadFactory, rejectedHandler);
        for (int i = 0; i < 100; i++) {
            executor.execute(new ThreadPoolExecutorExample(i));
        }
        executor.shutdown();
    }
}

Наша реализация run() засыпает только на 5 секунд, но это не главная задача этого кода. ThreadPoolExecutor начинается с 5 основных потоков и позволяет пулу увеличивать до 10 потоков максимум. В демонстрационных целях мы позволяем неиспользованному потоку простаивать только около 1 секунды. Реализация очереди здесь — это LinkedBlockingQueue емкостью 10 Runnable экземпляров. Мы также реализуем простой ThreadFactory для отслеживания создания потока. То же самое верно для RejectedExecutionHandler .

Цикл в методе main() теперь выдает 100 Runnable экземпляра в пул за короткое время. Выходные данные примера показывают, что мы должны создать 10 потоков (до максимума) для обработки всех ожидающих Runnables :

1
2
3
4
5
6
Creating new thread: 0
...
Creating new thread: 9
Rejecting task with id 20
...
Rejecting task with id 99

Но это также показывает, что все задачи с параметром taskId больше 19 перенаправляются в RejectedExecutionHandler . Это связано с тем, что наша реализация Runnable спит 5 секунд. После того, как первые 10 потоков были запущены, очередь может содержать только еще 10 Runnable экземпляров. Все последующие случаи должны быть отклонены.

Наконец, метод shutdown() позволяет ThreadPoolExecutor отклонять все дальнейшие задачи и ожидает, пока уже выполненные задачи будут выполнены. Вы можете заменить вызов shutdown() на вызов shutdownNow() . Последний пытается прервать все запущенные потоки и завершает работу пула потоков, не дожидаясь завершения всех потоков. В приведенном выше примере вы увидите десять исключений InterruptedException, поскольку наши десять спящих потоков сразу же проснутся.

2.2. ExecutorService

Интерфейс Executor очень прост, он только заставляет базовую реализацию реализовать метод execute() . ExecutorService идет дальше, Executor интерфейс Executor и добавляя ряд служебных методов (например, для добавления полного набора задач), методы для закрытия пула потоков, а также возможность запрашивать реализацию для результата выполнение одной задачи. Мы видели, что интерфейс Runnable определяет только метод run() как void в качестве возвращаемого значения. Следовательно, было необходимо введение нового интерфейса с именем Callable который определяет аналогично Runnable также только один метод, но этот метод возвращает значение:

1
V call();

Но как JDK обрабатывает тот факт, что задача возвращает значение, но передается в пул потоков для выполнения?

Отправитель задачи не может знать заранее, когда задача будет выполнена и как долго длится ее выполнение. Задержка текущего потока в ожидании результата, очевидно, не является решением. Работа по проверке того, доступен ли уже результат с функцией блокировки или ожидания определенного количества времени, реализована в другом классе: java.util.concurrent.Future<V> . Этот класс имеет только несколько методов для проверки выполнения задачи, отмены задачи и получения ее результата.

И последнее, но не менее важное: у нас есть другой интерфейс, который расширяет интерфейс Executor а также интерфейс ExecutorService помощью некоторых методов для планирования задачи на данный момент времени. Имя интерфейса — ScheduledExecutorService и он в основном предоставляет метод schedule() который принимает аргумент, как долго ждать, пока задача будет выполнена:

1
2
schedule(Callable<V> callable, long delay, TimeUnit unit);
schedule(Runnable command, long delay, TimeUnit unit);

Как и для ExecutorService метод schedule() доступен в двух вариантах: один для интерфейса Runnable и один для задач, которые возвращают значение с Callable интерфейса Callable . ScheduledExecutorService также предоставляет метод для периодического выполнения задач:

1
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

Рядом с начальной задержкой мы можем указать период, в который должно выполняться задание.

В последнем примере уже показано, как создать ThreadPoolExecutor . Реализация для ScheduledExecutorService называется ScheduledThreadPoolExecutor и должна обрабатываться очень похоже на ThreadPoolExecutor использованный выше. Но часто полный контроль над всеми функциями ExecutorService не требуется. Представьте себе простой тестовый клиент, который должен вызывать некоторые методы сервера, используя простой ThreadPool .

Поэтому создатели JDK создали простой фабричный класс с именем Executors (пожалуйста, обратите внимание на конечные значения). Этот класс предоставляет несколько статических методов для создания ThreadPoolExecutor к использованию ThreadPoolExecutor . Все это вместе позволяет нам реализовать простой пул потоков, который выполняет кучу задач, которые вычисляют некоторое число (операция демонстрации чисел здесь для демонстрационных целей заменена простым Thread.sleep() ):

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
public class ExecutorsExample implements Callable<Integer> {
    private static Random random = new Random(System.currentTimeMillis());
 
    public Integer call() throws Exception {
        Thread.sleep(1000);
        return random.nextInt(100);
    }
 
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        Future<Integer>[] futures = new Future[5];
        for (int i = 0; i < futures.length; i++) {
            futures[i] = executorService.submit(new ExecutorsExample());
        }
        for (int i = 0; i < futures.length; i++) {
            Integer retVal = futures[i].get();
            System.out.println(retVal);
        }
        executorService.shutdown();
    }
}

Создание ExecutorService является однострочным. Для выполнения некоторых задач нам просто необходим цикл for, который создает несколько новых экземпляров ExecutorsExample и сохраняет возвращенное Future в массиве. После того, как мы отправили задачи в сервис, мы просто ждем результата. Метод get() of Future блокируется, то есть текущие потоки спят, пока не станет доступен результат. Переопределенная версия этого метода требует спецификации тайм-аута, чтобы позволить ожидающему потоку продолжаться, если задача не завершается в течение определенного периода времени.

2,3. Параллельные коллекции

Инфраструктура коллекций Java охватывает широкий спектр структур данных, которые каждый Java-программист использует в своей повседневной работе. Эта коллекция расширяется структурами данных в пакете java.util.concurrent . Эти реализации предоставили поточно-ориентированные коллекции для использования в многопоточной среде.

Многие Java-программисты даже время от времени используют поточно-ориентированные структуры данных, даже не подозревая об этом. «Старые» классы Hashtable и Vector являются примерами для таких классов. Будучи частью JDK начиная с версии 1.0, эти базовые структуры данных были разработаны с учетом безопасности потоков. Хотя безопасность потоков здесь означает только то, что все методы синхронизируются на уровне экземпляра. Следующий код взят из реализации Oracle JDK:

1
2
3
4
5
6
7
public synchronized void clear() {
    Entry tab[] = table;
    modCount++;
    for (int index = tab.length; --index >= 0; )
        tab[index] = null;
    count = 0;
}

Это принципиальное отличие от «новых» классов коллекций, таких как HashMap или ArrayList (оба доступны начиная с JDK 1.2), которые сами по себе не являются поточно-ориентированными. Но есть удобный способ получить потокобезопасный экземпляр такого «более нового» класса коллекции:

1
2
HashMap<Long,String> map = new HashMap<Long, String>();
Map<Long, String> synchronizedMap = Collections.synchronizedMap(map);

Как мы видим из приведенного выше кода, класс Collections позволяет нам создавать во время выполнения синхронизированную версию ранее несинхронизированного класса коллекций.

Как мы узнали ранее, добавление ключевого слова, синхронизированного с методом, приводит к тому, что в каждый момент времени только один поток выполняет метод исследуемого объекта. Это, конечно, самый простой способ сделать простой класс коллекции потокобезопасным. Более продвинутые методы включают в себя специальные алгоритмы, разработанные для одновременного доступа. Эти алгоритмы реализованы в классах коллекции пакета java.util.concurrent .

Примером такого класса является ConcurrentHashMap :

1
2
3
ConcurrentHashMap<Long,String> map = new ConcurrentHashMap<Long,String>();
map.put(key, value);
String value2 = map.get(key);

Приведенный выше код выглядит почти так же, как и для обычного HashMap , но базовая реализация совершенно иная. Вместо использования только одной блокировки для всей таблицы ConcurrentHashMap подразделяет всю таблицу на множество небольших разделов. Каждый раздел имеет свою собственную блокировку. Следовательно, операции записи в эту карту из разных потоков, при условии, что они пишут в разных разделах таблицы, не конкурируют и могут использовать свою собственную блокировку.

Реализация также вводит идею фиксации операций записи для сокращения времени ожидания операций чтения. Это немного меняет семантику операции чтения, так как возвращает результат последней операции записи, которая завершилась. Это означает, что число записей может не совпадать непосредственно до и после выполнения метода чтения, как это было бы при использовании синхронизированного метода, но для параллельных приложений это не всегда важно. То же самое верно для реализации итератора ConcurrentHashMap .

Чтобы лучше понять разницу в производительности Hashtable , синхронизированных HashMap и ConcurrentHashMap , давайте реализуем простой тест производительности. Следующий код запускает несколько потоков и позволяет каждому потоку получать значение из карты в произвольной позиции, а затем обновляет значение в другой случайной позиции:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class MapComparison implements Runnable {
    private static Map<Integer, String> map;
    private Random random = new Random(System.currentTimeMillis());
 
    public static void main(String[] args) throws InterruptedException {
        runPerfTest(new Hashtable<Integer, String>());
        runPerfTest(Collections.synchronizedMap(new HashMap<Integer,String>()));
        runPerfTest(new ConcurrentHashMap<Integer, String>());
        runPerfTest(new ConcurrentSkipListMap<Integer, String>());
    }
 
    private static void runPerfTest(Map<Integer, String> map) throws InterruptedException {
        MapComparison.map = map;
        fillMap(map);
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        long startMillis = System.currentTimeMillis();
        for (int i = 0; i < 10; i++) {
            executorService.execute(new MapComparison());
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
        System.out.println(map.getClass().getSimpleName() + " took " + (System.currentTimeMillis() - startMillis) + " ms");
    }
 
    private static void fillMap(Map<Integer, String> map) {
        for (int i = 0; i < 100; i++) {
            map.put(i, String.valueOf(i));
        }
    }
 
    public void run() {
        for (int i = 0; i < 100000; i++) {
            int randomInt = random.nextInt(100);
            map.get(randomInt);
            randomInt = random.nextInt(100);
            map.put(randomInt, String.valueOf(randomInt));
        }
    }
}

Вывод этой программы следующий:

1
2
3
4
Hashtable took 436 ms
SynchronizedMap took 433 ms
ConcurrentHashMap took 75 ms
ConcurrentSkipListMap took 89 ms

Как мы и ожидали, реализации Hashtable и синхронизированного HashMap сильно отстают от параллельных пакетов. В этом примере также представлена ​​реализация списка пропуска в HashMap, где связанные элементы в одном сегменте образуют список пропуска, что означает, что список отсортирован и существуют разные уровни связывания элементов в списке. Указатель наивысшего уровня указывает непосредственно на какой-либо элемент в середине списка. Если этот элемент уже больше текущего элемента, итератор должен перейти на следующий более низкий уровень связи, чтобы пропустить меньше элементов, чем на самом высоком уровне. Подробное описание списков пропусков можно найти здесь . Интересным моментом в списке пропусков является то, что весь доступ для чтения занимает около log (n) времени, даже если все элементы хранятся в одном сегменте.

2,4. Атомные переменные

Когда несколько потоков совместно используют одну переменную, у нас есть задача синхронизировать доступ к этой переменной. Причиной этого является тот факт, что даже простая инструкция, такая как i ++, не является атомарной. В основном он состоит из следующих инструкций байт-кода:

1
2
3
iload_1      
iinc 1, 1
istore_1

Не зная слишком много о байт-коде Java, мы видим, что текущее значение локальной переменной 1 помещается в стек операнда, что оно увеличивается на постоянное значение 1, а затем извлекается из стека и сохраняется в локальной переменной номер один. , Это означает, что нам нужно три атомарных операции, чтобы увеличить локальную переменную на единицу. В многопоточной среде это также означает, что планировщик может остановить выполнение текущего потока между каждой из этих инструкций и запустить новый поток, который в свою очередь может работать с той же переменной.

Чтобы справиться с подобными ситуациями, вы, конечно, можете синхронизировать доступ к этой конкретной переменной:

1
2
3
synchronized(i) {
    i++;
}

Но это также означает, что текущий поток должен получить блокировку i, которая нуждается во внутренней синхронизации и вычислениях в JVM. Этот подход также называется пессимистической блокировкой, поскольку мы предполагаем, что весьма вероятно, что другой поток в настоящее время удерживает блокировку, которую мы хотим получить. Другой подход, называемый оптимистической блокировкой, предполагает, что за ресурс конкурирует не так много потоков, и поэтому мы просто пытаемся обновить значение и посмотреть, сработало ли это. Одной из реализаций этого подхода является метод сравнения и обмена (CAS). Эта операция реализована на многих современных процессорах как атомарная операция. Он сравнивает содержимое данной области памяти с заданным значением («ожидаемое значение») и обновляет его до нового значения, если текущее значение равно ожидаемому значению. В псевдокоде это выглядит так:

1
2
3
4
int currentValue = getValueAtMemoryPosition(pos);
if(currentValue == expectedValue) {
    setValueAtMemoryPosition(pos, newValue);
}

Операция CAS реализует приведенный выше код как одну атомарную операцию. Следовательно, его можно использовать, чтобы увидеть, имеет ли значение какой-либо переменной значение, сохраняемое текущим потоком, и обновить его до приращенного значения в этом случае. Поскольку использование операции CAS требует аппаратной поддержки, JDK предоставляет специальные классы для поддержки этих операций. Все они находятся в пакете java.util.concurrent.atomic.

Одним из представителей этих классов является java.util.concurrent.atomic.AtomicInteger . Операция CAS, описанная выше, реализуется методом

1
boolean compareAndSet(int expect, int update)

Возвращаемое логическое значение указывает, была ли операция обновления успешной или нет. На основе этой функциональности может быть реализована дополнительная операция, такая как операция атомарного приращения (здесь взято из реализации JDK Oracle):

1
2
3
4
5
6
7
8
public final int getAndIncrement() {
    for (;;) {
        int current = get();
        int next = current + 1;
        if (compareAndSet(current, next))
            return current;
    }
}

Теперь мы можем увеличивать целочисленную переменную различными потоками без использования пессимистических блокировок:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
public class AtomicIntegerExample implements Runnable {
    private static final AtomicInteger atomicInteger = new AtomicInteger();
 
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            executorService.execute(new AtomicIntegerExample());
        }
        executorService.shutdown();
    }
 
    public void run() {
        for (int i = 0; i < 10; i++) {
            int newValue = atomicInteger.getAndIncrement();
            if (newValue == 42) {
                System.out.println("[" + Thread.currentThread().getName() + "]: " + newValue);
            }
        }
    }
}

Приведенный выше код запускает пять потоков и позволяет каждому из них увеличивать переменную AtomicInteger . Счастливая нить, которая получает ответ 42, печатает это на консоли. При выполнении этого примера кода в повторении вывод будет создан только одним потоком.

Помимо AtomicInteger JDK также предлагает классы для атомарных операций над длинными значениями, целочисленными и длинными массивами и ссылками.

2.5. семафор

Семафоры используются для управления доступом к общему ресурсу. В отличие от простых синхронизированных блоков, семафор имеет внутренний счетчик, который увеличивается каждый раз, когда поток получает блокировку, и уменьшается каждый раз, когда поток снимает блокировку, полученную ранее. Операции увеличения и уменьшения, конечно, синхронизированы, следовательно, семафор может использоваться для управления количеством потоков, проходящих одновременно через критическую секцию. Две основные операции потока:

1
2
void acquire();
void release();

Конструктор принимает рядом с количеством одновременно блокируемых параметр справедливости. Параметр fairness определяет, установлены ли новые потоки, которые пытаются получить блокировку, в начале или в конце списка ожидающих потоков. Помещение нового потока в конец потоков гарантирует, что все потоки получат блокировку через некоторое время и, следовательно, ни один поток не будет голодать.

1
Semaphore(int permits, boolean fair)

Чтобы проиллюстрировать описанное поведение, давайте настроим простой пул потоков с пятью потоками, но с помощью семафора будем контролировать, что в каждый момент времени работает не более трех из них:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class SemaphoreExample implements Runnable {
    private static final Semaphore semaphore = new Semaphore(3, true);
    private static final AtomicInteger counter = new AtomicInteger();
    private static final long endMillis = System.currentTimeMillis() + 10000;
 
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            executorService.execute(new SemaphoreExample());
        }
        executorService.shutdown();
    }
 
    public void run() {
        while(System.currentTimeMillis() < endMillis) {
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                System.out.println("["+Thread.currentThread().getName()+"] Interrupted in acquire().");
            }
            int counterValue = counter.incrementAndGet();
            System.out.println("["+Thread.currentThread().getName()+"] semaphore acquired: "+counterValue);
            if(counterValue > 3) {
                throw new IllegalStateException("More than three threads acquired the lock.");
            }
            counter.decrementAndGet();
            semaphore.release();
        }
    }
}

Семафор создается путем передачи 3 в качестве числа одновременных разрешений. При попытке получить блокировку заблокированный поток может столкнуться с InterruptedException который должен быть перехвачен. В качестве альтернативы можно также вызвать служебный метод acquireUninterruptibly() чтобы обойти конструкцию try-catch.

Чтобы убедиться, что в критической секции у нас не более трех параллельных потоков, мы используем AtomicInteger который увеличивается каждый раз, когда процесс входит в секцию, и уменьшается до того, как покидает секцию. Когда значение счетчика больше четырех, генерируется IllegalStateException . Наконец, мы release() семафор и позволяем другому ожидающему потоку войти в критическую секцию.

2.6. CountDownLatch

Класс CountDownLatch — это еще один полезный класс для синхронизации потоков из JDK. Подобно классу Semaphore он предоставляет счетчик, но счетчик CountDownLatch можно уменьшить только до достижения нуля. Как только счетчик достигнет нуля, все потоки, ожидающие на CountDownLatch могут продолжить работу. Такая функциональность часто необходима, когда все потоки пула должны синхронизироваться в какой-то момент, чтобы продолжить. Простым примером может служить приложение, которое должно собирать данные из разных источников, прежде чем сможет сохранить новый набор данных в базе данных.

Следующий код демонстрирует, как пять потоков спят в течение случайного количества времени. Каждая нить, которая просыпается, ведет отсчет защелки, а затем ожидает защелки, чтобы стать нулевой. Наконец все потоки выводят, что они закончили.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class CountDownLatchExample implements Runnable {
    private static final int NUMBER_OF_THREADS = 5;
    private static final CountDownLatch latch = new CountDownLatch(NUMBER_OF_THREADS);
    private static Random random = new Random(System.currentTimeMillis());
 
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
        for (int i = 0; i < NUMBER_OF_THREADS; i++) {
            executorService.execute(new CountDownLatchExample());
        }
        executorService.shutdown();
    }
 
    public void run() {
        try {
            int randomSleepTime = random.nextInt(20000);
            System.out.println("[" + Thread.currentThread().getName() + "] Sleeping for " + randomSleepTime);
            Thread.sleep(randomSleepTime);
            latch.countDown();
            System.out.println("[" + Thread.currentThread().getName() + "] Waiting for latch.");
            latch.await();
            System.out.println("[" + Thread.currentThread().getName() + "] Finished.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

При запуске этого примера вы увидите, что выводится «В ожидании защелки». приходит в разные моменты времени, но это «Закончено». сообщение каждой темы печатается сразу одно за другим.

2,7. CyclicBarrier

В отличие от CountDownLatch , класс CyclicBarrier реализует счетчик, который может быть сброшен после CountDownLatch CyclicBarrier до нуля. Все потоки должны вызывать свой метод await() пока внутренний счетчик не будет установлен на ноль. Затем ожидающие потоки просыпаются и могут продолжаться. Внутренне счетчик затем сбрасывается до исходного значения, и вся процедура может начаться снова:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class CyclicBarrierExample implements Runnable {
    private static final int NUMBER_OF_THREADS = 5;
    private static AtomicInteger counter = new AtomicInteger();
    private static Random random = new Random(System.currentTimeMillis());
    private static final CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
        public void run() {
            counter.incrementAndGet();
        }
    });
 
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
        for (int i = 0; i < NUMBER_OF_THREADS; i++) {
            executorService.execute(new CyclicBarrierExample());
        }
        executorService.shutdown();
    }
 
    public void run() {
        try {
            while(counter.get() < 3) {
                int randomSleepTime = random.nextInt(10000);
                System.out.println("[" + Thread.currentThread().getName() + "] Sleeping for " + randomSleepTime);
                Thread.sleep(randomSleepTime);
                System.out.println("[" + Thread.currentThread().getName() + "] Waiting for barrier.");
                barrier.await();
                System.out.println("[" + Thread.currentThread().getName() + "] Finished.");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Приведенный выше пример очень похож на CountDownLatch , но в отличие от предыдущего примера я добавил цикл while в метод run() . Эти реализации run() позволяют каждому потоку продолжать процедуру sleep и await() , пока счетчик не станет равным трем. Также обратите внимание на анонимную реализацию Runnable (), предоставленную конструктору CyclicBarrier . Его метод run() выполняется каждый раз при срабатывании барьера. Здесь мы увеличиваем счетчик, который проверяется параллельными потоками.

3. Загрузите исходный код

Вы можете скачать исходный код этого урока: concurrency-4.zip