Статьи

Пересмотр глобальной согласованности данных в распределенных (микросервисных) архитектурах

Еще в 2015 году я написал несколько статей о том, как можно использовать стандартный диспетчер транзакций Java EE для обеспечения согласованности данных в распределенных сервисах ( вот оригинальная статья и
Вот статья о том, как сделать это с помощью Spring Boot, Tomcat или Jetty ).

В прошлом году мне посчастливилось поработать над небольшим проектом, в котором мы ставили под сомнение согласованность данных с нуля. Мы пришли к выводу, что существует другой способ получить гарантии согласованности данных, который я не рассматривал в другой статье, которую я написал о шаблонах для привязки ресурсов к транзакциям . Другое решение — изменить архитектуру с синхронной на асинхронную. Основная идея заключается в сохранении бизнес-данных вместе с «командами» в рамках одной транзакции базы данных. Команды — это просто факты, которые еще нужно вызывать другим системам. Сокращая количество одновременных транзакций до одной, можно гарантировать, что данные никогда не будут потеряны. Команды, которые были зафиксированы, затем выполняются как можно скорее, и именно выполнение команды (в новой транзакции) затем делает вызовы удаленным системам. По сути, это реализация модели согласованности BASE, потому что с глобальной точки зрения данные только в конечном итоге непротиворечивы.

Представьте себе ситуацию, когда обновление страхового случая должно привести к созданию задачи в системе документооборота, чтобы человек получал напоминание о том, чтобы что-то сделать, например, написать клиенту. Код для обработки запроса на обновление страхового случая может выглядеть следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
@Inject
    EntityManager em;
 
    @PUT
    @Path("case")
    @Produces("application/json")
    public void updateCase(Case case) {
        case = em.merge(case);
 
        if(anEmployeeShouldWriteToTheCustomer(case)){
            long taskId = taskService
                .createTask(case.getNr(),
                            "Write to customer...");
            case.addTask(taskId);
        }
    }

Вызов службы задач приводит к удаленному вызову приложения задачи, представляющего собой микросервис, отвечающий за рабочий процесс и неавтоматизированные задачи (работа, которую должен выполнять человек).

Есть две проблемы с нашим сервисом, как описано выше. Прежде всего, представьте, что приложение задачи находится в автономном режиме во время вызова. Это снижает доступность нашего приложения. Для каждого дополнительного удаленного приложения, к которому подключается наше приложение, существует снижение доступности нашей системы. Представьте, что одно из этих приложений имеет допустимое время простоя 4 часа в месяц, а второе приложение имеет одно из 8 часов. Это может привести к тому, что наше приложение будет в автономном режиме в течение 12 часов в месяц, в дополнение к нашим собственным простоям, поскольку нет никакой гарантии, что простои будут происходить в одно и то же время.

Вторая проблема с дизайном сервиса, описанная выше, возникает, когда возникает проблема с передачей данных в базу данных после вызова приложения задачи. Приведенный выше код использует JPA, который может выбрать сброс операторов SQL, сгенерированных при вызове метода merge или обновлений объекта, через некоторое время после этих вызовов и, самое позднее, во время принятия. Это означает, что ошибка базы данных может возникнуть после вызова приложения задачи. Вызов базы данных может даже не работать по другим причинам, например, из-за недоступности сети. Концептуально у нас есть проблема, что мы могли бы создать задачу с просьбой сотрудника отправить письмо клиенту, но не было возможности обновить дело, поэтому у сотрудника может даже не быть информации, необходимой для написания письма.

Если бы приложение задачи было осведомлено о транзакции, т.е. могло быть связано с транзакцией, чтобы менеджер транзакций в нашем приложении мог справиться с удаленной фиксацией / откатом, это, безусловно, помогло бы избежать второй проблемы, описанной выше (согласованность данных). Но увеличение времени простоя не будет обработано.

Изменение архитектуры таким образом, что вызов приложения задачи происходит асинхронно, решит обе эти проблемы. Обратите внимание, что я не говорю о простом асинхронном вызове метода, но вместо этого я говорю о вызове приложения задачи после того, как наше приложение завершит транзакцию базы данных. Только в этот момент мы гарантируем, что никакие данные не будут потеряны. Затем мы можем попытаться выполнить удаленный вызов так часто, как это необходимо, пока задача не будет успешно создана. На этом этапе глобальные данные согласованы. Возможность повторения неудачных попыток означает, что система в целом становится более надежной и сокращается время простоя. Обратите внимание, что я также не говорю о неблокирующих методах, которые часто называют асинхронными .

