Прошло много времени с моего последнего поста, но я наконец вернулся! Поскольку я все еще работаю над своим проектом, я буду снова писать об использовании Corda. На этот раз, вместо того, чтобы сосредоточиться на Corda, мы рассмотрим использование Spring с Corda. Точнее, Spring WebFlux. Зачем это делать? Один, потому что мы можем. Два, потому что это позволяет нам передавать события, выходящие из узла Corda. Это дает нам возможность отслеживать ход потоков или получать обновления в хранилище и отправлять их любым клиентам, зарегистрированным на соответствующих конечных точках. Использование WebFlux с Corda создало несколько проблем. Некоторые происходят из Корды, а некоторые из весны. Хотя проблемы Spring были связаны со мной, я ожидал, что комбо Spring Boot + WebFlux по умолчанию сделает для меня больше.
В этом посте я собираюсь предположить, что у вас есть некоторый опыт работы с Corda, но если вам нужна дополнительная информация по этому вопросу, я рекомендую прочитать мои предыдущие посты: Что такое Corda и как развиваться с помощью Corda . Кроме того, я также предлагаю взглянуть на « Делать вещи с Spring WebFlux» как введение в WebFlux.
Версия Corda с открытым исходным кодом 3.2
будет использоваться для содержания этого руководства. На самом деле я начал писать этот пост на основе 3.1
но за это время была выпущена более новая версия. В связи с этим есть несколько комментариев, основанных на перемещении между этими версиями.
Мы также будем реализовывать все на Kotlin, но содержание этого поста может быть реализовано и на Java.
Введение в пример приложения
Мы будем моделировать действительно простое приложение, которое не дает много пользы, и это то, что я испортил вместе ради этого поста. Приложение будет состоять из одной стороны, отправляющей сообщение (представленное MessageState
) другой стороне. Для этого будет запущен SendMessageFlow
, и после этого обе стороны получат копию сообщения, и все. Коротко и просто, но должно предоставить нам достаточно, чтобы продемонстрировать, как WebFlux может работать с Corda.
Состав
Обычно я начинаю с рассмотрения зависимостей. Хотя, поскольку я разделил код на отдельные модули, было бы лучше сначала просмотреть структуру небольшого примера приложения.
01
02
03
04
05
06
07
08
09
10
|
+-- app | +-- {spring code} | +-- build.gradle +-- cordapp | +-- {flow code} | +-- build.gradle +-- contracts-and-states | +-- {contracts and states code} | +-- build.gradle +-- build.gradle |
Это быстрый взгляд на структуру приложения. app
будет содержать весь код Spring и делегировать его узлу Corda через RPC. Модуль cordapp
содержит логику потока, а contracts-and-states
выполняют то, что предлагает название, и содержат контракт и код состояния. cordapp
и contracts-and-states
упаковываются в Jar Cordapp и выгружаются в узел Corda.
Каждый из этих модулей содержит файл build.gradle
содержащий соответствующую информацию о сборке и зависимости. Поскольку этот пост не посвящен непосредственно написанию кода Corda, мы не будем подробно останавливаться на каждом модуле и его файлах сборки. Вместо этого мы будем только перелистывать код потока в конце публикации, чтобы сосредоточиться на реализации Spring.
Зависимости для модуля Spring
Ниже build.gradle
файл build.gradle
модуля app
(содержащий код Spring):
Я не эксперт в Gradle, поэтому в этом фрагменте, возможно, есть некоторые вещи, которые можно было бы сделать лучше, но он делает то, что ему нужно.
Итак, я хочу выделить несколько вещей. Spring Boot 2.0.3.RELEASE
используется, и для этого плагин kotlin-spring
используется для добавления open
ко всем классам Kotlin, помеченным определенными аннотациями Spring. Это необходимо для многих ситуаций, поскольку Spring требует, чтобы некоторые классы не были финальными. Это не проблема в Java, но проблематична для Kotlin, так как все классы являются окончательными по умолчанию. Более подробную информацию о плагине можно найти на kotlinlang.org .
spring-boot-starter-webflux
извлекает зависимости WebFlux вместе с общим кодом веб-сервера Spring для запуска и запуска.
rxjava-reactive-streams
, это интересный, который мы увидим позже. Поскольку Corda использует RxJava 1.xx
а не более новый RxJava2, его Observable
не реализуют интерфейс Java 8 Publisher
который Spring WebFlux использует для возврата реактивных потоков. Эта зависимость преобразует эти старые Observable
в Publisher
чтобы они были совместимы с WebFlux. Мы еще вернемся к этому позже, когда посмотрим на код для этого преобразования.
Наконец, версия netty-all
вынуждена использовать версию 4.1.25.Final
для решения проблемы зависимости.
Функции маршрутизации
WebFlux представляет функциональный подход для маршрутизации запросов к функциям, которые их обрабатывают. Более подробную информацию об этом можно найти в разделе Работа с Spring WebFlux . Я не хочу углубляться в то, как работает WebFlux, но мы кратко рассмотрим определение функций маршрутизации. Основная причина этого заключается в использовании Kotlin вместо Java. Kotlin предоставляет другой способ определения функций с помощью DSL.
Ниже приведен код для определения маршрутизации для этого урока:
MessageHandler
компонент routes
принимает бин MessageHandler
(который мы рассмотрим позже) и отображает два URI на функции, найденные в этом MessageHandler
. DSL допускает немного более короткую версию по сравнению с реализацией Java. В этом фрагменте есть несколько моментов.
("/messages")
определяет базовый путь запроса двух функций маршрутизации. DSL позволяет функциям вкладывать себя в этот базовый путь и помогает в передаче структуры маршрутов.
Одна функция принимает TEXT_EVENT_STREAM
( text/event-stream
) в ответе, возвращенном при отправке запроса, а также указывает APPLICATION_JSON
( application/stream+json
) в качестве содержимого тела. Поскольку мы определили Content-Type
, в большинстве случаев мы можем предположить, что мы будем отправлять POST
запрос (которым мы являемся). POST
дополнительно вложен в предыдущую конфигурацию и добавляет еще одну функцию MessageHandler
для приема запросов.
Вторая функция получает обновления от узла Corda. Для этого он возвращает APPLICATION_STREAM_JSON
и ожидает, что GET
запрос будет отправлен в /messages/updates
.
Функции обработчика
В этом разделе мы рассмотрим MessageHandler
который несколько раз упоминался в предыдущем разделе. Этот класс содержит все функции, которые выполняют актуальную бизнес-логику. Маршрутизация была просто средством для достижения этой точки.
В моем предыдущем посте, посвященном работе с Spring WebFlux, будет более подробно объяснено больше специфических частей WebFlux в этих примерах, чем в этом посте.
Ниже приведен код обработчика:
Во-первых, мы должны выделить класс NodeRPCConnection
и его proxy
свойства типа CordaRPCOps
. Я украл NodeRPCConnection
из примера приложения Corda и Spring (написанного сотрудником R3). Короче говоря, NodeRPCConnection
создает RPC-соединение с узлом Corda, а proxy
возвращает CordaRPCOps
. CordaRPCOps
содержит все операции RPC, которые доступны для использования. Именно так Spring будет взаимодействовать с узлом Corda.
Давайте внимательнее посмотрим на функцию updates
:
Эта функция возвращает новые сообщения по мере их сохранения в хранилище. Такой вид конечной точки был бы полезен, если бы у вас было приложение, которое отслеживало обновления, поступающие с вашего узла Corda.
Код, связанный с Corda в этом фрагменте, содержится в функции trackNewMessages
. Он использует CordaRPCOps
от vaultTrackBy
для доступа к службе хранилища и начинает отслеживать обновления любых сообщений MessageState
. Поскольку мы не передали в функцию никаких аргументов, она будет отслеживать только UNCONSUMED
состояния. vaultTrackBy
возвращает объект DataFeed
который можно использовать для извлечения моментального снимка хранилища через свойство snapshot
или путем обращения к свойству updates
будет возвращено Observable
, позволяющее подписаться на его события обновления. Этот RxJava Observable
— это то, что мы будем использовать для потоковой передачи данных обратно вызывающей стороне.
Это первый случай, когда нам нужно использовать rxjava-reactive-streams
которые я упоминал ранее. Метод toPublisher
принимает Observable
и преобразует его в Publisher
. Помните, что для WebFlux требуются Java 8-совместимые библиотеки реактивной потоковой передачи, которые должны реализовывать Publisher
. Например, Spring имеет тенденцию использовать Reactor, который предоставляет классы Mono
и Flux
.
После создания Publisher
его необходимо ввести в ServerResponse
. Поскольку в этот момент все прошло хорошо, мы вернем ответ 200
помощью метода ok
. Тип Content-Type
затем устанавливается на APPLICATION_STREAM_JSON
поскольку он содержит потоковые данные. Наконец, тело ответа принимает Publisher
из trackNewMessages
. Теперь конечная точка готова для подписки запрашивающим клиентом.
Функциональность для потоковой передачи обновлений от узла к клиенту завершена. Как насчет сохранения нового сообщения? Кроме того, есть ли какая-либо информация, которую мы можем передать отправителю о выполнении потока? Итак, давайте ответим на эти два вопроса. Да, мы можем сохранить новое сообщение, используя WebFlux. И да, поток может вернуть свой текущий прогресс.
Ниже приведен код функции post
которая сохраняет новое сообщение для узлов отправителя и получателя при потоковой передаче прогресса потока:
proxy.startTrackedFlow
запускает поток, ход которого может быть отслежен любым ProgressTracker
добавленным в поток. startTrackedFlow
определенный в этом классе, делегирует вышеупомянутую функцию и возвращает ее свойство progress
; Observable<String>
чьи события состоят из ProgressTracker
.
MessageState
который передается в поток, создается из объекта Message
переданного из запроса. Это должно облегчить ввод данных сообщения в конечную точку, поскольку они содержат меньше информации, чем сам MessageState
. parse
преобразует имя строки X500, переданное в Message
в имя CordaX500Name
а затем в сторону внутри сети, если она существует.
Затем он упаковывается в ответ через created
метод. Content-Type
указывается, чтобы сообщить клиенту, что он содержит text/event-stream
. Путь к сообщению использует UUID
который был создан до выполнения потока. Это может быть использовано, например, для получения определенного сообщения, но вам нужно реализовать это самостоятельно, так как мне лень это делать для этого поста.
Создание клиента
Теперь, когда конечные точки настроены, мы должны создать клиента, который может отправлять запросы и использовать потоки, отправленные обратно. Позже мы кратко рассмотрим код потока, чтобы получить более полное представление о том, что происходит.
Для отправки запросов в реактивный бэкэнд Spring WebFlux предоставляет класс WebClient
. После отправки запроса WebClient
может реагировать на каждое событие, отправленное в ответе. MessageClient
ниже делает именно это:
MessageClient
и использует WebClient
для отправки запросов на адрес, указанный в конструкторе WebClient
. В этом классе происходит некоторая дополнительная конфигурация, связанная с десериализацией, но сейчас я хочу рассказать об этом, поскольку есть раздел, посвященный этой теме.
Как и прежде Работа с Spring WebFlux дает подробные пояснения к конкретным методам WebFlux.
Итак, давайте рассмотрим каждый запрос отдельно, сначала запрос POST
до конечной точки /messages
:
Метод post
создает конструктор, который определяет содержимое запроса. Это должно соответствовать конечной точке, которую мы определили ранее. Как только запрос будет создан, вызовите метод exchange
чтобы отправить его на сервер. Тело ответа затем сопоставляется с Flux<String>
позволяющим подписаться на него. В этом суть использования реактивных потоков. После подписки на ответ клиент должен выполнить любую обработку, которую он хочет выполнить для каждого события. В этом случае он просто распечатывает текущий шаг ProgressTracker
.
Если мы отправим запрос через этот фрагмент кода, мы получим следующее:
01
02
03
04
05
06
07
08
09
10
11
|
STEP: Verifying STEP: Signing STEP: Sending to Counterparty STEP: Collecting signatures from counterparties. STEP: Verifying collected signatures. STEP: Done STEP: Finalising STEP: Requesting signature by notary service STEP: Broadcasting transaction to participants STEP: Done STEP: Done |
Это шаги, которые определяет ProgressTracker
от SendMessageFlow
. Да, я знаю, что еще не показывал вам этот код, но просто поверьте мне в этом. Не намного больше этого. Как видите, каждое строковое значение, возвращаемое потоком, присоединяет «STEP» к себе
Теперь перейдем к GET
запросу к конечной точке /messages/update
:
Опять же, пока что особо нечего показать. Но за кулисами на самом деле требуется немало работы, чтобы заставить это работать. Все проблемы, с которыми я столкнулся, чтобы заставить этот вызов работать, вращались вокруг сериализации и десериализации. Мы углубимся в это в следующем разделе.
Ответ на этот запрос следующий:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
UPDATE: 0 consumed, 1 produced Consumed: Produced: 56781DF3CEBF2CDAFACE1C5BF04D4962B5483FBCD2C2E428352AD82BC951C686( 0 ) : TransactionState(data=MessageState(sender=O=PartyA, L=London, C=GB, recipient=O=PartyB, L=London, C=GB, contents=hello there, linearId=1afc6144-32b1- 4265 -a06e-73b6bb81aef3_b0fa8491-c9b9-418c-ba6e-8b7840faaf30, participants=[O=PartyA, L=London, C=GB, O=PartyB, L=London, C=GB]), contract=com.lankydanblog.tutorial.contracts.MessageContract, notary=O=Notary, L=London, C=GB, encumbrance= null , constraint=net.corda.core.contracts.WhitelistedByZoneAttachmentConstraint @4a1febb5 ) |
Приятной особенностью этой конечной точки является то, что она теперь поддерживает соединение с узлом, который будет продолжать отправлять любые связанные обновления обратно этому клиенту. Вышеупомянутый запрос был обновлением для исходного сообщения POST
. Любые новые события, полученные клиентом, будут выводить обновление на клиенте. Это то, что делает этот вид конечной точки идеальным для запуска процесса или просто отображения актуальных данных на внешнем интерфейсе, отдельном от самого узла Corda.
Сериализация и десериализация
В этом разделе я хотел сосредоточиться на правильной настройке сериализации и десериализации. Данные, извлеченные из конечной точки /messages/updates
должны правильно сериализовать свои данные, чтобы передать их клиенту, который также должен иметь возможность десериализации данных ответа.
Обычно Spring делает много за вас, и он все еще делает, но кажется, что с WebFlux есть некоторые дополнительные шаги, необходимые для его правильной настройки. Отказ от ответственности, это из моего опыта, и если вы знаете лучшие способы сделать это, мне было бы интересно услышать от вас.
Корда ДжексонПоддержка
Spring имеет тенденцию использовать Джексона по умолчанию, и, очень удобно, Corda предоставляет множество настроек Джексона. JacksonSupport.cordaModule
обеспечивает некоторую сериализацию и десериализацию для таких классов, как Party
и CordaX500Name
. Если у вас есть некоторые базовые ситуации, когда вам нужно сериализовать или десериализовать класс Corda, это, вероятно, подойдет вам. В Spring вы можете создать бин, который по умолчанию ObjectMapper
будет извлекать и добавлять к себе.
Но у этого маршрута есть несколько предостережений. Некоторые классы не могут быть десериализованы, поскольку модуль полагается на ObjectMapper
имеющий доступ к информации об узле, например, через RPC-клиент CordaRPCOps
. Без этого десериализация Party
, AbstractParty
или AnonymousParty
не удастся. Мало того, но теперь это не рекомендуется для Corda 3.2
из-за того, что он не безопасен для потоков. JacksonSupport.cordaModule
также был перемещен в свой собственный класс ( CordaModule
).
Решение, которое я даю ниже, также является решением, которое Corda рекомендует принимать с этого момента.
Ниже приведено исключение, которое MessageClient
когда MessageClient
получает обновления от конечной точки /messages/updates
(для остальной части этого раздела будет использоваться та же самая конечная точка):
1
|
com.fasterxml.jackson.databind.ObjectMapper cannot be cast to net.corda.client.jackson.JacksonSupport$PartyObjectMapper |
Исходя из этого, мы можем определить, что наш ObjectMapper
имеет неправильный тип и на самом деле должен быть подтипом PartyObjectMapper
. Пройдя немного дальше, мы увидим, что этот преобразователь также находится в классе JacksonSupport
. Теперь все, что осталось сделать, это создать этот картограф и использовать его вместо стандартного ObjectMapper
.
Итак, давайте посмотрим, как это сделать:
Это создаст RpcObjectMapper
который реализует PartyObjectMapper
и использует RPC для извлечения информации об узле, чтобы сделать возможным десериализацию различных классов сторон. Внутри createDefaultMapper,
и, благодаря Spring, теперь он будет CordaModule
отображения объектов по умолчанию для большинства (заметьте больше для последующих) случаев, когда требуется сериализация или десериализация.
Еще несколько конфигураций сериализации и десериализации
Сейчас … Я на самом деле в довольно странном положении. Я хотел пройти через все остальные шаги, чтобы заставить конечную точку работать. Но, независимо от того, что я делаю, я не могу воссоздать все ошибки, с которыми я сталкивался, прежде чем заставить его работать. Я не знаю, что сказать … Где-то мои исключения проглатываются и мешают мне видеть, что происходит. Во всяком случае, мы должны продолжать. К счастью, я знаю, почему я добавил остальную часть кода, но больше не могу предоставить вам исключение, что каждое изменение исправлено …
Так, давайте посмотрим на конечный продукт rpcObjectMapper
, rpcObjectMapper
которым мы начали работать ранее:
Здесь есть несколько дополнений. JsonComponentModule
добавляется в виде bean-компонента, так что он выбирает определенные пользовательские компоненты @JsonSerializer
и @JsonDeserializer
(в других классах). Кажется, что даже если он добавлен в модуль отображения как модуль, он все равно требует создания самого компонента, если он собирается найти и зарегистрировать пользовательские компоненты JSON.
Далее идет MixinModule
. Этот класс решает проблемы, возникающие при десериализации Vault.Update
и SecureHash
. Давайте внимательнее посмотрим.
Mixin позволяет нам добавлять аннотации Джексона в класс, фактически не имея доступа к самому классу, который мы явно не контролируем, поскольку это объект из кодовой базы Corda. Другой вариант заключается в том, что это добавляется в CordaModule
мы обсуждали ранее, но это другой разговор.
Vault.Update
нуждается в этом из-за наличия метода isEmpty
, который плохо работает с Джексоном, который запутывается и думает, что isEmpty
соответствует логическому полю, называемому empty
. Поэтому при десериализации JSON обратно в объект он пытается передать значение для поля.
Сам MixinModule
— это просто класс, конструктор которого добавляет VaultUpdateMixin
и SecureHashMixin
к себе. Затем картограф добавляет модуль, как и любой другой модуль. Работа выполнена.
Добавленная в VaultUpdateMixin
аннотация Джексона была @JsonIgnore
, что говорит само за себя. При сериализации или десериализации функция isEmpty
будет игнорироваться.
Далее идет SecureHashMixin
:
Я добавил это после перехода с 3.1
на 3.2
. Мне кажется, что добавление SecureHash
для SecureHash
было забыто. CordaModule
включает сериализацию и десериализацию для SecureHash.SHA256
но не SecureHash
. Приведенный выше код является копией и CordaModule
из CordaModule
с другим классом, связанным с Mixin.
Как только это будет включено, различия между 3.1
и 3.2
будут устранены.
Я думаю, я подниму вопрос для этого!
Пользовательские сериализаторы и десериализаторы
Для сериализации Vault.Update
только интерфейсу AttachmentConstraint
необходим собственный настраиваемый сериализатор:
Не о чем говорить, поскольку только HashAttachmentConstraint
имеет поля. Это соответствует десериализатору, который позже считывает поле type
JSON, чтобы определить, какой объект создан.
Последние два класса, которым требуются пользовательские десериализаторы, — это ContractState
и AttachmentContract
(совпадающие с сериализатором из ранее):
ContractStateDeserialiser
— довольно ленивая реализация, поскольку в этом руководстве используется только одно состояние. AttachmentConstraintDeserialiser
использует поле type
определенное в сериализаторе, чтобы определить, в какую реализацию AttachmentConstraint
следует преобразовать его.
Конкретная конфигурация WebFlux
В этом подразделе рассматриваются дополнительные необходимые настройки, связанные с использованием WebFlux. Вы уже видели некоторую конфигурацию в MessageClient
но нужно сделать немного больше:
Клиенту нужен этот компонент, чтобы иметь возможность десериализовать application/stream+json
вместе с объектами, возвращаемыми в ответе.
Чтобы использовать Jackson2JsonDecoder
определенный в конфигурации, необходимо указать ExchangeStrategies
WebClient
. К сожалению, класс ExchangeStrategies
не написан для того, чтобы подобрать уже созданный нами Jackson2JsonDecoder
. Я надеялся, что такая конфигурация будет работать по умолчанию, ну да ладно. Чтобы добавить ExchangeStrategies
необходимо использовать конструктор WebClient
. Как только это будет сделано, мы наконец там. Вся сериализация для упаковки ответа и десериализация для использования его от клиента завершена.
Это подводит итог всего кода, связанного со Spring, который я хочу обсудить в этом посте.
Быстрый взгляд на код потока
Прежде чем закончить, я кратко покажу последовательность действий, которую я собрал для целей этого урока:
Это довольно простой поток с добавлением ProgressTracker
который используется в запросе /messages
для отслеживания текущего состояния потока. Короче говоря, этот поток принимает MessageState
переданный в него, и отправляет его контрагенту. При движении по потоку ProgressTracker
обновляется до соответствующего шага. Дополнительную документацию по использованию ProgressTracker
можно найти в документации Corda .
Время закрытия
Честно говоря, это было намного дольше, чем я думал, и заняло у меня гораздо больше времени, чем я надеялся.
В заключение, Spring WebFlux предоставляет возможность использовать реактивные потоки для обработки событий ответа, когда бы они не поступили. При использовании с Corda можно отслеживать ход потока, а постоянный поток обновлений хранилища можно поддерживать готовым к действию по мере их поступления. Чтобы полностью использовать WebFlux с Corda, мы также должны были убедиться в том, что объекты были правильно сериализованы сервером, а затем десериализованы клиентом, чтобы их можно было использовать. Lucky Corda действительно предоставляет некоторые из них, но один или два класса или функции отсутствуют, и мы должны убедиться, что мы используем их при условии отображения объектов. К сожалению, WebFlux требует немного больше конфигурации, чем я обычно привык при использовании модулей Spring, но ничего такого, что не может быть исправлено.
Остальной код для этого поста можно найти на моем GitHub
Если вам понравился этот пост, вы можете подписаться на меня в твиттере на @LankyDanDev, где я публикую обновления своих новых сообщений (хотя в последнее время они немного замедлились).
Опубликовано на Java Code Geeks с разрешения Дэна Ньютона, партнера нашей программы JCG . См. Оригинальную статью здесь: потоковая передача данных из узла Corda с помощью Spring WebFlux
Мнения, высказанные участниками Java Code Geeks, являются их собственными. |