Статьи

Асинхронный повтор

Если у вас есть фрагмент кода, который часто дает сбой и должен быть повторен, эта библиотека Java 7/8 предоставляет богатый и ненавязчивый API с быстрым и масштабируемым решением этой проблемы:

1
2
3
4
5
6
7
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
RetryExecutor executor = new AsyncRetryExecutor(scheduler).
    retryOn(SocketException.class).
    withExponentialBackoff(500, 2).     //500ms times 2 after each retry
    withMaxDelay(10_000).               //10 seconds
    withUniformJitter().                //add between +/- 100 ms randomly
    withMaxRetries(20);

Теперь вы можете запустить произвольный блок кода, и библиотека будет повторять его для вас в случае, если выдает SocketException :

1
2
3
4
5
6
7
final CompletableFuture<Socket> future = executor.getWithRetry(() ->
        new Socket("localhost", 8080)
);
 
future.thenAccept(socket ->
        System.out.println("Connected! " + socket)
);

Пожалуйста, посмотрите внимательно! getWithRetry() не блокирует. Он немедленно возвращает CompletableFuture и асинхронно вызывает данную функцию. Вы можете слушать это Future или даже несколько фьючерсов одновременно и в то же время выполнять другую работу. Итак, этот код делает следующее: пытаясь подключиться к localhost:8080 и в случае сбоя с SocketException он будет повторяться через 500 миллисекунд (с некоторым случайным дрожанием), удваивая задержку после каждой попытки, но не более 10 секунд.

Эквивалентный, но более лаконичный синтаксис:

1
2
3
executor.
        getWithRetry(() -> new Socket("localhost", 8080)).
        thenAccept(socket -> System.out.println("Connected! " + socket));

Это пример выходных данных, которые вы можете ожидать:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
TRACE | Retry 0 failed after 3ms, scheduled next retry in 508ms (Sun Jul 21 21:01:12 CEST 2013)
java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:1.8.0-ea]
    //...
 
TRACE | Retry 1 failed after 0ms, scheduled next retry in 934ms (Sun Jul 21 21:01:13 CEST 2013)
java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:1.8.0-ea]
    //...
 
TRACE | Retry 2 failed after 0ms, scheduled next retry in 1919ms (Sun Jul 21 21:01:15 CEST 2013)
java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:1.8.0-ea]
    //...
 
TRACE | Successful after 2 retries, took 0ms and returned: Socket[addr=localhost/127.0.0.1,port=8080,localport=46332]
 
Connected! Socket[addr=localhost/127.0.0.1,port=8080,localport=46332]

Представьте, что вы подключаетесь к двум различным системам: одна работает медленно , вторая ненадежна и часто дает сбой:

1
2
3
4
5
6
CompletableFuture<String> stringFuture = executor.getWithRetry(ctx -> unreliable());
CompletableFuture<Integer> intFuture = executor.getWithRetry(ctx -> slow());
 
stringFuture.thenAcceptBoth(intFuture, (String s, Integer i) -> {
    //both done after some retries
});

thenAcceptBoth() вызов thenAcceptBoth() выполняется асинхронно, когда как медленные, так и ненадежные системы наконец отвечают без каких-либо сбоев. Аналогичным образом (используя CompletableFuture.acceptEither() ) вы можете одновременно асинхронно вызывать два или более ненадежных сервера и получать уведомление, когда первый из них завершается успешно после некоторого числа повторных попыток.

Я не могу подчеркнуть это достаточно — повторные попытки выполняются асинхронно и эффективно используют пул потоков, а не спят вслепую.

обоснование

Часто мы вынуждены повторять данный фрагмент кода, потому что он вышел из строя, и мы должны повторить попытку, обычно с небольшой задержкой, чтобы освободить процессор. Это требование довольно распространено, и существует несколько готовых универсальных реализаций с поддержкой повторов в Spring Batch через класс RetryTemplate который известен лучше всего. Но есть несколько других, очень похожих подходов ( [1] , [2] ). Все эти попытки (и я уверен, что многие из вас сами реализовали подобный инструмент!) Страдают одной и той же проблемой — они блокируют, тратя впустую много ресурсов и плохо масштабируя.

Это само по себе неплохо, поскольку делает модель программирования намного проще — библиотека заботится о повторных попытках, и вам просто нужно ждать возвращаемого значения дольше, чем обычно. Но он не только создает негерметичную абстракцию (метод, который обычно очень быстр, внезапно становится медленным из-за повторов и задержек), но также тратит впустую ценные потоки, так как такое средство будет проводить большую часть времени, спя между повторными попытками. Следовательно
Была создана утилита Async-Retry , предназначенная для Java 8 (с существующим обратным портом Java 7 ) и решающая проблемы, описанные выше.

