В этой статье мы увидим, как создать приложение pub / sub (обмен сообщениями, чат, уведомление), и оно полностью основано на MongoDB (без какого-либо брокера сообщений, такого как RabbitMQ, JMS,…).
Итак, что нужно сделать, чтобы достичь такой вещи:
- Приложение «опубликовать» сообщение. В нашем случае мы просто сохраняем документ в MongoDB
- другое приложение или поток подпишутся на эти события и получат сообщение автоматически. В нашем случае это означает, что приложение должно автоматически получать вновь созданный документ из MongoDB
Все это возможно с некоторыми очень интересными функциями MongoDB: ограниченные коллекции и настраиваемые курсоры .
Закрытые коллекции и настраиваемые курсоры
Как видно из документации, ограниченные коллекции — это коллекции фиксированного размера, которые работают аналогично циклическим буферам: как только коллекция заполняет выделенное пространство, она освобождает место для новых документов, перезаписывая самые старые документы.
Запрошенные коллекции MongoDB можно запрашивать с помощью настраиваемых курсоров, которые аналогичны команде unix tail -f. Ваше приложение продолжает извлекать документы по мере их вставки в коллекцию. Мне также нравится называть это «непрерывным запросом». Теперь, когда мы увидели основы, давайте реализуем это.
Создание очень простого приложения
Создать коллекцию
Первое, что нужно сделать, это создать новую ограниченную коллекцию:
|
1
2
3
4
5
6
7
8
9
|
$> mongouse chatdb.messages.drop()db.createCollection('messages', { capped: true, size: 10000 })db.messages.insert({"type":"init"}); |
Для простоты я использую оболочку MongoDB для создания коллекции сообщений в базе данных чата.
Вы можете увидеть в строке № 7, как создать ограниченную коллекцию, с двумя вариантами:
- ограничено: правда: это очевидно
- размер: 10000: это обязательный параметр при создании ограниченной коллекции. Это максимальный размер в байтах. (будет повышено до кратного 256)
Наконец, в строке № 9 я вставляю фиктивный документ, это также обязательно, чтобы можно было настроить настраиваемый курсор.
Написать заявку
Теперь, когда у нас есть коллекция, давайте напишем некоторый код. Сначала в node.js :
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
var mongo = require("mongodb");var mongodbUri = "mongodb://127.0.0.1/chat";mongo.MongoClient.connect (mongodbUri, function (err, db) { db.collection('messages', function(err, collection) { // open a tailable cursor console.log("== open tailable cursor"); collection.find({}, {tailable:true, awaitdata:true, numberOfRetries:-1}) .sort({ $natural: 1 }) .each(function(err, doc) { console.log(doc); }) });}); |
Из строк с 1 по 5 я просто подключаюсь к своему локальному экземпляру MongoDB.
Затем в строке № 7 я получаю коллекцию сообщений.
И в строке # 10 я выполняю поиск, используя настраиваемый курсор, используя определенные опции:
- {}: без фильтра, поэтому все документы будут возвращены
- tailable: true: ясно, что мы хотим создать настраиваемый курсор
- awaitdata: true: сказать, что мы ждем данных, прежде чем возвращать данные клиенту
- numberOfRetries: -1: количество повторных попыток по тайм-ауту, -1 бесконечно, поэтому приложение будет продолжать попытки
Строка # 11 просто принудительно производит сортировку в естественном порядке, затем на строке # 12 курсор возвращает данные, и документ печатается в консоли каждый раз, когда он вставляется.
Протестируйте приложение
Запустите приложение:
|
1
|
node app.js |
Вставьте документы в коллекцию сообщений, из оболочки или любого другого инструмента. Ниже вы можете увидеть скринкаст, демонстрирующий работу этого базового приложения:
Исходный код этого примера приложения в этом репозитории Github, выполните ветку step-01; клонировать эту ветку, используя:
|
1
|
git clone -b step-01 https://github.com/tgrall/mongodb-realtime-pubsub.git |
Я также создал суть, показывающую то же поведение в Java :
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
package org.mongodb.demos.tailable;import com.mongodb.*;public class MyApp { public static void main(String[] args) throws Exception { MongoClient mongoClient = new MongoClient(); DBCollection coll = mongoClient.getDB("chat").getCollection("messages"); DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get()) .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA); System.out.println("== open cursor =="); Runnable task = () -> { System.out.println("\tWaiting for events"); while (cur.hasNext()) { DBObject obj = cur.next(); System.out.println( obj ); } }; new Thread(task).start(); } } |
Матье Анселен написал это в Scala :
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
package org.mongodb.demos.tailableimport reactivemongo.api._import reactivemongo.bson._import play.api.libs.iteratee.Iterateeimport scala.concurrent.ExecutionContext.Implicits.globalimport reactivemongo.api.collections.default.BSONCollectionobject Capped extends App { val driver = new MongoDriver val connection = driver.connection(List("localhost")) val db = connection("chat") val collection = db.collection[BSONCollection]("messages") val cursor = collection .find(BSONDocument()) .options(QueryOpts().tailable.awaitData) .cursor[BSONDocument] println("== open tailable cursor") cursor.enumerate().apply(Iteratee.foreach { doc => println(s"Document inserted: ${BSONDocument.pretty(doc)}") })} |
Добавьте некоторый пользовательский интерфейс
У нас есть основы приложения на основе подписки:
- опубликовать, вставив документ в MongoDB
- подписаться, читая документ с помощью настраиваемого курсора
Теперь давайте отправим сообщения пользователю, используя, например, socket.io. Для этого нам необходимо:
- добавить зависимость socket.io в наш проект узла
- добавить страницу HTML, чтобы показать сообщения
В следующих списках показана обновленная версия app.js и index.html, давайте посмотрим:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
"use strict";var mongo = require("mongodb"), fs = require("fs"), // to read static files io = require("socket.io"), // socket io server http = require("http");var mongodbUri = "mongodb://127.0.0.1/chat";var app = http.createServer(handler);io = io.listen(app);app.listen(3000);console.log("http server on port 3000");function handler(req, res){ fs.readFile(__dirname + "/index.html", function (err, data) { res.writeHead(200); res.end(data); });}mongo.MongoClient.connect (mongodbUri, function (err, db) { db.collection('messages', function(err, collection) { // open socket io.sockets.on("connection", function (socket) { // open a tailable cursor console.log("== open tailable cursor"); collection.find({}, {tailable:true, awaitdata:true, numberOfRetries:-1}).sort({ $natural: 1 }).each(function(err, doc) { console.log(doc); // send message to client if (doc.type == "message") { socket.emit("message",doc); } }) }); });}); |
Приложение узла обновлено следующими функциями:
- строки № 4-7: импорт http, файловой системы и socket.io
- Строки # 10-21: настроить и запустить http-сервер. Вы можете видеть, что я создал простой обработчик для обслуживания статического файла HTML
- Строки # 28-39: я добавил поддержку веб-сокета, используя socket.io, где я открываю настраиваемый курсор и нажимаю / выпускаю сообщения в сокете.
Как видите, код, который я добавил, прост. Я не использую какой-либо продвинутый фреймворк и не управляю исключениями, это для простоты и удобочитаемости.
Давайте теперь посмотрим на клиента (html page).
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
<!doctype html><html><head> <title>MongoDB pub/sub</title> <style> * { margin: 0; padding: 10px; box-sizing: border-box; } body { font: 13px Helvetica, Arial; } #messages { list-style-type: none; margin: 0; padding: 0; } #messages li { padding: 5px 10px; } #messages li:nth-child(odd) { background: #eee; } </style></head><body> <h2>MongoDB/Socket.io demonstration</h2> <ul id="messages"></ul> <script> var socket = io(); socket.on('message', function(doc){ $('#messages').append($('<li>').text(doc.text)); }); </script></body></html> |
Как и сервер, он действительно прост и не использует никаких расширенных библиотек, кроме socket.io client (строка № 18) и JQuery (строка № 19), и использует:
- в строке № 22 к полученным сообщениям и распечатайте их на странице, используя JQuery в строке № 23
Я создал скринкаст этой версии приложения:
Вы можете найти исходный код в этом репозитории Github, перейдите по ветке step-02; клонировать эту ветку, используя:
git clone -b step-02 https://github.com/tgrall/mongodb-realtime-pubsub.git
Вывод
В этом первом посте мы имеем:
- узнал о настраиваемом курсоре и закрытой коллекции
- посмотрите, как его можно использовать для разработки приложения паб / суб
- выставить это в простое приложение на основе веб-сокета
| Ссылка: | Как создать приложение pub / sub с MongoDB? Введение от нашего партнера JCG Tugdual Grall в блоге Tug’s Blog . |