Статьи

Правильно получать уведомления о событиях Java

Реализация шаблона наблюдателя для предоставления уведомлений о событиях в Java кажется простой задачей. Однако есть некоторые подводные камни, с которыми можно легко столкнуться. Здесь приводится объяснение распространенных ошибок, которые я неосторожно совершал сам в разных случаях …

Уведомление о событиях Java

Давайте начнем с простого bean-компонента StateHolder который инкапсулирует состояние приватного поля int с соответствующими средствами доступа:

01
02
03
04
05
06
07
08
09
10
11
12
public class StateHolder {
 
  private int state;
 
  public int getState() {
    return state;
  }
 
  public void setState( int state ) {
    this.state = state;
  }
}

Учтите, что мы решили, что наш компонент должен транслировать новости об state changes зарегистрированным наблюдателям. Совершенно никаких проблем! Удобное определение события и слушателя легко создать …

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
// change event to broadcast
public class StateEvent {
 
  public final int oldState;
  public final int newState;
 
  StateEvent( int oldState, int newState ) {
    this.oldState = oldState;
    this.newState = newState;
  }
}
 
// observer interface
public interface StateListener {
  void stateChanged( StateEvent event );
}

… далее нам нужно иметь возможность зарегистрировать StatListeners в экземплярах StateHolder

01
02
03
04
05
06
07
08
09
10
11
12
13
14
public class StateHolder {
 
  private final Set<StateListener> listeners = new HashSet<>();
 
  [...]
      
  public void addStateListener( StateListener listener ) {
    listeners.add( listener );
  }
 
  public void removeStateListener( StateListener listener ) {
    listeners.remove( listener );
  }
}

… И последнее, но не менее StateHolder#setState необходимо настроить, чтобы вызвать фактическое уведомление об изменениях состояния:

01
02
03
04
05
06
07
08
09
10
11
12
13
public void setState( int state ) {
  int oldState = this.state;
  this.state = state;
  if( oldState != state ) {
    broadcast( new StateEvent( oldState, state ) );
  }
}
 
private void broadcast( StateEvent stateEvent ) {
  for( StateListener listener : listeners ) {
    listener.stateChanged( stateEvent );
  }
}

Бинго! Это все, что есть. Будучи профессионалами, мы могли бы даже реализовать этот тест и чувствовать себя комфортно с нашим полным охватом кода и зеленой полосой. И вообще, разве это не то, что мы узнали из уроков в Интернете?

Итак, вот плохие новости: решение ошибочно …

Параллельная модификация

Принимая во внимание StateHolder выше, можно легко столкнуться с ConcurrentModificationException , даже если он используется только в пределах одного потока. Но кто это вызывает и почему это происходит?

1
2
3
4
5
6
java.util.ConcurrentModificationException
    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
    at java.util.HashMap$KeyIterator.next(HashMap.java:1453)
    at com.codeaffine.events.StateProvider.broadcast(StateProvider.java:60)
    at com.codeaffine.events.StateProvider.setState(StateProvider.java:55)
    at com.codeaffine.events.StateProvider.main(StateProvider.java:122)

Просмотр трассировки стека обнаруживает, что исключение выдается Iterator HashMap мы используем. Только то, что мы не использовали итераторы в нашем коде, или мы? Ну, мы сделали. Для for each конструкции в broadcast основан на Iterable и, следовательно, преобразуется в цикл итератора во время компиляции.

Из-за этого прослушиватель, удаляющий себя из экземпляра StateHolder во время уведомления о событии, может вызвать StateHolder ConcurrentModificationException . Поэтому вместо работы с исходной структурой данных одним из решений может быть перебор снимка слушателей.

Таким образом, удаление слушателя не может больше мешать механизму широковещания (но обратите внимание, что семантика уведомлений также немного меняется, поскольку такое удаление не отражается снимком, пока выполняется broadcast ):

1
2
3
4
5
6
private void broadcast( StateEvent stateEvent ) {
  Set<StateListener> snapshot = new HashSet<>( listeners );
  for( StateListener listener : snapshot ) {
    listener.stateChanged( stateEvent );
  }
}

Но что, если StateHolder предназначен для использования в многопоточном контексте?

синхронизация

Чтобы иметь возможность использовать StateHolder в многопоточной среде, он должен быть потокобезопасным. Это может быть достигнуто довольно легко. Добавление синхронизированных с каждым методом нашего класса должно помочь, не так ли?