Основная абстракция — это RetryExecutor который предоставляет простой API:

01
02
03
04
05
06
07
08
09
10
public interface RetryExecutor {
 
    CompletableFuture<Void> doWithRetry(RetryRunnable action);
 
    <V> CompletableFuture<V> getWithRetry(Callable<V> task);
 
    <V> CompletableFuture<V> getWithRetry(RetryCallable<V> task);
 
    <V> CompletableFuture<V> getFutureWithRetry(RetryCallable<CompletableFuture<V>> task);
}

Не беспокойтесь о RetryRunnable и RetryCallable — они позволяют проверять исключения для вашего удобства, и в большинстве случаев мы будем использовать лямбда-выражения в любом случае.

Обратите внимание, что он возвращает CompletableFuture . Мы больше не делаем вид, что вызов неисправного метода быстрый. Если библиотека обнаружит исключение, она повторит наш блок кода с предварительно настроенными задержками отката. Время вызова будет стремительно расти от миллисекунд до нескольких секунд. CompletableFuture ясно указывает на это. Более того, это не тупой java.util.concurrent.Future мы все знаем — CompletableFuture в Java 8 очень мощный и, самое главное, неблокирующее по умолчанию.

Если вам все- .get() нужен результат блокировки, просто вызовите .get() для объекта Future .

Базовый API

API очень прост. Вы предоставляете блок кода, и библиотека будет запускать его несколько раз, пока он не вернется нормально, вместо того, чтобы выдавать исключение. Это также может привести к настраиваемым задержкам между повторными попытками:

1
2
3
RetryExecutor executor = //...
 
executor.getWithRetry(() -> new Socket("localhost", 8080));

Возвращенное CompletableFuture<Socket> будет разрешено после подключения к localhost:8080 успешно. При желании мы можем использовать RetryContext чтобы получить дополнительный контекст, например, какая RetryContext попытка выполняется в данный момент:

1
2
3
executor.
    getWithRetry(ctx -> new Socket("localhost", 8080 + ctx.getRetryCount())).
    thenAccept(System.out::println);

Этот код умнее, чем кажется. Во время первого выполнения ctx.getRetryCount() возвращает 0 , поэтому мы пытаемся подключиться к localhost:8080 . Если это не удастся, при следующей повторной попытке будет пытаться использовать localhost:8081 ( 8080 + 1 ) и так далее. И если вы понимаете, что все это происходит асинхронно, вы можете сканировать порты нескольких машин и получать уведомления о первом порте ответа на каждом хосте:

1
2
3
4
5
6
7
Arrays.asList("host-one", "host-two", "host-three").
    stream().
    forEach(host ->
        executor.
            getWithRetry(ctx -> new Socket(host, 8080 + ctx.getRetryCount())).
            thenAccept(System.out::println)
    );

Для каждого хоста RetryExecutor попытается подключиться к порту 8080 и повторить попытку с более высокими портами.

getFutureWithRetry() требует особого внимания. Если вы хотите повторить метод, который уже возвращает CompletableFuture<V> : например, результат асинхронного HTTP-вызова:

1
2
3
4
5
6
7
private CompletableFuture<String> asyncHttp(URL url) { /*...*/}
 
//...
 
final CompletableFuture<CompletableFuture<String>> response =
    executor.getWithRetry(ctx ->
        asyncHttp(new URL("http://example.com")));

Передача asyncHttp() в getWithRetry() приведет к CompletableFuture<CompletableFuture<V>> . С ним не только неудобно работать, но и ломаться. Библиотека будет едва вызывать asyncHttp() и повторять asyncHttp() только в случае сбоя, но не в случае возврата
CompletableFuture<String> завершается ошибкой. Решение простое:

1
2
3
final CompletableFuture<String> response =
    executor.getFutureWithRetry(ctx ->
        asyncHttp(new URL("http://example.com")));

В этом случае RetryExecutor поймет, что все, что было возвращено из asyncHttp() , на самом деле является просто Future и будет (асинхронно) ожидать результата или неудачи. Эта библиотека намного мощнее, поэтому давайте углубимся в:

Варианты конфигурации

В общем, есть два важных фактора, которые вы можете настроить: RetryPolicy который контролирует, должна ли быть сделана следующая попытка повторной попытки, и Backoff — который дополнительно добавляет задержку между последующими попытками повторной попытки.

По умолчанию RetryExecutor повторяет пользовательскую задачу бесконечно для каждого Throwable и добавляет 1 секунду задержки между попытками повторения.

