Статьи

JDK одновременный пакет

JDK одновременный пакет

Текущая модель памяти Java гарантирует ожидаемый порядок выполнения многопоточного кода, если в этом коде нет гонок. Чтобы защитить ваш код от гонок, придумайте разные способы синхронизации и обмена данными между ними. Пакет  java.util.concurrent, входящий в состав HotSpot JDK, предоставляет следующие инструменты для написания многопоточного кода:

  • атомное
  • Замки
  • Коллекции
  • Точки синхронизации
  • Исполнители
  • Аккумуляторы  JDK 1,8

атомное

В дочернем пакете  java.util.concurrent.atomic есть набор классов для работы с примитивными атомарными типами. Контракт этих классов обеспечивает выполнение операции  compare-and-set за «одну единицу процессорного времени». Когда вы устанавливаете новое значение переменной, вы также передаете ее старое значение (подход оптимистической блокировки). Если после вызова значение переменной отличается от ожидаемого параметра — compare-and-set вернет  false результат.

Например, возьмите два массива  long переменных:  [1,2,3,4,5] и  [-1,-2,-3,-4,-5]. Несколько потоков будут перебираться в массиве и объединять элементы в одну переменную. Код ( заводной ) с пессимистичной блокировкой выглядит так:

class Sum {
    static monitor = new Object()
    static volatile long sum = 0
}
class Summer implements Callable {
    long[] data
    Object call() throws Exception {
        data.each {
            synchronized (Sum.monitor) {
                println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
                Sum.sum += it
            }
        }
    }
}

Executors.newFixedThreadPool(2).invokeAll([
        new Summer(data: [1,2,3,4,5]),
        new Summer(data: [-1,-2,-3,-4,-5])
])

print("Sum: ${Sum.sum}")

Ожидаемый результат:
pool-1-thread-1: add 1 to 0
pool-1-thread-2: add -1 to 1
pool-1-thread-1: add 2 to 0
pool-1-thread-2: add -2 to 2
pool-1-thread-1: add 3 to 0
pool-1-thread-2: add -3 to 3
pool-1-thread-1: add 4 to 0
pool-1-thread-1: add 5 to 4
pool-1-thread-2: add -4 to 9
pool-1-thread-2: add -5 to 5
Sum: 0

Однако этот подход имеет существенные недостатки в производительности. В этом случае мы тратим гораздо больше времени на нахождение в состоянии блокировки, чем на выполнение реального кода:

  • попытка заблокировать монитор
  • блокирующая нить
  • разблокировать монитор
  • разблокировать тему

Рассмотрим использование  AtomicLong для реализации оптимистической блокировки при расчете той же суммы:

class Sum {
    static volatile AtomicLong sum = new AtomicLong(0)
}
class Summer implements Callable {
    long[] data
    Object call() throws Exception {
        data.each {
                while(true) {
                    long localSum = Sum.sum.get()
                    if (Sum.sum.compareAndSet(localSum, localSum + it)) {
                        println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
                        break;
                    } else {
                        println("[MISS!] ${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
                    }
                }
        }
    }
}

Executors.newFixedThreadPool(2).invokeAll([
        new Summer(data: [1,2,3,4,5]),
        new Summer(data: [-1,-2,-3,-4,-5])
])

print("Sum: ${Sum.sum}")

Как видно из результатов, попыток ошибок было не так много:

[MISS!] pool-1-thread-1: add 1 to -1
pool-1-thread-2: add -1 to -1
pool-1-thread-2: add -2 to -3
[MISS!] pool-1-thread-1: add 1 to -3
pool-1-thread-2: add -3 to -6
pool-1-thread-1: add 1 to -5
[MISS!] pool-1-thread-2: add -4 to -5
pool-1-thread-1: add 2 to -7
pool-1-thread-2: add -4 to -7
pool-1-thread-1: add 3 to -9
pool-1-thread-2: add -5 to -9
pool-1-thread-1: add 4 to -5
pool-1-thread-1: add 5 to 0
Sum: 0

При принятии решения об использовании оптимистической блокировки важно действовать с изменяемой переменной не долго. Чем дольше это действие — тем чаще будет ошибочно  compare-and-set, и чаще придется выполнять это действие снова.

Используя  compare-and-set метод, мы также можем реализовать неблокирующую блокировку чтения. В этом случае мы сохраняем текущую версию объекта в атомарной переменной. После завершения обработки объекта — сравните текущую версию объекта с сохраненной. Если они не равны — это снова прочитать свойства объекта, используя обычную  read-write блокировку.

class Transaction {
    long debit
}

class Account {
    AtomicLong version = new AtomicLong()
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock()
    List<Transaction> transactions = new ArrayList<Transaction>()
}

long  balance(Account account) {
    ReentrantReadWriteLock.ReadLock locked
    while(true) {
        long balance = 0
        long version = account.version.get()
        account.transactions.each {balance += it.debit}
        //volatile write for JMM
        if (account.version.compareAndSet(version, version)) {
            if (locked) {locked.unlock()}
            return balance
        } else {
            locked = account.readWriteLock.readLock()
        }
    }
}

void modifyTransaction(Account account, int position, long newDebit) {
    def writeLock = account.readWriteLock.writeLock()
    account.version.incrementAndGet()
    account.transactions[position].debit = newDebit
    writeLock.unlock()
}

Замки

ReentrantLock

В отличие от синхронизированного блока,  ReentrantLock позволяет более гибко выбирать время блокировки и разблокировки, поскольку он использует обычные вызовы Java.

ReentrantLock также предоставляет информацию о текущем состоянии блокировки и позволяет «ожидать» блокировки в течение определенного времени. Поддерживает правильную блокировку и освобождение рекурсивных блокировок для одного потока. Если вам нужна справедливая блокировка (соответствует порядку захвата монитора) — ReentrantLock также предоставьте этот механизм.

Несмотря на то, что   блокировка syncronized and  ReentrantLockочень похожа на уровне интерфейса, реализация их совершенно иная. Не вдаваясь в детали JMM: используйте  ReentrantLock вместо этого предоставленный синхронизированный блок JVM, только если вы очень часто сражаетесь за монитор. В случае, когда только один поток ( как правило ) пытается попасть в синхронизированный метод — производительность с простым  syncronized лучше, чем  ReentrantLock .

ReentrantReadWriteLock

ReentrantReadWriteLock Дополняет  ReentrantLock возможность получить множество блокировок чтения и блокировок записи. При необходимости блокировка при входе может быть «понижена» до блока чтения.

StampedLock  JDK 1,8

Реализует оптимистическую и пессимистическую блокировки чтения-записи с возможностью дальнейшего увеличения или уменьшения состояния блокировки. Оптимистическая блокировка реализуется через текущую блокировку «штамп» ( javadoc ):

double distanceFromOriginV1() { // A read-only method
 long stamp;
 if ((stamp = sl.tryOptimisticRead()) != 0L) { // optimistic
   double currentX = x;
   double currentY = y;
   if (sl.validate(stamp))
     return Math.sqrt(currentX * currentX + currentY * currentY);
 }
 stamp = sl.readLock(); // fall back to read lock
 try {
   double currentX = x;
   double currentY = y;
     return Math.sqrt(currentX * currentX + currentY * currentY);
 } finally {
   sl.unlockRead(stamp);
 }
}

Коллекции

Учебный класс Описание

ArrayBlockingQueue

Справедливая очередь для передачи сообщений из одного потока в другой. Поддержка методов block ( put() take()) и non-blocking ( offer() pool()). Запрещает нулевые значения. Емкость очереди должна присутствовать во время строительства.

ConcurrentHashMap

Структура ключ-значение, основанная на  hash функции. На чтении нет замков. Пишите блоки только часть карты (сегмента). Количество сегментов ограничено ближайшей  concurrencyLevelстепенью 2.

ConcurrentSkipListMap

Сбалансированная многопоточная структура ключ-значение (O (log n)). Поиск основан на скип-листе. Карта должна быть в состоянии сравнить ключи.

ConcurrentSkipListSet

ConcurrentSkipListMap без ценностей.

CopyOnWriteArrayList

Блокировка для записи, нет блокировки в списке чтения. Любая модификация создает новый экземпляр массива в памяти.

CopyOnWriteArraySet

CopyOnWriteArrayList без ценностей.

DelayQueue

PriorityBlockingQueue позволяет получить товар только после определенной задержки (задержка устанавливается через  Delayed интерфейс). DelayQueue может быть полезным для реализации планировщика. Емкость не фиксированная.

LinkedBlockingDeque

Двунаправленный  BlockingQueue, основанный на подключении (пропуск кэша и издержки когерентности кэша). Емкость не фиксированная.

LinkedBlockingQueue

Однонаправленный  BlockingQueue, основанный на подключении (пропуск кэша и издержки когерентности кэша). Емкость не фиксированная.

LinkedTransferQueue

Однонаправленный  BlockingQueue, основанный на подключении (пропуск кэша и издержки когерентности кэша). Емкость не фиксированная. В этой очереди есть методы ожидания потребителя в потоке производителя.

PriorityBlockingQueue

Однонаправленный  BlockingQueue, с приоритетом сообщений (через сравнение элементов).

SynchronousQueue

Направленный  BlockingQueue, реализует  transfer() логику (из очереди передачи) для  put()методов.

Точки синхронизации

Учебный класс Описание

CountDownLatch

Барьер ( await()), ожидание определенного (или более) количества вызовов  countDown(). Состояние барьера не может быть сброшено.

CyclicBarrier

Barrier ( await()), ожидающий определенного количества вызовов  await() других потоков. Когда количество потоков достигнет указанного — будет вызван дополнительный обратный вызов, и блокировка снимется. Барьер сбрасывает свое первоначальное состояние, когда число потоков повторяет указанное число и может быть использовано повторно.

Exchanger

Барьер ( exchange()) для синхронизации двух потоков. В это время возможна синхронизация передачи объектов между потоками.

Phaser

Расширение  CyclicBarrier, которое позволяет регистрировать и удалять элементы на каждом цикле шлагбаума.

Semaphore

Барьер, позволяющий захватывать только указанное количество потоков монитора. Фактически расширяет функциональность Lock с возможностью быть несколькими потоками в блоке синхронизации.

Исполнители

ExecutorService заменяет  new Thread(runnable) для упрощения работы с потоками. ExecutorService помогает повторно использовать свободные потоки, организовать очередь задач в пул потоков, разрешить подписку (или ожидание) результата задач. Вместо Runnableпула интерфейса  используется интерфейс  Callable (позволяет вернуть результат и выдать ошибку).

ExecutorService pool = Executors.newFixedThreadPool(4)
Future future = pool.submit(new Callable() {
    Object call() throws Exception {
        println("In thread")
        return "From thread"
    }
})
println("From main")
println(future.get())

try {
    pool.submit(new Callable() {
        Object call() throws Exception {
            throw new IllegalStateException()
        }
    }).get()
} catch (ExecutionException e) {println("Got it: ${e.cause}")}

pool.shutdown()

InvokeAll Метод возвращает управление вызывающему потоку только после завершения всех задач. InvokeAny Метод возвращает результат первого успешно выполненного задания, отменяя все в дальнейшем.

Учебный класс Описание

ThreadPoolExecutor

Пул потоков с возможностью указания рабочего и максимального количества потоков в пуле, очереди для задач.

ScheduledThreadPoolExecutor

Расширяет функциональность  ThreadPoolExecutor с возможностью выполнять задачи на регулярной основе или с отсрочкой.

ForkJoinPool

Облегченный пул потоков для «самовоспроизводящихся» задач. Пул ожидает вызова  fork() и join() методов с дочерними задачами в родительской.

class LNode {
    List<LNode> childs = []
    def object
}

class Finder extends RecursiveTask<LNode> {
    LNode  node
    Object expect

    protected LNode compute() {
        if (node?.object?.equals(expect)) {
            return node
        }
        node?.childs?.collect {
            new Finder(node: it, expect: expect).fork()
        }?.collect {
            it.join()
        }?.find {
            it != null
        }
    }
}

ForkJoinPool es = new ForkJoinPool()
def invoke = es.invoke(new Finder(
        node: new LNode(
                childs: [
                        new LNode(object: "ivalid"),
                        new LNode(
                                object: "ivalid",
                                childs: [new LNode(object: "test")]
                        )
                ]
        ),
        expect: "test"
))

print("${invoke?.object}")

Аккумуляторы  JDK 1,8

Аккумуляторы позволяют выполнять примитивные операции (например, суммирование или поиск максимального значения) с числовыми элементами в многопоточной среде без использования CAS.