Статьи

ReactiveMongo с Akka, Scala и Websockets

Я искал простой сервер веб-сокетов для одного из моих проектов, чтобы проверить некоторые вещи с  реактивным монго . Однако, оглядываясь по сторонам, я не мог найти простую базовую реализацию, не включив полную структуру. Наконец я наткнулся на один из проектов активаторов Typesage:  http://typesafe.com/activator/template/akka-spray-websocket . Несмотря на то, что название подразумевает, что спрей необходим, на самом деле он использует материал websocket отсюда: https://github.com/TooTallNate/Java-WebSocket , который обеспечивает очень простую в использовании базовую реализацию websocket.

Поэтому в этой статье я покажу вам, как вы можете настроить очень простой сервер веб-сокетов (не требуя дополнительных фреймворков) вместе с Akka и ReactiveMongo. На следующих снимках экрана показано, к чему мы стремимся:
Снимок экрана 2014-11-22 в 13.58.40.png
На этом снимке экрана вы видите простой клиент веб-сокета, который общается с нашим сервером. Наш сервер имеет следующие функциональные возможности:

  1. Все, что отправляет клиент, возвращается назад.
  2. Любые входные данные, добавленные в  конкретную (ограниченную) коллекцию в mongoDB  , автоматически передаются всем слушателям.

Вы можете вырезать и вставить весь код из этой статьи, но, вероятно, проще получить код из git. Вы можете найти его в github здесь:  https://github.com/josdirksen/smartjava/tree/master/ws-akka

Начиная

Первое, что нам нужно сделать, это настроить наше рабочее пространство, поэтому давайте начнем с рассмотрения конфигурации sbt:

organization  := "org.smartjava"
 
version       := "0.1"
 
scalaVersion  := "2.11.2"
 
scalacOptions := Seq("-unchecked", "-deprecation", "-encoding", "utf8")
 
libraryDependencies ++= {
  val akkaV = "2.3.6"
  Seq(
    "com.typesafe.akka"   %%  "akka-actor"    % akkaV,
    "org.java-websocket" % "Java-WebSocket" % "1.3.1-SNAPSHOT",
    "org.reactivemongo" %% "reactivemongo" % "0.10.5.0.akka23"
  )
}
 
resolvers ++= Seq("Code Envy" at "http://codenvycorp.com/repository/"
  ,"Typesafe" at "http://repo.typesafe.com/typesafe/releases/")

Здесь нет ничего особенного, мы просто указываем наши зависимости и добавляем некоторые средства распознавания, чтобы sbt знал, откуда извлекать зависимости. Прежде чем мы рассмотрим код, давайте сначала посмотрим на структуру каталогов и файл нашего проекта:

├── build.sbt
└── src
   └── main
      ├── resources
      │  ├── application.conf
      │  └── log4j2.xml
      └── scala
          ├── Boot.scala
          ├── DB.scala
          ├── WSActor.scala
          └── WSServer.scala

В каталоге src / main / resources мы храним наши файлы конфигурации, а в src / main / scala мы храним все наши файлы scala. Давайте начнем с просмотра файлов конфигурации. Для этого проекта мы используем два:

Файл Application.conf содержит конфигурацию нашего проекта и выглядит следующим образом:

akka {
  loglevel = "DEBUG"
}
 
mongo {
  db = "scala"
  collection = "rmongo"
  location = "localhost"
}
 
ws-server {
  port = 9999
}

Как видите, мы просто определяем уровень журнала, как использовать mongo и на каком порту мы хотим, чтобы наш сервер веб-сокетов слушал. И нам также нужен файл log4j2.xml, так как библиотека responsetivemongo использует его для ведения журнала:

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
    </Appenders>
    <Loggers>
        <Root level="INFO">
            <AppenderRef ref="Console"/>
        </Root>
    </Loggers>
</Configuration>

Итак, со скучными вещами в стороне, давайте посмотрим на файлы scala.

Запуск сервера websocket и регистрация путей

Файл Boot.scala выглядит так:

package org.smartjava
 