Создание экземпляра RetryExecutor

Реализация RetryExecutor по RetryExecutorAsyncRetryExecutor которую вы можете создать напрямую:

1
2
3
4
5
6
7
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
 
RetryExecutor executor = new AsyncRetryExecutor(scheduler);
 
//...
 
scheduler.shutdownNow();

Единственная необходимая зависимость — это стандартный ScheduledExecutorService из JDK . Во многих случаях достаточно одного потока, но если вы хотите одновременно обрабатывать повторы сотен или более задач, рассмотрите возможность увеличения размера пула.

Обратите внимание, что AsyncRetryExecutor не заботится о закрытии ScheduledExecutorService . Это сознательное дизайнерское решение, которое будет объяснено позже.

AsyncRetryExecutor есть немного других конструкторов, но большую часть времени изменение поведения этого класса наиболее удобно при вызове методов, связанных with*() . Вы увидите множество примеров, написанных таким образом. Позже мы просто будем использовать ссылку на executor без определения. Предположим, это тип RetryExecutor .

Повторная политика

Исключения

По умолчанию каждый Throwable (кроме специального AbortRetryException ), AbortRetryException из пользовательской задачи, вызывает повторную попытку. Очевидно, это настраивается. Например, в JPA вы можете повторить попытку транзакции, которая не удалась из-за OptimisticLockException но каждое другое исключение должно немедленно завершиться неудачей:

1
2
3
4
executor.
    retryOn(OptimisticLockException.class).
    withNoDelay().
    getWithRetry(ctx -> dao.optimistic());

Где dao.optimistic() может dao.optimistic() OptimisticLockException . В этом случае вы, вероятно, не хотите никакой задержки между повторными попытками, об этом позже. Если вам не нравится Throwable по умолчанию для каждого Throwable , просто retryOn() это с помощью retryOn() :

1
executor.retryOn(Exception.class)

Конечно, может потребоваться и обратное — прервать повторную попытку и немедленно прекратить работу в случае возникновения определенного исключения, а не повторной попытки. Это так просто:

1
2
3
4
executor.
        abortOn(NullPointerException.class).
        abortOn(IllegalArgumentException.class).
        getWithRetry(ctx -> dao.optimistic());

Очевидно, что вы не хотите повторять NullPointerException или IllegalArgumentException поскольку они указывают на программную ошибку, а не на временную ошибку. И, наконец, вы можете комбинировать политики повторов и прерываний. Пользовательский код будет повторяться в случае любого исключения (или подкласса) abortOn() если только оно не abortOn() указанное исключение. Например, мы хотим повторить каждую IOException или SQLException но прервать, если встречается java.sql.DataTruncation или java.sql.DataTruncation (порядок не имеет значения):

1
2
3
4
5
6
executor.
    retryOn(IOException.class).
    abortIf(FileNotFoundException.class).
    retryOn(SQLException.class).
    abortIf(DataTruncation.class).
    getWithRetry(ctx -> dao.load(42));

Если этого недостаточно, вы можете предоставить пользовательский предикат, который будет вызываться при каждой ошибке:

1
2
3
4
5
executor.
    abortIf(throwable ->
        throwable instanceof SQLException &&
                throwable.getMessage().contains("ORA-00911")
    );

Максимальное количество повторов

Другой способ прервать повторную «петлю» (помните, что этот процесс асинхронный, блокирующий цикл отсутствует ) — указать максимальное количество повторов:

1
executor.withMaxRetries(5)

В редких случаях вы можете отключить повторные попытки и практически не использовать преимущества асинхронного выполнения. В этом случае попробуйте:

1
executor.dontRetry()

Задержки между попытками (откат)

Иногда требуется повторная попытка сразу после сбоя (см. Пример OptimisticLockException ), но в большинстве случаев это плохая идея. Если вы не можете подключиться к внешней системе, подождите немного, прежде чем следующая попытка звучит разумно. Вы экономите процессор, пропускную способность и другие ресурсы сервера. Но есть немало вариантов для рассмотрения:

  • мы должны повторить с постоянными интервалами или увеличить задержку после каждого сбоя ?
  • должен ли быть нижний и верхний предел времени ожидания?
  • Должны ли мы добавить случайный «джиттер» к временам задержки, чтобы распределить попытки многих задач во времени?

Эта библиотека отвечает на все эти вопросы.

Фиксированный интервал между попытками

По умолчанию каждой повторной попытке предшествует 1 секунда ожидания. Таким образом, если первоначальная попытка не удалась, первая попытка будет выполнена через 1 секунду. Конечно, мы можем изменить это значение по умолчанию, например, до 200 миллисекунд:

