Эта статья является частью нашего академического курса под названием « Разработка современных приложений с помощью Scala» .
В этом курсе мы предоставляем среду и набор инструментов, чтобы вы могли разрабатывать современные приложения Scala. Мы охватываем широкий спектр тем: от сборки SBT и реактивных приложений до тестирования и доступа к базе данных. С нашими простыми учебными пособиями вы сможете запустить и запустить собственные проекты за минимальное время. Проверьте это здесь !
1. Введение
За последние пару лет многие программные системы, которыми ежедневно пользуются миллионы и даже миллиарды людей, столкнулись с беспрецедентными требованиями к масштабируемости. Во многих отношениях традиционные программные архитектуры были раздвинуты до предела, что обуславливает насущную необходимость придумывать другие архитектурные стили, которые лучше отвечают требованиям современного мира.
Это был момент, когда другая парадигма, реактивное программирование , начала появляться и широко распространяться очень быстро. Наряду с функциональным программированием реактивное программирование потрясло индустрию и открыло целый новый класс приложений, которые мы называем реактивными приложениями в наши дни.
Содержание
2. Быть реактивным
Создание приложений в соответствии с парадигмой реактивного программирования подразумевает следование другому архитектурному стилю и принципам проектирования, которые лучше всего описаны в «Реактивном манифесте» , опубликованном Джонасом Бонером в 2013 году.
- Отзывчивый: система реагирует своевременно, если это вообще возможно.
- Устойчивость: система остается отзывчивой перед лицом отказа.
- Эластичный: система остается отзывчивой при различных нагрузках.
- Управляемый сообщениями: реактивные системы полагаются на асинхронную передачу сообщений для установления границы между компонентами, что обеспечивает слабую связь, изоляцию и прозрачность местоположения.
Нет необходимости говорить, что каждый из этих принципов имеет большое значение, и все вместе они представляют собой идеальный рецепт для создания современных программных приложений. Но, конечно же, там нет серебряной пули. Нет магии, которая внезапно сделает любое приложение или систему реактивной. Это комбинация асинхронного и неблокирующего программирования, параллелизма передачи сообщений, дополненного неизменяемостью и обратным давлением, чтобы назвать несколько ключевых.
В этом уроке мы узнаем обо всех необходимых строительных блоках для разработки действительно реактивных приложений с использованием языка программирования и экосистемы Scala , сосредоточив внимание в этом разделе на первом из них — реактивных потоках .
3. Спецификация реактивных потоков
Цель реактивных потоков — служить прочной основой для асинхронной обработки потоков с неблокирующим противодавлением. В этом отношении спецификация реактивных потоков выделяется как постоянное усилие по обеспечению совместимого стандарта, которому будут следовать соответствующие реализации.
Спецификация реактивных потоков написана строго в соответствии с принципами, изложенными в The Reactive Manifesto, и первая официальная версия 1.0.0 для платформы JVM уже выпущена .
4. Реактивные потоки в дикой природе
Как только официальный API реактивных потоков для платформы JVM стал общедоступным, ряд очень популярных проектов с открытым исходным кодом объявили о немедленной доступности совместимых реализаций. Хотя полный список включает в себя дюжину из них, вот лишь несколько самых известных:
- Project Reactor , участник портфеля проектов Spring
- RxJava , реализация более широкой библиотеки Reactive Extensions
- Akka Streams , часть дистрибутива Akka Toolkit
Стоит отметить, что RxJava — одна из первых и наиболее продвинутых библиотек JVM, которая представила разработчикам Java мощные принципы парадигмы реактивного программирования . Хотя он также имеет порт на язык Scala , называемый RxScala , мы собираемся сосредоточиться на Akka Streams , чисто основанной на Scala реализации спецификации реактивных потоков .
Akka Streams: Реализация реактивных потоков
Как мы уже кратко упомянули, Akka Streams является лишь частью более полного дистрибутива Akka Toolkit . Последняя выпущенная версия Akka Toolkit на момент написания этой статьи — 2.4.8, и мы будем использовать ее в оставшейся части раздела.
Однако не паникуйте, если ваша версия Akka Toolkit не самая последняя, реализация спецификации реактивных потоков обеспечивается Akka Streams уже довольно давно.
5.1. Базовые концепты
Akka Streams построен на основе нескольких базовых элементов, которые взаимодействуют друг с другом и позволяют описывать очень сложные конвейеры обработки потоков.
- Источник : что-то с одним выходным потоком (концептуально представляет издателя )
- Sink : что-то с одним входным потоком (концептуально представляет подписчика )
- Поток : что-то с ровно одним входным и одним выходным потоком (концептуально представляет процессор )
- BidiFlow : что-то с ровно двумя входными и двумя выходными потоками
- График : топология потоковой обработки, которая принимает определенные входные данные и предоставляет определенные выходные данные.
Для любопытных читателей Akka Streams полностью реализует спецификацию реактивных потоков, но скрывает этот факт за более лаконичными абстракциями API, ориентированными на пользователя, представляя собственные базовые примитивы. Вот почему, если вы посмотрите на API реактивных потоков для JVM, вы можете не найти прямых совпадений с потоками Akka , хотя и поддерживаются соответствующие преобразования.
Одной из ключевых целей дизайна Akka Streams является возможность многократного использования. Все строительные блоки, описанные выше, могут быть совместно использованы и / или объединены в более сложные топологии потоковой обработки, как мы скоро увидим.
5.2. материализация
Akka Streams делает очень четкое разделение между определением потока и фактическим исполнением потока. Механизм определения произвольного потока и предоставления всех необходимых ресурсов для запуска называется материализацией в терминологии Akka Streams .
По сути, реализация материализатора может быть любой, но по умолчанию Akka Streams предоставляет ActorMaterializer, который в основном отображает различные этапы обработки с использованием Actor . Это также подразумевает, что в общем случае обработка потока будет полностью асинхронной и управляемой сообщениями.
В следующем разделе учебника «Параллелизм и параллелизм: Akka» мы подробно поговорим об актерах и актерских моделях . К счастью, Akka Streams отлично справляется со своей задачей, скрывая от нас ненужные абстракции, чтобы актеры не появлялись нигде, кроме ActorMaterializer .
5.3. Источники и Раковины
Входные данные являются отправной точкой любой потоковой обработки. Это может быть что угодно: файл, коллекция, сетевой сокет, поток, будущее, вы называете это. В API Akka Streams этот вход представлен параметризованным классом Source [+ Out, + Mat] , где:
- Out — это тип элементов, выход которых источника
- Mat — это тип некоторого дополнительного значения, которое может генерировать источник (часто устанавливается NotUsed, но об этом позже)
Для удобства объект Source имеет множество фабричных методов, которые упрощают упаковку типичных входных данных в соответствующие экземпляры класса Source , например:
1
2
3
4
5
6
7
8
9
|
val source : Source[Int, NotUsed] = Source( 1 to 10 ) val source : Source[Int, NotUsed] = Source(Set( 1 , 2 , 3 , 4 , 5 )) val source : Source[String, NotUsed] = Source.single( "Reactive Streams" ) val source : Source[ByteString, _ ] = FileIO.fromPath(file) val source : Source[Int, _ ] = Source.tick( 1 second, 10 seconds, Random.nextInt()) |
Уже достаточно ввода, чтобы начать простую потоковую обработку. Но, как мы уже знаем, определение Источника на самом деле ничего не сделает до момента материализации. Класс Source (и некоторые другие, такие как Flow fe) имеет семейство так называемых терминальных функций: run()
и runWith()
. Вызов любой из этих функций запускает процесс материализации, требуя предоставления материализатора неявно или явно. Например:
1
2
3
4
5
6
7
8
9
|
implicit val system = ActorSystem( "reactive-streams" ) implicit val materializer = ActorMaterializer() val numbers = List( 100 , 200 , 300 , 400 , 500 ) val source : Source[Int, NotUsed] = Source(numbers) source .runForeach { println _ } .onComplete { _ = > system.terminate() } |
Как только выполнение потоковой обработки заканчивается, каждое число будет напечатано на консоли:
1
2
3
4
5
|
100 200 300 400 500 |
Интересно, что во фрагменте кода выше при вызове runForeach
используется другая абстракция Akka Streams , Sink , которая по сути является потребителем входных данных потока на разных этапах. Таким образом, наш пример можно переписать так:
1
2
3
|
source .runWith { Sink.foreach { println _ } } .onComplete { _ = > system.terminate() } |
И, как и следовало ожидать, класс Source поддерживает широкий спектр функций для преобразования или обработки элементов потока, которые называются этапами обработки в потоках Akka , например:
1
2
3
4
|
source .map { _ * 2 } .runForeach { println _ } .onComplete { _ = > system.terminate() } |
Обратите внимание, что этапы обработки никогда не изменяют текущее определение потока, а скорее возвращают новый этап обработки. И последнее, но не менее важное: Akka Streams не позволяет null
проходить через поток как элемент, о чем люди, пришедшие из Java-среды, должны быть крайне осторожны.
5.4. Потоки
Если Source и Sink — это просто абстракции над выходами и входами, Flow — это тот тип клея, который по сути объединяет их вместе. Давайте вернемся к примеру с числами, но на этот раз мы будем читать их из файла.
1
2
|
val file : Path = Paths.get(getClass.getResource( "/numbers.txt" ).toURI()) val source : Source[ByteString, _ ] = FileIO.fromPath(file) |
Numbers.txt — это просто старый текстовый файл, где каждая строка содержит произвольное число, например:
1
2
3
4
5
|
100 200 300 400 500 |
Это может звучать тривиально, но давайте взглянем на тип Out источника : на самом деле это ByteString (точнее, файл будет читаться в чанках ByteString ). Это не совсем то, что мы хотим, мы хотели бы читать файл строка за строкой, число за номером, но как мы можем это сделать? К счастью, Akka Streams имеет встроенную поддержку для этого в форме Framing, и нам нужно только определить преобразование из потока ByteString в поток регулярных целых чисел:
1
2
3
|
val flow : Flow[ByteString, Int, _ ] = Flow[ByteString] .via(Framing.delimiter(ByteString( "\r\n" ), 10 , true )) .map { _ .utf 8 String.toInt } |
Здесь мы только что определили поток ! Он не привязан ни к входу, ни к выходу. Тем не менее, он может быть использован любым определением обработки потока, например:
1
2
3
4
|
source .via(flow) .filter { _ > 200 } .runForeach { println _ } |
Или мы можем определить еще один конвейер потоковой обработки, чтобы подсчитать, сколько строк мы обработали в целом. Вот где тип Mat будет полезен, поскольку мы будем использовать значение одного из Sink в качестве конечного результата, например:
1
2
3
4
5
6
7
8
|
val future : Future[Int] = source .via(flow) .toMat(Sink.fold[Int, Int]( 0 ){(acc, _ ) = > acc + 1 })(Keep.right) .run future.onSuccess { case count = > println(s "Lines processed: $count" ) } |
Довольно просто, не правда ли? Стоит сделать одно важное замечание относительно гарантий упорядочения элементов: Akka Streams сохраняет порядок ввода элементов в большинстве случаев (но некоторые этапы обработки могут этого не делать).
5.5. Графики и BidiFlow
Хотя Flow действительно мощная абстракция, он ограничен только одним входом и только одним выходом. Этого может быть достаточно для многих случаев использования, но сложные сценарии обработки потока требуют большей гибкости. Давайте познакомимся с Graph s и BidiFlow s.
Graph может использовать произвольное количество входов и выходов и формировать действительно сложные топологии, но в двух словах они состоят из простых потоков . Чтобы проиллюстрировать графики в действии, рассмотрим этот пример. Организация готова выплачивать своим сотрудникам ежегодный бонус, но все руководящие должности получают бонус в 10 тысяч к базовой зарплате, в то время как другие получают только 5 тысяч. Полная обработка потока может быть проиллюстрирована изображением ниже.
Благодаря выразительной Graph DSL, Akka Streams позволяет очень просто создавать действительно сложные сценарии, хотя у нас это довольно наивно.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
val employees = List( Employee( "Tom" , "manager" , 50000 ), Employee( "Bob" , "employee" , 20000 ), Employee( "Mark" , "employee" , 20000 ), Employee( "John" , "manager" , 55000 ), Employee( "Dilan" , "employee" , 35000 ) ) val graph = GraphDSL.create() { implicit builder = > import GraphDSL.Implicits. _ val in = Source(employees) val out = Sink.foreach { println _ } val broadcast = builder.add(Broadcast[Employee]( 2 )) val merge = builder.add(Merge[Employee]( 2 )) val manager = Flow[Employee] .filter { _ .position == "manager" } .map { e = > e.copy(salary = e.salary + 10000 ) } val employee = Flow[Employee] .filter { _ .position ! = "manager" } .map { e = > e.copy(salary = e.salary + 5000 ) } in ~> broadcast ~> manager ~> merge ~> out broadcast ~> employee ~> merge ClosedShape } |
ClosedShape в конце графика означает, что мы определили полностью связанный граф, где все входы и выходы подключены. Полностью связанный граф можно преобразовать в RunnableGraph и фактически выполнить, например:
1
2
3
|
RunnableGraph .fromGraph(graph) .run |
Мы увидим ожидаемый результат в консоли после завершения выполнения графика. Каждый человек имеет соответствующий бонус, добавленный к его / ее зарплате:
1
2
3
4
5
|
Employee(Tom,manager,60000) Employee(Bob,employee,25000) Employee(Mark,employee,25000) Employee(John,manager,65000) Employee(Dilan,employee,40000) |
Как и большинство других базовых частей, Graph и RunnableGraph свободно распространяются. Одним из последствий этого является возможность строить частичные графы и комбинировать различные источники , приемники и потоки .
В примерах, которые мы видели, данные проходят через поток только в одном направлении. BidiFlow является частным случаем графа, где есть два потока, которые идут в противоположных направлениях. Лучшей иллюстрацией BidiFlow является типичная передача запросов / ответов, например:
01
02
03
04
05
06
07
08
09
10
11
|
case class Request(payload : ByteString) case class Response(payload : ByteString) val server = GraphDSL.create() { implicit builder = > import GraphDSL.Implicits. _ val out = builder.add(Flow[Request].map { _ .payload.utf 8 String }) val in = builder.add(Flow[String].map {s = > Response(ByteString(s.reverse))}) BidiShape.fromFlows(out, in) } |
В этом упрощенном примере полезная нагрузка запроса просто переворачивается и упаковывается как полезная нагрузка ответа. server
на самом деле представляет собой граф, и для создания экземпляра BidiFlow мы должны использовать фабричный метод fromGraph
, например:
1
|
val bidiFlow = BidiFlow.fromGraph(server) |
Теперь мы готовы материализовать и запустить экземпляр BidiFlow , предоставив простой запрос и напрямую bidiFlow
оба bidiFlow
.
1
2
3
4
5
6
|
Source .single(Request(ByteString( "BidiFlow Example" ))) .via(bidiFlow.join(Flow[String])) .map( _ .payload.utf 8 String) .runWith(Sink.foreach { println _ }) .onComplete { _ = > system.terminate() } |
Не удивительно, но в консоли будет выведена обратная версия строки «Пример BidiFlow» :
1
|
elpmaxE wolFidiB |
5.6. Обратное давление
Концепция противодавления является одной из основ философии реактивных потоков . Akka Streams очень старается поддерживать потоковую обработку, гарантируя, что издатель никогда не выпустит больше элементов, чем может обработать подписчик. В дополнение к контролю спроса и распространению противодавления от нисходящих потоков к восходящим потокам, Akka Streams поддерживает использование буферизации и дросселирования для более детального и расширенного управления противодавлением.
5,7. Обработка ошибок
В любом более или менее реальном сценарии конвейеры потоковой обработки, созданные с помощью Akka Streams, будут ссылаться на логику конкретного приложения, включая доступ к базе данных или вызовы внешних служб. Ошибки могут и произойдут, что приведет к преждевременному прекращению обработки потока.
К счастью, Akka Streams предоставляет как минимум три стратегии для обработки исключений, возникающих при выполнении кода приложения:
- Стоп: поток завершается с ошибкой (стратегия по умолчанию)
- Resume: элемент отбрасывается и поток продолжается
- Перезапуск: элемент удаляется, и поток продолжается после перезапуска этапа.
Они находятся под сильным влиянием модели акторов, которую Akka Streams использует для материализации потоков обработки потоков, а также называются стратегиями надзора. Чтобы проиллюстрировать, как это работает, давайте рассмотрим простой поток, который использует числа в качестве источника и вызывает исключение каждый раз, когда встречается с четным числом.
1
2
3
4
5
6
7
|
val source = Source .unfold( 0 ) { e = > Some(e + 1 , e + 1 ) } .map { e = > if (e % 2 ! = 0 ) e else throw new IllegalArgumentException( "Only odd numbers are allowed" ) } .withAttributes(ActorAttributes.supervisionStrategy( _ = > Supervision.Resume)) .take( 10 ) .runForeach { println _ } .onComplete { _ = > system.terminate() } |
Без наблюдения поток завершится, как только номер 2 будет выпущен. Но при возобновлении стратегии поток продолжает выполнение с того места, где он ушел, пропуская проблемный элемент. Как и ожидалось, мы должны видеть только нечетные числа в консоли:
01
02
03
04
05
06
07
08
09
10
|
1 3 5 7 9 11 13 15 17 19 |
5,8. тестирование
Как мы увидим, большинство фреймворков и библиотек, разработанных сообществом Scala , были созданы с превосходной поддержкой тестирования. Akka Streams , безусловно, не является исключением и предоставляет специальный набор тестов для разработки комплексных сценариев тестирования. Чтобы продемонстрировать это, давайте вернемся к потоку образцов, который мы определили в разделе « Потоки ».
1
2
3
|
val flow : Flow[ByteString, Int, _ ] = Flow[ByteString] .via(Framing.delimiter(ByteString( "\r\n" ), 10 , true )) .map { _ .utf 8 String.toInt } |
Звучит неплохо иметь тестовый пример, который проверяет, что кадрирование работает точно так, как мы ожидали, поэтому давайте создадим его.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
class FlowsSpec extends TestKit(ActorSystem( "reactive-streams-test" )) with SpecificationLike with FutureMatchers with AfterAll { implicit val materializer = ActorMaterializer() def afterAll = system.terminate() "Stream" >> { "should return an expected value" >> { val (publisher, subscriber) = TestSource.probe[ByteString] .via(flow) .toMat(TestSink.probe[Int])(Keep.both) .run() subscriber.request( 2 ) publisher.sendNext(ByteString( "20" )) publisher.sendNext(ByteString( "0\r\n" )) publisher.sendComplete() subscriber.expectNext() must be _== ( 200 ) } } } |
Такие классы, как TestSource и TestSink (и многие другие) дают полный контроль над обработкой потока, поэтому можно тестировать очень сложные конвейеры. Кроме того, Akka Streams не обязывает использовать инфраструктуру тестирования, поэтому в качестве примера мы привели типичную спецификацию Specs2 .
6. Выводы
Этот раздел был просто поверхностным введением в мир реактивного программирования и реактивных потоков в частности. Несмотря на то, что мы довольно много говорили об Akka Streams , мы затронули лишь крошечную его часть, просто царапая верхушку айсберга: длинный список особенностей и внутренних деталей остался не раскрытым. Официальная документация Akka Streams является отличным источником всесторонних знаний о предмете, полным примеров. Пожалуйста, не стесняйтесь пройти через это.
7. Что дальше
В следующем разделе руководства мы поговорим о доступе к базам данных отношений из ваших приложений Scala . Темы этого раздела будут очень полезны, поскольку мы увидим важность реактивных потоков в поддержке определенных шаблонов доступа к базе данных.
Полные проекты доступны для скачивания .