import akka.actor.{Props, ActorSystem}
 
/**
 * This class launches the system.
 */
object Boot extends App {
  // create the actor system
  implicit lazy val system = ActorSystem("ws-system")
  // setup the mongoreactive connection
  implicit lazy val db = new DB(Configuration.location, Configuration.dbname);
 
  // we'll use a simple actor which echo's everything it finds back to the client.
  val echo = system.actorOf(EchoActor.props(db, Configuration.collection), "echo")
 
  // define the websocket routing and start a websocket listener
  private val wsServer = new WSServer(Configuration.port)
  wsServer.forResource("/echo", Some(echo))
  wsServer.start
 
  // make sure the actor system and the websocket server are shutdown when the client is
  // shutdown
  sys.addShutdownHook({system.shutdown;wsServer.stop})
}
 
// load configuration from external file
object Configuration {
  import com.typesafe.config.ConfigFactory
 
  private val config = ConfigFactory.load
  config.checkValid(ConfigFactory.defaultReference)
 
  val port = config.getInt("ws-server.port")
  val dbname = config.getString("mongo.db")
  val collection = config.getString("mongo.collection")
  val location = config.getString("mongo.location")
}

В этом исходном файле мы видим два объекта. Объект Configuration позволяет нам легко получить доступ к элементам конфигурации из файла application.conf, и объект Boot запустит наш сервер. Комментарии в коде должны в значительной степени объяснить, что происходит, но позвольте мне указать основные моменты:

  1. Мы создаем систему актеров Akka и соединение с нашим экземпляром mongoDB.
  2. Мы определяем актера, которого мы можем зарегистрировать по определенному пути веб-сокета.
  3. Затем мы создаем и запускаем websocketserver и регистрируем путь к только что созданному действующему субъекту.
  4. Наконец мы регистрируем крюк отключения, чтобы очистить все.

Вот и все. Теперь давайте посмотрим на интересную часть кода. Далее следует файл WSServer.scala.

Настройка сервера веб-сокетов

В файле WSServer.scala мы определяем сервер веб-сокетов.

package org.smartjava
 
import akka.actor.{ActorSystem, ActorRef}
import java.net.InetSocketAddress
import org.java_websocket.WebSocket
import org.java_websocket.framing.CloseFrame
import org.java_websocket.handshake.ClientHandshake
import org.java_websocket.server.WebSocketServer
import scala.collection.mutable.Map
import akka.event.Logging
 
/**
 * The WSserver companion objects defines a number of distinct messages sendable by this component
 */
object WSServer {
  sealed trait WSMessage
  case class Message(ws : WebSocket, msg : String) extends WSMessage
  case class Open(ws : WebSocket, hs : ClientHandshake) extends WSMessage
  case class Close(ws : WebSocket, code : Int, reason : String, external : Boolean) 
                                                                                                         extends WSMessage
  case class Error(ws : WebSocket, ex : Exception) extends WSMessage
}
 
/**
 * Create a websocket server that listens on a specific address.
 *
 * @param port
 */
