Статьи

Производители и потребители. Часть 2. Прерывание рабочих потоков

Этот блог на самом деле не о производителях и потребителях , я рассказал обо всем этом в своем последнем блоге . Если вы еще этого не видели, подведем итоги, чтобы продемонстрировать шаблон Producer Consumer с использованием сценария комментаторов, сообщающих о футбольных (футбольных) играх путем помещения обновлений в очередь. Эти обновления затем считываются в телестудии с помощью телетайпа и отображаются на экране телевизора.

В конце блога я сказал: «Несмотря на то, что код Teletype работает, в нем есть несколько мелких недостатков, которые нельзя отключить и которые не особенно тестируемы». Этот блог посвящен решению этих двух проблем.

Teletype использует простой рабочий поток для чтения сообщений из очереди и их отображения на экране. При создании рабочего потока вполне обычно включать цикл while в метод run () и выполнять работу потока в этом цикле. Если вы создаете бесконечный цикл while (true), тогда у вас возникают дополнительные проблемы с закрытием потока (разрывом бесконечного цикла) и написанием чего-либо, что будет проверять бесконечный цикл. Итак, первое, что нужно сделать, это никогда не использовать бесконечные циклы, как я делал в своем сломанном коде Teletype …

  @Override
  public void run() {

    while (true) {

      try {
        Message message = queue.take();
        printHead.print(message.toString());
      } catch (InterruptedException e) {
        // TODO add some real error handling here
        printHead.print("Teletype error - try switching it off and on.");
      }
    }

  }

Цель здесь состоит в том, чтобы найти способ закрытия телетайпа, а это означает создание способа выхода из бесконечного цикла while (true) и простой способ сделать это, заменяя while (true) на while (run), как продемонстрировано ниже:

public class Teletype implements Runnable {

  private final BlockingQueue<Message> queue;

  private final PrintHead printHead;

  private volatile boolean run = true;

  private Thread thread;

  private volatile int messageCount;

  public Teletype(PrintHead printHead, BlockingQueue<Message> queue) {
    this.queue = queue;
    this.printHead = printHead;
  }

  public void start() {

    thread = new Thread(this, "Studio Teletype");
    thread.start();
    printHead.print("Teletype Online.");
  }

  @Override
  public void run() {

    while (run) {

      try {
        Message message = queue.take();
        printHead.print(message.toString());
        messageCount++;
      } catch (InterruptedException e) {
        printHead.print("Teletype closing down...");
      }
    }
    printHead.print("Teletype Off.");
  }

  public void destroy() {
    run = false;
    thread.interrupt();
  }

  public int getMessageCount() {
    return messageCount;
  }
}

Если вы посмотрите на этот код, то увидите, что в нем есть новая переменная экземпляра run, которая используется в коде while (run). Когда вызывается start () и выполняется рабочий поток, метод run () берет сообщение из очереди и печатает его обычным способом, прежде чем искать другое сообщение в очереди. Следующим большим отличием в коде является добавление нового метода destroy (). Этот метод просто переворачивает переменную run в false и затем прерывает рабочий поток. Этот вызов interrupt () заставляет блокирующий вызов queue.take () генерировать исключение InterruptedException и возвращаться без сообщения. Исключение перехватывается в блоке catch, и выполнение возвращается к while (run). Поскольку переменная run теперь ложна, метод run () завершается.

Следующий шаг — сделать код более тестируемым. Очевидно, что лучший способ добиться этого — сделать его тестируемым, в первую очередь, с помощью Test Driven Development, а не тестируемой ретроспективно; однако, если требуется ретро-монтаж, то мы делаем ретро-монтаж.

Последнее изменение в коде Teletype — это функция messageCount. Каждый раз, когда он выбирает сообщение из очереди, счетчик увеличивается. Это полезная статистика, к которой может обращаться getMessageCount (), и она используется в следующем коде модульного теста.

public class TeletypeTest {

  private BlockingQueue<Message> queue;
  private Teletype instance;
  private PrintHead printhead;

  @Before
  public void setUp() throws Exception {

    printhead = mock(PrintHead.class);
    queue = new LinkedBlockingQueue<Message>();
    instance = new Teletype(printhead, queue);
  }

