Асинхронный код сложен. Все это знают. Написание асинхронных тестов еще сложнее. Недавно я исправил нестабильный тест и хочу поделиться некоторыми мыслями о написании асинхронных тестов.
В этом посте мы рассмотрим общую проблему с асинхронными тестами — как заставить тест выполнить определенный порядок между потоками и заставить некоторые операции некоторых потоков завершать их перед другими операциями других потоков.
Обычно мы не хотим навязывать порядок между выполнением разных потоков, потому что он устраняет причину использования потоков, которая заключается в том, чтобы обеспечить параллелизм и позволить ЦП выбирать наилучший порядок выполнения с учетом текущих ресурсов и состояния приложения. Но в случае тестирования иногда требуется детерминированный порядок для обеспечения стабильности теста.
Тестирование дросселя
Удушитель — это программный шаблон, который отвечает за ограничение числа одновременных операций для сохранения некоторой квоты ресурсов, например, пула соединений, сетевого буфера или операции, интенсивно использующей процессор. В отличие от других инструментов синхронизации, роль троттлера заключается в том, чтобы включить «fast-fast», позволяя запросам превышения квоты немедленно завершаться сбоем без ожидания. Быстрый отказ важен, потому что альтернатива, ожидающая, потребляет ресурсы — порты, потоки и память.
Вот простая реализация троттлера (в основном это обертка вокруг семафора ; в реальном мире могут быть ожидание, повторные попытки и т. Д.):
01
02
03
04
05
06
07
08
09
10
11
12
|
class ThrottledException extends RuntimeException( "Throttled!" ) class Throttler(count : Int) { private val semaphore = new Semaphore(count) def apply(f : = > Unit) : Unit = { if (!semaphore.tryAcquire()) throw new ThrottledException try { f } finally { semaphore.release() } } } |
Давайте начнем с базового модульного теста: тестирования дросселя для одного потока (для тестирования мы используем specs2 ). В этом тесте мы проверяем, что мы можем сделать больше вызовов последовательно, чем максимальное количество одновременных вызовов для регулятора (переменная maxCount ниже). Обратите внимание, что поскольку мы используем один поток, мы не тестируем функцию «быстрого-отказа» регулятора, поскольку мы не насыщаем регулятор. Фактически, мы только проверяем, что, пока дроссель не насыщен, он не прерывает операции.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
class ThrottlerTest extends Specification { "Throttler" should { "execute sequential" in new ctx { var invocationCount = 0 for (i <- 0 to maxCount) { throttler { invocationCount + = 1 } } invocationCount must be _== (maxCount + 1 ) } } trait ctx { val maxCount = 3 val throttler = new Throttler(maxCount) } } |
Тестирование дросселя асинхронно
В предыдущем тесте мы не насыщали регулятор просто потому, что это невозможно с одним потоком. Итак, следующий шаг — проверить, хорошо ли работает дроссель в многопоточной среде.
Настройка:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
val e = Executors.newCachedThreadPool() implicit val ec : ExecutionContext = ExecutionContext.fromExecutor(e) private val waitForeverLatch = new CountDownLatch( 1 ) override def after : Any = { waitForeverLatch.countDown() e.shutdownNow() } def waitForever() : Unit = try { waitForeverLatch.await() } catch { case _: InterruptedException = > case ex : Throwable = > throw ex } |
ExecutionContext используется для будущего строительства; метод waitForever удерживает поток до тех пор, пока защелка не будет освобождена — до конца теста. В функции after мы закрываем службу исполнителя.
Упрощенный способ проверки многопоточного поведения регулятора заключается в следующем:
1
2
3
4
5
6
7
8
|
"throw exception once reached the limit [naive,flaky]" in new ctx { for (i <- 1 to maxCount) { Future { throttler(waitForever()) } } throttler {} must throwA[ThrottledException] } |
Здесь мы создаем потоки maxCount (вызовы Future {}), которые вызывают функцию waitForever, которая ожидает до конца теста. Затем мы пытаемся выполнить еще одну операцию, чтобы обойти регулятор — maxCount + 1. По замыслу, на этом этапе мы должны получить исключение ThrottledException. Однако, пока мы ждем исключения, этого может не произойти. Последний вызов регулятора (с ожиданием) может произойти до того, как будет запущен один из фьючерсов (что вызовет исключение в этом будущем, но не в ожидании).
Проблема с вышеупомянутым тестом состоит в том, что мы не гарантируем, что все потоки были запущены и ожидают в функции waitForever, прежде чем мы попытаемся нарушить дроссель с ожидаемым результатом выброса дросселя. Чтобы это исправить, нам нужно как-то подождать, пока все фьючерсы не запустятся. Вот подход, который знаком многим из нас: просто добавьте вызов метода сна с некоторой разумной продолжительностью.
1
2
3
4
5
6
7
8
9
|
"throw exception once reached the limit [naive, bad]" in new ctx { for (i <- 1 to maxCount) { Future { throttler(waitForever()) } } Thread.sleep( 1000 ) throttler {} must throwA[ThrottledException] } |
Хорошо, теперь наш тест почти всегда будет проходить, но этот подход неверен по крайней мере по двум причинам:
Продолжительность теста продлится только столько времени, сколько «разумная продолжительность», которую мы установили.
В очень редких ситуациях, например, когда машина находится под высокой нагрузкой, этой разумной продолжительности будет недостаточно.
Если вы все еще сомневаетесь, поищите в Google другие причины.
Лучший подход — синхронизировать начало наших потоков (фьючерсов) и наших ожиданий. Давайте использовать класс CountDownLatch из java.util.concurrent:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
"throw exception once reached the limit [working]" in new ctx { val barrier = new CountDownLatch(maxCount) for (i <- 1 to maxCount) { Future { throttler { barrier.countDown() waitForever() } } } barrier.await( 5 , TimeUnit.SECONDS) must beTrue throttler {} must throwA[ThrottledException] } |
Мы используем CountDownLatch для барьерной синхронизации . Метод await блокирует основной поток, пока счетчик защелок не станет равным нулю. Когда другие потоки запускаются (давайте обозначим эти другие потоки как фьючерсы), каждый из этих фьючерсов вызывает метод барьера countDown, чтобы уменьшить количество защелок на единицу. Как только количество защелок становится равным нулю, все фьючерсы находятся внутри метода waitForever.
К этому моменту мы гарантируем, что регулятор насыщен, с потоками maxCount внутри. Попытка другого потока войти в регулятор приведет к исключению. У нас есть детерминистический способ настройки нашего теста, который заключается в том, чтобы основной поток входил в троттлер. На этом этапе основной поток может возобновиться и возобновится (счетчик защелок достигает нуля, а CountDownLatch освобождает ожидающий поток).
Мы используем немного более высокий тайм-аут в качестве гарантии, чтобы избежать бесконечной блокировки в случае непредвиденных ситуаций. Если что-то случится, мы не пройдем тест. Этот тайм-аут не повлияет на продолжительность теста, потому что, если не произойдет что-то непредвиденное, мы не должны его ждать.
Вывод
При тестировании асинхронного кода довольно часто требуется определенный порядок операций между потоками для конкретного теста. Отсутствие какой-либо синхронизации приводит к нестабильным тестам, которые иногда работают, а иногда дают сбой. Использование Thread.sleep замедляет работу и снижает вероятность ошибок, но не решает проблему.
В большинстве случаев, когда нам нужно обеспечить порядок между потоками в тесте, мы можем использовать CountDownLatch вместо Thread.sleep. Преимущество CountDownLatch заключается в том, что мы можем сообщить ему, когда следует освободить ожидающий (удерживающий) поток, получив два важных преимущества: детерминированный порядок и, следовательно, более надежные тесты и более быстрое выполнение тестов. Даже для тривиального ожидания — например, функции waitForever — мы могли бы использовать что-то вроде Thread.sleep (Long.MAX_VALUE), но всегда лучше не использовать хрупкие подходы.
- Вы можете найти полный код на GitHub .
Ссылка: | Тестирование асинхронного кода от нашего партнера JCG Дмитрия Команова в блоге Wix IO . |