Статьи

Производители и потребители — часть 3 таблетки отравы

Пару недель назад я написал вторую часть короткой серии блогов по шаблону Producer Consumer. Этот блог сосредоточено на необходимость закрыть мой рабочий поток TeleType, в исправлении ошибки в исходном коде из части 1 серии.

Идея в том, что рабочий поток Teletype может управляться командой из основного потока приложения. Эта команда сообщает рабочему потоку о завершении работы, что позволяет приложению корректно завершить работу, как показано в приведенном ниже коде:

@Override
  public void run() {

    while (run) {

      try {
        Message message = queue.take();
        printHead.print(message.toString());
        messageCount++;
      } catch (InterruptedException e) {
        printHead.print("Teletype closing down...");
      }
    }
    printHead.print("Teletype Off.");
  }

  public void destroy() {
    run = false;
    thread.interrupt();
  }

В этом примере основной поток вызывает метод destroy (), который устанавливает для переменной run значение false и прерывает блокирующий вызов работника в queue.take ().

Тем не менее, есть проблема с этой идеей в определенных обстоятельствах. Например, может ли внезапное завершение рабочего потока потребителя вызвать проблемы в других частях системы? Будет ли потеря данных, поскольку важные сообщения в очереди не будут обработаны? Если ответ на эти вопросы «да», то есть другой подход, который вы можете использовать: использовать таблетку отравы .

«Ядовитая таблетка» — это довольно мелодраматичное имя для простого помещения определенного известного элемента данных в очередь, и когда потребитель читает этот элемент, он закрывается. Очевидно, таблетка с ядомдолжен быть последним элементом, помещенным в очередь, иначе потребитель преждевременно завершит работу.

Эта идея хороша в простых системах с одним производителем и потребителем, как показано ниже:

… но нужно немного подумать, когда есть несколько производителей с одним потребителем, как в моем сценарии обновлений футбольного матча:

… и может полностью развалиться в случае нескольких производителей и потребителей:

… так как обеспечение того, что каждый потребитель получает отравленную таблетку в нужное время и все данные в очереди обрабатываются, может быть довольно сложным.

В этом блоге я обновляю свой код Teletype, чтобы отключиться, как только два MatcherReporter отправят все свои данные. Первое, что нужно сделать, это выбрать сообщение, которое будет действовать как отравленная таблетка. В приведенном ниже фрагменте кода вы видите, что я вставил сообщение с текстом «END OF FILE» в конце потока обновлений совпадений.

<value>95:30 END OF FILE</value>
  <value>95:00 Final Score  Fulham 0 - 1 Man Utd</value>
  <value>94:59 Full time The referee signals the end of the game.</value> 

Я вставил одно из этих сообщений в каждый набор игровых данных.

Следующее, что нужно сделать, это изменить код Teletype, добавив проверку сообщения о ядовитой таблетке:

public class Teletype implements Runnable {

  private static final String POISON_PILL_MESSAGE = "END OF FILE";

  private final BlockingQueue<Message> queue;

  private final PrintHead printHead;

  private final int matchesPlayed;

  private volatile boolean run = true;

  private int pillsRecieved;

  public Teletype(PrintHead printHead, BlockingQueue<Message> queue, int matchesPlayed) {
    this.queue = queue;
    this.printHead = printHead;
    this.matchesPlayed = matchesPlayed;
  }

  public void start() {

    Thread thread = new Thread(this, "Studio Teletype");
    thread.start();
    printHead.print("Teletype Online.");
  }

  @Override
  public void run() {

    while (run) {

      try {
        Message message = queue.take();
        handleMessage(message);
      } catch (InterruptedException e) {
        printHead.print("Teletype closing down...");
      }
    }
    printHead.print("Teletype Off.");
  }

  private void handleMessage(Message message) {
    if (allGamesAreOver(message.getMessageText())) {
      run = false;
    } else {
      printHead.print(message.toString());
    }
  }

  private boolean allGamesAreOver(String messageText) {

    if (POISON_PILL_MESSAGE.equals(messageText)) {
      pillsRecieved++;
    }

    return pillsRecieved == matchesPlayed ? true : false;
  }

  @VisibleForTesting
  boolean isRunning() {
    return run;
  }
}

Одним из наиболее значительных изменений здесь является добавление переменной экземпляра matchPlayed. Эта переменная сообщает Teletype, сколько MatchReporter предоставляет ему данные. В конечном итоге это нарушает модель «производитель-потребитель», поскольку потребитель теперь знает об остальной части системы; однако это необходимо, потому что мы должны убедиться, что Teletype завершает работу в конце
всех данных. В системе один производитель / потребитель один к одному это не обязательно.

Другое большое изменение в коде Teletype — это цикл run (). После получения сообщения из очереди оно передается новому методу handleMessage (…). Метод handleMessage (…) проверяет, все ли игры, из которых он получает данные, завершен, вызывая allGamesAreOver (…), который проверяет текст сообщения на соответствие строке ядовитой таблетки. Если тест сообщения является строкой ядовитой пилюли, то счетчик pillsRectained обновляется. Если pillsRectained равняется переменной matchPlayed, тогда все все игры завершены, и allGamesAreOver (…) возвращает true. Это устанавливает для переменной экземпляра run значение false, и метод run () рабочего потока завершается.

Вот и все, мелодраматический
узор «
Ядовитая таблетка» в двух словах, в следующий раз, «
Убийство в Красном сарае» .



Код для этого образца доступен на
GitHub .