Статьи

Как я тестирую свои Java-классы на безопасность потоков

Я затронул эту проблему на одном из моих недавних вебинаров , теперь пришло время объяснить это в письменном виде. Безопасность потоков — это важное качество классов в языках / платформах, таких как Java, где мы часто делимся объектами между потоками. Проблемы, вызванные отсутствием безопасности потоков, очень трудно отладить, так как они единичны и практически невозможно воспроизвести специально. Как вы проверяете свои объекты, чтобы убедиться, что они потокобезопасны? Вот как я это делаю.

Запах женщины (1992) Мартина Бреста

Допустим, есть простая книжная полка в памяти:

01
02
03
04
05
06
07
08
09
10
11
12
class Books {
  final Map<Integer, String> map =
    new ConcurrentHashMap<>();
  int add(String title) {
    final Integer next = this.map.size() + 1;
    this.map.put(next, title);
    return next;
  }
  String title(int id) {
    return this.map.get(id);
  }
}

Сначала мы помещаем туда книгу, и книжная полка возвращает ее идентификатор. Затем мы можем прочитать название книги по ее идентификатору:

1
2
3
4
Books books = new Books();
String title = "Elegant Objects";
int id = books.add(title);
assert books.title(id).equals(title);

Класс кажется поточно-ориентированным, поскольку мы используем поточно-ориентированный ConcurrentHashMap вместо более примитивного и не HashMap , верно? Давайте попробуем проверить это:

1
2
3
4
5
6
7
8
9
class BooksTest {
  @Test
  public void addsAndRetrieves() {
    Books books = new Books();
    String title = "Elegant Objects";
    int id = books.add(title);
    assert books.title(id).equals(title);
  }
}

Тест проходит, но это всего лишь однопотоковый тест. Давайте попробуем сделать те же манипуляции с несколькими параллельными потоками (я использую Hamcrest ):

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
class BooksTest {
  @Test
  public void addsAndRetrieves() {
    Books books = new Books();
    int threads = 10;
    ExecutorService service =
      Executors.newFixedThreadPool(threads);
    Collection<Future<Integer>> futures =
      new LinkedList<>();
    for (int t = 0; t < threads; ++t) {
      final String title = String.format("Book #%d", t);
      futures.add(service.submit(() -> books.add(title)));
    }
    Set<Integer> ids = new HashSet<>();
    for (Future<Integer> f : futures) {
      ids.add(f.get());
    }
    assertThat(ids.size(), equalTo(threads));
  }
}

Сначала я создаю пул потоков через Executors . Затем я отправляю десять объектов типа Callable через submit() . Каждый из них добавит новую уникальную книгу на книжную полку. Все они будут выполняться в некотором непредсказуемом порядке некоторыми из этих десяти потоков из пула.

Затем я извлекаю результаты их исполнителей через список объектов типа Future . Наконец, я вычисляю количество созданных уникальных идентификаторов книг. Если число 10, конфликтов не было. Я использую коллекцию Set , чтобы убедиться, что список идентификаторов содержит только уникальные элементы.

Тест проходит на моем ноутбуке. Однако, это не достаточно сильно. Проблема в том, что на самом деле он не тестирует Books из нескольких параллельных потоков. Время, которое проходит между нашими вызовами submit() достаточно велико, чтобы завершить выполнение books.add() . Вот почему на самом деле одновременно работает только один поток. Мы можем проверить это, немного изменив код:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
AtomicBoolean running = new AtomicBoolean();
AtomicInteger overlaps = new AtomicInteger();
Collection<Future<Integer>> futures = new LinkedList<>();
for (int t = 0; t < threads; ++t) {
  final String title = String.format("Book #%d", t);
  futures.add(
    service.submit(
      () -> {
        if (running.get()) {
          overlaps.incrementAndGet();
        }
        running.set(true);
        int id = books.add(title);
        running.set(false);
        return id;
      }
    )
  );
}
assertThat(overlaps.get(), greaterThan(0));

С помощью этого кода я пытаюсь увидеть, как часто потоки перекрывают друг друга и делают что-то параллельно. Этого никогда не происходит, и overlaps равны нулю. Таким образом, наш тест еще ничего не тестирует. Это просто добавляет десять книг на книжную полку одну за другой. Если я увеличу количество потоков до 1000, они иногда начинают перекрываться. Но мы хотим, чтобы они перекрывали друг друга, даже если их немного. Чтобы решить это, нам нужно использовать CountDownLatch :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean running = new AtomicBoolean();
AtomicInteger overlaps = new AtomicInteger();
Collection<Future<Integer>> futures = new LinkedList<>();
for (int t = 0; t < threads; ++t) {
  final String title = String.format("Book #%d", t);
  futures.add(
    service.submit(
      () -> {
        latch.await();
        if (running.get()) {
          overlaps.incrementAndGet();
        }
        running.set(true);
        int id = books.add(title);
        running.set(false);
        return id;
      }
    )
  );
}
latch.countDown();
Set<Integer> ids = new HashSet<>();
for (Future<Integer> f : futures) {
  ids.add(f.get());
}
assertThat(overlaps.get(), greaterThan(0));

Теперь каждая нить, прежде чем прикасаться к книгам, ждет разрешения, предоставленного latch . Когда мы отправляем их все через submit() они остаются в ожидании. Затем мы отпускаем защелку с помощью countDown() и все они начинают countDown() одновременно. Теперь на моем ноутбуке overlaps равны 3-5, даже когда threads 10.

И этот последний assertThat() вылетает сейчас! Я не получаю 10 идентификаторов книг, как раньше. Это 7-9, но никогда 10. Класс, по-видимому, не является потокобезопасным!

Но прежде чем мы исправим класс, давайте сделаем наш тест проще. Давайте использовать RunInThreads от Cactoos , который делает то же самое, что мы сделали выше, но под капотом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
class BooksTest {
  @Test
  public void addsAndRetrieves() {
    Books books = new Books();
    MatcherAssert.assertThat(
      t -> {
        String title = String.format(
          "Book #%d", t.getAndIncrement()
        );
        int id = books.add(title);
        return books.title(id).equals(title);
      },
      new RunsInThreads<>(new AtomicInteger(), 10)
    );
  }
}

Первый аргумент assertThat() является экземпляром Func (функциональный интерфейс), принимающим AtomicInteger (первый аргумент RunsInThreads ) и возвращающим Boolean . Эта функция будет выполняться в 10 параллельных потоках, используя тот же подход на основе защелки, как показано выше.

Этот RunInThreads кажется компактным и удобным, я уже использую его в нескольких проектах.

Кстати, чтобы сделать Books потокобезопасными, нам просто нужно добавить synchronized к его методу add() . Или, может быть, вы можете предложить лучшее решение?

Опубликовано на Java Code Geeks с разрешения Егора Бугаенко, партнера нашей программы JCG . См. Оригинальную статью здесь: Как я тестирую свои Java-классы на безопасность потоков

Мнения, высказанные участниками Java Code Geeks, являются их собственными.