Я затронул эту проблему на одном из моих недавних вебинаров , теперь пришло время объяснить это в письменном виде. Безопасность потоков — это важное качество классов в языках / платформах, таких как Java, где мы часто делимся объектами между потоками. Проблемы, вызванные отсутствием безопасности потоков, очень трудно отладить, так как они единичны и практически невозможно воспроизвести специально. Как вы проверяете свои объекты, чтобы убедиться, что они потокобезопасны? Вот как я это делаю.
Допустим, есть простая книжная полка в памяти:
01
02
03
04
05
06
07
08
09
10
11
12
|
class Books { final Map<Integer, String> map = new ConcurrentHashMap<>(); int add(String title) { final Integer next = this .map.size() + 1 ; this .map.put(next, title); return next; } String title( int id) { return this .map.get(id); } } |
Сначала мы помещаем туда книгу, и книжная полка возвращает ее идентификатор. Затем мы можем прочитать название книги по ее идентификатору:
1
2
3
4
|
Books books = new Books(); String title = "Elegant Objects" ; int id = books.add(title); assert books.title(id).equals(title); |
Класс кажется поточно-ориентированным, поскольку мы используем поточно-ориентированный ConcurrentHashMap
вместо более примитивного и не HashMap
, верно? Давайте попробуем проверить это:
1
2
3
4
5
6
7
8
9
|
class BooksTest { @Test public void addsAndRetrieves() { Books books = new Books(); String title = "Elegant Objects" ; int id = books.add(title); assert books.title(id).equals(title); } } |
Тест проходит, но это всего лишь однопотоковый тест. Давайте попробуем сделать те же манипуляции с несколькими параллельными потоками (я использую Hamcrest ):
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
class BooksTest { @Test public void addsAndRetrieves() { Books books = new Books(); int threads = 10 ; ExecutorService service = Executors.newFixedThreadPool(threads); Collection<Future<Integer>> futures = new LinkedList<>(); for ( int t = 0 ; t < threads; ++t) { final String title = String.format( "Book #%d" , t); futures.add(service.submit(() -> books.add(title))); } Set<Integer> ids = new HashSet<>(); for (Future<Integer> f : futures) { ids.add(f.get()); } assertThat(ids.size(), equalTo(threads)); } } |
Сначала я создаю пул потоков через Executors
. Затем я отправляю десять объектов типа Callable
через submit()
. Каждый из них добавит новую уникальную книгу на книжную полку. Все они будут выполняться в некотором непредсказуемом порядке некоторыми из этих десяти потоков из пула.
Затем я извлекаю результаты их исполнителей через список объектов типа Future
. Наконец, я вычисляю количество созданных уникальных идентификаторов книг. Если число 10, конфликтов не было. Я использую коллекцию Set
, чтобы убедиться, что список идентификаторов содержит только уникальные элементы.
Тест проходит на моем ноутбуке. Однако, это не достаточно сильно. Проблема в том, что на самом деле он не тестирует Books
из нескольких параллельных потоков. Время, которое проходит между нашими вызовами submit()
достаточно велико, чтобы завершить выполнение books.add()
. Вот почему на самом деле одновременно работает только один поток. Мы можем проверить это, немного изменив код:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
AtomicBoolean running = new AtomicBoolean(); AtomicInteger overlaps = new AtomicInteger(); Collection<Future<Integer>> futures = new LinkedList<>(); for ( int t = 0 ; t < threads; ++t) { final String title = String.format( "Book #%d" , t); futures.add( service.submit( () -> { if (running.get()) { overlaps.incrementAndGet(); } running.set( true ); int id = books.add(title); running.set( false ); return id; } ) ); } assertThat(overlaps.get(), greaterThan( 0 )); |
С помощью этого кода я пытаюсь увидеть, как часто потоки перекрывают друг друга и делают что-то параллельно. Этого никогда не происходит, и overlaps
равны нулю. Таким образом, наш тест еще ничего не тестирует. Это просто добавляет десять книг на книжную полку одну за другой. Если я увеличу количество потоков до 1000, они иногда начинают перекрываться. Но мы хотим, чтобы они перекрывали друг друга, даже если их немного. Чтобы решить это, нам нужно использовать CountDownLatch
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
CountDownLatch latch = new CountDownLatch( 1 ); AtomicBoolean running = new AtomicBoolean(); AtomicInteger overlaps = new AtomicInteger(); Collection<Future<Integer>> futures = new LinkedList<>(); for ( int t = 0 ; t < threads; ++t) { final String title = String.format( "Book #%d" , t); futures.add( service.submit( () -> { latch.await(); if (running.get()) { overlaps.incrementAndGet(); } running.set( true ); int id = books.add(title); running.set( false ); return id; } ) ); } latch.countDown(); Set<Integer> ids = new HashSet<>(); for (Future<Integer> f : futures) { ids.add(f.get()); } assertThat(overlaps.get(), greaterThan( 0 )); |
Теперь каждая нить, прежде чем прикасаться к книгам, ждет разрешения, предоставленного latch
. Когда мы отправляем их все через submit()
они остаются в ожидании. Затем мы отпускаем защелку с помощью countDown()
и все они начинают countDown()
одновременно. Теперь на моем ноутбуке overlaps
равны 3-5, даже когда threads
10.
И этот последний assertThat()
вылетает сейчас! Я не получаю 10 идентификаторов книг, как раньше. Это 7-9, но никогда 10. Класс, по-видимому, не является потокобезопасным!
Но прежде чем мы исправим класс, давайте сделаем наш тест проще. Давайте использовать RunInThreads
от Cactoos , который делает то же самое, что мы сделали выше, но под капотом:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
class BooksTest { @Test public void addsAndRetrieves() { Books books = new Books(); MatcherAssert.assertThat( t -> { String title = String.format( "Book #%d" , t.getAndIncrement() ); int id = books.add(title); return books.title(id).equals(title); }, new RunsInThreads<>( new AtomicInteger(), 10 ) ); } } |
Первый аргумент assertThat()
является экземпляром Func
(функциональный интерфейс), принимающим AtomicInteger
(первый аргумент RunsInThreads
) и возвращающим Boolean
. Эта функция будет выполняться в 10 параллельных потоках, используя тот же подход на основе защелки, как показано выше.
Этот RunInThreads
кажется компактным и удобным, я уже использую его в нескольких проектах.
Кстати, чтобы сделать Books
потокобезопасными, нам просто нужно добавить synchronized
к его методу add()
. Или, может быть, вы можете предложить лучшее решение?
Опубликовано на Java Code Geeks с разрешения Егора Бугаенко, партнера нашей программы JCG . См. Оригинальную статью здесь: Как я тестирую свои Java-классы на безопасность потоков
Мнения, высказанные участниками Java Code Geeks, являются их собственными. |