Статьи

Потоковое событие с MongoDB

MongoDB — это действительно отличная база данных «NoSQL» с очень широким спектром приложений. В одном проекте, который мы разрабатываем на SoftwareMill , мы использовали его как хранилище реплицированных событий, из которого мы передаем события другим компонентам.

Вступление

Основная идея довольно проста (см. Также статью Мартина Фаулера о Event Sourcing ). Наша система генерирует серию событий. Эти события сохраняются в хранилище событий. Другие компоненты в системе следуют за потоком событий и делают с ними «что-то»; например, они могут быть агрегированы и записаны в базу данных отчетов (с другой стороны, это напоминает CQRS ). Такой подход имеет много преимуществ:

  • чтение и запись событий не связаны (асинхронно)
  • любой следующий компонент может умереть, а затем «догнать», учитывая, что он не был мертв слишком долго
  • может быть несколько последователей. Последователи могут читать данные из подчиненных реплик для лучшей масштабируемости.
  • всплески событийной активности оказывают пониженное влияние на приемники событий; в худшем случае отчеты будут генерироваться медленнее

Ключевым компонентом здесь, конечно, является быстрое и надежное хранение событий. Три основные функции MongoDB, которые мы использовали для реализации одной из них:

  • закрытые коллекции и настраиваемые курсоры
  • быстрая коллекция добавляет
  • наборы реплик

Коллекция

В качестве основы мы используем ограниченную коллекцию , которая по определению ограничена по размеру. Если запись нового события приведет к превышению предела размера коллекции, самые старые события будут перезаписаны. Это дает нам нечто похожее на круговой буфер для событий. (Кроме того, мы также совершенно защищены от ошибок нехватки дискового пространства.)

До версии 2.2 у закрытой коллекции не было поля _id по умолчанию (и, следовательно, нет индекса). Однако, поскольку мы хотели, чтобы события надежно записывались в набор реплик, поле _id и его индекс являются обязательными.

Написание событий

Запись событий — это простая операция вставки Монго; вставки также можно делать партиями. В зависимости от того, насколько мы терпимы к потере события, мы можем использовать различные проблемы записи Монго (например, ожидание подтверждения записи от одного узла или от нескольких узлов).

Все события неизменны. Помимо более приятного, поточно-ориентированного Java-кода, это необходимо для потоковой передачи событий; если бы события были изменяемыми, как бы получатель событий узнал, что было обновлено? Кроме того, это имеет хорошие значения производительности Монго. Поскольку данные никогда не изменяются, документы, которые записываются на диск, никогда не сжимаются и не расширяются, поэтому нет необходимости перемещать блоки на диске. Фактически, в закрытой коллекции Mongo не позволяет вырастить документ, который когда-то был написан.

Чтение событий

Чтение потока событий немного сложнее. Прежде всего, может быть несколько читателей, каждый с разным уровнем продвижения в потоке. Во-вторых, если в потоке нет событий, мы бы хотели, чтобы читатель дождался появления некоторых событий и избегал активного опроса. Наконец, мы хотели бы обрабатывать события в пакетах, чтобы улучшить производительность.

Настраиваемые курсоры решают эти проблемы. Чтобы создать такой курсор, мы должны предоставить начальную точку — идентификатор события, с которого мы начнем читать; если идентификатор не указан, курсор вернет события из самого старого из доступных. Таким образом, каждый читатель должен хранить последнее событие, которое он прочитал и обработал.

Что еще более важно, настраиваемые курсоры могут дополнительно блокироваться на некоторое время, если нет новых данных, что решает проблему активного опроса.

(Между прочим, коллекция оплогов, которую mongo использует для репликации данных через набор реплик, также является ограниченной коллекцией. Подчиненные экземпляры Mongo следят за этой коллекцией, транслируя «события», которые являются операциями базы данных, и применяя их локально по порядку. )

Чтение событий на Java