  @Test
  public void testTeletype_with_two_messages_in_queue() throws InterruptedException {

    int numMessages = initializeQueueWithMessages();

    instance.start();

    synchWithTestInstanceThread(numMessages);

    instance.destroy();

    // assert that we didn't time out.
    assertEquals(numMessages, instance.getMessageCount());
    verify(printhead, times(5)).print(anyString());
  }

  private int initializeQueueWithMessages() {
    List<Message> messages = getTestMessages();
    queue.addAll(messages);
    int numMessages = messages.size();
    return numMessages;
  }

  private List<Message> getTestMessages() {

    List<Message> messages = new ArrayList<Message>();
    Message message = new Message("name", 1L, "String messageText", "String matchTime");
    messages.add(message);
    message = new Message("name", 2L, "String messageText", "String matchTime");
    messages.add(message);

    return messages;
  }

  private void synchWithTestInstanceThread(int numMessages) throws InterruptedException {

    // Synchronize on the number of messages
    // This will wait for 1/2 a second at most and then timeout
    for (int i = 0; (i < 5) && (instance.getMessageCount() < numMessages); i++) {

      Thread.sleep(100);
    }
  }
}

Код модульного теста для Teletype сначала вызывает initializeQueueWithMessages (…), чтобы добавить пару сообщений в очередь перед вызовом метода start () Teletype. Метод start () запускает рабочий поток, метод run () которого будет читать и отображать любые сообщения, найденные в очереди. Тем временем основной поток ожидает, пока рабочий поток завершит выполнение своих задач, вызвав synchWithTestInstanceThread (..). Как только этот метод возвращает Teletype, он отключается с помощью вызова destroy (). Все, что осталось сделать, это проверить, что synchWithTestInstanceThread (..) не прервал тайм-аут с вызовом, чтобы утверждать, что ожидаемое количество сообщений было прочитано из очереди, используя:

    assertEquals(numMessages, instance.getMessageCount());

… и чтобы убедиться, что сообщения отображались на экране с помощью
Mockito’s

    verify(printhead, times(5)).print(anyString())


В этом случае я добавил статистику messageCount исключительно для упрощения тестирования класса Teletype.
В разговоре с различными коллегами я чувство , что некоторые идеи добавления кода к классу для того , чтобы сделать его проверяемым является довольно спорным вопросом. У меня нет проблем с этим, так как одна из главных идей о
Spring и внедрении зависимостейв том, что вы можете написать код, который легко тестируется, и если добавление небольшого дополнительного кода в класс делает его тестируемым, пусть будет так. Я предпочел бы иметь класс, который немного тяжелее, который полностью протестирован и работает, чем суперлегкий класс, который терпит неудачу. Кроме того, если бы это была ситуация в реальном мире, и я где-то запускал Teletype вживую на машине, то я бы хотел включить целую кучу полезной статистики, чтобы я и / или ребята из Ops могли проверить его статус в любое время.

Последний фрагмент кода для рассмотрения — это основной (…) метод, как показано ниже.

 public static void main(String[] args) throws InterruptedException {

    System.out.println("Producer Consumer Demo Code...");
    ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("context2.xml");

    // Wait until all matches are over.
    Thread.sleep(98000);

    ctx.close();
    System.out.println("Games Over");
  }

Это было немного изменено для оригинальной версии в моем последнем блоге. В этом случае после загрузки контекста Spring основной поток ожидает завершения игр. Как только они закончены, контекст закрывается, гарантируя, что любые
методы обратного вызова по умолчанию вызваны. В этом случае важным методом обратного вызова по умолчанию является метод destroy () класса Teletype, который, как показано выше, отключает Teletype.

Наконец, основной целью этого блога было адаптировать класс Teletype, предоставляя ему возможность корректно закрываться при запросе, устанавливая флаг выполнения в false и прерывая рабочий поток. Проблема в том, что завершение рабочего потока зависит от взаимодействия с основным потоком; то, что может быть нежелательным. Существует, однако, еще один способ прекращения действия Телетайпа, известный как
паттерн «Ядовитая пилюля» , но об этом в следующий раз.


Код для этого образца доступен на
GitHub .