Статьи

Потоковая передача данных из узла Corda с помощью Spring WebFlux

Прошло много времени с моего последнего поста, но я наконец вернулся! Поскольку я все еще работаю над своим проектом, я буду снова писать об использовании Corda. На этот раз, вместо того, чтобы сосредоточиться на Corda, мы рассмотрим использование Spring с Corda. Точнее, Spring WebFlux. Зачем это делать? Один, потому что мы можем. Два, потому что это позволяет нам передавать события, выходящие из узла Corda. Это дает нам возможность отслеживать ход потоков или получать обновления в хранилище и отправлять их любым клиентам, зарегистрированным на соответствующих конечных точках. Использование WebFlux с Corda создало несколько проблем. Некоторые происходят из Корды, а некоторые из весны. Хотя проблемы Spring были связаны со мной, я ожидал, что комбо Spring Boot + WebFlux по умолчанию сделает для меня больше.

Corda

В этом посте я собираюсь предположить, что у вас есть некоторый опыт работы с Corda, но если вам нужна дополнительная информация по этому вопросу, я рекомендую прочитать мои предыдущие посты: Что такое Corda и как развиваться с помощью Corda . Кроме того, я также предлагаю взглянуть на « Делать вещи с Spring WebFlux» как введение в WebFlux.

Версия Corda с открытым исходным кодом 3.2 будет использоваться для содержания этого руководства. На самом деле я начал писать этот пост на основе 3.1 но за это время была выпущена более новая версия. В связи с этим есть несколько комментариев, основанных на перемещении между этими версиями.

Мы также будем реализовывать все на Kotlin, но содержание этого поста может быть реализовано и на Java.

Введение в пример приложения

Мы будем моделировать действительно простое приложение, которое не дает много пользы, и это то, что я испортил вместе ради этого поста. Приложение будет состоять из одной стороны, отправляющей сообщение (представленное MessageState ) другой стороне. Для этого будет запущен SendMessageFlow , и после этого обе стороны получат копию сообщения, и все. Коротко и просто, но должно предоставить нам достаточно, чтобы продемонстрировать, как WebFlux может работать с Corda.

Состав

Обычно я начинаю с рассмотрения зависимостей. Хотя, поскольку я разделил код на отдельные модули, было бы лучше сначала просмотреть структуру небольшого примера приложения.

01
02
03
04
05
06
07
08
09
10
+-- app
|   +-- {spring code}
|   +-- build.gradle
+-- cordapp
|   +-- {flow code}
|   +-- build.gradle
+-- contracts-and-states
|   +-- {contracts and states code}
|   +-- build.gradle
+-- build.gradle

Это быстрый взгляд на структуру приложения. app будет содержать весь код Spring и делегировать его узлу Corda через RPC. Модуль cordapp содержит логику потока, а contracts-and-states выполняют то, что предлагает название, и содержат контракт и код состояния. cordapp и contracts-and-states упаковываются в Jar Cordapp и выгружаются в узел Corda.

Каждый из этих модулей содержит файл build.gradle содержащий соответствующую информацию о сборке и зависимости. Поскольку этот пост не посвящен непосредственно написанию кода Corda, мы не будем подробно останавливаться на каждом модуле и его файлах сборки. Вместо этого мы будем только перелистывать код потока в конце публикации, чтобы сосредоточиться на реализации Spring.

Зависимости для модуля Spring

Ниже build.gradle файл build.gradle модуля app (содержащий код Spring):

Я не эксперт в Gradle, поэтому в этом фрагменте, возможно, есть некоторые вещи, которые можно было бы сделать лучше, но он делает то, что ему нужно.

Итак, я хочу выделить несколько вещей. Spring Boot 2.0.3.RELEASE используется, и для этого плагин kotlin-spring используется для добавления open ко всем классам Kotlin, помеченным определенными аннотациями Spring. Это необходимо для многих ситуаций, поскольку Spring требует, чтобы некоторые классы не были финальными. Это не проблема в Java, но проблематична для Kotlin, так как все классы являются окончательными по умолчанию. Более подробную информацию о плагине можно найти на kotlinlang.org .

