Статьи

Используйте API реактивных потоков для объединения akka-потоков с rxJava

Просто небольшая статья на этот раз, так как я все еще экспериментирую с этим материалом. Есть много разговоров о реактивном программировании. В Java 8 у нас есть Stream API, у нас есть rxJava, у нас есть ratpack, а у Akka есть akka-streams .

Основная проблема с этими реализациями заключается в том, что они несовместимы. Вы не можете подключить подписчика одной реализации к издателю другой. К счастью, инициатива начала обеспечивать способ совместной работы этих различных реализаций:

«Целью данной спецификации является создание множества соответствующих реализаций, которые благодаря соблюдению правил смогут беспрепятственно взаимодействовать, сохраняя вышеупомянутые преимущества и характеристики по всему графу обработки потокового приложения».

От — http://www.reactive-streams.org/

Как это работает

Теперь, как мы это делаем? Давайте посмотрим на быстрый пример, основанный на примерах, предоставленных akka-stream ( отсюда ). В следующем листинге:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package sample.stream
  
import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{SubscriberSink, PublisherSource, Source}
import com.google.common.collect.{DiscreteDomain, ContiguousSet}
import rx.RxReactiveStreams
import rx.Observable;
import scala.collection.JavaConverters._
  
object BasicTransformation {
  
  def main(args: Array[String]): Unit = {
  
    // define an implicit actorsystem and import the implicit dispatcher
    implicit val system = ActorSystem("Sys")
    import system.dispatcher
  
    // flow materializer determines how the stream is realized.
    // this time as a flow between actors.
    implicit val materializer = FlowMaterializer()
  
    // input text for the stream.
    val text =
      """|Lorem Ipsum is simply dummy text of the printing and typesetting industry.
         |Lorem Ipsum has been the industry's standard dummy text ever since the 1500s,
         |when an unknown printer took a galley of type and scrambled it to make a type
         |specimen book.""".stripMargin
  
    // create an observable from a simple list (this is in rxjava style)
    val first = Observable.from(text.split("\\s").toList.asJava);
    // convert the rxJava observable to a publisher
    val publisher = RxReactiveStreams.toPublisher(first);
    // based on the publisher create an akka source
    val source = PublisherSource(publisher);
  
    // now use the akka style syntax to stream the data from the source
    // to the sink (in this case this is println)
    source.
      map(_.toUpperCase).                 // executed as actors
      filter(_.length > 3).
      foreach { el =>                     // the sink/consumer
        println(el)
      }.
      onComplete(_ => system.shutdown())  // lifecycle event
  }
}

Комментарии к коду в этом примере в значительной степени объясняют, что происходит. Здесь мы создаем Observable на основе rxJava. Преобразуйте этот Observable в издателя «реактивных потоков» и используйте этого издателя для создания источника akka-streams. Для остальной части кода мы можем использовать API потока стиля akka-stream для моделирования потока. В этом случае мы просто делаем некоторую фильтрацию и распечатываем результат.