Статьи

Повторная синхронизация многопоточных интеграционных тестов

Недавно я наткнулся на статью Синхронизация многопоточных интеграционных тестов в блоге Капитана Дебуга . В этом посте подчеркивается проблема разработки интеграционных тестов с участием тестируемого класса в асинхронном режиме. Был выдуман этот надуманный пример (я снял несколько комментариев):

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ThreadWrapper {
  
    public void doWork() {
  
        Thread thread = new Thread() {
            @Override
            public void run() {
  
                System.out.println("Start of the thread");
                addDataToDB();
                System.out.println("End of the thread method");
            }
  
            private void addDataToDB() {
                // Dummy Code...
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
  
        thread.start();
        System.out.println("Off and running...");
    }
  
}

Это всего лишь пример общего шаблона, в котором бизнес-логика делегируется некоторому асинхронному пулу заданий, который мы не можем контролировать. Роджер Хьюз (автор) перечисляет несколько методик тестирования такого кода, в том числе:

  • произвольный («достаточно длинный») sleep() в методе test, чтобы убедиться, что фоновая логика завершается
  • рефакторинг doWork() так что он принимает CountDownLatch и соглашается уведомить его, когда работа завершена
  • сделав метод выше, пакет private и только @VisibleForTesting
  • «The» решение — рефакторинг doWork() так что он принимает произвольный Runnable . В тесте мы можем обернуть этот Runnable (шаблон декоратора) и дождаться завершения внутреннего Runnable

Последнее решение неплохое, но оно значительно меняет обязанности ThreadWrapper . Теперь вызывающий ThreadWrapper должен решить, какой тип задания должен выполняться асинхронно, тогда как ранее ThreadWrapper инкапсулировал бизнес-логику. Я не говорю, что это плохой дизайн, но он сильно отличается от оригинального метода.

Awaitility

Можем ли мы написать тест без такого масштабного рефакторинга? Первое решение включает в себя удобную библиотеку под названием Awaitility . Эта библиотека не является «серебряной пулей», она просто периодически оценивает данное условие и следит за его выполнением в течение заданного времени. Это тот код, который вы, вероятно, написали один или два раза — завернутый в библиотеку с хорошо разработанным API. Итак, вот наш первоначальный подход:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
import static com.jayway.awaitility.Awaitility.await;
import static java.util.concurrent.TimeUnit.SECONDS;
  
//...
  
await().atMost(10, SECONDS).until(recordInserted());
  
//...
  
private Callable<Boolean> recordInserted() {
    return new Callable<Boolean>() {
        @Override
        public Boolean call() throws Exception {
            return dataExists();
        }
    };
}

Я думаю, что здесь нечего объяснять. dataExists() — это просто boolean метод, который изначально возвращает false но в конечном итоге возвращает true когда фоновая задача ( addDataToDB() ) выполнена. Другими словами, мы предполагаем, что фоновая задача имеет некоторый побочный эффект, и dataExists() может обнаружить этот побочный эффект. Кстати, у меня получилось установить JDK 8 с поддержкой Lambda, и IntelliJ IDEA дает мне эту замечательную подсказку:



Внезапно мне предложили эту альтернативу, совместимую с Java 8:

1
2
3
private Callable<Boolean> recordInserted() {
    return () -> dataExists();
}

Но есть еще:

Что превращает мой код в:

1
2
3
private Callable<Boolean> recordInserted() {
    return this::dataExists;
}

префикс this:: означает, что recordInsterted является методом текущего объекта. Точно так же мы можем сказать someDao::dataExists . Проще говоря, этот синтаксис превращает метод в функциональный объект, который мы можем передать (этот процесс называется расширением eta в Scala ). Теперь recordInsterted() больше не нужен, поэтому я могу встроить его и полностью удалить:

1
await().atMost(10, SECONDS).until(this::dataExists);

Я не уверен, что мне нравится больше — новый лямбда-синтаксис или то, как IntelliJ IDEA берет код, предшествующий Java 8, и автоматически его модифицирует (ну, это все еще немного экспериментально, только что сообщил IDEA-106670 ). Я могу выполнить это намерение в рамках всего проекта IntelliJ, используя лямбда-поддержку всей моей кодовой базы за считанные секунды. Милая!

Но вернемся к исходной проблеме. Awaitility очень помогает, предоставляя достойный API и некоторые удобные функции. Я широко использую его в сочетании с FluentLenium . Но периодически опрос на предмет изменения состояния выглядит как обходной путь и все еще вводит минимальную задержку. Но обратите внимание, что выполнение и синхронизация в асинхронных задачах довольно распространены, и JDK уже предоставляет необходимые средства: Future абстракция !

java.util.concurrent.Future

Чтобы ограничить область рефакторинга, я пока оставлю оригинальный new Thread() подход new Thread() и буду использовать SettableFuture<V> из Guava . Это реализация Future<V> которая позволяет запускать завершение или сбой в любое время из любого потока (см. DeferredResult — асинхронная обработка в Spring MVC для более расширенного использования). Как видите, изменения довольно малы:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ThreadWrapper {
  
    public ListenableFuture<Void> doWork() {
        final SettableFuture<Void> future = SettableFuture.<Void>create();
  
        Thread thread = new Thread() {
  
            @Override
            public void run() {
                addDataToDB()
                //...
  
                //last instruction
                future.set(null);
            }
  
            private void addDataToDB() {
                // Dummy Code...
                // ...
  
            }
  
        };
  
        thread.start();
        return future;
    }
  
}

doWork() теперь возвращает ListenableFuture<Void> с жизненным циклом, контролируемым внутри асинхронной задачи. Мы используем Void но в действительности вы можете вместо этого вернуть какой-то асинхронный результат. future.set(null) в конце имеет решающее значение. Он сигнализирует, что будущее выполнено, и все потоки, ожидающие этого будущего, будут уведомлены. Еще раз, на практике вы должны использовать, например, Future<Integer> а затем вместо null мы будем говорить future.set(someInteger) . Здесь null это просто заполнитель для типа Void . Как это поможет нам? Тестовый код теперь может зависеть от будущего завершения:

1
2
final ListenableFuture<Void> future = wrapper.doWork();
future.get(10, SECONDS);

future.get() блокируется до тех пор, пока не будет future.get() будущее (с тайм-аутом), т.е. до тех пор, пока мы не вызовем future.set(...) . Кстати, я использую ListenableFuture из Guava, но Java 8 представляет эквивалентный и стандартный CompletableFuture — я скоро напишу об этом.

Итак, мы куда-то добираемся. Future<T> — полезная абстракция для ожидания и оповещения о завершении фоновых заданий. Но есть и одно огромное преимущество Future которое, к сожалению , не пользуется преимуществами — обработка и распространение исключений. Future.get() будет блокироваться до завершения будущего и возвращать асинхронный результат или генерировать исключение, изначально выброшенное из нашей работы. Это действительно полезно для асинхронных тестов. В настоящее время, если Thread.run() генерирует исключение, оно может регистрироваться или не отображаться для нас, и будущее никогда не будет завершено. С Awaitility это немного лучше — он истекает без какой-либо веской причины, которую нужно отследить вручную в консоли / журналах. Но с незначительными изменениями наш тест гораздо более подробный:

1
2
3
4
5
6
7
8
9
public void run() {
    try {
        addDataToDB()
        //...
        future.set(null);
    } catch (Exception e) {
        future.setException(e);
    }
}

Если в асинхронном задании возникает какое-то исключение, оно всплывает и отображается как причина сбоя JUnit / TestNG.

(Listening) ExecutorService

Вот и все. Если addDataToDB() генерирует исключение, оно не будет потеряно. Вместо этого наш future.get() в тесте перезапустит это исключение для нас. Наш тест не будет просто тайм-аут, оставляя нас без понятия, что пошло не так. Отлично, но нужно ли нам создавать этот специальный экземпляр SettableFuture<T> , не можем ли мы просто использовать существующие библиотеки, которые уже дают нам Future<T> с правильной базовой реализацией? Конечно! По этому требуется дальнейший рефакторинг:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
  
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
  
public class ThreadWrapper {
  
    private final ListeningExecutorService executorService =
        MoreExecutors.listeningDecorator(
            Executors.newSingleThreadExecutor()
        );
  
    public ListenableFuture<?> doWork() {
        Runnable job = new Runnable() {
            @Override
            public void run() {
                //...
            }
        };
        return executorService.submit(job);
    }
  
}

Это то, чего вы все ждали. Не запускайте новую Thread все время, используйте пул потоков! На самом деле я пошел еще дальше, используя ListeningExecutorService — расширение ExecutorService которое возвращает экземпляры ListenableFuture ( посмотрите, почему вы этого хотите ). Но решение не требует этого, я просто распространяю хорошие практики. Как видите, экземпляр Future теперь создан и управляется для нас. Тест точно такой же, но производственный код чище и надежнее.

MoreExecutors.sameThreadExecutor ()

Последний трюк, который я хочу показать вам, включает внедрение зависимостей. Сначала давайте ThreadWrapper создание пула потоков из класса ThreadWrapper :

01
02
03
04
05
06
07
08
09
10
private final ListeningExecutorService executorService;
  
public ThreadWrapper() {
    this(Executors.newSingleThreadExecutor());
}
  
public ThreadWrapper(ExecutorService executorService) {
    this.executorService =
        MoreExecutors.listeningDecorator(executorService);
}

Теперь мы можем дополнительно предоставить пользовательский ExecutorService . Это хорошо по ряду других причин, но для нас это открывает новую возможность тестирования: MoreExecutors.sameThreadExecutor() . На этот раз мы немного изменим наш тест:

1
2
final ThreadWrapper wrapper = new ThreadWrapper(MoreExecutors.sameThreadExecutor());
wrapper.doWork().get();

Посмотрите, как мы передаем пользовательские ExecutorService ? Это очень специальная реализация, которая не поддерживает пул потоков любого вида. Каждый раз, когда вы submit() некоторую задачу в этот «пул», она будет выполняться в том же потоке блокирующим образом. Это означает, что у нас больше нет асинхронного теста, хотя производственный код не сильно изменился! wrapper.doWork() будет блокироваться до тех пор, пока не завершится «фоновая» работа. Дополнительный вызов get() все еще необходим, чтобы убедиться, что исключения распространяются, но он гарантированно никогда не блокируется (потому что работа уже выполнена).

Использование одного и того же потока для выполнения асинхронной задачи вместо пула потоков может привести к неожиданным результатам, если вы каким-то образом зависите от свойств, основанных на потоках, например, транзакции, безопасность, ThreadLocal . Однако если вы используете стандартный ThreadPoolExecutor с CallerRunsPolicy , JDK уже ведет себя таким образом, если пул потоков переполнен. Так что это не так уж необычно.

Резюме

Тестировать асинхронный код сложно, но у вас есть варианты. Несколько вариантов. Но один вывод, который меня поражает, — это побочный эффект наших усилий. Мы реорганизовали оригинальный код, чтобы сделать его тестируемым. Но окончательный производственный код не только тестируемый, но и гораздо лучше структурированный и надежный. Удивительно, но даже исходный код совместим с предыдущей версией, поскольку мы едва изменили тип возвращаемого значения с void на Future<Void> .

Кажется, это правило — тестируемый код часто лучше разрабатывается и реализуется. Модульный тест — это первый клиентский код, использующий нашу библиотеку. Это естественно заставляет нас больше думать о потребителях, а не о реализации.