Статьи

Создание RxJava Observable от актера Akka

Недавно я задумался о способах объединения двух замечательных библиотек: Akka  и  RxJava . Самым простым вариантом использования для меня было преобразование потока сообщений, полученных  Actor в  Observable.

Я быстро напишу, как это работает, и, возможно, это станет началом новой небольшой серии блогов. И, может быть, я узнаю кое-что о том, как лучше всего создать модуль Akka для RxJava, что я бы хотел сделать…

Но я начну очень очень мало. Это не будет касаться какой-либо обработки ошибок или, например, изучения шины событий Akka  . — Все это придется ждать позже.

Во-первых, мне понадобится небольшой,  build.sbt который в основном там, чтобы импортировать две зависимости для меня:

organization := "de.johoop",
name := "akka-rxjava",
version := "1.0.0-SNAPSHOT",
scalaVersion := "2.10.3",
scalacOptions ++= Seq("-deprecation", "-language:_"))
libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.2.1",
  "com.netflix.rxjava" % "rxjava-core" % "0.14.5",
  "com.netflix.rxjava" % "rxjava-scala" % "0.14.5"))

Далее мне нужно определить простое сообщение …

sealed trait Message
case class Hello(text: String) extends Message

… и основной актер, который получает их:

import akka.actor._

class ObservableActor extends Actor with ActorLogging {
  def receive = {
    case message: Message =>
      log debug s"incoming: $message"
  }
}

Теперь давайте добавим простое  Main и попробуем это:

object Main extends App {
  val system = ActorSystem("client")

  val receiver = system actorOf (Props[ObservableActor], "rcv")
  Seq("hello", "world", "again", "not anymore") foreach { msg => 
    receiver ! Hello(msg)
  }

  system.shutdown
}

До сих пор это чистый Акка, и, конечно, он работает как положено. Теперь, чтобы сделать Observable из этого, мне понадобится механизм для подписки и отписки. Для подписки я хочу иметь возможность указать обратный вызов,  onNext который будет вызываться при каждом получении сообщения.

Это означает, что мне нужно еще несколько сообщений:

sealed trait SubUnsub extends Message
case class Subscribe(onNext: Message => Unit) extends SubUnsub
case object Unsubscribe extends SubUnsub

Конечно, мой актер должен обработать эти новые сообщения. Я буду использовать  Become механизм для этого, чтобы избежать любого изменчивого состояния. И очень важно: я хочу вызывать новый обратный вызов при получении сообщений, а не просто регистрировать их. Все это означает, что новая версия актера выглядит так:

class ObservableActor extends Actor with ActorLogging {
  def receive = {
    case Subscribe(onNext) =>
      log debug "subscribe"
      context become subscribed(onNext)
  }

  def subscribed(onNext: Message => Unit): Actor.Receive = {
    case Unsubscribe =>
      log debug "unsubscribe"
      context become receive

    case message: Message =>
      log debug s"incoming: $message"
      onNext(message)
  }
}

At last, I’m prepared to create an Observable. This is the step where the “magic” happens. I’ll add a small utility function for this:

import rx.lang.scala._ // the additional import for RxJava

def observableFromActor(actor: ActorRef): Observable[Message] = 
  Observable { observer =>
    actor ! Subscribe(observer onNext)
    new Subscription { 
      override def unsubscribe: Unit = actor ! Unsubscribe 
    }
  }

What happens here is that in order to create an Observable, I need a function that takes an Observer (which provides the required callback(s) and wants to get notified about messages) and returns a Subscription (for unsubscribing).

On subscription, we send the Subscribe message to the actor, telling it about the callback. And on unsubscription, we send the Unsubscribe message. That’s all there is to it.

Let’s extend our Main object to try it out:

val subscription = 
  observableFromActor(receiver) 
    .take(3)
    .subscribe(msg => println(s"received: $msg"))

Seq("hello", "world", "again", "not anymore") foreach { msg => 
  receiver ! Hello(msg)
}
 subscription.unsubscribe

Aaaand it still does it’s thing. I’m good. For now.

In conclusion, I have to say that this was surprisingly easy. Of course, maybe that’s just because I left out all the more interesting things…

Thanks for reading! :-)

For reference, here’s the complete code of the example:

import akka.actor._
import rx.lang.scala._

sealed trait Message
case class Hello(text: String) extends Message

sealed trait SubUnsub extends Message
case class Subscribe(onNext: Message => Unit) extends SubUnsub
case object Unsubscribe extends SubUnsub

object Main extends App {
  val system = ActorSystem("client")
  val receiver = system actorOf (Props[ObservableActor], "rcv")

  val subscription =
    observableFromActor(receiver)
      .take(3)
      .subscribe(msg => println(s"received: $msg"))

  Seq("hello", "world", "again", "not anymore") foreach { msg =>
    receiver ! Hello(msg)
  }
  subscription.unsubscribe
  system.shutdown

  def observableFromActor(actor: ActorRef): Observable[Message] =
    Observable { observer =>
      actor ! Subscribe(observer onNext)
      new Subscription {
        override def unsubscribe: Unit = actor ! Unsubscribe
      }
    }
}

class ObservableActor extends Actor with ActorLogging {
  def receive = {
    case Subscribe(onNext) =>
      log debug "subscribe"
      context become subscribed(onNext)
  }

  def subscribed(onNext: Message => Unit): Actor.Receive = {
    case Unsubscribe =>
      log debug "unsubscribe"
      context become receive

    case message: Message =>
      log debug s"incoming: $message"
      onNext(message)
  }
}