class WSServer(val port : Int)(implicit system : ActorSystem, db: DB ) 
                             extends WebSocketServer(new InetSocketAddress(port)) {
 
  // maps the path to a specific actor.
  private val reactors = Map[String, ActorRef]()
  // setup some logging based on the implicit passed in actorsystem
  private val log = Logging.getLogger(system, this);
 
  // Call this function to bind an actor to a specific path. All incoming
  // connections to a specific path will be routed to that specific actor.
  final def forResource(descriptor : String, reactor : Option
  
   ) {
    log.debug("Registring actor:" + reactor + " to " + descriptor);
    reactor match {
      case Some(actor) => reactors += ((descriptor, actor))
      case None => reactors -= descriptor
    }
  }
 
  // onMessage is called when a websocket message is recieved.
  // in this method we check whether we can find a listening
  // actor and forward the call to that.
  final override def onMessage(ws : WebSocket, msg : String) {
 
    if (null != ws) {
      reactors.get(ws.getResourceDescriptor) match {
        case Some(actor) => actor ! WSServer.Message(ws, msg)
        case None => ws.close(CloseFrame.REFUSE)
      }
    }
  }
 
  final override def onOpen(ws : WebSocket, hs : ClientHandshake) {
    log.debug("OnOpen called {} :: {}", ws, hs);
    if (null != ws) {
      reactors.get(ws.getResourceDescriptor) match {
        case Some(actor) => actor ! WSServer.Open(ws, hs)
        case None => ws.close(CloseFrame.REFUSE)
      }
    }
  }
 
  final override def onClose(ws : WebSocket, code : Int, reason : String, external : Boolean) {
    log.debug("Close called {} :: {} :: {} :: {}", ws, code, reason, external);
    if (null != ws) {
      reactors.get(ws.getResourceDescriptor) match {
        case Some(actor) => actor ! WSServer.Close(ws, code, reason, external)
        case None => ws.close(CloseFrame.REFUSE)
      }
    }
  }
  final override def onError(ws : WebSocket, ex : Exception) {
    log.debug("onError called {} :: {}", ws, ex);
    if (null != ws) {
      reactors.get(ws.getResourceDescriptor) match {
        case Some(actor) => actor ! WSServer.Error(ws, ex)
        case None => ws.close(CloseFrame.REFUSE)
      }
    }
  }
}
  

Большой исходный файл, но не сложный для понимания. Позвольте мне объяснить основные понятия:

  1. Сначала мы определим количество сообщений как классы дел. Это сообщения, которые мы отправили нашим актерам. Они отражают сообщения, которые наш сервер веб-сокетов может получать от клиента.
  2. Сам WSServer происходит от WebSocketServer, предоставляемого библиотекой org.java_websocket.
  3. WSServer определяет одну дополнительную функцию, которая называется forResource. С помощью этой функции мы определяем, какого актера вызывать при получении сообщения на нашем сервере веб-сокетов.
  4. и, наконец, мы переопределяем различные методы *, которые вызываются, когда на нашем сервере веб-сокетов происходит определенное событие.

Теперь давайте посмотрим на функциональность эха

Акко эхо актер

Актер-эхо имеет две роли в этом сценарии. Во-первых, он предоставляет возможность отвечать на входящие сообщения, отвечая тем же сообщением. Кроме того, он также создает дочерний актер (с именем ListenActor), который обрабатывает документы, полученные от mongoDB.

object EchoActor {
 
  // Messages send specifically by this actor to another instance of this actor.
  sealed trait EchoMessage
 
  case class Unregister(ws : WebSocket) extends EchoMessage
  case class Listen() extends EchoMessage;
  case class StopListening() extends EchoMessage
 
  def props(db: DB): Props = Props(new EchoActor(db))
}
 
/**
 * Actor that handles the websocket request
 */
class EchoActor(db: DB) extends Actor with ActorLogging {
  import EchoActor._
 
  val clients = mutable.ListBuffer[WebSocket]()
  val socketActorMapping = mutable.Map[WebSocket, ActorRef]()
 
  override def receive = {
 
    // receive the open request
    case Open(ws, hs) => {
      log.debug("Received open request. Start listening for ", ws)
      clients += ws
 
      // create the child actor that handles the db listening
      val targetActor = context.actorOf(ListenActor.props(ws, db));
 
      socketActorMapping(ws) = targetActor;
      targetActor ! Listen
    }
 
    // recieve the close request
    case Close(ws, code, reason, ext) => {
      log.debug("Received close request. Unregisting actor for url {}", ws.getResourceDescriptor)
 
      // send a message to self to unregister
      self ! Unregister(ws)
      socketActorMapping(ws) ! StopListening
      socketActorMapping remove ws;
    }
 
    // recieves an error message
    case Error(ws, ex) => self ! Unregister(ws)
 
    // receives a text message
    case Message(ws, msg) => {
      log.debug("url {} received msg '{}'", ws.getResourceDescriptor, msg)
      ws.send("You send:" + msg);
    }
 
    // unregister the websocket listener
    case Unregister(ws) => {
      if (null != ws) {
        log.debug("unregister monitor")
        clients -= ws
      }
    }
  }
}

