Статьи

Потоковые обновления в реальном времени из реактивного репозитория Spring Data

В этом посте подробно описывается наивная реализация потоковой передачи обновлений из базы данных в любые другие компоненты, которые заинтересованы в этих данных. Точнее, как изменить репозиторий Spring Data R2DBC для отправки событий соответствующим подписчикам.

В этом посте будут полезны некоторые базовые знания R2DBC и Spring. Мои предыдущие работы, Асинхронный доступ к СУБД с использованием Spring Data R2DBC и Spring Data R2DBC для Microsoft SQL Server, должны помочь в этом отношении.

Как я уже говорил, это будет наивная реализация. Поэтому в коде не будет ничего причудливого.

Для этого я SimpleR2dbcRepository чтобы создать реализацию репозитория, которая генерирует событие каждый раз, когда сохраняется новая запись. Новые события добавляются в DirectProcessor и отправляются любому Publisher подписанному на него. Это выглядит так:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
class PersonRepository(
  entity: RelationalEntityInformation<Person, Int>,
  databaseClient: DatabaseClient,
  converter: R2dbcConverter,
  accessStrategy: ReactiveDataAccessStrategy
) : SimpleR2dbcRepository<Person, Int>(entity, databaseClient, converter, accessStrategy) {
 
  private val source: DirectProcessor<Person> = DirectProcessor.create<Person>()
  val events: Flux<Person> = source
 
  override fun <S : Person> save(objectToSave: S): Mono<S> {
    return super.save(objectToSave).doOnNext(source::onNext)
  }
}

Единственная функция из SimpleR2dbcRepository которую необходимо переопределить, — это save ( saveAll делегаты для save ). doOnNext добавляется к исходному вызову сохранения, который отправляет новое событие в source ( DirectorProcessor ), вызывая onNext .

source приведен к Flux чтобы классы из-за пределов хранилища не могли добавлять новые события. Технически они все еще могут добавлять события, но им нужно будет разыграть их самостоятельно.

Как вы могли заметить, хранилище загружает параметры и передает их в SimpleR2dbcRepository . Экземпляр хранилища необходимо создать вручную, поскольку некоторые его зависимости не могут быть введены автоматически:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
class RepositoryConfiguration {
 
  @Bean
  fun personRepository(
    databaseClient: DatabaseClient,
    dataAccessStrategy: ReactiveDataAccessStrategy
  ): PersonRepository {
    val entity: RelationalPersistentEntity<Person> = dataAccessStrategy
      .converter
      .mappingContext
      .getRequiredPersistentEntity(Person::class.java) as RelationalPersistentEntity<Person>
    val relationEntityInformation: MappingRelationalEntityInformation<Person, Int> =
      MappingRelationalEntityInformation(entity, Int::class.java)
    return PersonRepository(
      relationEntityInformation,
      databaseClient,
      dataAccessStrategy.converter,
      dataAccessStrategy
    )
  }
}

На данный момент все настроено и готово к использованию. Ниже приведен пример его работы:

1
2
3
4
5
6
7
8
9
personRepository.events
  .doOnComplete { log.info("Events flux has closed") }
  .subscribe { log.info("From events stream - $it") }
// insert people records over time
MARVEL_CHARACTERS
  .toFlux()
  .delayElements(Duration.of(1, SECONDS))
  .concatMap { personRepository.save(it) }
  .subscribe()

Какие выводы:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
29-08-2019 09:08:27.674 [reactor-tcp-nio-1]  From events stream - Person(id=481, name=Spiderman, age=18)
29-08-2019 09:08:28.550 [reactor-tcp-nio-2]  From events stream - Person(id=482, name=Ironman, age=48)
29-08-2019 09:08:29.555 [reactor-tcp-nio-3]  From events stream - Person(id=483, name=Thor, age=1000)
29-08-2019 09:08:30.561 [reactor-tcp-nio-4]  From events stream - Person(id=484, name=Hulk, age=49)
29-08-2019 09:08:31.568 [reactor-tcp-nio-5]  From events stream - Person(id=485, name=Antman, age=49)
29-08-2019 09:08:32.571 [reactor-tcp-nio-6]  From events stream - Person(id=486, name=Blackwidow, age=34)
29-08-2019 09:08:33.576 [reactor-tcp-nio-7]  From events stream - Person(id=487, name=Starlord, age=38)
29-08-2019 09:08:34.581 [reactor-tcp-nio-8]  From events stream - Person(id=488, name=Captain America, age=100)
29-08-2019 09:08:35.585 [reactor-tcp-nio-9]  From events stream - Person(id=489, name=Warmachine, age=50)
29-08-2019 09:08:36.589 [reactor-tcp-nio-10] From events stream - Person(id=490, name=Wasp, age=26)
29-08-2019 09:08:37.596 [reactor-tcp-nio-11] From events stream - Person(id=491, name=Winter Soldier, age=101)
29-08-2019 09:08:38.597 [reactor-tcp-nio-12] From events stream - Person(id=492, name=Black Panther, age=42)
29-08-2019 09:08:39.604 [reactor-tcp-nio-1]  From events stream - Person(id=493, name=Doctor Strange, age=42)
29-08-2019 09:08:40.609 [reactor-tcp-nio-2]  From events stream - Person(id=494, name=Gamora, age=29)
29-08-2019 09:08:41.611 [reactor-tcp-nio-3]  From events stream - Person(id=495, name=Groot, age=4)
29-08-2019 09:08:42.618 [reactor-tcp-nio-4]  From events stream - Person(id=496, name=Hawkeye, age=47)
29-08-2019 09:08:43.620 [reactor-tcp-nio-5]  From events stream - Person(id=497, name=Pepper Potts, age=44)
29-08-2019 09:08:44.627 [reactor-tcp-nio-6]  From events stream - Person(id=498, name=Captain Marvel, age=59)
29-08-2019 09:08:45.631 [reactor-tcp-nio-7]  From events stream - Person(id=499, name=Rocket Raccoon, age=30)
29-08-2019 09:08:46.637 [reactor-tcp-nio-8]  From events stream - Person(id=500, name=Drax, age=49)
29-08-2019 09:08:47.639 [reactor-tcp-nio-9]  From events stream - Person(id=501, name=Nebula, age=30)

Запись сохраняется каждую секунду, что соответствует событиям, исходящим из хранилища.

Обратите внимание, что событие doOnComplete никогда не вызывается. Источник никогда не закрывается и, следовательно, никогда не отправляет событие завершения ни одному из своих подписчиков.

Это все, что нужно, по крайней мере, для этой базовой реализации. Я уверен, что можно сделать гораздо больше, но мне нужно сначала выяснить, как это сделать … Подводя итог, можно сделать несколько добавлений, которые вы можете передавать потоковым данным, вставленным в вашу базу данных, в компоненты, которые заинтересованы в записях добавляется.

Опубликовано на Java Code Geeks с разрешения Дэна Ньютона, партнера нашей программы JCG . Смотрите оригинальную статью здесь: Потоковые обновления в реальном времени из реактивного репозитория Spring Data

Мнения, высказанные участниками Java Code Geeks, являются их собственными.