Статьи

Координационные темы

Java 5 представила много новых примитивов и коллекций параллелизма, и в этом посте мы рассмотрим два класса, которые можно использовать для координации потоков: CountDownLatch и CyclicBarrier .

CountDownLatch инициализируется со счетчиком. Затем потоки могут либо отсчитывать защелку, либо ждать, пока она достигнет 0. Когда защелка достигнет 0, все ожидающие потоки будут освобождены.

Распространенной идиомой является использование защелки для запуска скоординированного начала или конца между потоками

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownDemo {

public static void main(String[] args) throws Exception {
int threads = 3;
final CountDownLatch startLatch = new CountDownLatch(threads);
final CountDownLatch endLatch = new CountDownLatch(threads);

ExecutorService svc = Executors.newFixedThreadPool(threads);
for (int i = 0; i < threads; i++) {
svc.execute(new Runnable() {
public void run() {
try {
log("At run()");
startLatch.countDown();
startLatch.await();

log("Do work");
Thread.sleep((int) (Math.random() * 1000));

log("Wait for end");
endLatch.countDown();
endLatch.await();

log("Done");
} catch (Exception e) {
e.printStackTrace();
}
}
});
Thread.sleep(100);
}
}

private static void log(String msg) {
System.out.println(System.currentTimeMillis() + ": "
+ Thread.currentThread().getId() + " " + msg);
}
}

В этом коде вы увидите, что две защелки инициализируются. Каждый поток, который запускается, ведет обратный отсчет на защелке и ожидает защелки с обратным отсчетом до 0 (когда все потоки были инициализированы). Точно так же каждый поток ожидает завершения всех потоков одновременно.

Запуск этой программы дает:


1194812267416: 7 при выполнении ()

1194812267517: 8 при выполнении ()

1194812267618: 9 при выполнении ()

1194812267618: 9
выполнить работу
1194812267618: 7

выполнить работу
1194812267619: 8

выполнить работу
1194812267673: 7 дождаться завершения

1194812267688: 8 дождаться завершения

119 9 Ожидание завершения

1194812268023: 9 Готово

1194812268023: 7 Готово

1194812268023: 8 Готово

Вы можете видеть, что каждый поток выполняет run () в разное время, но одновременно проходит через барьер. Каждый из них затем выполняет случайное количество работы и ждет защелки, а затем проходит мимо нее вместе.

В приведенном выше примере каждый поток всегда ждет срабатывания защелки. Вы также можете подождать определенный период времени, прежде чем сдаться. И вы можете проверить защелку, чтобы увидеть, сколько потоков прибыло и теперь ждет. Каждый экземпляр CountDownLatch можно использовать только один раз, а затем он мертв.

Если вы хотите, чтобы набор потоков неоднократно встречался в общей точке, вам лучше использовать CyclicBarrier. Обычно это используется в многопоточном тестировании, где обычно запускают несколько потоков, встречаются, делают что-то, встречаются, проверяют некоторые утверждения, многократно.

Предыдущая программа может быть упрощена путем замены двух защелок одним барьером:

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierDemo {

public static void main(String[] args) throws Exception {
int threads = 3;
final CyclicBarrier barrier = new CyclicBarrier(threads);

ExecutorService svc = Executors.newFixedThreadPool(threads);
for (int i = 0; i < threads; i++) {
svc.execute(new Runnable() {
public void run() {
try {
log("At run()");
barrier.await();

log("Do work");
Thread.sleep((int) (Math.random() * 1000));

log("Wait for end");
barrier.await();

log("Done");
} catch (Exception e) {
e.printStackTrace();
}
}
});
Thread.sleep(100);
}
}

private static void log(String msg) {
System.out.println(System.currentTimeMillis() + ": "
+ Thread.currentThread().getId() + " " + msg);
}
}

Здесь мы видим, что потоки могут многократно ждать у барьера, который неявным образом ведет обратный отсчет до тех пор, пока не достигнут все потоки, а затем освобождает все потоки.

Еще один приятный трюк с CyclicBarrier заключается в том, что действие Runnable может быть связано с барьером, который будет выполняться последним потоком, достигающим барьера. Вы можете очень просто создать таймер начала / окончания для тестирования с этой функциональностью:

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TimerBarrierDemo {

public static void main(String[] args) throws Exception {
int threads = 3;
final CyclicBarrier barrier = new CyclicBarrier(threads, new BarrierTimer());

ExecutorService svc = Executors.newFixedThreadPool(threads);
for (int i = 0; i < threads; i++) {
svc.execute(new Runnable() {
public void run() {
try {
barrier.await();
long sleepTime = (int) (Math.random() * 1000);
System.out.println(Thread.currentThread().getId() + " working for " + sleepTime);
Thread.sleep(sleepTime);
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}

private static class BarrierTimer implements Runnable {
private long start;

public void run() {
if (start == 0) {
start = System.currentTimeMillis();
} else {
long end = System.currentTimeMillis();
long elapsed = (end - start);
System.out.println("Completed in " + elapsed + " ms");
}
}
}
}

Здесь мы полагаемся, что знаем, что барьер будет достигнут ровно дважды — один раз в начале и один раз в конце. При первом достижении мы записываем метку времени, а во втором случае мы распечатываем время. Когда мы создаем наш барьер, мы даем ему экземпляр этого класса таймера. Затем каждая нить ожидает запуска на барьере, работает в течение случайного промежутка времени и ожидает окончания барьера.

Пробег выглядит так:


9 работает на 35

7 работает на 341

8 работает на 371

Завершено за 372 мс

Как правило, следует ожидать, что записанное истекшее время будет максимумом рабочего времени любого из потоков.

CyclicBarrier также имеет несколько дополнительных приемов — потоки могут ждать период времени, а не вечно, проверять, был ли сломан барьер (прерыванием или принудительно с помощью метода reset ()), и определять количество сторон и количество в настоящее время ожидание.