Чтобы сделать это, я создал простую библиотеку, которая требует от разработчика двух вещей. Более подробная информация об элементарной реализации, используемой в демонстрационном приложении, доступна здесь . Прежде всего, разработчик должен сделать вызов CommandService , передавая данные, которые требуются при выполнении фактической команды. Во-вторых, разработчик должен предоставить реализацию команды, которую будет выполнять фреймворк. Первая часть выглядит так:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
public class TaskService {
 
    @Inject
    CommandService commandService;
 
    /** will create a command which causes a task to be
     *  created in the task app, asynchronously, but robustly. */
    public void createTask(long caseNr, String textForTask) {
        String context = createContext(caseNr, textForTask);
 
        Command command = new Command(CreateTaskCommand.NAME, context);
 
        commandService.persistCommand(command);
    }
 
    private String createContext(long nr, String textForTask) {
        //TODO use object mapper rather than build string ourselves...
        return "{\"caseNr\": " + nr + ", \"textForTask\": \"" + textForTask + "\"}";
    }

Показанная здесь служба команд принимает объект команды, который содержит две части информации: имя команды и строку JSON, содержащую данные, которые понадобятся команде. Более зрелая реализация, которую я написал для своего клиента, принимает объект в качестве входных данных, а не строку JSON, а API использует обобщенные элементы.

Реализация команды, предоставленная разработчиком, выглядит следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
public class CreateTaskCommand implements ExecutableCommand {
 
    public static final String NAME = "CreateTask";
 
    @Override
    public void execute(String idempotencyId, JsonNode context) {
        long caseNr = context.get("caseNr").longValue();
 
        CALL THE TASK MICROSERVICE HERE
    }
 
    @Override
    public String getName() { return NAME; }
}

Метод execute команды — это то, где разработчик реализует то, что нужно сделать. Я не показал код, используемый для вызова приложения задачи, поскольку он здесь не очень актуален, это просто HTTP-вызов.

Интересная часть такого асинхронного дизайна не в двух перечисленных выше списках, а в коде инфраструктуры, который обеспечивает выполнение команды. Алгоритм намного сложнее, чем вы можете подумать, потому что он должен быть в состоянии справляться со сбоями, что также приводит к блокировке. Когда выполняется вызов службы команд, происходит следующее:

  • Команда сохраняется в базе данных
  • Событие CDI запущено
  • Когда приложение фиксирует транзакцию, вызывается инфраструктура, поскольку она наблюдает за успешностью транзакции.
  • Платформа «резервирует» команду в базе данных, чтобы несколько экземпляров приложения не пытались выполнить одну и ту же команду одновременно
  • Фреймворк использует асинхронный вызов EJB для выполнения команды
  • Выполнение команды работает, используя контейнер для поиска реализаций интерфейса ExecutableCommand и используя любой, имя которого сохранено в команде
  • Все соответствующие команды выполняются путем вызова их метода execute , передавая им входные данные, которые были сохранены в базе данных.
  • Успешно выполненные команды удаляются из базы данных
  • Команды, которые потерпели неудачу, обновляются в базе данных, так что количество попыток выполнения увеличивается

Наряду с этим довольно сложным алгоритмом, фреймворк также нуждается в некотором домашнем обслуживании:

  • Периодически проверяйте, есть ли команды, которые нужно выполнить. Критерии:
    • Команда не выполнена, но не была предпринята попытка более 5 раз
    • Команда в данный момент не выполняется
    • Команда не висит
    • (более сложная реализация может также ограничить частоту попыток повторной попытки, например, через минуту, две минуты, затем четыре и т. д.)
  • Периодически проверяйте, есть ли команды, которые висят, и разблокируйте их, чтобы они были предприняты повторно.

Команды могут зависнуть, если, например, происходит сбой приложения во время выполнения. Итак, как вы можете видеть, решение не тривиально и, как таковое, относится к коду фреймворка, так что колесо не продолжает изобретаться. К сожалению, реализация во многом зависит от среды, в которой она должна работать, и поэтому очень затрудняет написание переносимой библиотеки (поэтому я не сделал ничего, кроме публикации классов в пакете commands демонстрационного приложения ). Интересно, что это даже зависит от используемой базы данных, потому что, например,
select for update не поддерживается должным образом Hibernate при использовании с Oracle . Для завершения, команды, которые терпят неудачу 5 раз, должны контролироваться, чтобы администратор мог решить проблему и обновить команды, чтобы они предпринимались повторно.

Правильный вопрос на этом этапе — является ли изменение архитектуры на асинхронную лучшим решением? На первый взгляд это выглядит так, как будто решает все проблемы с согласованностью данных. Но на самом деле есть несколько вещей, которые необходимо рассмотреть подробно. Вот несколько примеров.

А) Представьте себе, что после обновления страхового случая пользователь хочет закрыть его, и часть бизнес-правил, определяющих, может ли дело быть закрыто, включает проверку того, являются ли какие-либо задачи незавершенными. Лучшее место, чтобы проверить, являются ли какие-либо задачи незавершенными, — это приложение задач! Поэтому разработчик добавляет несколько строк кода для его вызова. На этом этапе это уже становится сложным, потому что разработчик должен сделать синхронный вызов приложения задачи или использовать команду? Совет дан ниже, и для простоты, давайте предположим, что вызов сделан синхронно. Но что, если три секунды назад приложение задачи не работало, и поэтому в нашей базе данных все еще есть незавершенная команда, которая при выполнении создаст задачу. Если бы мы просто полагались на приложение задачи, мы бы закрыли кейс и при следующей попытке выполнить неполную команду сохранили бы задачу, даже если кейс уже закрыт. Это становится грязно, потому что нам нужно создать дополнительную логику, чтобы заново открыть случай, когда пользователь щелкает задачу, чтобы справиться с ней. Более правильным решением было бы сначала задать задачу приложения, а затем проверить команды в нашей базе данных. Даже тогда, поскольку команды выполняются асинхронно, у нас могут возникнуть проблемы с синхронизацией, когда мы что-то упускаем. Общая проблема, которую мы имеем здесь, это проблема упорядочения. Хорошо известно, что в конечном итоге непротиворечивые системы страдают от проблем упорядочения и могут требовать дополнительных компенсаторных механизмов, таких как описанный выше, когда дело вновь открывается. Такие вещи могут иметь довольно сложные последствия для общего дизайна, поэтому будьте осторожны!

Б) Представьте, что в системном ландшафте происходит событие, в результате которого вызывается приложение случая для создания страхового случая. Представьте себе, что происходит второе событие, которое должно привести к обновлению этого случая. Представьте, что приложение, желающее создать и обновить дело, было реализовано асинхронно с использованием инфраструктуры команд. Наконец, представьте, что приложение case было недоступно во время первого события, поэтому команда для создания дела осталась в базе данных в незавершенном состоянии. Что произойдет, если вторая команда будет выполнена до первой, т. Е. Регистр будет обновлен еще до того, как он появится? Конечно, мы могли бы разработать приложение кейса, чтобы оно было умным, и если кейс не существует, он просто создает его в обновленном состоянии. Но что мы тогда делаем, когда выполняется команда создания дела? Обновляем ли мы его до исходного состояния? Это было бы плохо. Мы игнорируем вторую команду? Это может быть плохо, если какая-то бизнес-логика зависит от дельты, то есть изменения в кейсе. Я слышал, что такие системы, как Elastic Search, используют временные метки в запросах, чтобы решить, были ли они отправлены до текущего состояния, и он игнорирует такие вызовы. Мы создаем второй случай? Это может произойти, если мы не контролируем идемпотентность, и это тоже будет плохо. Можно реализовать некий сложный конечный автомат для команд отслеживания и, например, разрешить выполнение команды обновления только после команды создания. Но для этого требуется дополнительное место для сохранения команды обновления до тех пор, пока команда создания не будет выполнена. Итак, как вы можете видеть, проблемы с заказом снова возникают!

C) Когда нам нужно использовать команды, и когда мы можем избежать синхронных вызовов удаленных приложений? Похоже, что общее правило состоит в том, что, как только нам нужно получить доступ к более чем одному ресурсу для записи в него, мы должны использовать команды, если глобальная согласованность данных важна для нас. Поэтому, если для определенного вызова требуется много данных для чтения из нескольких удаленных приложений, чтобы мы могли обновить нашу базу данных, нет необходимости использовать команды, хотя может потребоваться реализовать идемпотентность или для вызывающей стороны реализовать какую-то механизма повторных попыток или действительно используйте команду для вызова нашей системы. Если, с другой стороны, мы хотим согласованно записывать данные в удаленное приложение и нашу базу данных, то нам нужно использовать команду для вызова удаленного приложения.

