JDK одновременный пакет
Текущая модель памяти Java гарантирует ожидаемый порядок выполнения многопоточного кода, если в этом коде нет гонок. Чтобы защитить ваш код от гонок, придумайте разные способы синхронизации и обмена данными между ними. Пакет java.util.concurrent, входящий в состав HotSpot JDK, предоставляет следующие инструменты для написания многопоточного кода:
- атомное
- Замки
- Коллекции
- Точки синхронизации
- Исполнители
- Аккумуляторы JDK 1,8
атомное
В дочернем пакете java.util.concurrent.atomic есть набор классов для работы с примитивными атомарными типами. Контракт этих классов обеспечивает выполнение операции compare-and-set за «одну единицу процессорного времени». Когда вы устанавливаете новое значение переменной, вы также передаете ее старое значение (подход оптимистической блокировки). Если после вызова значение переменной отличается от ожидаемого параметра — compare-and-set вернет false результат.
Например, возьмите два массива long переменных: [1,2,3,4,5] и [-1,-2,-3,-4,-5]. Несколько потоков будут перебираться в массиве и объединять элементы в одну переменную. Код ( заводной ) с пессимистичной блокировкой выглядит так:
class Sum {
static monitor = new Object()
static volatile long sum = 0
}
class Summer implements Callable {
long[] data
Object call() throws Exception {
data.each {
synchronized (Sum.monitor) {
println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
Sum.sum += it
}
}
}
}
Executors.newFixedThreadPool(2).invokeAll([
new Summer(data: [1,2,3,4,5]),
new Summer(data: [-1,-2,-3,-4,-5])
])
print("Sum: ${Sum.sum}")
Ожидаемый результат:
pool-1-thread-1: add 1 to 0 pool-1-thread-2: add -1 to 1 pool-1-thread-1: add 2 to 0 pool-1-thread-2: add -2 to 2 pool-1-thread-1: add 3 to 0 pool-1-thread-2: add -3 to 3 pool-1-thread-1: add 4 to 0 pool-1-thread-1: add 5 to 4 pool-1-thread-2: add -4 to 9 pool-1-thread-2: add -5 to 5 Sum: 0
Однако этот подход имеет существенные недостатки в производительности. В этом случае мы тратим гораздо больше времени на нахождение в состоянии блокировки, чем на выполнение реального кода:
- попытка заблокировать монитор
- блокирующая нить
- разблокировать монитор
- разблокировать тему
Рассмотрим использование AtomicLong для реализации оптимистической блокировки при расчете той же суммы:
class Sum {
static volatile AtomicLong sum = new AtomicLong(0)
}
class Summer implements Callable {
long[] data
Object call() throws Exception {
data.each {
while(true) {
long localSum = Sum.sum.get()
if (Sum.sum.compareAndSet(localSum, localSum + it)) {
println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
break;
} else {
println("[MISS!] ${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
}
}
}
}
}
Executors.newFixedThreadPool(2).invokeAll([
new Summer(data: [1,2,3,4,5]),
new Summer(data: [-1,-2,-3,-4,-5])
])
print("Sum: ${Sum.sum}")
Как видно из результатов, попыток ошибок было не так много:
[MISS!] pool-1-thread-1: add 1 to -1 pool-1-thread-2: add -1 to -1 pool-1-thread-2: add -2 to -3 [MISS!] pool-1-thread-1: add 1 to -3 pool-1-thread-2: add -3 to -6 pool-1-thread-1: add 1 to -5 [MISS!] pool-1-thread-2: add -4 to -5 pool-1-thread-1: add 2 to -7 pool-1-thread-2: add -4 to -7 pool-1-thread-1: add 3 to -9 pool-1-thread-2: add -5 to -9 pool-1-thread-1: add 4 to -5 pool-1-thread-1: add 5 to 0 Sum: 0
При принятии решения об использовании оптимистической блокировки важно действовать с изменяемой переменной не долго. Чем дольше это действие — тем чаще будет ошибочно compare-and-set, и чаще придется выполнять это действие снова.
Используя compare-and-set метод, мы также можем реализовать неблокирующую блокировку чтения. В этом случае мы сохраняем текущую версию объекта в атомарной переменной. После завершения обработки объекта — сравните текущую версию объекта с сохраненной. Если они не равны — это снова прочитать свойства объекта, используя обычную read-write блокировку.
class Transaction {
long debit
}
class Account {
AtomicLong version = new AtomicLong()
ReadWriteLock readWriteLock = new ReentrantReadWriteLock()
List<Transaction> transactions = new ArrayList<Transaction>()
}
long balance(Account account) {
ReentrantReadWriteLock.ReadLock locked
while(true) {
long balance = 0
long version = account.version.get()
account.transactions.each {balance += it.debit}
//volatile write for JMM
if (account.version.compareAndSet(version, version)) {
if (locked) {locked.unlock()}
return balance
} else {
locked = account.readWriteLock.readLock()
}
}
}
void modifyTransaction(Account account, int position, long newDebit) {
def writeLock = account.readWriteLock.writeLock()
account.version.incrementAndGet()
account.transactions[position].debit = newDebit
writeLock.unlock()
}
Замки
ReentrantLock
В отличие от синхронизированного блока, ReentrantLock позволяет более гибко выбирать время блокировки и разблокировки, поскольку он использует обычные вызовы Java.
ReentrantLock также предоставляет информацию о текущем состоянии блокировки и позволяет «ожидать» блокировки в течение определенного времени. Поддерживает правильную блокировку и освобождение рекурсивных блокировок для одного потока. Если вам нужна справедливая блокировка (соответствует порядку захвата монитора) — ReentrantLock также предоставьте этот механизм.
Несмотря на то, что блокировка syncronized and ReentrantLockочень похожа на уровне интерфейса, реализация их совершенно иная. Не вдаваясь в детали JMM: используйте ReentrantLock вместо этого предоставленный синхронизированный блок JVM, только если вы очень часто сражаетесь за монитор. В случае, когда только один поток ( как правило ) пытается попасть в синхронизированный метод — производительность с простым syncronized лучше, чем ReentrantLock .
ReentrantReadWriteLock
ReentrantReadWriteLock Дополняет ReentrantLock возможность получить множество блокировок чтения и блокировок записи. При необходимости блокировка при входе может быть «понижена» до блока чтения.
StampedLock JDK 1,8
Реализует оптимистическую и пессимистическую блокировки чтения-записи с возможностью дальнейшего увеличения или уменьшения состояния блокировки. Оптимистическая блокировка реализуется через текущую блокировку «штамп» ( javadoc ):
double distanceFromOriginV1() { // A read-only method
long stamp;
if ((stamp = sl.tryOptimisticRead()) != 0L) { // optimistic
double currentX = x;
double currentY = y;
if (sl.validate(stamp))
return Math.sqrt(currentX * currentX + currentY * currentY);
}
stamp = sl.readLock(); // fall back to read lock
try {
double currentX = x;
double currentY = y;
return Math.sqrt(currentX * currentX + currentY * currentY);
} finally {
sl.unlockRead(stamp);
}
}
Коллекции
| Учебный класс | Описание |
|---|---|
|
|
Справедливая очередь для передачи сообщений из одного потока в другой. Поддержка методов block ( |
|
|
Структура ключ-значение, основанная на |
|
|
Сбалансированная многопоточная структура ключ-значение (O (log n)). Поиск основан на скип-листе. Карта должна быть в состоянии сравнить ключи. |
|
|
|
|
|
Блокировка для записи, нет блокировки в списке чтения. Любая модификация создает новый экземпляр массива в памяти. |
|
|
|
|
|
|
|
|
Двунаправленный |
|
|
Однонаправленный |
|
|
Однонаправленный |
|
|
Однонаправленный |
|
|
Направленный |
Точки синхронизации
| Учебный класс | Описание |
|---|---|
|
|
Барьер ( |
|
|
Barrier ( |
|
|
Барьер ( |
|
|
Расширение |
|
|
Барьер, позволяющий захватывать только указанное количество потоков монитора. Фактически расширяет функциональность |
Исполнители
ExecutorService заменяет new Thread(runnable) для упрощения работы с потоками. ExecutorService помогает повторно использовать свободные потоки, организовать очередь задач в пул потоков, разрешить подписку (или ожидание) результата задач. Вместо Runnableпула интерфейса используется интерфейс Callable (позволяет вернуть результат и выдать ошибку).
ExecutorService pool = Executors.newFixedThreadPool(4)
Future future = pool.submit(new Callable() {
Object call() throws Exception {
println("In thread")
return "From thread"
}
})
println("From main")
println(future.get())
try {
pool.submit(new Callable() {
Object call() throws Exception {
throw new IllegalStateException()
}
}).get()
} catch (ExecutionException e) {println("Got it: ${e.cause}")}
pool.shutdown()
InvokeAll Метод возвращает управление вызывающему потоку только после завершения всех задач. InvokeAny Метод возвращает результат первого успешно выполненного задания, отменяя все в дальнейшем.
| Учебный класс | Описание |
|---|---|
|
|
Пул потоков с возможностью указания рабочего и максимального количества потоков в пуле, очереди для задач. |
|
|
Расширяет функциональность |
|
|
Облегченный пул потоков для «самовоспроизводящихся» задач. Пул ожидает вызова |
class LNode {
List<LNode> childs = []
def object
}
class Finder extends RecursiveTask<LNode> {
LNode node
Object expect
protected LNode compute() {
if (node?.object?.equals(expect)) {
return node
}
node?.childs?.collect {
new Finder(node: it, expect: expect).fork()
}?.collect {
it.join()
}?.find {
it != null
}
}
}
ForkJoinPool es = new ForkJoinPool()
def invoke = es.invoke(new Finder(
node: new LNode(
childs: [
new LNode(object: "ivalid"),
new LNode(
object: "ivalid",
childs: [new LNode(object: "test")]
)
]
),
expect: "test"
))
print("${invoke?.object}")
Аккумуляторы JDK 1,8
Аккумуляторы позволяют выполнять примитивные операции (например, суммирование или поиск максимального значения) с числовыми элементами в многопоточной среде без использования CAS.