1
executor.withFixedBackoff(200)

Если мы уже здесь, по умолчанию откат применяется после выполнения пользовательской задачи. Если само задание пользователя занимает некоторое время, повторные попытки будут происходить реже. Например, при задержке повторения 200 мс и среднем времени, которое требуется до сбоя пользовательской задачи со скоростью около 50 мс, RetryExecutor будет повторять примерно 4 раза в секунду (50 мс + 200 мс). Однако, если вы хотите сохранить частоту повторений на более предсказуемом уровне, вы можете использовать флаг fixedRate :

1
2
3
executor.
        withFixedBackoff(200).
        withFixedRate()

Это похоже на подходы «фиксированная скорость» и «фиксированная задержка» в ScheduledExecutorService . Кстати, не ожидайте, что RetryExecutor будет очень точным, он делает это лучше, но это сильно зависит от вышеупомянутой точности ScheduledExecutorService .

Экспоненциально растущие интервалы между попытками

Вероятно, это активный предмет исследования, но в целом вы можете увеличить задержку повторных попыток с течением времени, предполагая, что если пользовательская задача несколько раз завершается неудачей, мы должны пытаться реже. Например, допустим, что мы начинаем с задержкой 100 мс, пока не будет предпринята первая попытка повторной попытки, но если эта попытка также не удалась, нам следует подождать в два раза больше (200 мс). А потом 400 мс, 800 мс … Вы поняли:

1
executor.withExponentialBackoff(100, 2)

Это экспоненциальная функция, которая может расти очень быстро. Таким образом, полезно установить максимальное время задержки на некотором разумном уровне, например, 10 секунд:

1
2
3
executor.
    withExponentialBackoff(100, 2).
    withMaxDelay(10_000)      //10 seconds

Случайный джиттер

Одно явление, часто наблюдаемое во время больших простоев, состоит в том, что системы имеют тенденцию к синхронизации Представьте себе занятую систему, которая внезапно перестает отвечать. Сотни или тысячи запросов терпят неудачу и повторяются. Это зависит от вашего отката, но по умолчанию все эти запросы будут повторяться ровно через одну секунду, создавая огромную волну трафика в один момент времени. Наконец, такие сбои распространяются на другие системы, которые, в свою очередь, также синхронизируются.

Чтобы избежать этой проблемы, полезно распределять повторы во времени, выравнивая нагрузку. Простое решение состоит в том, чтобы добавить случайный джиттер к времени задержки, чтобы не все запросы были запланированы для повторения в одно и то же время. У вас есть выбор между равномерным джиттером (случайное значение от -100 мс до 100 мс):

1
executor.withUniformJitter(100)     //ms

… И пропорциональный джиттер, умножающий время задержки на случайный коэффициент, по умолчанию между 0,9 и 1,1 (10%):

1
executor.withProportionalJitter(0.1)        //10%

Вы также можете установить жесткий нижний предел времени задержки, чтобы избежать запланированных сокращений:

1
executor.withMinDelay(50)   //ms

Детали реализации

Эта библиотека была построена с учетом Java 8, чтобы воспользоваться лямбдами и новой абстракцией CompletableFuture (но порт Java 7 с зависимостью Guava существует ). Он использует ScheduledExecutorService внизу для запуска задач и планирования повторов в будущем — что позволяет лучше использовать потоки.

Но что действительно интересно, так это то, что вся библиотека полностью неизменна, нет ни одного изменяемого поля вообще. Поначалу это может показаться нелогичным, возьмем, к примеру, этот тривиальный пример кода:

1
2
3
4
5
6
7
8
9
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
 
AsyncRetryExecutor first = new AsyncRetryExecutor(scheduler).
    retryOn(Exception.class).
    withExponentialBackoff(500, 2);
 
AsyncRetryExecutor second = first.abortOn(FileNotFoundException.class);
 
AsyncRetryExecutor third = second.withMaxRetries(10);

Может показаться, что все методы with*() или retryOn() / abortOn() видоизменяют существующего исполнителя. Но это не так, каждое изменение конфигурации создает новый экземпляр , оставляя старый нетронутым. Так, например, пока first исполнитель будет повторять попытку FileNotFoundException , second и third не будут. Однако все они используют один и тот же scheduler . По этой причине AsyncRetryExecutor не закрывает ScheduledExecutorService (у него даже нет метода close() ). Поскольку мы не знаем, сколько существует копий AsyncRetryExecutor указывающих на один и тот же планировщик, мы даже не пытаемся управлять его жизненным циклом. Однако, как правило, это не проблема (см. Интеграцию Spring ниже).

