Статьи

Тестирование параллельных приложений

Эта статья является частью нашего курса Академии под названием Основы параллелизма Java .

В этом курсе вы погрузитесь в магию параллелизма. Вы познакомитесь с основами параллелизма и параллельного кода и узнаете о таких понятиях, как атомарность, синхронизация и безопасность потоков. Проверьте это здесь !

1. Введение

Эта статья представляет собой введение в тестирование многопоточных приложений. Мы внедряем простую очередь блокировки и тестируем ее поведение блокировки, а также ее поведение и производительность в условиях стресс-теста. Наконец, мы пролили свет на доступные фреймворки для модульного тестирования многопоточных классов.

2. SimpleBlockingQueue

В этом разделе мы собираемся реализовать очень простую и простую очередь блокировки. Эта очередь не должна делать ничего, кроме хранения элементов, которые мы поместили в нее, и возврата их при вызове get() . Функция get() должна блокироваться до появления нового элемента.

Ясно, что пакет java.util.concurrent уже предоставляет такую ​​функциональность и что нет необходимости реализовывать это снова, но в демонстрационных целях мы делаем это здесь, чтобы показать, как тестировать такой класс.

В качестве вспомогательной структуры данных для нашей очереди мы выбираем стандартный LinkedList из пакета java.util . Этот список не синхронизирован и вызов его метода get() не блокируется. Следовательно, мы должны синхронизировать доступ к списку, и мы должны добавить функциональность блокировки. Последнее может быть реализовано с помощью простого цикла while() который вызывает метод wait() в списке, когда очередь пуста. Если очередь не пуста, возвращается первый элемент:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class SimpleBlockingQueue<T> {
    private List<T> queue = new LinkedList<T>();
 
    public int getSize() {
        synchronized(queue) {
            return queue.size();
        }
    }
     
    public void put(T obj) {
        synchronized(queue) {
            queue.add(obj);
            queue.notify();
        }
    }
     
    public T get() throws InterruptedException  {
        while(true) {
            synchronized(queue) {
                if(queue.isEmpty()) {
                    queue.wait();
                } else {
                    return queue.remove(0);
                }
            }
        }
    }
}

2.1. Тест блокировки операций

Хотя эта реализация очень проста, проверить все функциональные возможности, особенно функцию блокировки, не так просто. Когда мы просто вызываем get() в пустой очереди, текущий поток блокируется, пока другие потоки не вставят новый элемент в очередь. Это означает, что нам нужно как минимум два разных потока в нашем модульном тесте. В то время как один поток блокируется, другой поток ожидает некоторое определенное время. Если в течение этого времени другой поток не выполнит дополнительный код, мы можем предположить, что функция блокировки работает. Одним из способов проверки того, что блокирующий поток не выполняет никакого дополнительного кода, является добавление некоторых установленных логических флагов, когда было сгенерировано исключение или строка после выполнения вызова get() :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private static class BlockingThread extends Thread {
    private SimpleBlockingQueue queue;
    private boolean wasInterrupted = false;
    private boolean reachedAfterGet = false;
    private boolean throwableThrown;
 
    public BlockingThread(SimpleBlockingQueue queue) {
        this.queue = queue;
    }
     
    public void run() {
        try {
            try {
                queue.get();
            } catch (InterruptedException e) {
                wasInterrupted = true;
            }
            reachedAfterGet = true;
        } catch (Throwable t) {
            throwableThrown = true;
        }
    }
 
    public boolean isWasInterrupted() {
        return wasInterrupted;
    }
 
    public boolean isReachedAfterGet() {
        return reachedAfterGet;
    }
     
    public boolean isThrowableThrown() {
        return throwableThrown;
    }
}

