Несколько недель назад мы говорили о алгоритмах передачи сообщений без блокировок с Groovy ++ и предоставили реализацию акторов, похожих на Scala или Erlang, но быстрее. Сегодня мы будем использовать ту же технику для реализации похожей, но отличной от Clojure концепции, называемой агентами. Вы можете найти исходный код и другие примеры в дистрибутиве Groovy ++
Так что же такое агент? Вероятно, лучшим источником является документация Clojure , которая очень вдохновляет читателей, даже если вы ненавидите Lisp-подобный синтаксис, используемый Clojure, так же, как и я. Короче говоря, я приведу очень ненаучное определение
Агент — это ссылка на неизменный объект, а изменение самой ссылки допускается только асинхронными действиями, которые выполняются не более чем для каждого агента в любой момент времени.
Агенты могут иметь дополнительные валидаторы для предлагаемого нового значения и (также необязательные) прослушиватели для успешных обновлений
Для экспертов Clojure важно отметить, что агенты Groovy ++ еще не интегрированы ни с одной STM (программной транзакционной памятью)
Давайте начнем с развития нашего класса
@Typed final class Agent<T> extends ExecutingChannel {
private volatile T ref
final T get () { ref }
.........
Мы извлекаем наш класс из ExecutingChannel (вариация того, что мы разработали в предыдущей статье), что даст нам семантику «максимум одно действие в любой момент времени» бесплатно. И определить энергозависимое поле для ссылки на внутреннее состояние и геттер для этой ссылки.
Несколько важных вещей, на которые стоит обратить внимание
- Доступ к указанному объекту доступен немедленно без какой-либо дополнительной синхронизации.
- За это мы платим, сохраняя ссылку в волатильном поле
- Именно из-за 1) объект, на который делается ссылка, должен быть неизменным или рассматриваться как неизменяемый
- Groovy ++ предоставляет несколько постоянных структур данных, идеально подходящих для таких ситуаций — FList, FQueue, FHashMap & FVector (некоторые другие будут добавлены)
Теперь давайте позаботимся о валидаторах и слушателях. Для обоих мы определяем
- абстрактный статический внутренний класс (пока не исправлена ошибка в ядре Groovy, которая не позволяет использовать внутренние интерфейсы)
- изменяемое поле для постоянного списка этого типа (мы хотим иметь возможность асинхронно добавлять приемники удаления и валидаторы)
- добавить / удалить метод для обработки членства в списке
abstract static class Validator<T> {
abstract boolean validate (Agent<T> agent, T newValue)
}
private volatile FList<Listener<T>> listeners = FList.emptyList
abstract static class Listener<T> {
abstract void onUpdate (Agent<T> agent)
}
private volatile FList<Validator<T>> validators = FList.emptyList
Listener addListener (Listener listener) {
listeners.apply{ it + listener }
listener
}
void removeListener (Listener listener) {
listeners.apply{ it - listener }
}
Validator addValidator (Validator validator) {
validators.apply{ it + validator }
validator
}
void removeValidator (Validator validator) {
validators.apply{ it - validator }
}
Обратите внимание, как элегантно мы можем применить новое значение к изменчивому полю. Вместо for (;;) {..compareAndSet …} мы просто используем apply {…}
Внимательный читатель может спросить, зачем нам нужны специальные типы для валидатора / слушателя, и вместо этого использовать стандартные Function1 <Agent <T>,?> Для слушателя и Function2 <Agent <T>, T, Boolean> для валидатора. По правде говоря, моя первоначальная версия использовала именно это, но потом я обнаружил, что упаковка результатов валидатора очень неестественна, и решил внедрить интерфейс с одним методом (у слушателя есть специальный класс для симметрии).
Другой интересный вопрос — зачем нам возвращаемое значение addListener / addValidator. Это интересная история. Давайте рассмотрим следующий фрагмент кода
def validator = agent.addValidator {...<validator body>... }
Здесь происходит неявное создание анонимного внутреннего класса, расширяющего Validator. Затем этот экземпляр передается в качестве аргумента методу addValidator. Если нам нужно сохранить ссылку для этого валидатора (например, для последующего удаления), нам нужен доступ к нему. Это причина для возврата значения метода addValidator.
Теперь мы готовы реализовать основной метод класса Agent. Для удобства мы будем называть это «зов» (извините за тавтологию)
В Groovy вы почти всегда можете опустить имя метода, если метод называется ‘call’
Вот код
void call (Mutation<T> mutation, Runnable whenDone = null) {
def that = this
schedule {
def oldRef = ref
def newRef = mutation(oldRef)
if (newRef !== oldRef) {
if(!validators.any{ !it.validate(that, newRef) }) {
ref = newRef
listeners*.onUpdate(that)
}
}
whenDone?.run ()
}
}
Вероятно, это требует немного объяснений
Прежде всего Mutation <T> — это то же самое, что Function1 <T, T>, который является функцией, принимающей аргумент типа T и возвращающей значение того же типа.
Во-вторых, ExecutorChannel.schedule — это метод, который отправляет специальное сообщение на канал. Вы можете думать об этом сообщении так, как будто оно Runnable для выполнения. Посмотрите на Groovy ++ источники для деталей.
И последнее, но не менее важное: у нас есть дополнительный необязательный параметр, представляющий операцию, выполняемую при завершении действий мутации / проверки / прослушивателя. Чтобы привести один пример, почему это может быть полезно, мы разработаем еще один метод.
Представьте, что по каким-то причинам мы хотим инициировать мутацию значения агента и ждать, пока он не завершится. Вот простой (и более важный, свободный от тупиков) способ сделать это.
void await (Mutation<T> mutation) {
CountDownLatch cdl = [1]
call(mutation) {
cdl.countDown ()
}
cdl.await()
}
В приведенном выше коде мы вводим внутренний CountDownLatch и освобождаем его после завершения действия обновления.
Теперь, когда мы закончили с дизайном агента, давайте попробуем его использовать.
Для создания агента мы делаем следующее
Agent<FVector<Integer>> sharedVector = [FVector.emptyVector]
sharedVector.executor = pool
В приведенном выше коде мы явно устанавливаем свойство executor (java.util.concurrent.Executor). Во многих случаях это не требуется, поскольку стандартная библиотека Groovy ++ имеет обозначение текущего исполнителя, которое доступно в потоках, управляемых библиотекой.
Чтобы обновить / заполнить внутренний вектор, мы можем сделать что-то подобное
for(i in 0..<100) {
Mutation<FVector<Integer>> action = {
it.length < 100*100 ? it + it.length : it
}
sharedVector(action) {
if (sharedVector.get().length < 100*100)
sharedVector(action, this)
}
}
Обратите внимание на хитрость, которую мы используем, чтобы сделать наше обновление «рекурсивным». Это еще одно интересное использование обратного вызова завершения whenDone. Когда обновление завершено, мы проверяем, хотим ли мы продолжить и, если да, планируем то же действие с тем же обратным вызовом завершения.
Прежде чем читать следующий абзац, я предлагаю читателю подумать, что не так с кодом выше.
К сожалению, приведенный выше код не совсем корректен. Может случиться, что один поток решил и запланировал обновление для нашего агента, потому что внутренний вектор содержит меньше элементов, чем мы хотим, но несколько обновлений уже запланировано, и в целом мы превышаем желаемое количество элементов. Валидатор — полезный инструмент в этой ситуации
def validator = sharedVector.addValidator { agent, newValue ->
newValue.length <= 100*100
}
..............
sharedVector.removeValidator(validator)
Мы также можем работать с агентом квазисинхронно. Допустим, мы хотим запустить 10 потоков, чтобы перемешать внутренний вектор 1000 раз. Мы можем сделать это как в коде ниже.
CountDownLatch shuffleCdl = [10]
for(i in 0..<10) {
pool.execute {
def r = new Random ()
for(j in 0..<1000) {
def i1 = r.nextInt (100*100)
def i2 = r.nextInt (100*100)
sharedVector.await {
def v1 = it[i1]
def v2 = it[i2]
it.set(i2, v1).set(i1, v2)
}
}
shuffleCdl.countDown()
}
}
shuffleCdl.await ()
Агент представляет собой простую, но удобную общую структуру данных, которая не требует специальной синхронизации. Я надеюсь, что эта статья прояснила для вас. Спасибо за чтение и до следующего раза.