1
2
3
4
5
public class StateHolder {
  public synchronized void addStateListener( StateListener listener ) {  [...]
  public synchronized void removeStateListener( StateListener listener ) {  [...]
  public synchronized int getState() {  [...]
  public synchronized void setState( int state ) {  [...]

Теперь доступ для чтения / записи к экземпляру StateHolder его внутренней блокировкой . Это делает публичные методы атомарными и обеспечивает правильную видимость состояния для разных потоков. Миссия выполнена!

Не совсем … хотя реализация является поточно-ориентированной, она несет в себе риск блокировки приложений, которые ее используют.

Подумайте о следующей ситуации: StateHolder A изменяет состояние StateHolder S. Во время уведомления слушателей S Thread B пытается получить доступ к S и блокируется. Если B удерживает блокировку синхронизации на объекте, который должен быть уведомлен одним из слушателей S, мы сталкиваемся с мертвой блокировкой.

Вот почему нам нужно сузить синхронизацию для доступа к состоянию и транслировать событие за пределы защищенных проходов:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class StateHolder {
 
  private final Set<StateListener> listeners = new HashSet<>();
  private int state;
 
  public void addStateListener( StateListener listener ) {
    synchronized( listeners ) {
      listeners.add( listener );
    }
  }
 
  public void removeStateListener( StateListener listener ) {
    synchronized( listeners ) {
      listeners.remove( listener );
    }
  }
 
  public int getState() {
    synchronized( listeners ) {
      return state;
    }
  }
 
  public void setState( int state ) {
    int oldState = this.state;
    synchronized( listeners ) {
      this.state = state;
    }
    if( oldState != state ) {
      broadcast( new StateEvent( oldState, state ) );
    }
  }
 
  private void broadcast( StateEvent stateEvent ) {
    Set<StateListener> snapshot;
    synchronized( listeners ) {
      snapshot = new HashSet<>( listeners );
    }
    for( StateListener listener : snapshot ) {
      listener.stateChanged( stateEvent );
    }
  }
}

В листинге показана реализация, созданная на основе предыдущих фрагментов, обеспечивающая правильную (но несколько устаревшую) синхронизацию с использованием экземпляра Set качестве внутренней блокировки. Уведомление слушателя происходит за пределами охраняемого блока и, следовательно, позволяет избежать циклического ожидания .

Примечание. Из-за параллельного характера системы решение не гарантирует, что уведомления об изменениях доходят до слушателя в том порядке, в котором они произошли. Если требуется больше точности относительно фактического значения состояния на стороне наблюдателя, рассмотрите возможность предоставления StateHolder качестве источника вашего объекта события .

Если порядок событий имеет решающее значение, можно подумать о поточно- ориентированной структуре FIFO для буферизации событий вместе с соответствующим снимком слушателя в защищенном блоке setState . Отдельный поток может инициировать фактические уведомления о событиях из неохраняемого блока, если структура FIFO не пуста ( Producer-Consumer-Pattern ). Это должно обеспечить хронологический порядок, не рискуя мертвым замком. Я должен сказать, так как я никогда не пробовал это решение самостоятельно ..

Учитывая семантику предыдущей реализации, составление нашего класса с использованием потоковобезопасных классов, таких как CopyOnWriteArraySet и AtomicInteger делает решение менее многословным:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class StateHolder {
 
  private final Set<StateListener> listeners = new CopyOnWriteArraySet<>();
  private final AtomicInteger state = new AtomicInteger();
 
  public void addStateListener( StateListener listener ) {
    listeners.add( listener );
  }
 
  public void removeStateListener( StateListener listener ) {
    listeners.remove( listener );
  }
 
  public int getState() {
    return state.get();
  }
 
  public void setState( int state ) {
    int oldState = this.state.getAndSet( state );
    if( oldState != state ) {
      broadcast( new StateEvent( oldState, state ) );
    }
  }
 
  private void broadcast( StateEvent stateEvent ) {
    for( StateListener listener : listeners ) {
      listener.stateChanged( stateEvent );
    }
  }
}

Поскольку CopyOnWriteArraySet и AtomicInteger являются поточно- AtomicInteger , у нас больше нет необходимости в защищенных блоках. Но подожди минутку! Разве мы не научились использовать моментальный снимок для трансляции вместо того, чтобы зацикливаться на скрытом итераторе исходного набора?

Это может немного сбивать с толку, но Iterator предоставляемый CopyOnWriteArraySet , уже является моментальным снимком. Коллекции CopyOnWriteXXX были изобретены специально для таких случаев использования — эффективные, если они небольшого размера, оптимизированные для частой итерации с редко меняющимся содержимым. Что означает, что наш код безопасен.

В Java 8 метод broadcast может быть сокращен еще больше, используя Iterable#forEach в сочетании с лямбдами. Код, конечно, остается безопасным, так как итерация также выполняется на снимке:

1
2
3
private void broadcast( StateEvent stateEvent ) {
  listeners.forEach( listener -> listener.stateChanged( stateEvent ) );
}

Обработка исключений

В последнем разделе этого поста обсуждается, как обращаться со сломанными слушателями, которые RuntimeException неожиданные RuntimeException . Хотя я обычно выбираю строго отказоустойчивый подход, в этом случае может быть неуместным пропускать такие исключения без обработки. В частности, учитывая, что реализация, вероятно, используется в многопоточной среде.

Сломанный слушатель наносит вред системе двумя способами. Во-первых, это предотвращает уведомление тех наблюдателей, которые разбираются после нашего призрака. Во-вторых, это может повредить вызывающий поток, который может быть не готов к решению проблемы. Подводя итог, это может привести к многочисленным, скрытым сбоям, первоначальную причину которых трудно обнаружить.

Следовательно, может быть полезно скрыть каждое уведомление в блоке try-catch:

01
02
03
04
05
06
07
08
09
10
11
private void broadcast( StateEvent stateEvent ) {
  listeners.forEach( listener -> notifySafely( stateEvent, listener ) );
}
 
private void notifySafely( StateEvent stateEvent, StateListener listener ) {
  try {
    listener.stateChanged( stateEvent );
  } catch( RuntimeException unexpected ) {
    // appropriate exception handling goes here...
  }
}

Вывод

Как показано в разделах выше, в уведомлении о событиях Java есть несколько основных моментов, о которых следует помнить. Убедитесь, что перебираете снимок коллекции слушателей во время уведомления о событиях, держите уведомление о событии вне синхронизированных блоков и безопасно уведомляйте слушателей, если это необходимо.

Надеюсь, я смог разобраться в тонкостях понятным способом и не перепутал, в частности, разделы параллелизма. В случае, если вы обнаружите какие-либо ошибки или у вас есть дополнительные возможности для обмена, не стесняйтесь использовать разделы комментариев ниже.

Ссылка: Получение уведомления о событиях Java прямо от нашего партнера JCG Рудигера Херрманна в блоге Code Affine .