Флаг wasInterrupted указывает, был ли прерван поток блокировки, флаг reachedAfterGet показывает, что строка после получения была выполнена, и, наконец, throwableThrown сообщит нам, что был брошен любой тип Throwable . С помощью методов getter для этих флагов мы можем теперь написать модульный тест, который сначала создает пустую очередь, запускает наш BlockingThread , ждет некоторое время, а затем вставляет новый элемент в очередь.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
@Test
public void testPutOnEmptyQueueBlocks() throws InterruptedException {
    final SimpleBlockingQueue queue = new SimpleBlockingQueue();
    BlockingThread blockingThread = new BlockingThread(queue);
    blockingThread.start();
    Thread.sleep(5000);
    assertThat(blockingThread.isReachedAfterGet(), is(false));
    assertThat(blockingThread.isWasInterrupted(), is(false));
    assertThat(blockingThread.isThrowableThrown(), is(false));
    queue.put(new Object());
    Thread.sleep(1000);
    assertThat(blockingThread.isReachedAfterGet(), is(true));
    assertThat(blockingThread.isWasInterrupted(), is(false));
    assertThat(blockingThread.isThrowableThrown(), is(false));
    blockingThread.join();
}

Перед вставкой все флаги должны быть ложными. Если это так, мы помещаем новый элемент в очередь и проверяем, установлен ли флаг reachedAfterGet истины. Все остальные флаги должны быть ложными. Наконец, мы можем join() к blockingThread .

2.2. Тестирование на правильность

Предыдущий тест только что показал, как проверить операцию блокировки. Еще более интересным является настоящий многопоточный тестовый пример, где у нас есть несколько потоков, добавляющих элементы в очередь, и группа рабочих потоков, которые извлекают эти значения из очереди. По сути, это означает создание нескольких потоков производителей, которые вставляют новые элементы в очередь, и настройку группы рабочих потоков, которые вызывают get() .

Но как мы узнаем, что рабочие потоки получили точно такие же элементы из очереди, которые ранее были добавлены производственными потоками? Одним из возможных решений будет иметь вторую очередь, где мы добавляем и удаляем элементы на основе некоторого уникального идентификатора (например, UUID). Но поскольку мы находимся в многопоточной среде, мы также должны синхронизировать доступ ко второй очереди, а создание уникального идентификатора также обеспечивает некоторую синхронизацию.

Лучшим решением были бы некоторые математические средства, которые могут быть реализованы без какой-либо дополнительной синхронизации. Здесь проще всего использовать целочисленные значения в качестве элементов очереди, которые извлекаются из локального генератора случайных потоков. Специальный класс ThreadLocalRandom управляет генератором случайных чисел для каждого потока; следовательно, у нас нет никаких накладных расходов на синхронизацию. В то время как потоки производителя вычисляют сумму по элементам, вставленным ими, рабочие потоки также вычисляют свою локальную сумму. В конце сумма по всем потокам производителя сравнивается с суммой всех потоков потребителя. Если это то же самое, мы можем предположить, что с высокой вероятностью мы получили все задачи, которые были вставлены ранее.

Следующий модульный тест реализует эти идеи, отправляя потоки потребителя и производителя как задачи в фиксированный пул потоков:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Test
public void testParallelInsertionAndConsumption() throws InterruptedException, ExecutionException {
    final SimpleBlockingQueue<Integer> queue = new SimpleBlockingQueue<Integer>();
    ExecutorService threadPool = Executors.newFixedThreadPool(NUM_THREADS);
    final CountDownLatch latch = new CountDownLatch(NUM_THREADS);
    List<Future<Integer>> futuresPut = new ArrayList<Future<Integer>>();
    for (int i = 0; i < 3; i++) {
        Future<Integer> submit = threadPool.submit(new Callable<Integer>() {
            public Integer call() {
                int sum = 0;
                for (int i = 0; i < 1000; i++) {
                    int nextInt = ThreadLocalRandom.current().nextInt(100);
                    queue.put(nextInt);
                    sum += nextInt;
                }
                latch.countDown();
                return sum;
            }
        });
        futuresPut.add(submit);
    }
    List<Future<Integer>> futuresGet = new ArrayList<Future<Integer>>();
    for (int i = 0; i < 3; i++) {
        Future<Integer> submit = threadPool.submit(new Callable<Integer>() {
            public Integer call() {
                int count = 0;
                try {
                    for (int i = 0; i < 1000; i++) {
                        Integer got = queue.get();
                        count += got;
                    }
                } catch (InterruptedException e) {
 
                }
                latch.countDown();
                return count;
            }
        });
        futuresGet.add(submit);
    }
    latch.await();
    int sumPut = 0;
    for (Future<Integer> future : futuresPut) {
        sumPut += future.get();
    }
    int sumGet = 0;
    for (Future<Integer> future : futuresGet) {
        sumGet += future.get();
    }
    assertThat(sumPut, is(sumGet));
}

