Статьи

Прощание с асинхронным кодом

Quasar — это библиотека, которая добавляет в JVM настоящие легкие потоки (волокна). Они очень дешевые и очень быстрые — на самом деле волокна ведут себя так же, как процессы Erlang или Go-процедуры — и позволяют писать простой код блокировки, в то же время обладая теми же преимуществами в производительности, что и сложный асинхронный код.

В этом посте мы узнаем, как преобразовать любой асинхронный API, основанный на обратном вызове, в красивый (оптоволоконный) API блокировки. Он предназначен для людей, которые хотят интегрировать свои собственные или сторонние библиотеки с волокнами Quasar . Вам не нужно знать об этом, если вы просто используете волокна Quasar с каналами или актерами или используете много интеграций, уже доступных в проекте Comsat (код, представленный ниже, является кодом, который разработчик приложения никогда не видит). Но даже если вы этого не сделаете, вы можете найти этот пост полезным для понимания того, как Quasar делает свое волшебство.

Почему Async?

Во-первых, причина того, что многие библиотеки предоставляют асинхронные API-интерфейсы, заключается в том, что количество запущенных потоков 1, которые может обрабатывать ОС, намного меньше, чем, скажем, число открытых TCP-соединений, которые ОС может поддерживать. А именно, ваш компьютер может поддерживать гораздо более высокий уровень параллелизма, чем предлагаемый потоками, поэтому библиотеки — и разработчики, использующие их — отказываются от потока в качестве абстракции, используемой для блока параллелизма программного обеспечения 2 . Асинхронные API не блокируют потоки и могут привести к значительному увеличению производительности (обычно в пропускной способности и емкости сервера — не так много в задержке).

Но использование асинхронных API также создает код, который по праву заслужил название «адский обратный вызов». Ад обратного вызова достаточно плох в средах, где отсутствует многоядерная обработка, таких как Javascript; это может быть намного хуже в тех, как JVM, где вам нужно заботиться о видимости памяти и синхронизации.

Написание блокирующего кода, работающего на волокнах, дает вам те же преимущества, что и асинхронный код, без недостатков: вы используете хорошие блокирующие API (вы даже можете продолжать использовать существующие), но вы получаете все преимущества производительности от неблокирующего кода.

Безусловно, у асинхронных API есть еще одно преимущество: они позволяют одновременно отправлять несколько операций ввода-вывода (например, HTTP-запросы). Поскольку выполнение этих операций обычно занимает много времени, и они часто независимы, мы можем одновременно ожидать завершения нескольких из них. Эти полезные функции, однако, также возможны с Java-фьючерсами, не требуя обратных вызовов. Позже мы увидим, как сделать фьючерсы, блокирующие волокна.

FiberAsync

Многие современные Java IO / библиотеки / драйверы баз данных поставляются с двумя разновидностями API: синхронным (потоковым) и асинхронным на основе обратных вызовов (это верно для NIO, клиента JAX-RS, клиента Apache HTTP и многих других ). Синхронный API намного приятнее.

В Quasar есть программный инструмент, который преобразует любой асинхронный API на основе обратного вызова в хороший волоконный блок: FiberAsync . По сути, FiberASync блокирует текущее волокно, устанавливает асинхронный обратный вызов, и когда обратный вызов срабатывает, он снова FiberASync волокно и возвращает результат операции (или выдает исключение, если оно не удалось).

Чтобы понять, как использовать FiberAsync , мы рассмотрим пример API: FooClient . FooClient — это современный IO API, поэтому он имеет два варианта: синхронный, блокирующий поток и асинхронный. Они здесь:

01
02
03
04
05
06
07
08
09
10
11
12
interface FooClient {
  String op(String arg) throws FooException, InterruptedException;
}
 
interface AsyncFooClient {
  Future<String> asyncOp(String arg, FooCompletion<String> callback);
}
 
interface FooCompletion<T> {
  void success(T result);
  void failure(FooException exception);
}