При использовании Mongo Java Driver есть несколько «ловушек». Прежде всего вам нужно инициализировать курсор. Для этого нам нужно предоставить (1) идентификатор последнего события, если оно есть; (2) порядок, в котором мы хотим прочитать события (здесь: естественный, то есть порядок вставки); и (3) два важных параметра курсора, которые мы хотим, чтобы курсор был настраиваемым, и которые мы хотим заблокировать, если нет новых данных:

DBObject query = lastReceivedEventId.isPresent()
   ? BasicDBObjectBuilder.start("_id", BasicDBObjectBuilder
         .start("$gte", lastReceivedEventId.get()).get())
         .get()
   : null;
 
DBObject sortBy = BasicDBObjectBuilder.start("$natural", 1).get();
 
DBCollection collection = ... // must be a capped collection
DBCursor cursor = collection
   .find(query)
   .sort(sortBy)
   .addOption(Bytes.QUERYOPTION_TAILABLE)
   .addOption(Bytes.QUERYOPTION_AWAITDATA);

Вы можете удивиться, почему мы использовали >= last_idвместо >. Это необходимо из-за способа генерации Mongo ObjectIds. С помощью простого > last_idмы можем пропустить некоторые события, которые были сгенерированы в ту же секунду, что и last_idсобытие, но после него. Это также означает, что наш Java-код должен позаботиться об этом факте и отбросить первое полученное событие.

Класс курсора расширяет базовый Iteratorинтерфейс Java , поэтому его довольно легко использовать. Так что теперь мы можем позаботиться о дозировании. При переборе курсора драйвер получает данные с сервера Mongo в пакетном режиме; поэтому мы можем просто вызвать hasNext()и next(), как и любой другой итератор, получить последующие элементы, и только некоторые вызовы фактически вызовут сетевое взаимодействие с сервером.

В драйвере Mongo Java вызов, который на самом деле потенциально блокируется hasNext(). Если мы хотим обрабатывать события в пакетах, нам нужно (а) читать элементы, пока они доступны, и (б) иметь некоторый способ узнать перед блокировкой, что больше нет событий, и что мы можем обработать события уже сгруппированы. И, как hasNext()может заблокировать, мы не можем сделать это напрямую.

Вот почему мы ввели промежуточную очередь ( LinkedBlockingQueue). В отдельном потоке события, считанные с курсора, помещаются в очередь по мере их поступления. Если нет событий, поток будет заблокирован cursor.hasNext(). Очередь блокировки имеет необязательный предел размера, поэтому, если он заполнен, помещение элемента также будет блокироваться до тех пор, пока не освободится место. В потоке-получателе событий мы сначала пытаемся прочитать один элемент из очереди блокирующим способом (используя .poll, поэтому здесь мы ждем, пока какое-либо событие не станет доступным). Затем мы пытаемся слить весь контент очереди во временную коллекцию (используя .drainToсборку пакета и, возможно, получение 0 элементов, но у нас всегда есть первый).

Важно отметить, что если коллекция пуста, Mongo не будет блокироваться, поэтому мы должны вернуться к активному опросу. Мы также должны позаботиться о том, чтобы курсор мог умереть во время этого ожидания; чтобы проверить это, мы должны проверить это cursor.getCursorId() != 0, где 0 — идентификатор «мертвого курсора». В таком случае нам просто нужно повторно создать экземпляр курсора.

Подводя итоги

Подводя итог, мы получили очень быстрое решение для поиска и трансляции событий. Он «саморегулируемый», в том смысле, что если есть пик активности событий, они будут считываться приемниками событий с задержкой большими партиями. Если активность событий низкая, они будут быстро обрабатываться небольшими партиями.

Мы также используем тот же экземпляр Mongo для других целей; наличие одной системы БД для кластеризации и поддержки как обычных данных, так и событий, безусловно, прекрасно с точки зрения оперативной деятельности.