spring-boot-starter-webflux извлекает зависимости WebFlux вместе с общим кодом веб-сервера Spring для запуска и запуска.

rxjava-reactive-streams , это интересный, который мы увидим позже. Поскольку Corda использует RxJava 1.xx а не более новый RxJava2, его Observable не реализуют интерфейс Java 8 Publisher который Spring WebFlux использует для возврата реактивных потоков. Эта зависимость преобразует эти старые Observable в Publisher чтобы они были совместимы с WebFlux. Мы еще вернемся к этому позже, когда посмотрим на код для этого преобразования.

Наконец, версия netty-all вынуждена использовать версию 4.1.25.Final для решения проблемы зависимости.

Функции маршрутизации

WebFlux представляет функциональный подход для маршрутизации запросов к функциям, которые их обрабатывают. Более подробную информацию об этом можно найти в разделе Работа с Spring WebFlux . Я не хочу углубляться в то, как работает WebFlux, но мы кратко рассмотрим определение функций маршрутизации. Основная причина этого заключается в использовании Kotlin вместо Java. Kotlin предоставляет другой способ определения функций с помощью DSL.

Ниже приведен код для определения маршрутизации для этого урока:

MessageHandler компонент routes принимает бин MessageHandler (который мы рассмотрим позже) и отображает два URI на функции, найденные в этом MessageHandler . DSL допускает немного более короткую версию по сравнению с реализацией Java. В этом фрагменте есть несколько моментов.

("/messages") определяет базовый путь запроса двух функций маршрутизации. DSL позволяет функциям вкладывать себя в этот базовый путь и помогает в передаче структуры маршрутов.

Одна функция принимает TEXT_EVENT_STREAM ( text/event-stream ) в ответе, возвращенном при отправке запроса, а также указывает APPLICATION_JSON ( application/stream+json ) в качестве содержимого тела. Поскольку мы определили Content-Type , в большинстве случаев мы можем предположить, что мы будем отправлять POST запрос (которым мы являемся). POST дополнительно вложен в предыдущую конфигурацию и добавляет еще одну функцию MessageHandler для приема запросов.

Вторая функция получает обновления от узла Corda. Для этого он возвращает APPLICATION_STREAM_JSON и ожидает, что GET запрос будет отправлен в /messages/updates .

Функции обработчика

В этом разделе мы рассмотрим MessageHandler который несколько раз упоминался в предыдущем разделе. Этот класс содержит все функции, которые выполняют актуальную бизнес-логику. Маршрутизация была просто средством для достижения этой точки.

В моем предыдущем посте, посвященном работе с Spring WebFlux, будет более подробно объяснено больше специфических частей WebFlux в этих примерах, чем в этом посте.

Ниже приведен код обработчика:

Во-первых, мы должны выделить класс NodeRPCConnection и его proxy свойства типа CordaRPCOps . Я украл NodeRPCConnection из примера приложения Corda и Spring (написанного сотрудником R3). Короче говоря, NodeRPCConnection создает RPC-соединение с узлом Corda, а proxy возвращает CordaRPCOps . CordaRPCOps содержит все операции RPC, которые доступны для использования. Именно так Spring будет взаимодействовать с узлом Corda.

Давайте внимательнее посмотрим на функцию updates :

Эта функция возвращает новые сообщения по мере их сохранения в хранилище. Такой вид конечной точки был бы полезен, если бы у вас было приложение, которое отслеживало обновления, поступающие с вашего узла Corda.

