Недавно я задумался о способах объединения двух замечательных библиотек: Akka и RxJava . Самым простым вариантом использования для меня было преобразование потока сообщений, полученных Actor
в Observable
.
Я быстро напишу, как это работает, и, возможно, это станет началом новой небольшой серии блогов. И, может быть, я узнаю кое-что о том, как лучше всего создать модуль Akka для RxJava, что я бы хотел сделать…
Но я начну очень очень мало. Это не будет касаться какой-либо обработки ошибок или, например, изучения шины событий Akka . — Все это придется ждать позже.
Во-первых, мне понадобится небольшой, build.sbt
который в основном там, чтобы импортировать две зависимости для меня:
organization := "de.johoop", name := "akka-rxjava", version := "1.0.0-SNAPSHOT", scalaVersion := "2.10.3", scalacOptions ++= Seq("-deprecation", "-language:_")) libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % "2.2.1", "com.netflix.rxjava" % "rxjava-core" % "0.14.5", "com.netflix.rxjava" % "rxjava-scala" % "0.14.5"))
Далее мне нужно определить простое сообщение …
sealed trait Message case class Hello(text: String) extends Message
… и основной актер, который получает их:
import akka.actor._ class ObservableActor extends Actor with ActorLogging { def receive = { case message: Message => log debug s"incoming: $message" } }
Теперь давайте добавим простое Main
и попробуем это:
object Main extends App { val system = ActorSystem("client") val receiver = system actorOf (Props[ObservableActor], "rcv") Seq("hello", "world", "again", "not anymore") foreach { msg => receiver ! Hello(msg) } system.shutdown }
До сих пор это чистый Акка, и, конечно, он работает как положено. Теперь, чтобы сделать Observable из этого, мне понадобится механизм для подписки и отписки. Для подписки я хочу иметь возможность указать обратный вызов, onNext
который будет вызываться при каждом получении сообщения.
Это означает, что мне нужно еще несколько сообщений:
sealed trait SubUnsub extends Message case class Subscribe(onNext: Message => Unit) extends SubUnsub case object Unsubscribe extends SubUnsub
Конечно, мой актер должен обработать эти новые сообщения. Я буду использовать Become
механизм для этого, чтобы избежать любого изменчивого состояния. И очень важно: я хочу вызывать новый обратный вызов при получении сообщений, а не просто регистрировать их. Все это означает, что новая версия актера выглядит так:
class ObservableActor extends Actor with ActorLogging { def receive = { case Subscribe(onNext) => log debug "subscribe" context become subscribed(onNext) } def subscribed(onNext: Message => Unit): Actor.Receive = { case Unsubscribe => log debug "unsubscribe" context become receive case message: Message => log debug s"incoming: $message" onNext(message) } }
At last, I’m prepared to create an Observable. This is the step where the “magic” happens. I’ll add a small utility function for this:
import rx.lang.scala._ // the additional import for RxJava def observableFromActor(actor: ActorRef): Observable[Message] = Observable { observer => actor ! Subscribe(observer onNext) new Subscription { override def unsubscribe: Unit = actor ! Unsubscribe } }
What happens here is that in order to create an Observable
, I need a function that takes an Observer
(which provides the required callback(s) and wants to get notified about messages) and returns a Subscription
(for unsubscribing).
On subscription, we send the Subscribe
message to the actor, telling it about the callback. And on unsubscription, we send the Unsubscribe
message. That’s all there is to it.
Let’s extend our Main
object to try it out:
val subscription = observableFromActor(receiver) .take(3) .subscribe(msg => println(s"received: $msg")) Seq("hello", "world", "again", "not anymore") foreach { msg => receiver ! Hello(msg) } subscription.unsubscribe
Aaaand it still does it’s thing. I’m good. For now.
In conclusion, I have to say that this was surprisingly easy. Of course, maybe that’s just because I left out all the more interesting things…
Thanks for reading!
For reference, here’s the complete code of the example:
import akka.actor._ import rx.lang.scala._ sealed trait Message case class Hello(text: String) extends Message sealed trait SubUnsub extends Message case class Subscribe(onNext: Message => Unit) extends SubUnsub case object Unsubscribe extends SubUnsub object Main extends App { val system = ActorSystem("client") val receiver = system actorOf (Props[ObservableActor], "rcv") val subscription = observableFromActor(receiver) .take(3) .subscribe(msg => println(s"received: $msg")) Seq("hello", "world", "again", "not anymore") foreach { msg => receiver ! Hello(msg) } subscription.unsubscribe system.shutdown def observableFromActor(actor: ActorRef): Observable[Message] = Observable { observer => actor ! Subscribe(observer onNext) new Subscription { override def unsubscribe: Unit = actor ! Unsubscribe } } } class ObservableActor extends Actor with ActorLogging { def receive = { case Subscribe(onNext) => log debug "subscribe" context become subscribed(onNext) } def subscribed(onNext: Message => Unit): Actor.Receive = { case Unsubscribe => log debug "unsubscribe" context become receive case message: Message => log debug s"incoming: $message" onNext(message) } }