JDK одновременный пакет
Текущая модель памяти Java гарантирует ожидаемый порядок выполнения многопоточного кода, если в этом коде нет гонок. Чтобы защитить ваш код от гонок, придумайте разные способы синхронизации и обмена данными между ними. Пакет java.util.concurrent
, входящий в состав HotSpot JDK, предоставляет следующие инструменты для написания многопоточного кода:
- атомное
- Замки
- Коллекции
- Точки синхронизации
- Исполнители
- Аккумуляторы JDK 1,8
атомное
В дочернем пакете java.util.concurrent.atomic
есть набор классов для работы с примитивными атомарными типами. Контракт этих классов обеспечивает выполнение операции compare-and-set
за «одну единицу процессорного времени». Когда вы устанавливаете новое значение переменной, вы также передаете ее старое значение (подход оптимистической блокировки). Если после вызова значение переменной отличается от ожидаемого параметра — compare-and-set
вернет false
результат.
Например, возьмите два массива long
переменных: [1,2,3,4,5]
и [-1,-2,-3,-4,-5]
. Несколько потоков будут перебираться в массиве и объединять элементы в одну переменную. Код ( заводной ) с пессимистичной блокировкой выглядит так:
class Sum { static monitor = new Object() static volatile long sum = 0 } class Summer implements Callable { long[] data Object call() throws Exception { data.each { synchronized (Sum.monitor) { println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}") Sum.sum += it } } } } Executors.newFixedThreadPool(2).invokeAll([ new Summer(data: [1,2,3,4,5]), new Summer(data: [-1,-2,-3,-4,-5]) ]) print("Sum: ${Sum.sum}")
Ожидаемый результат:
pool-1-thread-1: add 1 to 0 pool-1-thread-2: add -1 to 1 pool-1-thread-1: add 2 to 0 pool-1-thread-2: add -2 to 2 pool-1-thread-1: add 3 to 0 pool-1-thread-2: add -3 to 3 pool-1-thread-1: add 4 to 0 pool-1-thread-1: add 5 to 4 pool-1-thread-2: add -4 to 9 pool-1-thread-2: add -5 to 5 Sum: 0
Однако этот подход имеет существенные недостатки в производительности. В этом случае мы тратим гораздо больше времени на нахождение в состоянии блокировки, чем на выполнение реального кода:
- попытка заблокировать монитор
- блокирующая нить
- разблокировать монитор
- разблокировать тему
Рассмотрим использование AtomicLong
для реализации оптимистической блокировки при расчете той же суммы:
class Sum { static volatile AtomicLong sum = new AtomicLong(0) } class Summer implements Callable { long[] data Object call() throws Exception { data.each { while(true) { long localSum = Sum.sum.get() if (Sum.sum.compareAndSet(localSum, localSum + it)) { println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}") break; } else { println("[MISS!] ${Thread.currentThread().name}: add ${it} to ${Sum.sum}") } } } } } Executors.newFixedThreadPool(2).invokeAll([ new Summer(data: [1,2,3,4,5]), new Summer(data: [-1,-2,-3,-4,-5]) ]) print("Sum: ${Sum.sum}")
Как видно из результатов, попыток ошибок было не так много:
[MISS!] pool-1-thread-1: add 1 to -1 pool-1-thread-2: add -1 to -1 pool-1-thread-2: add -2 to -3 [MISS!] pool-1-thread-1: add 1 to -3 pool-1-thread-2: add -3 to -6 pool-1-thread-1: add 1 to -5 [MISS!] pool-1-thread-2: add -4 to -5 pool-1-thread-1: add 2 to -7 pool-1-thread-2: add -4 to -7 pool-1-thread-1: add 3 to -9 pool-1-thread-2: add -5 to -9 pool-1-thread-1: add 4 to -5 pool-1-thread-1: add 5 to 0 Sum: 0
При принятии решения об использовании оптимистической блокировки важно действовать с изменяемой переменной не долго. Чем дольше это действие — тем чаще будет ошибочно compare-and-set
, и чаще придется выполнять это действие снова.
Используя compare-and-set
метод, мы также можем реализовать неблокирующую блокировку чтения. В этом случае мы сохраняем текущую версию объекта в атомарной переменной. После завершения обработки объекта — сравните текущую версию объекта с сохраненной. Если они не равны — это снова прочитать свойства объекта, используя обычную read-write
блокировку.
class Transaction { long debit } class Account { AtomicLong version = new AtomicLong() ReadWriteLock readWriteLock = new ReentrantReadWriteLock() List<Transaction> transactions = new ArrayList<Transaction>() } long balance(Account account) { ReentrantReadWriteLock.ReadLock locked while(true) { long balance = 0 long version = account.version.get() account.transactions.each {balance += it.debit} //volatile write for JMM if (account.version.compareAndSet(version, version)) { if (locked) {locked.unlock()} return balance } else { locked = account.readWriteLock.readLock() } } } void modifyTransaction(Account account, int position, long newDebit) { def writeLock = account.readWriteLock.writeLock() account.version.incrementAndGet() account.transactions[position].debit = newDebit writeLock.unlock() }
Замки
ReentrantLock
В отличие от синхронизированного блока, ReentrantLock
позволяет более гибко выбирать время блокировки и разблокировки, поскольку он использует обычные вызовы Java.
ReentrantLock
также предоставляет информацию о текущем состоянии блокировки и позволяет «ожидать» блокировки в течение определенного времени. Поддерживает правильную блокировку и освобождение рекурсивных блокировок для одного потока. Если вам нужна справедливая блокировка (соответствует порядку захвата монитора) — ReentrantLock
также предоставьте этот механизм.
Несмотря на то, что блокировка syncronized
and ReentrantLock
очень похожа на уровне интерфейса, реализация их совершенно иная. Не вдаваясь в детали JMM: используйте ReentrantLock
вместо этого предоставленный синхронизированный блок JVM, только если вы очень часто сражаетесь за монитор. В случае, когда только один поток ( как правило ) пытается попасть в синхронизированный метод — производительность с простым syncronized
лучше, чем ReentrantLock
.
ReentrantReadWriteLock
ReentrantReadWriteLock
Дополняет ReentrantLock
возможность получить множество блокировок чтения и блокировок записи. При необходимости блокировка при входе может быть «понижена» до блока чтения.
StampedLock JDK 1,8
Реализует оптимистическую и пессимистическую блокировки чтения-записи с возможностью дальнейшего увеличения или уменьшения состояния блокировки. Оптимистическая блокировка реализуется через текущую блокировку «штамп» ( javadoc ):
double distanceFromOriginV1() { // A read-only method long stamp; if ((stamp = sl.tryOptimisticRead()) != 0L) { // optimistic double currentX = x; double currentY = y; if (sl.validate(stamp)) return Math.sqrt(currentX * currentX + currentY * currentY); } stamp = sl.readLock(); // fall back to read lock try { double currentX = x; double currentY = y; return Math.sqrt(currentX * currentX + currentY * currentY); } finally { sl.unlockRead(stamp); } }
Коллекции
Учебный класс | Описание |
---|---|
|
Справедливая очередь для передачи сообщений из одного потока в другой. Поддержка методов block ( |
|
Структура ключ-значение, основанная на |
|
Сбалансированная многопоточная структура ключ-значение (O (log n)). Поиск основан на скип-листе. Карта должна быть в состоянии сравнить ключи. |
|
|
|
Блокировка для записи, нет блокировки в списке чтения. Любая модификация создает новый экземпляр массива в памяти. |
|
|
|
|
|
Двунаправленный |
|
Однонаправленный |
|
Однонаправленный |
|
Однонаправленный |
|
Направленный |
Точки синхронизации
Учебный класс | Описание |
---|---|
|
Барьер ( |
|
Barrier ( |
|
Барьер ( |
|
Расширение |
|
Барьер, позволяющий захватывать только указанное количество потоков монитора. Фактически расширяет функциональность |
Исполнители
ExecutorService
заменяет new Thread(runnable)
для упрощения работы с потоками. ExecutorService
помогает повторно использовать свободные потоки, организовать очередь задач в пул потоков, разрешить подписку (или ожидание) результата задач. Вместо Runnable
пула интерфейса используется интерфейс Callable
(позволяет вернуть результат и выдать ошибку).
ExecutorService pool = Executors.newFixedThreadPool(4) Future future = pool.submit(new Callable() { Object call() throws Exception { println("In thread") return "From thread" } }) println("From main") println(future.get()) try { pool.submit(new Callable() { Object call() throws Exception { throw new IllegalStateException() } }).get() } catch (ExecutionException e) {println("Got it: ${e.cause}")} pool.shutdown()
InvokeAll
Метод возвращает управление вызывающему потоку только после завершения всех задач. InvokeAny
Метод возвращает результат первого успешно выполненного задания, отменяя все в дальнейшем.
Учебный класс | Описание |
---|---|
|
Пул потоков с возможностью указания рабочего и максимального количества потоков в пуле, очереди для задач. |
|
Расширяет функциональность |
|
Облегченный пул потоков для «самовоспроизводящихся» задач. Пул ожидает вызова |
class LNode { List<LNode> childs = [] def object } class Finder extends RecursiveTask<LNode> { LNode node Object expect protected LNode compute() { if (node?.object?.equals(expect)) { return node } node?.childs?.collect { new Finder(node: it, expect: expect).fork() }?.collect { it.join() }?.find { it != null } } } ForkJoinPool es = new ForkJoinPool() def invoke = es.invoke(new Finder( node: new LNode( childs: [ new LNode(object: "ivalid"), new LNode( object: "ivalid", childs: [new LNode(object: "test")] ) ] ), expect: "test" )) print("${invoke?.object}")
Аккумуляторы JDK 1,8
Аккумуляторы позволяют выполнять примитивные операции (например, суммирование или поиск максимального значения) с числовыми элементами в многопоточной среде без использования CAS.