Вам может быть интересно, почему такое неуклюжее дизайнерское решение? Есть три причины:

  • при написании параллельного кода неизменяемость может значительно снизить риск многопоточных ошибок. Например, RetryContext содержит количество попыток. Но вместо того, чтобы изменить его, мы просто создаем новый экземпляр (копию) с увеличенным, но final счетчиком. Никакое состояние гонки или видимость никогда не могут возникнуть.
  • если вам предоставлен существующий RetryExecutor который почти соответствует RetryExecutor но вам нужен один незначительный твик, вы просто вызываете executor.with...() и получаете свежую копию. Вам не нужно беспокоиться о других местах, где использовался тот же исполнитель (дополнительные примеры см. В разделе Spring )
  • функциональное программирование и неизменяемые структуры данных в наши дни сексуальны .

NB: AsyncRetryExecutor не помечен как final , вы можете нарушить неизменность, наследуя его и добавляя изменяемое состояние. Пожалуйста, не делайте этого, подклассы разрешены только для изменения поведения.

зависимости

Эта библиотека требует Java 8 и SLF4J для ведения журнала. Порт Java 7 дополнительно зависит от гуавы .

Весенняя интеграция

Если вы только собираетесь использовать RetryExecutor в Spring — не стесняйтесь, но API конфигурации может не работать для вас. Spring продвигает (или использовал для продвижения) соглашение об изменчивых сервисах с множеством сеттеров. В XML вы определяете bean и вызываете сеттеры (через <property name="..."/> ) для него. Это соглашение предполагает существование мутирующих сеттеров. Но я нашел такой подход склонным к ошибкам и противоречивым при некоторых обстоятельствах.

Допустим, мы глобально определили org.springframework.transaction.support.TransactionTemplate компонент org.springframework.transaction.support.TransactionTemplate и org.springframework.transaction.support.TransactionTemplate его в нескольких местах. Отлично. Теперь есть один единственный запрос, который требует немного другого времени ожидания:

1
2
@Autowired
private TransactionTemplate template;

и позже в том же классе:

1
2
3
4
final int oldTimeout = template.getTimeout();
template.setTimeout(10_000);
//do the work
template.setTimeout(oldTimeout);

Этот код неверен на многих уровнях! Прежде всего, если что-то не получается, мы никогда не восстанавливаем oldTimeout . ОК, finally на помощь. Но также обратите внимание, как мы изменили глобальный общий экземпляр TransactionTemplate . Кто знает, сколько других бинов и потоков собираются его использовать, не подозревая об измененной конфигурации?

И даже если вы действительно хотите глобально изменить время ожидания транзакции, достаточно справедливо, но это все равно неправильный способ сделать это. Поле private timeout не является volatile поэтому внесенные в него изменения могут быть или не быть видны другим потокам. Какой беспорядок! Та же проблема возникает со многими другими классами, такими как JmsTemplate .

Вы видите, куда я иду? Просто создайте один неизменный класс обслуживания и безопасно отрегулируйте его, создавая копии в любое время. И пользоваться такими услугами в наши дни одинаково просто:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
@Configuration
class Beans {
 
    @Bean
    public RetryExecutor retryExecutor() {
        return new AsyncRetryExecutor(scheduler()).
            retryOn(SocketException.class).
            withExponentialBackoff(500, 2);
    }
 
    @Bean(destroyMethod = "shutdownNow")
    public ScheduledExecutorService scheduler() {
        return Executors.newSingleThreadScheduledExecutor();
    }
 
}

Привет! Это 21 век, нам больше не нужен XML весной. Bootstrap также прост:

1
2
3
4
final ApplicationContext context = new AnnotationConfigApplicationContext(Beans.class);
final RetryExecutor executor = context.getBean(RetryExecutor.class);
//...
context.close();

Как видите, интеграция современных неизменяемых сервисов с Spring так же проста. Кстати, если вы не готовы к таким большим изменениям при разработке собственных служб, по крайней мере, подумайте о внедрении конструктора .

зрелость

Эта библиотека покрыта сильной батареей юнит-тестов. Однако он еще не использовался ни в одном рабочем коде, и API может быть изменен. Конечно, вам предлагается отправлять сообщения об ошибках, запросы функций и запросы на извлечение . Он был разработан с учетом Java 8, но существует бэкпорт Java 7 с немного более подробным API и обязательной зависимостью от Guava ( ListenableFuture вместо
CompletableFuture от Java 8 ).

Полный исходный код на GitHub.

Ссылка: Асинхронный шаблон повторов от нашего партнера JCG Томаша Нуркевича из блога Java и соседей .