Параллельный обмен сообщениями — очень удобный инструмент для создания высокопроизводительных приложений. Грубо говоря, идея состоит в том, что мы стараемся избегать блокировки потоков при ожидании ресурсов и вместо этого отправляем сообщения реактивным объектам, когда есть над чем работать. Это решает множество проблем, включая минимизацию потенциальных взаимоблокировок и значительное повышение производительности приложений для тяжелых многопоточных приложений.
К сожалению, помимо блокировки потоков у нас также есть проблема блокировки операций с общим пулом ресурсов. Позвольте мне привести конкретный пример. Несколько дней назад я играл с Кассандрой. Вот самый простой код доступа к Кассандре
def transport = new TSocket("localhost", 9160)
def client = new Cassandra.Client(new TBinaryProtocol(transport))
transport.open()
// do whatever you need with Cassandra
transport.close ()
Здесь следует отметить два момента:
- мы явно не хотим открывать / закрывать соединение для каждого запроса к кластеру Cassandra, поэтому нам нужно иметь общий (то есть требующий синхронизации) пул соединений
- даже когда у нас есть доступное для нашего использования соединение, мы будем заблокированы до завершения операции, во многих ситуациях (например, в потоке пользовательского интерфейса) это неприемлемо
Наше первое желание состояло в том, чтобы переписать клиентский API Cassandra в стиле продолжения, основанном на неблокирующем вводе-выводе сокетов, но это, кажется, не такая простая задача (она основана на автоматически сгенерированном коде Thrift)
Вот наше общее решение для такого рода проблем. Это относительно короткий (74 LOC), и мы опишем ключевые идеи ниже
@Typed abstract class ResourcePool<R> {
Executor executor
boolean runFair
private volatile Pair<FQueue<Action<R>>,FList<R>> state = [FQueue.emptyQueue,null]
abstract FList<R> initResources ()
abstract static class Action<R> extends Function1<R,?> {
Runnable whenDone
}
final void execute (Action action, Runnable whenDone = null) {
action.whenDone = whenDone
if (state.second == null) {
initPool ()
}
for (;;) {
def s = state
if (s.second.empty) {
// no resource available, so put action in to the queue
if(state.compareAndSet(s, [s.first.addLast(action), FList.emptyList]))
break
}
else {
// queue is guaranteed to be empty
if(state.compareAndSet(s, [FQueue.emptyQueue, s.second.tail])) {
// schedule action
executor.execute {
scheduledAction(action,s.second.head)
}
break
}
}
}
}
private final def scheduledAction(Action<R> action, R resource) {
action(resource)
for (;;) {
def s = state
if (s.first.empty) {
// no more actions => we return resource to the pool
if(state.compareAndSet(s, [FQueue.emptyQueue, s.second + resource])) {
return action.whenDone?.run ()
}
}
else {
def removed = s.first.removeFirst()
if(state.compareAndSet(s, [removed.second, s.second])) {
if (runFair || action.whenDone) {
// schedule action
executor.execute {
scheduledAction(removed.first,resource)
}
return action.whenDone?.run ()
}
else {
// tail recursion
return scheduledAction(removed.first, resource)
}
}
}
}
}
private synchronized void initPool () {
if (state.second == null) {
state.second = initResources ()
}
}
}
Позвольте мне объяснить, что происходит в коде выше
- Общий параметр R — это тип ресурса, которым мы управляем
- Есть абстрактный метод initResources, который отвечает за создание и инициализацию объединенных ресурсов
- ResourcePool должен быть поддержан некоторым Исполнителем для управления параллелизмом
- Есть логическое свойство runFair, которое, грубо говоря, отвечает за стратегию совместного использования ресурсов Executor с другими задачами. Я объясню это в деталях ниже
Наш пример с Сassandra может выглядеть примерно так
ResourcePool<Cassandra.Client> cassandraPool = {
FList<Cassandra.Client> cp = FList.emptyList
for (i in 0..<3) {
def transport = new TSocket("localhost", 9160)
def client = new Cassandra.Client(new TBinaryProtocol(transport))
transport.open()
cp = cp + client
}
cp
}
cassandraPool.executor = pool
Единственным интерфейсным методом для ResourcePool является метод execute, который принимает одно или два замыкания. Первое закрытие определяет операцию, которая будет выполнена, когда общий ресурс стал доступным. Второе закрытие является необязательным и определяет продолжение, которое должно быть запланировано после завершения основной операции. Например, мы можем использовать наш пул, как это
cassandraPool.execute { client ->
def start = System.currentTimeMillis()
for (i in integers) {
client.insert(keyspace, i, colPathName, "Chris Goffinet$i".toString().getBytes(UTF8), start, ConsistencyLevel.ONE)
client.insert(keyspace, i, colPathAge, "$j".toString().getBytes(UTF8), start, ConsistencyLevel.ONE)
}
def elapsed = System.currentTimeMillis() - start
println "Thread$j: ${integers.size()} inserts in $elapsed ms ${(1.0d*elapsed)/integers.size()}"
}{
cdl.countDown()
}
Сама реализация довольно проста. Мы используем функциональные списки наших старых друзей и функциональные очереди и сохраняем состояние в виде пары очередей действий для выполнения и списка доступных ресурсов.
Существует очень важный инвариант, который мы всегда сохраняем в нашем алгоритме: если в очереди есть какое-либо действие, то доступного ресурса нет, а если есть доступный ресурс, очередь действий пуста.
Учитывая вышеизложенное, относительно легко понять, как работает пара методов методы execute / scheduleAction .
- Если сообщение приходит, когда нет доступного ресурса, мы просто помещаем его в очередь.
- Когда ресурс становится доступным (путем завершения предыдущей обработки), он заботится об удалении элемента из очереди и планировании выполнения (или непосредственном выполнении).
- Если сообщение появляется при наличии доступного ресурса, то ресурс удаляется из доступного списка и выполнение запланировано
runFair — это специальное свойство, которое говорит, нужно ли нам планировать новое выполнение после обработки предыдущего действия или можно повторно использовать тот же поток. Обычно это нормально с точки зрения приложения и быстрее с точки зрения производительности для повторного использования. Если этого не происходит, это обычно означает, что вы можете рассмотреть возможность использования двух разных исполнителей вместо разных частей вашей логики.
Я надеюсь, что это было интересно, и вы найдете этот инструмент полезным в своем развитии. Спасибо за чтение и до следующего раза.