Обратите внимание, как асинхронная операция, как это имеет место во многих современных библиотеках, принимает обратный вызов и возвращает будущее. А пока давайте игнорируем будущее; мы вернемся к этому позже.

FooClient намного приятнее и проще, чем AsyncFooClient , но он блокирует поток и значительно снижает пропускную способность. Мы хотим создать реализацию интерфейса FooClient который может работать внутри волокна и блокировать его, поэтому у нас будет простой код и отличная пропускная способность. Для этого мы будем использовать AsyncFooClient под капотом, но превратим его в FooClient блокирующий FooClient . Вот весь код, который нам нужен (мы немного упростим его):

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class FiberFooClient implements FooClient {
    private final AsyncFooClient asyncClient;
 
    public FiberFooClient(AsyncFooClient asyncClient) {
        this.asyncClient = asyncClient;
    }
 
    @Override
    @Suspendable
    String op(final String arg) throws FooException, InterruptedException {
        try {
            return new FiberAsync<String, FooException>() {
                @Override
                protected void requestAsync() {
                    asyncClient.asyncOp(arg, new FooCompletion<String>() {
                        public void success(String result) {
                            FiberAsync.this.asyncCompleted(result);
                        }
                        public void failure(FooException exception) {
                            FiberAsync.this.asyncFailed(exception);
                        }
                    });
                }
            }.run();
        } catch(SuspendExecution e) {
            throw new AssertionError(e);
        }
    }
}

Теперь, что здесь происходит? Мы реализуем интерфейс FooClient , но мы делаем блокировку оптоволокна, а не блокировку потоков. Нам нужно сообщить Quasar, что наш метод — блокировка волокон (или «приостановлен»), поэтому мы @Suspendable его как @Suspendable .

Затем мы создаем подклассы FiberAsync и реализуем метод requestAsync (два аргумента универсального типа, FiberAsync принимает FiberAsync — это тип возвращаемого значения и тип проверяемого исключения, которое может FiberAsync операция, если таковые имеются; для нет проверенных исключений вторым универсальным аргументом должно быть RuntimeException ). requestAsync отвечает за запуск асинхронной операции и регистрацию обратного вызова. Затем обратный вызов должен вызвать asyncCompleted — если операция прошла успешно — и передать ему результат, который мы хотим вернуть, или asyncFailed — если операция не удалась — и передать ему исключение по причине сбоя.

Наконец, мы вызываем FiberAsync.run() . Это блокирует текущее волокно и вызывает requestAsync для установки обратного вызова. Волокно будет оставаться заблокированным до тех пор, пока не сработает обратный вызов, что освободит FiberAsync , вызвав либо asyncCompleted либо asyncFailed . Метод run также имеет версию, которая принимает аргумент тайм-аута, что может быть полезно, если мы хотим ограничить по времени операцию блокировки (в общем, это хорошая идея).

Еще одна вещь, которую нужно объяснить, это блок try/catch . Есть два способа объявить метод suspendable: аннотировать его с помощью @Suspendable или объявить, что он выбрасывает проверенное исключение SuspendExecution . FiberAsync run FiberAsync используется последний, поэтому для компиляции кода нам нужно перехватить SuspendExecution , но поскольку это не является настоящим исключением, мы никогда не сможем его перехватить (ну, по крайней мере, если Quasar работает правильно) — следовательно, AssertionError .

Как только это будет сделано, вы можете использовать op в любом волокне, например так:

1
2
3
4
5
new Fiber<Void>(() ->{
    // ...
    String res = client.op();
    // ...
}).start();

Кстати, все это намного короче с Pulsar (Quasar’s Clojure API), где асинхронная операция:

1
(async-op arg #(println "result:" %))

Преобразуется в следующий синхронный код блокировки волокна с помощью макроса Pulsar await :

1
(println "result:" (await (async-op arg)))

Упрощение и массовое производство

Обычно такой интерфейс, как FooClient , имеет много методов, и, как правило, большинство методов в AsyncFooClient принимают такой же тип обратного вызова ( FooCompletion ). Если это так, мы можем FiberAsync капсулу большую часть кода, который мы видели, в именованный подкласс FiberAsync :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
abstract class FooAsync<T> extends FiberAsync<T, FooException> implements FooCompletion<T> {
    @Override
    public void success(T result) {
        asyncCompleted(result);
    }
 
    @Override
    public void failure(FooException exception) {
        asyncFailed(exception);
    }
 
    @Override
    @Suspendable
    public T run() throws FooException, InterruptedException {
        try {
            return super.run();
        } catch (SuspendExecution e) {
            throw new AssertionError();
        }
    }
 
    @Override
    @Suspendable
    public T run(long timeout, TimeUnit unit) throws FooException, InterruptedException, TimeoutException {
        try {
            return super.run(timeout, unit);
        } catch (SuspendExecution e) {
            throw new AssertionError();
        }
    }
}

Обратите внимание, как мы заставили нашу FiberAsync напрямую реализовывать FooCompletion вызов FooCompletion — это не обязательно, но это полезный шаблон. Теперь наш метод блокировки волокон намного проще, и другие операции в этом интерфейсе могут быть реализованы так же легко:

1
2
3
4
5
6
7
8
9
@Override
@Suspendable
public String op(final String arg) throws FooException, InterruptedException {
    return new FooAsync<String>() {
        protected void requestAsync() {
            asyncClient.asyncOp(arg, this);
        }
    }.run();
}

Иногда нам может потребоваться, чтобы наш метод op вызывался в обычных потоках, а не в оптоволокне. По умолчанию FiberAsync.run() выдает исключение, если вызывается в потоке. Чтобы это исправить, все, что нам нужно сделать, — это реализовать другой метод FiberAsync , requestSync , который вызывает исходный синхронный API, если в волокне вызывается run . Наш окончательный код выглядит следующим образом (мы предполагаем, что FiberFooClass имеет поле FooClient типа FooClient ):

01
02
03
04
05
06
07
08
09
10
11
12
@Override
@Suspendable
public String op(final String arg) throws FooException, InterruptedException {
    return new FooAsync<String>() {
        protected void requestAsync() {
            asyncClient.asyncOp(arg, this);
        }
        public String requestSync() {
            return syncClient.op(arg);
        }
    }.run();
}

И это все!

фьючерсы

Фьючерсы — это удобный способ одновременного запуска нескольких длительных независимых операций ввода-вывода, пока мы ожидаем завершения всех из них. Мы хотим, чтобы наши волокна могли блокироваться на фьючерсах. Многие библиотеки Java возвращают фьючерсы из своих асинхронных операций, так что пользователь может выбирать между полностью асинхронным, основанным на обратном вызове использованием и «полусинхронным» использованием, которое использует фьючерсы; наш интерфейс AsyncFooClient работает именно так.

Вот как мы реализуем версию AsyncFooClient которая возвращает блокирующие волокна фьючерсы:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import co.paralleluniverse.strands.SettableFuture;
 
public class FiberFooAsyncClient implements FooClient {
    private final AsyncFooClient asyncClient;
 
    public FiberFooClient(AsyncFooClient asyncClient) {
        this.asyncClient = asyncClient;
    }
 
    @Override
    public Future<String> asyncOp(String arg, FooCompletion<String> callback) {
        final SettableFuture<T> future = new SettableFuture<>();
        asyncClient.asyncOp(arg, callbackFuture(future, callback))
        return future;
    }
 
    private static <T> FooCompletion<T> callbackFuture(final SettableFuture<T> future, final FooCompletion<T> callback) {
        return new FooCompletion<T>() {
            @Override
            public void success(T result) {
                future.set(result);
                callback.completed(result);
            }
 
            @Override
            public void failure(Exception ex) {
                future.setException(ex);
                callback.failed(ex);
            }
 
            @Override
            public void cancelled() {
                future.cancel(true);
                callback.cancelled();
            }
        };
    }
}

Будущее, которое мы возвращаем, co.paralleluniverse.strands.SettableFuture , работает одинаково хорошо, если мы блокируем его либо на волокнах, либо на простых нитях (т. co.paralleluniverse.strands.SettableFuture На любом типе прядей ).

Завершаемое будущее JDK 8 и прослушиваемое будущее Guava

API-интерфейсы, которые возвращают CompletionStage (или CompletableFuture который его реализует) — добавленный в Java в JDK 8 — могут сделать блокировку волокон намного проще с помощью встроенных FiberAsync . Например,

1
CompletableFuture<String> asyncOp(String arg);

превращается в вызов блокировки волокна с помощью:

1
String res = AsyncCompletionStage.get(asyncOp(arg));

Методы, возвращающие Google Guava, аналогично преобразуются в синхронную блокировку волокна, поэтому:

1
ListenableFuture<String> asyncOp(String arg);

Включается блокировка волокна с:

1
String res = AsyncListenableFuture.get(asyncOp(arg));

Альтернатива фьючерсам

Хотя фьючерсы полезны и знакомы, нам не нужен специальный API, который возвращает их при использовании волокон. Волокна настолько дешевы, что их можно выращивать, а класс Fiber реализует Future так что сами волокна могут заменить фьючерсы, изготовленные вручную. Вот пример:

1
2
3
4
5
6
7
void work() {
    Fiber<String> f1 = new Fiber<>(() -> fiberFooClient.op("first operation"));
    Fiber<String> f2 = new Fiber<>(() -> fiberFooClient.op("second operation"));
 
    String res1 = f1.get();
    String res2 = f2.get();
}

Таким образом, волокна дают нам будущее, даже если API, которые мы используем, этого не делают.

Что делать, если нет асинхронного API?

К сожалению, иногда мы сталкиваемся с библиотекой, которая предоставляет только синхронный API, блокирующий потоки. JDBC является ярким примером такого API. Хотя Quasar не может увеличить пропускную способность работы с подобной библиотекой, сделать API-волокно-совместимым все же целесообразно (и на самом деле очень просто). Почему? Поскольку волокна, выполняющие вызовы синхронному сервису, вероятно, делают и другие вещи. На самом деле, они могут вызывать службу довольно редко (учитывайте данные, считываемые по оптоволокну из СУБД, только при отсутствии кэша).

Чтобы достичь этого, нужно превратить блокирующий API в асинхронный, выполнив фактические вызовы в выделенном пуле потоков, а затем обернув этот фальшивый асинхронный API с помощью FiberAsync . Этот процесс настолько механический, что у FiberAsync есть некоторые статические методы, которые заботятся обо всем за нас. Так что предположим, что наш сервис выставил только блокирующий API FooClient . Чтобы сделать это блокирующим волокно, все, что мы делаем, это:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
public class SadFiberFooClient implements FooClient {
    private final FooClient client;
    private static final ExecutorService FOO_EXECUTOR = Executors.newCachedThreadPool();
 
    public FiberFooClient(FooClient client) {
        this.client = client;
    }
 
    @Override
    @Suspendable
    String op(final String arg) throws FooException, InterruptedException {
        try {
            return FiberAsync.runBlocking(FOO_EXECUTOR, () -> client.op());
        } catch(SuspendExecution e) {
            throw new AssertionError(e);
        }
    }
}

Эта реализация FooClient безопасна для использования как нитями, так и волокнами. Фактически, при вызове в простом потоке метод не будет беспокоить о передаче операции в предоставленный пул потоков, а выполнит ее в текущем потоке — так же, как это было бы, если бы мы использовали исходную реализацию FooClient .

Вывод

Методы, показанные здесь — с FiberAsync и cpstrands.SettableFuture — именно так работают модули интеграции, составляющие проект Comsat . Comsat включает в себя интеграции для сервлетов, JAX-RS (сервер и клиент), JDBC, JDBI, jOOQ, MongoDB, Retrofit и Dropwizard.

Важно увидеть, как — чтобы сделать простые и производительные API-интерфейсы блокировки волокон — мы действительно заново реализовали интерфейсы API, но не их внутреннюю работу: исходный библиотечный код все еще используется, только через его асинхронный API, уродливость которого Теперь скрыто от потребителя библиотеки.

Дополнительный кредит: как насчет монад?

Существуют и другие способы, кроме волокон, для борьбы с адом обратного вызова. Наиболее известные механизмы в мире JVM — это составные фьючерсы Scala, наблюдаемые RxJava и JDK 8 CompletionStage / CompletableFuture . Это все примеры монад и монадной композиции. Монады работают, и некоторым людям нравится их использовать, но я думаю, что они не подходят для большинства языков программирования.

Видите ли, монады заимствованы из языков программирования на основе лямбда-исчисления. Лямбда-исчисление — это теоретическая модель вычислений, полностью отличная от, но полностью аналогичная машине Тьюринга. Но в отличие от модели машины Тьюринга, вычисления лямбда-исчисления не имеют понятия шагов, действий или состояний. Эти вычисления ничего не делают ; они просто есть . Таким образом, монады — это способ для основанных на LC языков, таких как Haskell, описывать действие, состояние, время и т. Д. Как чистые вычисления. Это способ для языка LC сказать компьютеру «сделай это, а потом сделай это».

Дело в том, что в императивных языках уже есть абстракция «сделай это, а потом сделай это», и эта абстракция — это поток. Мало того, но императивные языки обычно имеют очень простую запись «сделай это, а потом сделай это»: за этим следует заявление, за которым следует это . Единственная причина, по которой императивные языки даже рассматривают возможность принятия такой внешней концепции, заключается в том, что реализация потоков (ядром ОС) является менее чем удовлетворительной. Но вместо того, чтобы принимать чужую, незнакомую концепцию — и требующую совершенно разных типов API — лучше исправить реализацию (потоков), чем использовать аналогичную, но слегка различную абстракцию. Волокна сохраняют абстракцию и фиксируют реализацию.

Другая проблема с монадами в таких языках, как Java и Scala, заключается в том, что эти языки не только обязательны, но и допускают неограниченную мутацию общего состояния и побочные эффекты — то, чего нет у Haskell. Комбинация неограниченных мутаций общего состояния и «ниточных» монад может иметь катастрофические последствия. На чистом языке FP — поскольку побочные эффекты контролируются — единица вычислений, а именно функция, также является единицей параллелизма: вы можете безопасно выполнять любую пару функций одновременно. Это не тот случай, когда у вас есть неограниченный побочный эффект. Порядок выполнения функции, могут ли две функции быть выполнены одновременно, и если и когда функция может наблюдать мутации общего состояния, выполненные другой, — все это серьезные проблемы. В результате функции, выполняемые как часть «потоковых» монад, должны быть либо чистыми (без каких-либо побочных эффектов), либо быть очень-очень осторожными в отношении того, как они выполняют эти побочные эффекты. Это именно то, чего мы пытаемся избежать. Таким образом, хотя монадические композиции действительно генерируют гораздо более приятный код, чем callback-ад, они не решают никаких проблем параллелизма, возникающих в асинхронном коде.

PS

Предыдущий раздел не должен читаться как одобрение чистых языков «FP», таких как Haskell, потому что я на самом деле думаю, что они создают слишком много других проблем. Я полагаю, что (ближайшее) будущее — это императивные языки 3 , которые допускают мутации общего состояния, но с некоторой транзакционной семантикой. Я верю, что эти будущие языки будут черпать вдохновение в основном из таких языков, как Clojure и Erlang.

Обсудить на Reddit

  1. Под выполнением я имею в виду потоки, которые достаточно часто запускаются
  2. См . Закон Литтла, Масштабируемость и Отказоустойчивость
  3. Являются ли они «функциональными» или нет, это сложный вопрос, так как никто не придумал хорошего определения, что такое функциональный язык программирования и что отличает его от нефункциональных языков.
Ссылка: Прощание с асинхронным кодом от нашего партнера JCG Дафны Пресслер в блоге Parallel Universe .