В этой статье мы увидим, как создать приложение pub / sub (обмен сообщениями, чат, уведомление), и оно полностью основано на MongoDB (без какого-либо брокера сообщений, такого как RabbitMQ, JMS,…).
Итак, что нужно сделать, чтобы достичь такой вещи:
- Приложение «опубликовать» сообщение. В нашем случае мы просто сохраняем документ в MongoDB
- другое приложение или поток подпишутся на эти события и получат сообщение автоматически. В нашем случае это означает, что приложение должно автоматически получать вновь созданный документ из MongoDB
Все это возможно с некоторыми очень интересными функциями MongoDB: ограниченные коллекции и настраиваемые курсоры .
Закрытые коллекции и настраиваемые курсоры
Как видно из документации, ограниченные коллекции — это коллекции фиксированного размера, которые работают аналогично циклическим буферам: как только коллекция заполняет выделенное пространство, она освобождает место для новых документов, перезаписывая самые старые документы.
Запрошенные коллекции MongoDB можно запрашивать с помощью настраиваемых курсоров, которые аналогичны команде unix tail -f. Ваше приложение продолжает извлекать документы по мере их вставки в коллекцию. Мне также нравится называть это «непрерывным запросом». Теперь, когда мы увидели основы, давайте реализуем это.
Создание очень простого приложения
Создать коллекцию
Первое, что нужно сделать, это создать новую ограниченную коллекцию:
| 1 2 3 4 5 6 7 8 9 | $> mongouse chatdb.messages.drop()db.createCollection('messages', { capped: true, size: 10000})db.messages.insert({"type":"init"}); | 
Для простоты я использую оболочку MongoDB для создания коллекции сообщений в базе данных чата.
Вы можете увидеть в строке № 7, как создать ограниченную коллекцию, с двумя вариантами:
- ограничено: правда: это очевидно
- размер: 10000: это обязательный параметр при создании ограниченной коллекции. Это максимальный размер в байтах. (будет повышено до кратного 256)
Наконец, в строке № 9 я вставляю фиктивный документ, это также обязательно, чтобы можно было настроить настраиваемый курсор.
Написать заявку
Теперь, когда у нас есть коллекция, давайте напишем некоторый код. Сначала в node.js :
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 | var mongo = require("mongodb");var mongodbUri = "mongodb://127.0.0.1/chat";mongo.MongoClient.connect (mongodbUri, function (err, db) {  db.collection('messages', function(err, collection) {    // open a tailable cursor    console.log("== open tailable cursor");    collection.find({}, {tailable:true, awaitdata:true, numberOfRetries:-1})                      .sort({ $natural: 1})                      .each(function(err, doc) {      console.log(doc);    })  });}); | 
Из строк с 1 по 5 я просто подключаюсь к своему локальному экземпляру MongoDB.
Затем в строке № 7 я получаю коллекцию сообщений.
И в строке # 10 я выполняю поиск, используя настраиваемый курсор, используя определенные опции:
- {}: без фильтра, поэтому все документы будут возвращены
- tailable: true: ясно, что мы хотим создать настраиваемый курсор
- awaitdata: true: сказать, что мы ждем данных, прежде чем возвращать данные клиенту
- numberOfRetries: -1: количество повторных попыток по тайм-ауту, -1 бесконечно, поэтому приложение будет продолжать попытки
Строка # 11 просто принудительно производит сортировку в естественном порядке, затем на строке # 12 курсор возвращает данные, и документ печатается в консоли каждый раз, когда он вставляется.
Протестируйте приложение
Запустите приложение:
| 1 | node app.js | 
Вставьте документы в коллекцию сообщений, из оболочки или любого другого инструмента. Ниже вы можете увидеть скринкаст, демонстрирующий работу этого базового приложения:
Исходный код этого примера приложения в этом репозитории Github, выполните ветку step-01; клонировать эту ветку, используя:
| 1 | git clone -b step-01 https://github.com/tgrall/mongodb-realtime-pubsub.git | 
Я также создал суть, показывающую то же поведение в Java :
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | packageorg.mongodb.demos.tailable;importcom.mongodb.*;publicclassMyApp {    publicstaticvoidmain(String[] args) throwsException {        MongoClient mongoClient = newMongoClient();        DBCollection coll = mongoClient.getDB("chat").getCollection("messages");        DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get())                .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);        System.out.println("== open cursor ==");        Runnable task = () -> {            System.out.println("\tWaiting for events");            while(cur.hasNext()) {                DBObject obj = cur.next();                System.out.println( obj );            }        };        newThread(task).start();            }    } | 
Матье Анселен написал это в Scala :
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | packageorg.mongodb.demos.tailableimportreactivemongo.api._importreactivemongo.bson._importplay.api.libs.iteratee.Iterateeimportscala.concurrent.ExecutionContext.Implicits.globalimportreactivemongo.api.collections.default.BSONCollectionobject Capped extendsApp {  val driver = newMongoDriver  val connection = driver.connection(List("localhost"))  val db = connection("chat")  val collection = db.collection[BSONCollection]("messages")  val cursor = collection        .find(BSONDocument())          .options(QueryOpts().tailable.awaitData)            .cursor[BSONDocument]  println("== open tailable cursor")    cursor.enumerate().apply(Iteratee.foreach { doc =>    println(s"Document inserted: ${BSONDocument.pretty(doc)}")  })} | 
Добавьте некоторый пользовательский интерфейс
У нас есть основы приложения на основе подписки:
- опубликовать, вставив документ в MongoDB
- подписаться, читая документ с помощью настраиваемого курсора
Теперь давайте отправим сообщения пользователю, используя, например, socket.io. Для этого нам необходимо:
- добавить зависимость socket.io в наш проект узла
- добавить страницу HTML, чтобы показать сообщения
В следующих списках показана обновленная версия app.js и index.html, давайте посмотрим:
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | "use strict";var mongo = require("mongodb"),    fs = require("fs"),         // to read static files    io = require("socket.io"),  // socket io server    http = require("http");var mongodbUri = "mongodb://127.0.0.1/chat";var app = http.createServer(handler);io = io.listen(app);app.listen(3000);console.log("http server on port 3000");function handler(req, res){  fs.readFile(__dirname + "/index.html",  function (err, data) {    res.writeHead(200);    res.end(data);  });}mongo.MongoClient.connect (mongodbUri, function (err, db) {  db.collection('messages', function(err, collection) {    // open socket    io.sockets.on("connection", function (socket) {      // open a tailable cursor      console.log("== open tailable cursor");      collection.find({}, {tailable:true, awaitdata:true, numberOfRetries:-1}).sort({ $natural: 1}).each(function(err, doc) {        console.log(doc);        // send message to client        if(doc.type == "message") {          socket.emit("message",doc);        }      })    });  });}); | 
Приложение узла обновлено следующими функциями:
- строки № 4-7: импорт http, файловой системы и socket.io
- Строки # 10-21: настроить и запустить http-сервер. Вы можете видеть, что я создал простой обработчик для обслуживания статического файла HTML
- Строки # 28-39: я добавил поддержку веб-сокета, используя socket.io, где я открываю настраиваемый курсор и нажимаю / выпускаю сообщения в сокете.
Как видите, код, который я добавил, прост. Я не использую какой-либо продвинутый фреймворк и не управляю исключениями, это для простоты и удобочитаемости.
Давайте теперь посмотрим на клиента (html page).
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | <!doctypehtml><html><head>  <title>MongoDB pub/sub</title>  <style>  * { margin: 0; padding: 10px; box-sizing: border-box; }  body { font: 13px Helvetica, Arial; }  #messages { list-style-type: none; margin: 0; padding: 0; }  #messages li { padding: 5px 10px; }  #messages li:nth-child(odd) { background: #eee; }  </style></head><body>  <h2>MongoDB/Socket.io demonstration</h2>  <ulid="messages"></ul>  <script>  var socket = io();  socket.on('message', function(doc){    $('#messages').append($('<li>').text(doc.text));  });  </script></body></html> | 
Как и сервер, он действительно прост и не использует никаких расширенных библиотек, кроме socket.io client (строка № 18) и JQuery (строка № 19), и использует:
- в строке № 22 к полученным сообщениям и распечатайте их на странице, используя JQuery в строке № 23
Я создал скринкаст этой версии приложения:
Вы можете найти исходный код в этом репозитории Github, перейдите по ветке step-02; клонировать эту ветку, используя:
git clone -b step-02 https://github.com/tgrall/mongodb-realtime-pubsub.git
Вывод
В этом первом посте мы имеем:
- узнал о настраиваемом курсоре и закрытой коллекции
- посмотрите, как его можно использовать для разработки приложения паб / суб
- выставить это в простое приложение на основе веб-сокета
| Ссылка: | Как создать приложение pub / sub с MongoDB? Введение от нашего партнера JCG Tugdual Grall в блоге Tug’s Blog . |