Код, связанный с Corda в этом фрагменте, содержится в функции trackNewMessages . Он использует CordaRPCOps от vaultTrackBy для доступа к службе хранилища и начинает отслеживать обновления любых сообщений MessageState . Поскольку мы не передали в функцию никаких аргументов, она будет отслеживать только UNCONSUMED состояния. vaultTrackBy возвращает объект DataFeed который можно использовать для извлечения моментального снимка хранилища через свойство snapshot или путем обращения к свойству updates будет возвращено Observable , позволяющее подписаться на его события обновления. Этот RxJava Observable — это то, что мы будем использовать для потоковой передачи данных обратно вызывающей стороне.

Это первый случай, когда нам нужно использовать rxjava-reactive-streams которые я упоминал ранее. Метод toPublisher принимает Observable и преобразует его в Publisher . Помните, что для WebFlux требуются Java 8-совместимые библиотеки реактивной потоковой передачи, которые должны реализовывать Publisher . Например, Spring имеет тенденцию использовать Reactor, который предоставляет классы Mono и Flux .

После создания Publisher его необходимо ввести в ServerResponse . Поскольку в этот момент все прошло хорошо, мы вернем ответ 200 помощью метода ok . Тип Content-Type затем устанавливается на APPLICATION_STREAM_JSON поскольку он содержит потоковые данные. Наконец, тело ответа принимает Publisher из trackNewMessages . Теперь конечная точка готова для подписки запрашивающим клиентом.

Функциональность для потоковой передачи обновлений от узла к клиенту завершена. Как насчет сохранения нового сообщения? Кроме того, есть ли какая-либо информация, которую мы можем передать отправителю о выполнении потока? Итак, давайте ответим на эти два вопроса. Да, мы можем сохранить новое сообщение, используя WebFlux. И да, поток может вернуть свой текущий прогресс.

Ниже приведен код функции post которая сохраняет новое сообщение для узлов отправителя и получателя при потоковой передаче прогресса потока:

proxy.startTrackedFlow запускает поток, ход которого может быть отслежен любым ProgressTracker добавленным в поток. startTrackedFlow определенный в этом классе, делегирует вышеупомянутую функцию и возвращает ее свойство progress ; Observable<String> чьи события состоят из ProgressTracker .

MessageState который передается в поток, создается из объекта Message переданного из запроса. Это должно облегчить ввод данных сообщения в конечную точку, поскольку они содержат меньше информации, чем сам MessageState . parse преобразует имя строки X500, переданное в Message в имя CordaX500Name а затем в сторону внутри сети, если она существует.

Затем он упаковывается в ответ через created метод. Content-Type указывается, чтобы сообщить клиенту, что он содержит text/event-stream . Путь к сообщению использует UUID который был создан до выполнения потока. Это может быть использовано, например, для получения определенного сообщения, но вам нужно реализовать это самостоятельно, так как мне лень это делать для этого поста.

Создание клиента

Теперь, когда конечные точки настроены, мы должны создать клиента, который может отправлять запросы и использовать потоки, отправленные обратно. Позже мы кратко рассмотрим код потока, чтобы получить более полное представление о том, что происходит.

Для отправки запросов в реактивный бэкэнд Spring WebFlux предоставляет класс WebClient . После отправки запроса WebClient может реагировать на каждое событие, отправленное в ответе. MessageClient ниже делает именно это:

MessageClient и использует WebClient для отправки запросов на адрес, указанный в конструкторе WebClient . В этом классе происходит некоторая дополнительная конфигурация, связанная с десериализацией, но сейчас я хочу рассказать об этом, поскольку есть раздел, посвященный этой теме.

Как и прежде Работа с Spring WebFlux дает подробные пояснения к конкретным методам WebFlux.

Итак, давайте рассмотрим каждый запрос отдельно, сначала запрос POST до конечной точки /messages :

Метод post создает конструктор, который определяет содержимое запроса. Это должно соответствовать конечной точке, которую мы определили ранее. Как только запрос будет создан, вызовите метод exchange чтобы отправить его на сервер. Тело ответа затем сопоставляется с Flux<String> позволяющим подписаться на него. В этом суть использования реактивных потоков. После подписки на ответ клиент должен выполнить любую обработку, которую он хочет выполнить для каждого события. В этом случае он просто распечатывает текущий шаг ProgressTracker .