Код этого актера в значительной степени должен объяснить сам. Благодаря этому действующему субъекту и коду мы получили простой сервер веб-сокетов, который использует субъект для обработки сообщений. Прежде чем мы посмотрим на ListenActor, который запускается из сообщения «Open», полученного EchoHandler, давайте посмотрим, как мы подключаемся к mongoDB из нашего объекта DB:

package org.smartjava;
 
import play.api.libs.iteratee.{Concurrent, Enumeratee, Iteratee}
import reactivemongo.api.collections.default.BSONCollection
import reactivemongo.api._
import reactivemongo.bson.BSONDocument
import scala.concurrent.ExecutionContext.Implicits.global
 
/**
 * Contains DB related functions.
 */
class DB(location:String, dbname:String)  {
 
  // get connection to the database
  val db: DefaultDB = createConnection(location, dbname)
  // create a enumerator that we use to broadcast received documents
  val (bcEnumerator, channel) = Concurrent.broadcast[BSONDocument]
  // assign the channel to the mongodb cursor enumerator
  val iteratee = createCursor(getCollection(Configuration.collection))
                    .enumerate()
                    .apply(Iteratee
                        .foreach({doc: BSONDocument => channel.push(doc)}));
 
  /**
   * Return a simple collection
   */
  private def getCollection(collection: String): BSONCollection = {
    db(collection)
  }
 
  /**
   * Create the connection
   */
  private def createConnection(location: String, dbname: String)  : DefaultDB = {
    // needed to connect to mongoDB.
    import scala.concurrent.ExecutionContext
 
    // gets an instance of the driver
    // (creates an actor system)
    val driver = new MongoDriver
    val connection = driver.connection(List(location))
 
    // Gets a reference to the database
    connection(dbname)
  }
 
  /**
   * Create the cursor
   */
  private def createCursor(collection: BSONCollection): Cursor[BSONDocument] = {
    import reactivemongo.api._
    import reactivemongo.bson._
    import scala.concurrent.Future
 
    import scala.concurrent.ExecutionContext.Implicits.global
 
    val query = BSONDocument(
      "currentDate" -> BSONDocument(
        "$gte" -> BSONDateTime(System.currentTimeMillis())
      ));
 
    // we enumerate over a capped collection
    val cursor  = collection.find(query)
      .options(QueryOpts().tailable.awaitData)
      .cursor[BSONDocument]
 
    return cursor
  }
 
  /**
   * Simple function that registers a callback and a predicate on the
   * broadcasting enumerator
   */
  def listenToCollection(f: BSONDocument => Unit,
                         p: BSONDocument => Boolean ) = {
 
    val it = Iteratee.foreach(f)
    val itTransformed = Enumeratee.takeWhile[BSONDocument](p).transform(it);
    bcEnumerator.apply(itTransformed);
  }
}

Большая часть этого кода довольно стандартна, но я бы хотел отметить пару вещей. В начале этого класса мы настроили итератора следующим образом:

  val db: DefaultDB = createConnection(location, dbname)
  val (bcEnumerator, channel) = Concurrent.broadcast[BSONDocument]
  val iteratee = createCursor(getCollection(Configuration.collection))
                    .enumerate()
                    .apply(Iteratee
                        .foreach({doc: BSONDocument => channel.push(doc)}));

Что мы делаем здесь, так это то, что мы сначала создаем широковещательный перечислитель, используя функцию Concurrent.broadcast Этот перечислитель может передавать элементы, предоставленные каналом, нескольким потребителям (итераторам). Затем мы создаем итератор на перечислителе, предоставляемом нашим курсором ReactiveMongo, где мы используем только что созданный канал для передачи документов любому итератору, подключенному к bcEnumerator.
Мы подключаем итераторов к bcEnumerator в функции listenToCollection:

  def listenToCollection(f: BSONDocument => Unit,
                         p: BSONDocument => Boolean ) = {
 
    val it = Iteratee.foreach(f)
    val itTransformed = Enumeratee.takeWhile[BSONDocument](p).transform(it);
    bcEnumerator.apply(itTransformed);
  }

