Я искал простой сервер веб-сокетов для одного из моих проектов, чтобы проверить некоторые вещи с реактивным монго . Однако, оглядываясь по сторонам, я не мог найти простую базовую реализацию, не включив полную структуру. Наконец я наткнулся на один из проектов активаторов Typesage: http://typesafe.com/activator/template/akka-spray-websocket . Несмотря на то, что название подразумевает, что спрей необходим, на самом деле он использует материал websocket отсюда: https://github.com/TooTallNate/Java-WebSocket , который обеспечивает очень простую в использовании базовую реализацию websocket.
Поэтому в этой статье я покажу вам, как вы можете настроить очень простой сервер веб-сокетов (не требуя дополнительных фреймворков) вместе с Akka и ReactiveMongo. На следующих снимках экрана показано, к чему мы стремимся:
На этом снимке экрана вы видите простой клиент веб-сокета, который общается с нашим сервером. Наш сервер имеет следующие функциональные возможности:
- Все, что отправляет клиент, возвращается назад.
- Любые входные данные, добавленные в конкретную (ограниченную) коллекцию в mongoDB , автоматически передаются всем слушателям.
Вы можете вырезать и вставить весь код из этой статьи, но, вероятно, проще получить код из git. Вы можете найти его в github здесь: https://github.com/josdirksen/smartjava/tree/master/ws-akka
Начиная
Первое, что нам нужно сделать, это настроить наше рабочее пространство, поэтому давайте начнем с рассмотрения конфигурации sbt:
organization := "org.smartjava" version := "0.1" scalaVersion := "2.11.2" scalacOptions := Seq("-unchecked", "-deprecation", "-encoding", "utf8") libraryDependencies ++= { val akkaV = "2.3.6" Seq( "com.typesafe.akka" %% "akka-actor" % akkaV, "org.java-websocket" % "Java-WebSocket" % "1.3.1-SNAPSHOT", "org.reactivemongo" %% "reactivemongo" % "0.10.5.0.akka23" ) } resolvers ++= Seq("Code Envy" at "http://codenvycorp.com/repository/" ,"Typesafe" at "http://repo.typesafe.com/typesafe/releases/")
Здесь нет ничего особенного, мы просто указываем наши зависимости и добавляем некоторые средства распознавания, чтобы sbt знал, откуда извлекать зависимости. Прежде чем мы рассмотрим код, давайте сначала посмотрим на структуру каталогов и файл нашего проекта:
├── build.sbt └── src └── main ├── resources │ ├── application.conf │ └── log4j2.xml └── scala ├── Boot.scala ├── DB.scala ├── WSActor.scala └── WSServer.scala
В каталоге src / main / resources мы храним наши файлы конфигурации, а в src / main / scala мы храним все наши файлы scala. Давайте начнем с просмотра файлов конфигурации. Для этого проекта мы используем два:
Файл Application.conf содержит конфигурацию нашего проекта и выглядит следующим образом:
akka { loglevel = "DEBUG" } mongo { db = "scala" collection = "rmongo" location = "localhost" } ws-server { port = 9999 }
Как видите, мы просто определяем уровень журнала, как использовать mongo и на каком порту мы хотим, чтобы наш сервер веб-сокетов слушал. И нам также нужен файл log4j2.xml, так как библиотека responsetivemongo использует его для ведения журнала:
<?xml version="1.0" encoding="UTF-8"?> <Configuration status="INFO"> <Appenders> <Console name="Console" target="SYSTEM_OUT"> <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> </Console> </Appenders> <Loggers> <Root level="INFO"> <AppenderRef ref="Console"/> </Root> </Loggers> </Configuration>
Итак, со скучными вещами в стороне, давайте посмотрим на файлы scala.
Запуск сервера websocket и регистрация путей
Файл Boot.scala выглядит так:
package org.smartjava import akka.actor.{Props, ActorSystem} /** * This class launches the system. */ object Boot extends App { // create the actor system implicit lazy val system = ActorSystem("ws-system") // setup the mongoreactive connection implicit lazy val db = new DB(Configuration.location, Configuration.dbname); // we'll use a simple actor which echo's everything it finds back to the client. val echo = system.actorOf(EchoActor.props(db, Configuration.collection), "echo") // define the websocket routing and start a websocket listener private val wsServer = new WSServer(Configuration.port) wsServer.forResource("/echo", Some(echo)) wsServer.start // make sure the actor system and the websocket server are shutdown when the client is // shutdown sys.addShutdownHook({system.shutdown;wsServer.stop}) } // load configuration from external file object Configuration { import com.typesafe.config.ConfigFactory private val config = ConfigFactory.load config.checkValid(ConfigFactory.defaultReference) val port = config.getInt("ws-server.port") val dbname = config.getString("mongo.db") val collection = config.getString("mongo.collection") val location = config.getString("mongo.location") }
В этом исходном файле мы видим два объекта. Объект Configuration позволяет нам легко получить доступ к элементам конфигурации из файла application.conf, и объект Boot запустит наш сервер. Комментарии в коде должны в значительной степени объяснить, что происходит, но позвольте мне указать основные моменты:
- Мы создаем систему актеров Akka и соединение с нашим экземпляром mongoDB.
- Мы определяем актера, которого мы можем зарегистрировать по определенному пути веб-сокета.
- Затем мы создаем и запускаем websocketserver и регистрируем путь к только что созданному действующему субъекту.
- Наконец мы регистрируем крюк отключения, чтобы очистить все.
Вот и все. Теперь давайте посмотрим на интересную часть кода. Далее следует файл WSServer.scala.
Настройка сервера веб-сокетов
В файле WSServer.scala мы определяем сервер веб-сокетов.
package org.smartjava import akka.actor.{ActorSystem, ActorRef} import java.net.InetSocketAddress import org.java_websocket.WebSocket import org.java_websocket.framing.CloseFrame import org.java_websocket.handshake.ClientHandshake import org.java_websocket.server.WebSocketServer import scala.collection.mutable.Map import akka.event.Logging /** * The WSserver companion objects defines a number of distinct messages sendable by this component */ object WSServer { sealed trait WSMessage case class Message(ws : WebSocket, msg : String) extends WSMessage case class Open(ws : WebSocket, hs : ClientHandshake) extends WSMessage case class Close(ws : WebSocket, code : Int, reason : String, external : Boolean) extends WSMessage case class Error(ws : WebSocket, ex : Exception) extends WSMessage } /** * Create a websocket server that listens on a specific address. * * @param port */ class WSServer(val port : Int)(implicit system : ActorSystem, db: DB ) extends WebSocketServer(new InetSocketAddress(port)) { // maps the path to a specific actor. private val reactors = Map[String, ActorRef]() // setup some logging based on the implicit passed in actorsystem private val log = Logging.getLogger(system, this); // Call this function to bind an actor to a specific path. All incoming // connections to a specific path will be routed to that specific actor. final def forResource(descriptor : String, reactor : Option) { log.debug("Registring actor:" + reactor + " to " + descriptor); reactor match { case Some(actor) => reactors += ((descriptor, actor)) case None => reactors -= descriptor } } // onMessage is called when a websocket message is recieved. // in this method we check whether we can find a listening // actor and forward the call to that. final override def onMessage(ws : WebSocket, msg : String) { if (null != ws) { reactors.get(ws.getResourceDescriptor) match { case Some(actor) => actor ! WSServer.Message(ws, msg) case None => ws.close(CloseFrame.REFUSE) } } } final override def onOpen(ws : WebSocket, hs : ClientHandshake) { log.debug("OnOpen called {} :: {}", ws, hs); if (null != ws) { reactors.get(ws.getResourceDescriptor) match { case Some(actor) => actor ! WSServer.Open(ws, hs) case None => ws.close(CloseFrame.REFUSE) } } } final override def onClose(ws : WebSocket, code : Int, reason : String, external : Boolean) { log.debug("Close called {} :: {} :: {} :: {}", ws, code, reason, external); if (null != ws) { reactors.get(ws.getResourceDescriptor) match { case Some(actor) => actor ! WSServer.Close(ws, code, reason, external) case None => ws.close(CloseFrame.REFUSE) } } } final override def onError(ws : WebSocket, ex : Exception) { log.debug("onError called {} :: {}", ws, ex); if (null != ws) { reactors.get(ws.getResourceDescriptor) match { case Some(actor) => actor ! WSServer.Error(ws, ex) case None => ws.close(CloseFrame.REFUSE) } } } }
Большой исходный файл, но не сложный для понимания. Позвольте мне объяснить основные понятия:
- Сначала мы определим количество сообщений как классы дел. Это сообщения, которые мы отправили нашим актерам. Они отражают сообщения, которые наш сервер веб-сокетов может получать от клиента.
- Сам WSServer происходит от WebSocketServer, предоставляемого библиотекой org.java_websocket.
- WSServer определяет одну дополнительную функцию, которая называется forResource. С помощью этой функции мы определяем, какого актера вызывать при получении сообщения на нашем сервере веб-сокетов.
- и, наконец, мы переопределяем различные методы *, которые вызываются, когда на нашем сервере веб-сокетов происходит определенное событие.
Теперь давайте посмотрим на функциональность эха
Акко эхо актер
Актер-эхо имеет две роли в этом сценарии. Во-первых, он предоставляет возможность отвечать на входящие сообщения, отвечая тем же сообщением. Кроме того, он также создает дочерний актер (с именем ListenActor), который обрабатывает документы, полученные от mongoDB.
object EchoActor { // Messages send specifically by this actor to another instance of this actor. sealed trait EchoMessage case class Unregister(ws : WebSocket) extends EchoMessage case class Listen() extends EchoMessage; case class StopListening() extends EchoMessage def props(db: DB): Props = Props(new EchoActor(db)) } /** * Actor that handles the websocket request */ class EchoActor(db: DB) extends Actor with ActorLogging { import EchoActor._ val clients = mutable.ListBuffer[WebSocket]() val socketActorMapping = mutable.Map[WebSocket, ActorRef]() override def receive = { // receive the open request case Open(ws, hs) => { log.debug("Received open request. Start listening for ", ws) clients += ws // create the child actor that handles the db listening val targetActor = context.actorOf(ListenActor.props(ws, db)); socketActorMapping(ws) = targetActor; targetActor ! Listen } // recieve the close request case Close(ws, code, reason, ext) => { log.debug("Received close request. Unregisting actor for url {}", ws.getResourceDescriptor) // send a message to self to unregister self ! Unregister(ws) socketActorMapping(ws) ! StopListening socketActorMapping remove ws; } // recieves an error message case Error(ws, ex) => self ! Unregister(ws) // receives a text message case Message(ws, msg) => { log.debug("url {} received msg '{}'", ws.getResourceDescriptor, msg) ws.send("You send:" + msg); } // unregister the websocket listener case Unregister(ws) => { if (null != ws) { log.debug("unregister monitor") clients -= ws } } } }
Код этого актера в значительной степени должен объяснить сам. Благодаря этому действующему субъекту и коду мы получили простой сервер веб-сокетов, который использует субъект для обработки сообщений. Прежде чем мы посмотрим на ListenActor, который запускается из сообщения «Open», полученного EchoHandler, давайте посмотрим, как мы подключаемся к mongoDB из нашего объекта DB:
package org.smartjava; import play.api.libs.iteratee.{Concurrent, Enumeratee, Iteratee} import reactivemongo.api.collections.default.BSONCollection import reactivemongo.api._ import reactivemongo.bson.BSONDocument import scala.concurrent.ExecutionContext.Implicits.global /** * Contains DB related functions. */ class DB(location:String, dbname:String) { // get connection to the database val db: DefaultDB = createConnection(location, dbname) // create a enumerator that we use to broadcast received documents val (bcEnumerator, channel) = Concurrent.broadcast[BSONDocument] // assign the channel to the mongodb cursor enumerator val iteratee = createCursor(getCollection(Configuration.collection)) .enumerate() .apply(Iteratee .foreach({doc: BSONDocument => channel.push(doc)})); /** * Return a simple collection */ private def getCollection(collection: String): BSONCollection = { db(collection) } /** * Create the connection */ private def createConnection(location: String, dbname: String) : DefaultDB = { // needed to connect to mongoDB. import scala.concurrent.ExecutionContext // gets an instance of the driver // (creates an actor system) val driver = new MongoDriver val connection = driver.connection(List(location)) // Gets a reference to the database connection(dbname) } /** * Create the cursor */ private def createCursor(collection: BSONCollection): Cursor[BSONDocument] = { import reactivemongo.api._ import reactivemongo.bson._ import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global val query = BSONDocument( "currentDate" -> BSONDocument( "$gte" -> BSONDateTime(System.currentTimeMillis()) )); // we enumerate over a capped collection val cursor = collection.find(query) .options(QueryOpts().tailable.awaitData) .cursor[BSONDocument] return cursor } /** * Simple function that registers a callback and a predicate on the * broadcasting enumerator */ def listenToCollection(f: BSONDocument => Unit, p: BSONDocument => Boolean ) = { val it = Iteratee.foreach(f) val itTransformed = Enumeratee.takeWhile[BSONDocument](p).transform(it); bcEnumerator.apply(itTransformed); } }
Большая часть этого кода довольно стандартна, но я бы хотел отметить пару вещей. В начале этого класса мы настроили итератора следующим образом:
val db: DefaultDB = createConnection(location, dbname) val (bcEnumerator, channel) = Concurrent.broadcast[BSONDocument] val iteratee = createCursor(getCollection(Configuration.collection)) .enumerate() .apply(Iteratee .foreach({doc: BSONDocument => channel.push(doc)}));
Что мы делаем здесь, так это то, что мы сначала создаем широковещательный перечислитель, используя функцию Concurrent.broadcast Этот перечислитель может передавать элементы, предоставленные каналом, нескольким потребителям (итераторам). Затем мы создаем итератор на перечислителе, предоставляемом нашим курсором ReactiveMongo, где мы используем только что созданный канал для передачи документов любому итератору, подключенному к bcEnumerator.
Мы подключаем итераторов к bcEnumerator в функции listenToCollection:
def listenToCollection(f: BSONDocument => Unit, p: BSONDocument => Boolean ) = { val it = Iteratee.foreach(f) val itTransformed = Enumeratee.takeWhile[BSONDocument](p).transform(it); bcEnumerator.apply(itTransformed); }
In this function we pass in a function and a predicate. The function is executed whenever a document is added to mongo and the predicate is used to determine when to stop sending messages to the iteratee.
The only missing part is the ListenActor
ListenActor which responds to messages from Mongo
The following code shows the actor responsible for responding to messages from mongoDB. When it receives a Listen message it registers itself using the listenToCollection function. Whenever a message is passed in from mongo it sends a message to itself, to further propogate it to the websocket.
object ListenActor { case class ReceiveUpdate(msg: String); def props(ws: WebSocket, db: DB): Props = Props(new ListenActor(ws, db)) } class ListenActor(ws: WebSocket, db: DB) extends Actor with ActorLogging { var predicateResult = true; override def receive = { case Listen => { log.info("{} , {} , {}", ws, db) // function to call when we receive a message from the reactive mongo // we pass this to the DB cursor val func = ( doc: BSONDocument) => { self ! ReceiveUpdate(BSONDocument.pretty(doc)); } // the predicate that determines how long we want to retrieve stuff // we do this while the predicateResult is true. val predicate = (d: BSONDocument) => {predicateResult} :Boolean Some(db.listenToCollection(func, predicate)) } // when we recieve an update we just send it over the websocket case ReceiveUpdate(msg) => { ws.send(msg); } case StopListening => { predicateResult = false; // and kill ourselves self ! PoisonPill } } }
Now that we’ve done all that, we can run this example. On startup you’ll see something like this:
[DEBUG] [11/22/2014 15:14:33.856] [main] [EventStream(akka://ws-system)] logger log1-Logging$DefaultLogger started [DEBUG] [11/22/2014 15:14:33.857] [main] [EventStream(akka://ws-system)] Default Loggers started [DEBUG] [11/22/2014 15:14:35.104] [main] [WSServer(akka://ws-system)] Registring actor:Some(Actor[akka://ws-system/user/echo#1509664759]) to /echo 15:14:35.211 [reactivemongo-akka.actor.default-dispatcher-5] INFO reactivemongo.core.actors.MongoDBSystem - The node set is now available 15:14:35.214 [reactivemongo-akka.actor.default-dispatcher-5] INFO reactivemongo.core.actors.MongoDBSystem - The primary is now available
Next when we connect a websocket we see the following:
DEBUG] [11/22/2014 15:15:18.957] [WebSocketWorker-32] [WSServer(akka://ws-system)] OnOpen called org.java_websocket.WebSocketImpl@3161f479 :: org.java_websocket.handshake.HandshakeImpl1Client@6d9a6e19 [DEBUG] [11/22/2014 15:15:18.965] [ws-system-akka.actor.default-dispatcher-2] [akka://ws-system/user/echo] Received open request. Start listening for WARNING arguments left: 1 [INFO] [11/22/2014 15:15:18.973] [ws-system-akka.actor.default-dispatcher-5] [akka://ws-system/user/echo/$a] org.java_websocket.WebSocketImpl@3161f479 , org.smartjava.DB@73fd64
Now lets insert a message into the mongo collection which we created with the following command:
db.createCollection( "rmongo", { capped: true, size: 100000 } )
And lets insert an message:
> db.rmongo.insert({"test": 1234567, "currentDate": new Date()}) WriteResult({ "nInserted" : 1 })
Which results in this in our websocket client:
If you’re interested in the source files look at the following directory in GitHub:[a href=»https://github.com/josdirksen/smartjava/tree/master/ws-akka» style=»color: rgb(34, 98, 164); outline-style: none; text-decoration: none;»]https://github.com/josdirksen/smartjava/tree/master/ws-akka