Статьи

Потоки могут делать что угодно

В Corda Flow может сделать гораздо больше, чем предлагать новые транзакции для записи между организациями. Хотя, сказать, что они могут сделать что-нибудь , вероятно, немного далеко идущее (хотя это бросается в глаза). Что я действительно хочу сказать, так это то, что потоки являются точками входа в узел. Corda предоставляет ряд функций для взаимодействия с узлом через RPC. В настоящее время они охватывают более простые варианты использования, такие как запросы к хранилищу, но есть ограничение на то, что предоставляется. Потоки покрывают любую нестандартную логику, которая должна быть запущена. Итак, если вы хотите предоставить API из узла Corda, который клиент может запустить или использовать, тогда этот пост для вас.

В этом посте я буду изучать использование потоков в качестве точек входа в узел. Потоки, которые предлагают новые транзакции, показаны во многих других руководствах и были исключены из этого поста. Центр внимания будет направлен исключительно на потоки, которые обеспечивают различную функциональность.

Теория

Как упоминалось во введении, и я хочу повторить это, единственный способ представить конечную точку или API из узла Corda — это поток. Существующие конечные точки RPC будут обрабатывать стандартные функции. В будущем это может измениться, добавив поддержку пользовательских конечных точек RPC. Но на данный момент (Corda 4) потоки — это единственный способ запустить нестандартную логику внутри узла.

На мой взгляд, демонстрация функциональности узла через потоки очень похожа на запись конечных точек HTTP для веб-сервера. Вы должны определить, что доступно. Клиенты не могут запрашивать доступ к какому-либо внутреннему коду внутри веб-сервера. Если конечной точки нет, ошибка отправляется обратно. То же самое относится и к Корде. Тогда более просто рассуждать о том, что клиент может или не может делать при взаимодействии с узлом.

Что касается реализации, самый простой способ начать работу — показать вам пример:

01
02
03
04
05
06
07
08
09
10
11
@StartableByRPC
class StupidSimpleQueryFlow(private val externalId: String) : FlowLogic<MessageState>() {
    // does not need to be suspendable?
    override fun call(): MessageState {
        return serviceHub.vaultService.queryBy<MessageState>(
            QueryCriteria.LinearStateQueryCriteria(
                externalId = listOf(externalId)
            )
        ).states.single().state.data
    }
}

Если вы знаете свои вещи, то вы поймете, что вы можете сделать это через RPC. Я выбрал этот пример, чтобы начать с чего-то знакомого. Вы, вероятно, делали подобные запросы в своих собственных потоках. Хотя есть большая вероятность, что они были частью более обширного процесса.

Здесь следует отметить несколько вещей, которые отличаются от большинства потоков:

  • Нет аннотации @InitiatingFlow
  • Нет аннотации @Suspendable
  • Нет соответствующего потока ответчика

Почему они отсутствуют? Ну, они не нужны. Думаю об этом. Каждая точка связана с коммуникацией с другим узлом, чего не происходит. Поток запускается, выполняет запрос и возвращает полученные данные. Позвольте мне остановиться на этом:

  • @InitiatingFlow позволяет потоку создавать новые сеансы для связи с другими узлами. Поскольку взаимодействия не требуются, сеансы создавать не нужно, и поэтому аннотацию можно удалить.
  • @Suspendable требуется для функций, которые приостанавливаются. Разрешение потоку создавать контрольные точки , которые загружаются, когда потоку нужно снова проснуться. Наиболее распространенное место для приостановки потока — во время связи с другим узлом. В этом случае аннотацию можно удалить, поскольку поток никогда не нужно приостанавливать.
  • Потоки респондентов работают на узлах контрагентов и взаимодействуют с вашими потоками, когда вы send данные. Опять-таки, нет связи, поэтому нет необходимости в том, чтобы на других узлах был установлен поток, который соединяется с потоком выше.