In this function we pass in a function and a predicate. The function is executed whenever a document is added to mongo and the predicate is used to determine when to stop sending messages to the iteratee.

The only missing part is the ListenActor

ListenActor which responds to messages from Mongo

The following code shows the actor responsible for responding to messages from mongoDB. When it receives a Listen message it registers itself using the listenToCollection function. Whenever a message is passed in from mongo it sends a message to itself, to further propogate it to the websocket.

object ListenActor {
  case class ReceiveUpdate(msg: String);
  def props(ws: WebSocket, db: DB): Props = Props(new ListenActor(ws, db))
}
class ListenActor(ws: WebSocket, db: DB) extends Actor with ActorLogging {
 
  var predicateResult = true;
 
  override def receive = {
    case Listen => {
 
      log.info("{} , {} , {}", ws, db)
 
      // function to call when we receive a message from the reactive mongo
      // we pass this to the DB cursor
      val func = ( doc: BSONDocument) => {
        self ! ReceiveUpdate(BSONDocument.pretty(doc));
      }
 
      // the predicate that determines how long we want to retrieve stuff
      // we do this while the predicateResult is true.
      val predicate = (d: BSONDocument) => {predicateResult} :Boolean
      Some(db.listenToCollection(func, predicate))
    }
 
    // when we recieve an update we just send it over the websocket
    case ReceiveUpdate(msg) => {
      ws.send(msg);
    }
 
    case StopListening => {
      predicateResult = false;
 
      // and kill ourselves
      self ! PoisonPill
    }
  }
}

Now that we’ve done all that, we can run this example. On startup you’ll see something like this:

[DEBUG] [11/22/2014 15:14:33.856] [main] [EventStream(akka://ws-system)] logger log1-Logging$DefaultLogger started
[DEBUG] [11/22/2014 15:14:33.857] [main] [EventStream(akka://ws-system)] Default Loggers started
[DEBUG] [11/22/2014 15:14:35.104] [main] [WSServer(akka://ws-system)] Registring actor:Some(Actor[akka://ws-system/user/echo#1509664759]) to /echo
15:14:35.211 [reactivemongo-akka.actor.default-dispatcher-5] INFO  reactivemongo.core.actors.MongoDBSystem - The node set is now available
15:14:35.214 [reactivemongo-akka.actor.default-dispatcher-5] INFO  reactivemongo.core.actors.MongoDBSystem - The primary is now available

Next when we connect a websocket we see the following:
Screen Shot 2014-11-22 at 15.15.26.png

DEBUG] [11/22/2014 15:15:18.957] [WebSocketWorker-32] [WSServer(akka://ws-system)] OnOpen called org.java_websocket.WebSocketImpl@3161f479 :: org.java_websocket.handshake.HandshakeImpl1Client@6d9a6e19
[DEBUG] [11/22/2014 15:15:18.965] [ws-system-akka.actor.default-dispatcher-2] [akka://ws-system/user/echo] Received open request. Start listening for  WARNING arguments left: 1
[INFO] [11/22/2014 15:15:18.973] [ws-system-akka.actor.default-dispatcher-5] [akka://ws-system/user/echo/$a] org.java_websocket.WebSocketImpl@3161f479 , org.smartjava.DB@73fd64

Now lets insert a message into the mongo collection which we created with the following command:

db.createCollection( "rmongo", { capped: true, size: 100000 } )

And lets insert an message:

> db.rmongo.insert({"test": 1234567, "currentDate": new Date()})
WriteResult({ "nInserted" : 1 })

Which results in this in our websocket client:
Screen Shot 2014-11-22 at 15.17.55_0.png

If you’re interested in the source files look at the following directory in GitHub:[a href=»https://github.com/josdirksen/smartjava/tree/master/ws-akka» style=»color: rgb(34, 98, 164); outline-style: none; text-decoration: none;»]https://github.com/josdirksen/smartjava/tree/master/ws-akka