Quasar — это библиотека, которая добавляет в JVM настоящие легкие потоки (волокна). Они очень дешевые и очень быстрые — на самом деле волокна ведут себя так же, как процессы Erlang или Go-процедуры — и позволяют писать простой код блокировки, в то же время обладая теми же преимуществами в производительности, что и сложный асинхронный код.
В этом посте мы узнаем, как преобразовать любой асинхронный API, основанный на обратном вызове, в красивый (оптоволоконный) API блокировки. Он предназначен для людей, которые хотят интегрировать свои собственные или сторонние библиотеки с волокнами Quasar . Вам не нужно знать об этом, если вы просто используете волокна Quasar с каналами или актерами или используете много интеграций, уже доступных в проекте Comsat (код, представленный ниже, является кодом, который разработчик приложения никогда не видит). Но даже если вы этого не сделаете, вы можете найти этот пост полезным для понимания того, как Quasar делает свое волшебство.
Почему Async?
Во-первых, причина того, что многие библиотеки предоставляют асинхронные API-интерфейсы, заключается в том, что количество запущенных потоков 1, которые может обрабатывать ОС, намного меньше, чем, скажем, число открытых TCP-соединений, которые ОС может поддерживать. А именно, ваш компьютер может поддерживать гораздо более высокий уровень параллелизма, чем предлагаемый потоками, поэтому библиотеки — и разработчики, использующие их — отказываются от потока в качестве абстракции, используемой для блока параллелизма программного обеспечения 2 . Асинхронные API не блокируют потоки и могут привести к значительному увеличению производительности (обычно в пропускной способности и емкости сервера — не так много в задержке).
Но использование асинхронных API также создает код, который по праву заслужил название «адский обратный вызов». Ад обратного вызова достаточно плох в средах, где отсутствует многоядерная обработка, таких как Javascript; это может быть намного хуже в тех, как JVM, где вам нужно заботиться о видимости памяти и синхронизации.
Написание блокирующего кода, работающего на волокнах, дает вам те же преимущества, что и асинхронный код, без недостатков: вы используете хорошие блокирующие API (вы даже можете продолжать использовать существующие), но вы получаете все преимущества производительности от неблокирующего кода.
Безусловно, у асинхронных API есть еще одно преимущество: они позволяют одновременно отправлять несколько операций ввода-вывода (например, HTTP-запросы). Поскольку выполнение этих операций обычно занимает много времени, и они часто независимы, мы можем одновременно ожидать завершения нескольких из них. Эти полезные функции, однако, также возможны с Java-фьючерсами, не требуя обратных вызовов. Позже мы увидим, как сделать фьючерсы, блокирующие волокна.
FiberAsync
Многие современные Java IO / библиотеки / драйверы баз данных поставляются с двумя разновидностями API: синхронным (потоковым) и асинхронным на основе обратных вызовов (это верно для NIO, клиента JAX-RS, клиента Apache HTTP и многих других ). Синхронный API намного приятнее.
В Quasar есть программный инструмент, который преобразует любой асинхронный API на основе обратного вызова в хороший волоконный блок: FiberAsync
. По сути, FiberASync
блокирует текущее волокно, устанавливает асинхронный обратный вызов, и когда обратный вызов срабатывает, он снова FiberASync
волокно и возвращает результат операции (или выдает исключение, если оно не удалось).
Чтобы понять, как использовать FiberAsync
, мы рассмотрим пример API: FooClient
. FooClient
— это современный IO API, поэтому он имеет два варианта: синхронный, блокирующий поток и асинхронный. Они здесь:
01
02
03
04
05
06
07
08
09
10
11
12
|
interface FooClient { String op(String arg) throws FooException, InterruptedException; } interface AsyncFooClient { Future<String> asyncOp(String arg, FooCompletion<String> callback); } interface FooCompletion<T> { void success(T result); void failure(FooException exception); } |
Обратите внимание, как асинхронная операция, как это имеет место во многих современных библиотеках, принимает обратный вызов и возвращает будущее. А пока давайте игнорируем будущее; мы вернемся к этому позже.
FooClient
намного приятнее и проще, чем AsyncFooClient
, но он блокирует поток и значительно снижает пропускную способность. Мы хотим создать реализацию интерфейса FooClient
который может работать внутри волокна и блокировать его, поэтому у нас будет простой код и отличная пропускная способность. Для этого мы будем использовать AsyncFooClient
под капотом, но превратим его в FooClient
блокирующий FooClient
. Вот весь код, который нам нужен (мы немного упростим его):
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
public class FiberFooClient implements FooClient { private final AsyncFooClient asyncClient; public FiberFooClient(AsyncFooClient asyncClient) { this .asyncClient = asyncClient; } @Override @Suspendable String op( final String arg) throws FooException, InterruptedException { try { return new FiberAsync<String, FooException>() { @Override protected void requestAsync() { asyncClient.asyncOp(arg, new FooCompletion<String>() { public void success(String result) { FiberAsync. this .asyncCompleted(result); } public void failure(FooException exception) { FiberAsync. this .asyncFailed(exception); } }); } }.run(); } catch (SuspendExecution e) { throw new AssertionError(e); } } } |
Теперь, что здесь происходит? Мы реализуем интерфейс FooClient
, но мы делаем блокировку оптоволокна, а не блокировку потоков. Нам нужно сообщить Quasar, что наш метод — блокировка волокон (или «приостановлен»), поэтому мы @Suspendable
его как @Suspendable
.
Затем мы создаем подклассы FiberAsync
и реализуем метод requestAsync
(два аргумента универсального типа, FiberAsync
принимает FiberAsync
— это тип возвращаемого значения и тип проверяемого исключения, которое может FiberAsync
операция, если таковые имеются; для нет проверенных исключений вторым универсальным аргументом должно быть RuntimeException
). requestAsync
отвечает за запуск асинхронной операции и регистрацию обратного вызова. Затем обратный вызов должен вызвать asyncCompleted
— если операция прошла успешно — и передать ему результат, который мы хотим вернуть, или asyncFailed
— если операция не удалась — и передать ему исключение по причине сбоя.
Наконец, мы вызываем FiberAsync.run()
. Это блокирует текущее волокно и вызывает requestAsync
для установки обратного вызова. Волокно будет оставаться заблокированным до тех пор, пока не сработает обратный вызов, что освободит FiberAsync
, вызвав либо asyncCompleted
либо asyncFailed
. Метод run
также имеет версию, которая принимает аргумент тайм-аута, что может быть полезно, если мы хотим ограничить по времени операцию блокировки (в общем, это хорошая идея).
Еще одна вещь, которую нужно объяснить, это блок try/catch
. Есть два способа объявить метод suspendable: аннотировать его с помощью @Suspendable
или объявить, что он выбрасывает проверенное исключение SuspendExecution
. FiberAsync
run
FiberAsync
используется последний, поэтому для компиляции кода нам нужно перехватить SuspendExecution
, но поскольку это не является настоящим исключением, мы никогда не сможем его перехватить (ну, по крайней мере, если Quasar работает правильно) — следовательно, AssertionError
.
Как только это будет сделано, вы можете использовать op
в любом волокне, например так:
1
2
3
4
5
|
new Fiber<Void>(() ->{ // ... String res = client.op(); // ... }).start(); |
Кстати, все это намного короче с Pulsar (Quasar’s Clojure API), где асинхронная операция:
1
|
(async-op arg #(println "result:" %)) |
Преобразуется в следующий синхронный код блокировки волокна с помощью макроса Pulsar await
:
1
|
(println "result:" (await (async-op arg))) |
Упрощение и массовое производство
Обычно такой интерфейс, как FooClient
, имеет много методов, и, как правило, большинство методов в AsyncFooClient
принимают такой же тип обратного вызова ( FooCompletion
). Если это так, мы можем FiberAsync
капсулу большую часть кода, который мы видели, в именованный подкласс FiberAsync
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
abstract class FooAsync<T> extends FiberAsync<T, FooException> implements FooCompletion<T> { @Override public void success(T result) { asyncCompleted(result); } @Override public void failure(FooException exception) { asyncFailed(exception); } @Override @Suspendable public T run() throws FooException, InterruptedException { try { return super .run(); } catch (SuspendExecution e) { throw new AssertionError(); } } @Override @Suspendable public T run( long timeout, TimeUnit unit) throws FooException, InterruptedException, TimeoutException { try { return super .run(timeout, unit); } catch (SuspendExecution e) { throw new AssertionError(); } } } |
Обратите внимание, как мы заставили нашу FiberAsync
напрямую реализовывать FooCompletion
вызов FooCompletion
— это не обязательно, но это полезный шаблон. Теперь наш метод блокировки волокон намного проще, и другие операции в этом интерфейсе могут быть реализованы так же легко:
1
2
3
4
5
6
7
8
9
|
@Override @Suspendable public String op( final String arg) throws FooException, InterruptedException { return new FooAsync<String>() { protected void requestAsync() { asyncClient.asyncOp(arg, this ); } }.run(); } |
Иногда нам может потребоваться, чтобы наш метод op
вызывался в обычных потоках, а не в оптоволокне. По умолчанию FiberAsync.run()
выдает исключение, если вызывается в потоке. Чтобы это исправить, все, что нам нужно сделать, — это реализовать другой метод FiberAsync
, requestSync
, который вызывает исходный синхронный API, если в волокне вызывается run
. Наш окончательный код выглядит следующим образом (мы предполагаем, что FiberFooClass
имеет поле FooClient
типа FooClient
):
01
02
03
04
05
06
07
08
09
10
11
12
|
@Override @Suspendable public String op( final String arg) throws FooException, InterruptedException { return new FooAsync<String>() { protected void requestAsync() { asyncClient.asyncOp(arg, this ); } public String requestSync() { return syncClient.op(arg); } }.run(); } |
И это все!
фьючерсы
Фьючерсы — это удобный способ одновременного запуска нескольких длительных независимых операций ввода-вывода, пока мы ожидаем завершения всех из них. Мы хотим, чтобы наши волокна могли блокироваться на фьючерсах. Многие библиотеки Java возвращают фьючерсы из своих асинхронных операций, так что пользователь может выбирать между полностью асинхронным, основанным на обратном вызове использованием и «полусинхронным» использованием, которое использует фьючерсы; наш интерфейс AsyncFooClient
работает именно так.
Вот как мы реализуем версию AsyncFooClient
которая возвращает блокирующие волокна фьючерсы:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
import co.paralleluniverse.strands.SettableFuture; public class FiberFooAsyncClient implements FooClient { private final AsyncFooClient asyncClient; public FiberFooClient(AsyncFooClient asyncClient) { this .asyncClient = asyncClient; } @Override public Future<String> asyncOp(String arg, FooCompletion<String> callback) { final SettableFuture<T> future = new SettableFuture<>(); asyncClient.asyncOp(arg, callbackFuture(future, callback)) return future; } private static <T> FooCompletion<T> callbackFuture( final SettableFuture<T> future, final FooCompletion<T> callback) { return new FooCompletion<T>() { @Override public void success(T result) { future.set(result); callback.completed(result); } @Override public void failure(Exception ex) { future.setException(ex); callback.failed(ex); } @Override public void cancelled() { future.cancel( true ); callback.cancelled(); } }; } } |
Будущее, которое мы возвращаем, co.paralleluniverse.strands.SettableFuture
, работает одинаково хорошо, если мы блокируем его либо на волокнах, либо на простых нитях (т. co.paralleluniverse.strands.SettableFuture
На любом типе прядей ).
Завершаемое будущее JDK 8 и прослушиваемое будущее Guava
API-интерфейсы, которые возвращают CompletionStage
(или CompletableFuture
который его реализует) — добавленный в Java в JDK 8 — могут сделать блокировку волокон намного проще с помощью встроенных FiberAsync
. Например,
1
|
CompletableFuture<String> asyncOp(String arg); |
превращается в вызов блокировки волокна с помощью:
1
|
String res = AsyncCompletionStage.get(asyncOp(arg)); |
Методы, возвращающие Google Guava, аналогично преобразуются в синхронную блокировку волокна, поэтому:
1
|
ListenableFuture<String> asyncOp(String arg); |
Включается блокировка волокна с:
1
|
String res = AsyncListenableFuture.get(asyncOp(arg)); |
Альтернатива фьючерсам
Хотя фьючерсы полезны и знакомы, нам не нужен специальный API, который возвращает их при использовании волокон. Волокна настолько дешевы, что их можно выращивать, а класс Fiber
реализует Future
так что сами волокна могут заменить фьючерсы, изготовленные вручную. Вот пример:
1
2
3
4
5
6
7
|
void work() { Fiber<String> f1 = new Fiber<>(() -> fiberFooClient.op( "first operation" )); Fiber<String> f2 = new Fiber<>(() -> fiberFooClient.op( "second operation" )); String res1 = f1.get(); String res2 = f2.get(); } |
Таким образом, волокна дают нам будущее, даже если API, которые мы используем, этого не делают.
Что делать, если нет асинхронного API?
К сожалению, иногда мы сталкиваемся с библиотекой, которая предоставляет только синхронный API, блокирующий потоки. JDBC является ярким примером такого API. Хотя Quasar не может увеличить пропускную способность работы с подобной библиотекой, сделать API-волокно-совместимым все же целесообразно (и на самом деле очень просто). Почему? Поскольку волокна, выполняющие вызовы синхронному сервису, вероятно, делают и другие вещи. На самом деле, они могут вызывать службу довольно редко (учитывайте данные, считываемые по оптоволокну из СУБД, только при отсутствии кэша).
Чтобы достичь этого, нужно превратить блокирующий API в асинхронный, выполнив фактические вызовы в выделенном пуле потоков, а затем обернув этот фальшивый асинхронный API с помощью FiberAsync
. Этот процесс настолько механический, что у FiberAsync
есть некоторые статические методы, которые заботятся обо всем за нас. Так что предположим, что наш сервис выставил только блокирующий API FooClient
. Чтобы сделать это блокирующим волокно, все, что мы делаем, это:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
public class SadFiberFooClient implements FooClient { private final FooClient client; private static final ExecutorService FOO_EXECUTOR = Executors.newCachedThreadPool(); public FiberFooClient(FooClient client) { this .client = client; } @Override @Suspendable String op( final String arg) throws FooException, InterruptedException { try { return FiberAsync.runBlocking(FOO_EXECUTOR, () -> client.op()); } catch (SuspendExecution e) { throw new AssertionError(e); } } } |
Эта реализация FooClient
безопасна для использования как нитями, так и волокнами. Фактически, при вызове в простом потоке метод не будет беспокоить о передаче операции в предоставленный пул потоков, а выполнит ее в текущем потоке — так же, как это было бы, если бы мы использовали исходную реализацию FooClient
.
Вывод
Методы, показанные здесь — с FiberAsync
и cpstrands.SettableFuture
— именно так работают модули интеграции, составляющие проект Comsat . Comsat включает в себя интеграции для сервлетов, JAX-RS (сервер и клиент), JDBC, JDBI, jOOQ, MongoDB, Retrofit и Dropwizard.
Важно увидеть, как — чтобы сделать простые и производительные API-интерфейсы блокировки волокон — мы действительно заново реализовали интерфейсы API, но не их внутреннюю работу: исходный библиотечный код все еще используется, только через его асинхронный API, уродливость которого Теперь скрыто от потребителя библиотеки.
Дополнительный кредит: как насчет монад?
Существуют и другие способы, кроме волокон, для борьбы с адом обратного вызова. Наиболее известные механизмы в мире JVM — это составные фьючерсы Scala, наблюдаемые RxJava и JDK 8 CompletionStage
/ CompletableFuture
. Это все примеры монад и монадной композиции. Монады работают, и некоторым людям нравится их использовать, но я думаю, что они не подходят для большинства языков программирования.
Видите ли, монады заимствованы из языков программирования на основе лямбда-исчисления. Лямбда-исчисление — это теоретическая модель вычислений, полностью отличная от, но полностью аналогичная машине Тьюринга. Но в отличие от модели машины Тьюринга, вычисления лямбда-исчисления не имеют понятия шагов, действий или состояний. Эти вычисления ничего не делают ; они просто есть . Таким образом, монады — это способ для основанных на LC языков, таких как Haskell, описывать действие, состояние, время и т. Д. Как чистые вычисления. Это способ для языка LC сказать компьютеру «сделай это, а потом сделай это».
Дело в том, что в императивных языках уже есть абстракция «сделай это, а потом сделай это», и эта абстракция — это поток. Мало того, но императивные языки обычно имеют очень простую запись «сделай это, а потом сделай это»: за этим следует заявление, за которым следует это . Единственная причина, по которой императивные языки даже рассматривают возможность принятия такой внешней концепции, заключается в том, что реализация потоков (ядром ОС) является менее чем удовлетворительной. Но вместо того, чтобы принимать чужую, незнакомую концепцию — и требующую совершенно разных типов API — лучше исправить реализацию (потоков), чем использовать аналогичную, но слегка различную абстракцию. Волокна сохраняют абстракцию и фиксируют реализацию.
Другая проблема с монадами в таких языках, как Java и Scala, заключается в том, что эти языки не только обязательны, но и допускают неограниченную мутацию общего состояния и побочные эффекты — то, чего нет у Haskell. Комбинация неограниченных мутаций общего состояния и «ниточных» монад может иметь катастрофические последствия. На чистом языке FP — поскольку побочные эффекты контролируются — единица вычислений, а именно функция, также является единицей параллелизма: вы можете безопасно выполнять любую пару функций одновременно. Это не тот случай, когда у вас есть неограниченный побочный эффект. Порядок выполнения функции, могут ли две функции быть выполнены одновременно, и если и когда функция может наблюдать мутации общего состояния, выполненные другой, — все это серьезные проблемы. В результате функции, выполняемые как часть «потоковых» монад, должны быть либо чистыми (без каких-либо побочных эффектов), либо быть очень-очень осторожными в отношении того, как они выполняют эти побочные эффекты. Это именно то, чего мы пытаемся избежать. Таким образом, хотя монадические композиции действительно генерируют гораздо более приятный код, чем callback-ад, они не решают никаких проблем параллелизма, возникающих в асинхронном коде.
PS
Предыдущий раздел не должен читаться как одобрение чистых языков «FP», таких как Haskell, потому что я на самом деле думаю, что они создают слишком много других проблем. Я полагаю, что (ближайшее) будущее — это императивные языки 3 , которые допускают мутации общего состояния, но с некоторой транзакционной семантикой. Я верю, что эти будущие языки будут черпать вдохновение в основном из таких языков, как Clojure и Erlang.
Обсудить на Reddit
- Под выполнением я имею в виду потоки, которые достаточно часто запускаются ↩
- См . Закон Литтла, Масштабируемость и Отказоустойчивость ↩
- Являются ли они «функциональными» или нет, это сложный вопрос, так как никто не придумал хорошего определения, что такое функциональный язык программирования и что отличает его от нефункциональных языков. ↩
Ссылка: | Прощание с асинхронным кодом от нашего партнера JCG Дафны Пресслер в блоге Parallel Universe . |