В этой статье мы увидим, как создать приложение pub / sub (обмен сообщениями, чат, уведомление), и оно полностью основано на MongoDB (без какого-либо брокера сообщений, такого как RabbitMQ, JMS,…).
Итак, что нужно сделать, чтобы достичь такой вещи:
- Приложение «опубликовать» сообщение. В нашем случае мы просто сохраняем документ в MongoDB
- другое приложение или поток подпишутся на эти события и получат сообщение автоматически. В нашем случае это означает, что приложение должно автоматически получать вновь созданный документ из MongoDB
Все это возможно с некоторыми очень интересными функциями MongoDB: ограниченные коллекции и настраиваемые курсоры .
Закрытые коллекции и настраиваемые курсоры
Как видно из документации, ограниченные коллекции — это коллекции фиксированного размера, которые работают аналогично циклическим буферам: как только коллекция заполняет выделенное пространство, она освобождает место для новых документов, перезаписывая самые старые документы.
Запрошенные коллекции MongoDB можно запрашивать с помощью настраиваемых курсоров, которые аналогичны команде unix tail -f. Ваше приложение продолжает извлекать документы по мере их вставки в коллекцию. Мне также нравится называть это «непрерывным запросом». Теперь, когда мы увидели основы, давайте реализуем это.
Создание очень простого приложения
Создать коллекцию
Первое, что нужно сделать, это создать новую ограниченную коллекцию:
1
2
3
4
5
6
7
8
9
|
$> mongo use chat db.messages.drop() db.createCollection( 'messages' , { capped: true , size: 10000 }) db.messages.insert({ "type" : "init" }); |
Для простоты я использую оболочку MongoDB для создания коллекции сообщений в базе данных чата.
Вы можете увидеть в строке № 7, как создать ограниченную коллекцию, с двумя вариантами:
- ограничено: правда: это очевидно
- размер: 10000: это обязательный параметр при создании ограниченной коллекции. Это максимальный размер в байтах. (будет повышено до кратного 256)
Наконец, в строке № 9 я вставляю фиктивный документ, это также обязательно, чтобы можно было настроить настраиваемый курсор.
Написать заявку
Теперь, когда у нас есть коллекция, давайте напишем некоторый код. Сначала в node.js :
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
var mongo = require( "mongodb" ); var mongodbUri = "mongodb://127.0.0.1/chat" ; mongo.MongoClient.connect (mongodbUri, function (err, db) { db.collection( 'messages' , function(err, collection) { // open a tailable cursor console.log( "== open tailable cursor" ); collection.find({}, {tailable: true , awaitdata: true , numberOfRetries:- 1 }) .sort({ $natural: 1 }) .each(function(err, doc) { console.log(doc); }) }); }); |
Из строк с 1 по 5 я просто подключаюсь к своему локальному экземпляру MongoDB.
Затем в строке № 7 я получаю коллекцию сообщений.
И в строке # 10 я выполняю поиск, используя настраиваемый курсор, используя определенные опции:
- {}: без фильтра, поэтому все документы будут возвращены
- tailable: true: ясно, что мы хотим создать настраиваемый курсор
- awaitdata: true: сказать, что мы ждем данных, прежде чем возвращать данные клиенту
- numberOfRetries: -1: количество повторных попыток по тайм-ауту, -1 бесконечно, поэтому приложение будет продолжать попытки
Строка # 11 просто принудительно производит сортировку в естественном порядке, затем на строке # 12 курсор возвращает данные, и документ печатается в консоли каждый раз, когда он вставляется.
Протестируйте приложение
Запустите приложение:
1
|
node app.js |
Вставьте документы в коллекцию сообщений, из оболочки или любого другого инструмента. Ниже вы можете увидеть скринкаст, демонстрирующий работу этого базового приложения:
Исходный код этого примера приложения в этом репозитории Github, выполните ветку step-01; клонировать эту ветку, используя:
1
|
git clone -b step-01 https: //github .com /tgrall/mongodb-realtime-pubsub .git |
Я также создал суть, показывающую то же поведение в Java :
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
package org.mongodb.demos.tailable; import com.mongodb.*; public class MyApp { public static void main(String[] args) throws Exception { MongoClient mongoClient = new MongoClient(); DBCollection coll = mongoClient.getDB( "chat" ).getCollection( "messages" ); DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start( "$natural" , 1 ).get()) .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA); System.out.println( "== open cursor ==" ); Runnable task = () -> { System.out.println( "\tWaiting for events" ); while (cur.hasNext()) { DBObject obj = cur.next(); System.out.println( obj ); } }; new Thread(task).start(); } } |
Матье Анселен написал это в Scala :
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
package org.mongodb.demos.tailable import reactivemongo.api._ import reactivemongo.bson._ import play.api.libs.iteratee.Iteratee import scala.concurrent.ExecutionContext.Implicits.global import reactivemongo.api.collections. default .BSONCollection object Capped extends App { val driver = new MongoDriver val connection = driver.connection(List( "localhost" )) val db = connection( "chat" ) val collection = db.collection[BSONCollection]( "messages" ) val cursor = collection .find(BSONDocument()) .options(QueryOpts().tailable.awaitData) .cursor[BSONDocument] println( "== open tailable cursor" ) cursor.enumerate().apply(Iteratee.foreach { doc => println(s "Document inserted: ${BSONDocument.pretty(doc)}" ) }) } |
Добавьте некоторый пользовательский интерфейс
У нас есть основы приложения на основе подписки:
- опубликовать, вставив документ в MongoDB
- подписаться, читая документ с помощью настраиваемого курсора
Теперь давайте отправим сообщения пользователю, используя, например, socket.io. Для этого нам необходимо:
- добавить зависимость socket.io в наш проект узла
- добавить страницу HTML, чтобы показать сообщения
В следующих списках показана обновленная версия app.js и index.html, давайте посмотрим:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
"use strict" ; var mongo = require( "mongodb" ), fs = require( "fs" ), // to read static files io = require( "socket.io" ), // socket io server http = require( "http" ); var mongodbUri = "mongodb://127.0.0.1/chat" ; var app = http.createServer(handler); io = io.listen(app); app.listen( 3000 ); console.log( "http server on port 3000" ); function handler(req, res){ fs.readFile(__dirname + "/index.html" , function (err, data) { res.writeHead( 200 ); res.end(data); }); } mongo.MongoClient.connect (mongodbUri, function (err, db) { db.collection( 'messages' , function(err, collection) { // open socket io.sockets.on( "connection" , function (socket) { // open a tailable cursor console.log( "== open tailable cursor" ); collection.find({}, {tailable: true , awaitdata: true , numberOfRetries:- 1 }).sort({ $natural: 1 }).each(function(err, doc) { console.log(doc); // send message to client if (doc.type == "message" ) { socket.emit( "message" ,doc); } }) }); }); }); |
Приложение узла обновлено следующими функциями:
- строки № 4-7: импорт http, файловой системы и socket.io
- Строки # 10-21: настроить и запустить http-сервер. Вы можете видеть, что я создал простой обработчик для обслуживания статического файла HTML
- Строки # 28-39: я добавил поддержку веб-сокета, используя socket.io, где я открываю настраиваемый курсор и нажимаю / выпускаю сообщения в сокете.
Как видите, код, который я добавил, прост. Я не использую какой-либо продвинутый фреймворк и не управляю исключениями, это для простоты и удобочитаемости.
Давайте теперь посмотрим на клиента (html page).
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
<! doctype html> < html > < head > < title >MongoDB pub/sub</ title > < style > * { margin: 0; padding: 10px; box-sizing: border-box; } body { font: 13px Helvetica, Arial; } #messages { list-style-type: none; margin: 0; padding: 0; } #messages li { padding: 5px 10px; } #messages li:nth-child(odd) { background: #eee; } </ style > </ head > < body > < h2 >MongoDB/Socket.io demonstration</ h2 > < ul id = "messages" ></ ul > < script > var socket = io(); socket.on('message', function(doc){ $('#messages').append($('< li >').text(doc.text)); }); </ script > </ body > </ html > |
Как и сервер, он действительно прост и не использует никаких расширенных библиотек, кроме socket.io client (строка № 18) и JQuery (строка № 19), и использует:
- в строке № 22 к полученным сообщениям и распечатайте их на странице, используя JQuery в строке № 23
Я создал скринкаст этой версии приложения:
Вы можете найти исходный код в этом репозитории Github, перейдите по ветке step-02; клонировать эту ветку, используя:
git clone -b step-02 https://github.com/tgrall/mongodb-realtime-pubsub.git
Вывод
В этом первом посте мы имеем:
- узнал о настраиваемом курсоре и закрытой коллекции
- посмотрите, как его можно использовать для разработки приложения паб / суб
- выставить это в простое приложение на основе веб-сокета
Ссылка: | Как создать приложение pub / sub с MongoDB? Введение от нашего партнера JCG Tugdual Grall в блоге Tug’s Blog . |