Мы используем CountDownLatch , чтобы дождаться завершения всех потоков. Наконец, мы можем вычислить сумму по всем представленным и полученным целым числам и утверждать, что они равны.

Порядок, в котором выполняются разные потоки, трудно предсказать. Это зависит от многих динамических факторов, таких как прерывания, обрабатываемые операционной системой, и от того, как планировщик выбирает следующий поток для выполнения. Чтобы добиться большего количества переключений контекста, можно вызвать метод Thread.yield() . Это дает планировщику подсказку, что текущий поток готов уступить ЦП в пользу другого потока. Как утверждает Javadoc, это всего лишь подсказка, то есть JVM может полностью игнорировать эту подсказку и дальше выполнять текущий поток. Но для целей тестирования можно использовать этот метод, чтобы ввести больше переключений контекста и, следовательно, вызвать условия гонки и т. Д.

2,3. Тестирование производительности

Другой аспект, следующий за правильным поведением класса, — это его производительность. Во многих реальных приложениях производительность является важным требованием и поэтому должна быть проверена.

Мы можем использовать ExecutorService для настройки различного количества потоков. Предполагается, что каждый поток вставляет элемент в нашу очередь и впоследствии извлекает его из него. Во внешнем цикле мы увеличиваем количество потоков, чтобы увидеть, как количество потоков влияет на пропускную способность.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Test
public void testPerformance() throws InterruptedException {
    for (int numThreads = 1; numThreads < THREADS_MAX; numThreads++) {
        long startMillis = System.currentTimeMillis();
        final SimpleBlockingQueue<Integer> queue = new SimpleBlockingQueue<Integer>();
        ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
        for (int i = 0; i < numThreads; i++) {
            threadPool.submit(new Runnable() {
                public void run() {
                    for (long i = 0; i < ITERATIONS; i++) {
                        int nextInt = ThreadLocalRandom.current().nextInt(100);
                        try {
                            queue.put(nextInt);
                            nextInt = queue.get();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
        threadPool.shutdown();
        threadPool.awaitTermination(5, TimeUnit.MINUTES);
        long totalMillis = System.currentTimeMillis() - startMillis;
        double throughput = (double)(numThreads * ITERATIONS * 2) / (double) totalMillis;
        System.out.println(String.format("%s with %d threads: %dms (throughput: %.1f ops/s)", LinkedBlockingQueue.class.getSimpleName(), numThreads, totalMillis, throughput));
    }
}

Чтобы понять, как работает наша простая реализация очереди, мы можем сравнить ее с реализацией из JDK. Кандидатом для этого является LinkedBlockingQueue . Его два метода put() и take() работают аналогично нашей реализации, ожидая того обстоятельства, что LinkedBlockingQueue опционально ограничен и поэтому должен отслеживать количество вставленных элементов и позволяет текущему потоку спать, если очередь заполнена. Эта функциональность требует дополнительного учета и проверки операций вставки. С другой стороны, реализация JDK не использует синхронизированные блоки и была реализована с утомительными измерениями производительности.

Когда мы реализуем тот же тестовый пример, что и выше, используя LinkedBlockingQueue , мы получаем следующий вывод для обоих тестовых случаев:

фигура 1

фигура 1

На рисунке ясно видно, что пропускная способность, т. LinkedBlockingQueue Количество операций, выполняемых за единицу времени, почти в два раза выше для реализации LinkedBlockingQueue . Но это также показывает, что с одним потоком обе реализации выполняют примерно одинаковое количество операций в секунду. Добавление большего количества потоков улучшает пропускную способность, хотя кривая очень скоро сходится к своему значению насыщения. Добавление большего количества потоков больше не повышает производительность приложения.

3. Тестирование фреймворков

Вместо того, чтобы писать собственную платформу для реализации многопоточных тестовых случаев для вашего приложения, вы можете взглянуть на доступную тестовую среду. Этот раздел освещает два из них: JMock и Grobo Utils.

3.1. JMock

Для целей стресс-тестирования фреймворк JMock предоставляет класс Blitzer. Этот класс реализует функциональность, аналогичную той, что мы делали в разделе «Тестирование на корректность», поскольку он внутренне настраивает ThreadPool в который отправляются задачи, которые выполняют определенное действие. Вы предоставляете количество задач / действий для выполнения, а также количество потоков для конструктора:

1
Blitzer blitzer = new Blitzer(25000, 6);

Этот экземпляр имеет метод blitz() для которого вы просто предоставляете реализацию интерфейса Runnable :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
@Test
public void stressTest() throws InterruptedException {
    final SimpleBlockingQueue<Integer> queue = new SimpleBlockingQueue<Integer>();
    blitzer.blitz(new Runnable() {
        public void run() {
            try {
                queue.put(42);
                queue.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
    assertThat(queue.getSize(), is(0));
}

Поэтому класс Blitzer делает реализацию стресс-тестов еще проще, чем с ExecutorService .

3.2. Grobo Utils

Grobo Utils — это инфраструктура, обеспечивающая поддержку тестирования многопоточных приложений. Идеи, лежащие в основе, описаны в статье .

Как и в предыдущем примере, у нас есть класс MultiThreadedTestRunner который внутренне создает пул потоков и выполняет заданное количество реализаций Runnable как отдельные потоки. Экземпляры Runnable должны реализовывать специальный интерфейс под названием TestRunnable . Стоит отметить, что единственный метод runTest() выдает исключение. Таким образом, исключения, создаваемые в потоках, влияют на результат теста. Это не тот случай, когда мы используем обычный ExecutorService . Здесь задачи должны реализовывать Runnable и его единственный метод run() не выдает никаких исключений. Исключения, возникающие в этих задачах, проглатываются и не нарушают тест.

После создания MultiThreadedTestRunner мы можем вызвать его runTestRunnables() и runTestRunnables() количество миллисекунд, в течение которых он должен ждать, пока тест не завершится неудачей. Наконец, assertThat() проверяет, что очередь снова пуста, так как все тестовые потоки удалили ранее добавленный элемент.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class SimpleBlockingQueueGroboUtilTest {
 
    private static class MyTestRunnable extends TestRunnable {
        private SimpleBlockingQueue<Integer> queue;
 
        public MyTestRunnable(SimpleBlockingQueue<Integer> queue) {
            this.queue = queue;
        }
 
        @Override
        public void runTest() throws Throwable {
            for (int i = 0; i < 1000000; i++) {
                this.queue.put(42);
                this.queue.get();
            }
        }
    }
 
    @Test
    public void stressTest() throws Throwable {
        SimpleBlockingQueue<Integer> queue = new SimpleBlockingQueue<Integer>();
        TestRunnable[] testRunnables = new TestRunnable[6];
        for (int i = 0; i < testRunnables.length; i++) {
            testRunnables[i] = new MyTestRunnable(queue);
        }
        MultiThreadedTestRunner mttr = new MultiThreadedTestRunner(testRunnables);
        mttr.runTestRunnables(2 * 60 * 1000);
        assertThat(queue.getSize(), is(0));
    }
}