D) Что нам делать, если мы хотим вызвать несколько удаленных приложений? Если все они предлагают идемпотентные API-интерфейсы, то, по-видимому, нет проблем с вызовом их всех из одной команды. В противном случае может потребоваться использовать одну команду для вызова удаленного приложения. Если их необходимо вызывать в определенном порядке, необходимо, чтобы одна реализация команды создала команду, которая должна быть вызвана следующей в цепочке. Цепочка команд напоминает мне о хореографии. Возможно, будет проще или более легко поддерживать бизнес-процесс в качестве оркестровки. Смотрите здесь для более подробной информации.

E) Потоковое локальное хранилище (TLS) может вызвать головную боль, потому что команды не выполняются в том же потоке, который создает команду. Таким образом, такие механизмы, как внедрение CDI-компонентов @RequestScoped также больше не работают, как вы могли ожидать. @Asynchronous правила Java EE, которые применяются к @Asynchronous EJB, также применимы и здесь, именно потому, что код структуры использует этот механизм в своей реализации. Если вам нужен TLS или bean-компоненты с определенной областью действия, вам следует рассмотреть возможность добавления данных из таких мест во входные данные, которые сохраняются с помощью команды в базе данных, и, как только команда будет выполнена, восстановить состояние перед вызовом любой локальной службы / компонента, который опирается на это.

F) Что нам делать, если требуется ответ от удаленного приложения? Большую часть времени мы вызываем удаленные системы и нуждаемся в данных ответа от них, чтобы продолжить обработку. Иногда можно разделить чтение и запись, например, с помощью CQRS . Одно из решений состоит в том, чтобы разбить процесс на более мелкие этапы, чтобы каждый раз, когда требуется вызвать удаленную систему, она обрабатывается новой командой, и эта команда не только выполняет удаленный вызов, но и обновляет локальные данные при ответе. прибывает. Однако мы заметили, что при наличии оптимистической стратегии блокировки это может привести к ошибкам, когда пользователь хочет сохранить изменения, внесенные в его данные, которые теперь «устарели» по сравнению с версией в базе данных, даже если они может потребоваться изменить только некоторые атрибуты, которые команда не изменила. Одним из решений этой проблемы является передача событий из серверной части через веб-сокет клиенту, чтобы он мог выполнить частичное обновление атрибутов, затронутых командой, чтобы пользователь мог сохранить свои данные позже. Другим решением является вопрос, зачем вам нужны данные ответа. В приведенном выше примере я поместил идентификатор задачи в дело. Это может быть одним из способов отслеживать задачи, связанные с делом. Лучший способ — передать идентификатор дела в приложение задачи и заставить его сохранить идентификатор дела в задаче. Если вам нужен список задач, связанных с делом, вы запрашиваете их, используя * ваш * идентификатор, а не отслеживаете их идентификатор. Делая это, вы устраняете зависимость от данных ответа (кроме проверки того, что задача создана без ошибки), и поэтому нет необходимости обновлять ваши данные на основе ответа от удаленного приложения.

Надеюсь, мне удалось продемонстрировать, что асинхронная архитектура с использованием команд, описанных выше, предлагает подходящую альтернативу шаблонам для обеспечения согласованности глобальных данных, о которых я писал несколько лет назад.

Обратите внимание, что после внедрения фреймворка и применения его к нескольким нашим приложениям мы узнали, что мы не единственные, у кого есть такие идеи. Хотя я не читал о
Eventuate Tram и его транзакционные команды , похоже, очень похожи. Было бы интересно сравнить реализации.

Наконец, помимо команд, мы добавили «события» поверх команд. События в этом случае — это сообщения, отправленные через JMS, Kafka, выберите вашу любимую систему обмена сообщениями согласованным и гарантированным образом. Обе стороны, а именно публикация и использование события, реализованы в виде команды, которая обеспечивает очень хорошие хотя бы раз гарантии доставки. События сообщают 1..n приложениям в ландшафте, что что-то произошло, тогда как команды сообщают одному удаленному приложению что-то делать. Они вместе с технологией веб-сокетов и возможностью информировать клиентов об асинхронных изменениях в бэкэнде дополняют архитектуру, необходимую для обеспечения согласованности глобальных данных. О том, является ли такая асинхронная архитектура лучше, чем, скажем, копирование с помощью диспетчера транзакций, чтобы гарантировать глобальную согласованность данных, я еще изучаю. Оба имеют свои проблемы, преимущества и недостатки. Вероятно, лучшее решение основано на миксе, как это обычно бывает со сложной программной системой 🙂

См. Оригинальную статью здесь: Пересмотр глобальной согласованности данных в распределенных (микросервисных) архитектурах.

Мнения, высказанные участниками Java Code Geeks, являются их собственными.