Если мы отправим запрос через этот фрагмент кода, мы получим следующее:

01
02
03
04
05
06
07
08
09
10
11
STEP: Verifying
STEP: Signing
STEP: Sending to Counterparty
STEP: Collecting signatures from counterparties.
STEP: Verifying collected signatures.
STEP: Done
STEP: Finalising
STEP: Requesting signature by notary service
STEP: Broadcasting transaction to participants
STEP: Done
STEP: Done

Это шаги, которые определяет ProgressTracker от SendMessageFlow . Да, я знаю, что еще не показывал вам этот код, но просто поверьте мне в этом. Не намного больше этого. Как видите, каждое строковое значение, возвращаемое потоком, присоединяет «STEP» к себе

Теперь перейдем к GET запросу к конечной точке /messages/update :

Опять же, пока что особо нечего показать. Но за кулисами на самом деле требуется немало работы, чтобы заставить это работать. Все проблемы, с которыми я столкнулся, чтобы заставить этот вызов работать, вращались вокруг сериализации и десериализации. Мы углубимся в это в следующем разделе.

Ответ на этот запрос следующий:

01
02
03
04
05
06
07
08
09
10
11
12
13
UPDATE: 0 consumed, 1 produced
 
Consumed:
 
Produced:
56781DF3CEBF2CDAFACE1C5BF04D4962B5483FBCD2C2E428352AD82BC951C686(0)
: TransactionState(data=MessageState(sender=O=PartyA, L=London, C=GB,
recipient=O=PartyB, L=London, C=GB, contents=hello there,
linearId=1afc6144-32b1-4265-a06e-73b6bb81aef3_b0fa8491-c9b9-418c-ba6e-8b7840faaf30,
participants=[O=PartyA, L=London, C=GB, O=PartyB, L=London, C=GB]),
contract=com.lankydanblog.tutorial.contracts.MessageContract,
notary=O=Notary, L=London, C=GB, encumbrance=null,
constraint=net.corda.core.contracts.WhitelistedByZoneAttachmentConstraint@4a1febb5)

Приятной особенностью этой конечной точки является то, что она теперь поддерживает соединение с узлом, который будет продолжать отправлять любые связанные обновления обратно этому клиенту. Вышеупомянутый запрос был обновлением для исходного сообщения POST . Любые новые события, полученные клиентом, будут выводить обновление на клиенте. Это то, что делает этот вид конечной точки идеальным для запуска процесса или просто отображения актуальных данных на внешнем интерфейсе, отдельном от самого узла Corda.

Сериализация и десериализация

В этом разделе я хотел сосредоточиться на правильной настройке сериализации и десериализации. Данные, извлеченные из конечной точки /messages/updates должны правильно сериализовать свои данные, чтобы передать их клиенту, который также должен иметь возможность десериализации данных ответа.

Обычно Spring делает много за вас, и он все еще делает, но кажется, что с WebFlux есть некоторые дополнительные шаги, необходимые для его правильной настройки. Отказ от ответственности, это из моего опыта, и если вы знаете лучшие способы сделать это, мне было бы интересно услышать от вас.

Корда ДжексонПоддержка

Spring имеет тенденцию использовать Джексона по умолчанию, и, очень удобно, Corda предоставляет множество настроек Джексона. JacksonSupport.cordaModule обеспечивает некоторую сериализацию и десериализацию для таких классов, как Party и CordaX500Name . Если у вас есть некоторые базовые ситуации, когда вам нужно сериализовать или десериализовать класс Corda, это, вероятно, подойдет вам. В Spring вы можете создать бин, который по умолчанию ObjectMapper будет извлекать и добавлять к себе.

