Статьи

Java 7: понимание Phaser

Java 7 представляет гибкий механизм синхронизации потоков, называемый Phaser. Если вам нужно дождаться прибытия потоков, прежде чем вы сможете продолжить или запустить другой набор задач, тогда Phaser — хороший выбор. Вот список, все объясняется пошагово.

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Phaser;

public class PhaserExample {

 public static void main(String[] args) throws InterruptedException {

  List<runnable> tasks = new ArrayList<>();

  for (int i = 0; i < 2; i++) {

   Runnable runnable = new Runnable() {
    @Override
    public void run() {
     int a = 0, b = 1;
     for (int i = 0; i < 2000000000; i++) {
      a = a + b;
      b = a - b;
     }
    }
   };

   tasks.add(runnable);

  }

  new PhaserExample().runTasks(tasks);

 }

 void runTasks(List<runnable> tasks) throws InterruptedException {

  final Phaser phaser = new Phaser(1) {
   protected boolean onAdvance(int phase, int registeredParties) {
    return phase >= 1 || registeredParties == 0;
   }
  };

  for (final Runnable task : tasks) {
   phaser.register();
   new Thread() {
    public void run() {
     do {
      phaser.arriveAndAwaitAdvance();
      task.run();
     } while (!phaser.isTerminated());
    }
   }.start();
   Thread.sleep(500);
  }

  phaser.arriveAndDeregister();
 }

}

Этот пример позволяет многое узнать о внутренностях Phaser. Давайте рассмотрим код:

строка 8 : метод main, который создает две задачи Runnable
строка 29 : список задач передается методу runTasks. Метод

runTasks-Method фактически использует Phaser для синхронизации задач так, что каждая задача в список должен прийти к барьеру, прежде чем они будут выполнены параллельно. Список задач выполняется дважды. Первый цикл начинается, когда обе нити достигают барьера (см. Изображение метки 1). Второй цикл начинается, когда обе нити достигают барьера (см. Изображение метки 2).


Обратите внимание: «сторона» — это термин в контексте Phaser, который эквивалентен тому, что мы подразумеваем под потоком.
Когда одна сторона прибывает, тогда один поток достиг барьера синхронизации.

Строка 35 : создать Phaser, у которого есть одна зарегистрированная сторона (это означает, что phaser ожидает, что один поток = сторона прибудет, прежде чем он сможет начать цикл выполнения)
Строка 36 : реализовать метод onAdvance-Method, чтобы объяснить, что этот список задач выполняется дважды (выполнено by: Строка 37 говорит, что она возвращает true, если фаза равна или больше, чем 1)
Строка 41 : перебрать список задач
Строка 42 : зарегистрировать этот поток в Phaser. Обратите внимание, что экземпляр Phaser не знает экземпляры задачи. Это простой счетчик зарегистрированных, неподтвержденных и прибывших сторон, разделенных между участвующими потоками. Если две стороны зарегистрированы, то две стороны должны прибыть в фазер, чтобы иметь возможность начать первый цикл.
Линия 46: сообщить нити, чтобы она ждала у барьера, пока прибывающие стороны не сравнятся с зарегистрированными сторонами
Строка 52 : две задачи зарегистрированы, всего три стороны зарегистрированы.
Строка 54 : отменить регистрацию одной партии. Это приводит к двум зарегистрированным сторонам и двум прибывшим сторонам. Это заставляет потоки, ожидающие (строка 46), выполнить первый цикл. (на самом деле третья сторона прибыла, пока три были зарегистрированы — но это не имеет значения)

Исходный фрагмент кода, хранящийся в моем репозитории Git, создает следующий вывод:

After phaser init -> Registered: 1 - Unarrived: 1 - Arrived: 0 - Phase: 0
After register -> Registered: 2 - Unarrived: 2 - Arrived: 0 - Phase: 0
After arrival -> Registered: 2 - Unarrived: 1 - Arrived: 1 - Phase: 0
After register -> Registered: 3 - Unarrived: 2 - Arrived: 1 - Phase: 0
After arrival -> Registered: 3 - Unarrived: 1 - Arrived: 2 - Phase: 0
Before main thread arrives and deregisters -> Registered: 3 - Unarrived: 1 - Arrived: 2 - Phase: 0
On advance -> Registered: 2 - Unarrived: 0 - Arrived: 2 - Phase: 0
After main thread arrived and deregistered -> Registered: 2 - Unarrived: 2 - Arrived: 0 - Phase: 1
Main thread will terminate ...
Thread-0:go  :Wed Dec 28 16:09:16 CET 2011
Thread-1:go  :Wed Dec 28 16:09:16 CET 2011
Thread-0:done:Wed Dec 28 16:09:20 CET 2011
Thread-1:done:Wed Dec 28 16:09:20 CET 2011
On advance -> Registered: 2 - Unarrived: 0 - Arrived: 2 - Phase: 1
Thread-0:go  :Wed Dec 28 16:09:20 CET 2011
Thread-1:go  :Wed Dec 28 16:09:20 CET 2011
Thread-1:done:Wed Dec 28 16:09:23 CET 2011
Thread-0:done:Wed Dec 28 16:09:23 CET 2011

Строка 1 : когда Phaser инициализируется в строке 35 фрагмента кода, тогда регистрируется одна сторона, и никто не прибыл.
Строка 2 : после того, как первый поток зарегистрирован в строке 42 в примере кода, есть две зарегистрированные стороны и две неподтвержденные стороны. Поскольку ни одна нить еще не достигла барьера, никакая партия не прибыла.
Строка 3 : первый поток прибывает и ожидает на барьере (строка 46 в фрагменте кода)
Строка 4 : регистрирует второй поток, три зарегистрированы, два не получены, один прибыл
Строка 5 : второй поток достиг барьера, следовательно два прибыли сейчас
линия 7: одна сторона отменена в строке кода 54 примера кода, поэтому onAdvance-Method вызывается и возвращает false. Это запускает первый цикл, так как зарегистрированные стороны равны прибывшим сторонам (т.е. двум) Фаза 1 начинается -> один цикл (см знак изображения 1)
Строка 8 : поскольку все нити уведомлены и начать свою работу, две партии unarrived снова, не прибыла
Line 14 : После того, как нити выполнены свои задачи , когда они приходят снова (код строка 46) вызывается метод onAdvance, теперь 2-й цикл выполнен

ОК, пройдите его и посмотрите мои комментарии в исходном фрагменте кода, чтобы узнать больше. 

От http://niklasschlimm.blogspot.com/2011/12/java-7-understanding-phaser.html