Большинство потоков, которые не взаимодействуют с другими узлами, будут следовать такой структуре (некоторые потоки могут все еще приостановиться в зависимости от того, что вы делаете).

Определенный выше поток может затем быть доступен из внешнего клиента:

1
proxy.startFlow(::StupidSimpleQueryFlow, "asdas").returnValue.get()

Примеры

Ниже приведено несколько примеров потоков, которые вы можете написать, и почему вы можете их использовать.

Получение значения из службы

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
@StartableByRPC
class GetMeSomeValueFromAService : FlowLogic<Long>() {
  override fun call(): Long {
    return serviceHub.cordaService(IncrementingService::class.java).counter
  }
}
 
@CordaService
class IncrementingService(serviceHub: AppServiceHub) : SingletonSerializeAsToken() {
 
  var counter: Long = 0
 
  init {
    timer(period = TimeUnit.SECONDS.toMillis(1)) {
      counter += 1
    }
  }
}

Если у вас есть служба, которая хранит некоторую информацию, полезную для внешнего клиента, вам потребуется способ ее получения.

Вызов кода внутри узла для уменьшения дублирования кода

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
@StartableByRPC
class ExecuteSomeInternalNodeLogicYouDontWantToDuplicateFlow(private val recipient: Party) :
  FlowLogic<List<MessageState>>() {
  override fun call(): List<MessageState> {
    return serviceHub.cordaService(MessageRepository::class.java)
      .findAllMessagesByRecipient(recipient)
  }
}
 
@CordaService
class MessageRepository(private val serviceHub: AppServiceHub) : SingletonSerializeAsToken() {
 
  fun findAllMessagesByRecipient(recipient: Party): List<MessageState> {
    return serviceHub.vaultService.queryBy<MessageState>(
      QueryCriteria.VaultCustomQueryCriteria(
        builder { MessageSchema.PersistentMessage::recipient.equal(recipient.name.toString()) }
      )
    ).states.map { it.state.data }
  }
}

Полезные части кода для повторного использования могут быть выделены в отдельные классы. В частности, для Corda вы можете поместить логику в поток или службу.

Чтобы выполнить код внутри потока или службы с клиента:

  • Внутри потока — просто вызовите поток от клиента
  • Внутри службы — добавьте новый поток, который делегирует службе

Вы можете реализовать запрос выше в клиенте, но у вас будет две версии, если вы также будете использовать тот же запрос внутри узла.

Выполнение логики, которую нельзя выполнить через RPC

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
@StartableByRPC
class DoSomethingComplicatedThatYouCantDoViaRpc(private val recipient: Party) :
    FlowLogic<List<MessageSchema.PersistentMessage>>() {
    override fun call(): List<MessageSchema.PersistentMessage> {
        return serviceHub.withEntityManager {
            createQuery(
                "SELECT m FROM $TABLE_NAME m WHERE m.recipient = ?1",
                MessageSchema.PersistentMessage::class.java
            ).setParameter(
                1,
                recipient.name.toString()
            ).resultList
        }
    }
 
    private companion object {
        val TABLE_NAME = MessageSchema.PersistentMessage::class.jvmName
    }
}

Как упоминалось ранее, существует ограничение на то, что вы можете делать через RPC. Код выше является примером этого. Он обращается к EntityManager который может выполнять запросы более низкого уровня, чем это позволяет хранилище.

Вывод

Краткое заключение для этого поста. Потоки являются единственными точками входа в узел, которые могут выполнять полностью настраиваемую логику. Corda предоставляет несколько API для взаимодействия с узлом, но они предоставляют ограниченную функциональность. Чтобы выполнить любую логику (которую может запустить узел), вам необходим поток, которому разрешено входить в узел и нажимать кнопку для вас.

Опубликовано на Java Code Geeks с разрешения Дэна Ньютона, партнера нашей программы JCG . Смотрите оригинальную статью здесь: потоки могут делать все что угодно

Мнения, высказанные участниками Java Code Geeks, являются их собственными.