Но у этого маршрута есть несколько предостережений. Некоторые классы не могут быть десериализованы, поскольку модуль полагается на ObjectMapper имеющий доступ к информации об узле, например, через RPC-клиент CordaRPCOps . Без этого десериализация Party , AbstractParty или AnonymousParty не удастся. Мало того, но теперь это не рекомендуется для Corda 3.2 из-за того, что он не безопасен для потоков. JacksonSupport.cordaModule также был перемещен в свой собственный класс ( CordaModule ).

Решение, которое я даю ниже, также является решением, которое Corda рекомендует принимать с этого момента.

Ниже приведено исключение, которое MessageClient когда MessageClient получает обновления от конечной точки /messages/updates (для остальной части этого раздела будет использоваться та же самая конечная точка):

1
com.fasterxml.jackson.databind.ObjectMapper cannot be cast to net.corda.client.jackson.JacksonSupport$PartyObjectMapper

Исходя из этого, мы можем определить, что наш ObjectMapper имеет неправильный тип и на самом деле должен быть подтипом PartyObjectMapper . Пройдя немного дальше, мы увидим, что этот преобразователь также находится в классе JacksonSupport . Теперь все, что осталось сделать, это создать этот картограф и использовать его вместо стандартного ObjectMapper .

Итак, давайте посмотрим, как это сделать:

Это создаст RpcObjectMapper который реализует PartyObjectMapper и использует RPC для извлечения информации об узле, чтобы сделать возможным десериализацию различных классов сторон. Внутри createDefaultMapper, и, благодаря Spring, теперь он будет CordaModule отображения объектов по умолчанию для большинства (заметьте больше для последующих) случаев, когда требуется сериализация или десериализация.

Еще несколько конфигураций сериализации и десериализации

Сейчас … Я на самом деле в довольно странном положении. Я хотел пройти через все остальные шаги, чтобы заставить конечную точку работать. Но, независимо от того, что я делаю, я не могу воссоздать все ошибки, с которыми я сталкивался, прежде чем заставить его работать. Я не знаю, что сказать … Где-то мои исключения проглатываются и мешают мне видеть, что происходит. Во всяком случае, мы должны продолжать. К счастью, я знаю, почему я добавил остальную часть кода, но больше не могу предоставить вам исключение, что каждое изменение исправлено …

Так, давайте посмотрим на конечный продукт rpcObjectMapper , rpcObjectMapper которым мы начали работать ранее:

Здесь есть несколько дополнений. JsonComponentModule добавляется в виде bean-компонента, так что он выбирает определенные пользовательские компоненты @JsonSerializer и @JsonDeserializer (в других классах). Кажется, что даже если он добавлен в модуль отображения как модуль, он все равно требует создания самого компонента, если он собирается найти и зарегистрировать пользовательские компоненты JSON.

Далее идет MixinModule . Этот класс решает проблемы, возникающие при десериализации Vault.Update и SecureHash . Давайте внимательнее посмотрим.

Mixin позволяет нам добавлять аннотации Джексона в класс, фактически не имея доступа к самому классу, который мы явно не контролируем, поскольку это объект из кодовой базы Corda. Другой вариант заключается в том, что это добавляется в CordaModule мы обсуждали ранее, но это другой разговор.

Vault.Update нуждается в этом из-за наличия метода isEmpty , который плохо работает с Джексоном, который запутывается и думает, что isEmpty соответствует логическому полю, называемому empty . Поэтому при десериализации JSON обратно в объект он пытается передать значение для поля.

Сам MixinModule — это просто класс, конструктор которого добавляет VaultUpdateMixin и SecureHashMixin к себе. Затем картограф добавляет модуль, как и любой другой модуль. Работа выполнена.

Добавленная в VaultUpdateMixin аннотация Джексона была @JsonIgnore , что говорит само за себя. При сериализации или десериализации функция isEmpty будет игнорироваться.

Далее идет SecureHashMixin :

Я добавил это после перехода с 3.1 на 3.2 . Мне кажется, что добавление SecureHash для SecureHash было забыто. CordaModule включает сериализацию и десериализацию для SecureHash.SHA256 но не SecureHash . Приведенный выше код является копией и CordaModule из CordaModule с другим классом, связанным с Mixin.

Как только это будет включено, различия между 3.1 и 3.2 будут устранены.

Я думаю, я подниму вопрос для этого!

Пользовательские сериализаторы и десериализаторы

Для сериализации Vault.Update только интерфейсу AttachmentConstraint необходим собственный настраиваемый сериализатор:

Не о чем говорить, поскольку только HashAttachmentConstraint имеет поля. Это соответствует десериализатору, который позже считывает поле type JSON, чтобы определить, какой объект создан.

Последние два класса, которым требуются пользовательские десериализаторы, — это ContractState и AttachmentContract (совпадающие с сериализатором из ранее):

ContractStateDeserialiser — довольно ленивая реализация, поскольку в этом руководстве используется только одно состояние. AttachmentConstraintDeserialiser использует поле type определенное в сериализаторе, чтобы определить, в какую реализацию AttachmentConstraint следует преобразовать его.

Конкретная конфигурация WebFlux

В этом подразделе рассматриваются дополнительные необходимые настройки, связанные с использованием WebFlux. Вы уже видели некоторую конфигурацию в MessageClient но нужно сделать немного больше:

Клиенту нужен этот компонент, чтобы иметь возможность десериализовать application/stream+json вместе с объектами, возвращаемыми в ответе.

Чтобы использовать Jackson2JsonDecoder определенный в конфигурации, необходимо указать ExchangeStrategies WebClient . К сожалению, класс ExchangeStrategies не написан для того, чтобы подобрать уже созданный нами Jackson2JsonDecoder . Я надеялся, что такая конфигурация будет работать по умолчанию, ну да ладно. Чтобы добавить ExchangeStrategies необходимо использовать конструктор WebClient . Как только это будет сделано, мы наконец там. Вся сериализация для упаковки ответа и десериализация для использования его от клиента завершена.

Это подводит итог всего кода, связанного со Spring, который я хочу обсудить в этом посте.

Быстрый взгляд на код потока

Прежде чем закончить, я кратко покажу последовательность действий, которую я собрал для целей этого урока:

Это довольно простой поток с добавлением ProgressTracker который используется в запросе /messages для отслеживания текущего состояния потока. Короче говоря, этот поток принимает MessageState переданный в него, и отправляет его контрагенту. При движении по потоку ProgressTracker обновляется до соответствующего шага. Дополнительную документацию по использованию ProgressTracker можно найти в документации Corda .

Время закрытия

Честно говоря, это было намного дольше, чем я думал, и заняло у меня гораздо больше времени, чем я надеялся.

В заключение, Spring WebFlux предоставляет возможность использовать реактивные потоки для обработки событий ответа, когда бы они не поступили. При использовании с Corda можно отслеживать ход потока, а постоянный поток обновлений хранилища можно поддерживать готовым к действию по мере их поступления. Чтобы полностью использовать WebFlux с Corda, мы также должны были убедиться в том, что объекты были правильно сериализованы сервером, а затем десериализованы клиентом, чтобы их можно было использовать. Lucky Corda действительно предоставляет некоторые из них, но один или два класса или функции отсутствуют, и мы должны убедиться, что мы используем их при условии отображения объектов. К сожалению, WebFlux требует немного больше конфигурации, чем я обычно привык при использовании модулей Spring, но ничего такого, что не может быть исправлено.

Остальной код для этого поста можно найти на моем GitHub

Если вам понравился этот пост, вы можете подписаться на меня в твиттере на @LankyDanDev, где я публикую обновления своих новых сообщений (хотя в последнее время они немного замедлились).

Опубликовано на Java Code Geeks с разрешения Дэна Ньютона, партнера нашей программы JCG . См. Оригинальную статью здесь: потоковая передача данных из узла Corda с помощью Spring WebFlux

Мнения, высказанные участниками Java Code Geeks, являются их собственными.