Статьи

Интеграция MongoDB и Amazon Kinesis для интеллектуальных и надежных потоков

Эта статья была первоначально опубликована на MongoDB . Спасибо за поддержку партнеров, которые делают возможным использование SitePoint.

Вы можете создавать свои оперативные рабочие нагрузки поверх MongoDB и по-прежнему реагировать на события в режиме реального времени, запуская действия по обработке потоков Amazon Kinesis , используя триггеры стежков MongoDB.

Давайте рассмотрим пример сценария, в котором поток данных генерируется в результате действий, которые пользователи выполняют на веб-сайте. Мы будем надежно хранить данные и одновременно подпитывать процесс Kinesis, чтобы выполнять потоковую аналитику по таким вопросам, как отказ от корзины, рекомендации по продукту или даже обнаружение мошенничества с кредитными картами.

Мы сделаем это, настроив триггер строчки. Когда соответствующие обновления данных выполняются в MongoDB, триггер будет использовать функцию Stitch для вызова AWS Kinesis, как вы можете видеть на этой диаграмме архитектуры:

То, что вам нужно следовать

  1. Экземпляр Атласа
    Если у вас еще нет приложения, работающего в Atlas, вы можете ознакомиться с нашим руководством по началу работы с Atlas здесь . В этом примере мы будем использовать базу данных под названием streamdata с коллекцией под названием clickdata, в которую мы записываем данные из нашего веб-приложения электронной коммерции.
  2. Аккаунт AWS и поток Kinesis
    В этом примере мы будем использовать поток Kinesis для отправки данных в другие приложения, такие как Kinesis Analytics. Это поток, в который мы хотим направлять наши обновления.
  3. Применение стежка
    Если у вас еще нет приложения Stitch, войдите в Atlas и нажмите « Stitch Apps» в левой части навигации, затем нажмите « Создать новое приложение» .

Создать коллекцию

Первым шагом является создание базы данных и коллекции из консоли приложения Stitch. Нажмите « Правила» в левом меню навигации и нажмите кнопку « Добавить коллекцию» . Введите streamdata для базы данных и clickdata для имени коллекции. Выберите шаблон с надписью «Пользователи могут только читать и записывать свои собственные данные» и указывать имя поля, в котором мы будем указывать идентификатор пользователя.

Рисунок 2. Создание коллекции

Настройка Stitch для общения с AWS

Stitch позволяет настраивать службы для взаимодействия с внешними службами, такими как AWS Kinesis . Выберите « Сервисы» в навигационной панели слева и нажмите кнопку « Добавить сервис» , выберите сервис AWS, задайте идентификатор ключа доступа AWS и секретный ключ доступа .

Рисунок 3. Конфигурация сервиса в Stitch

Сервисы используют Правила, чтобы указать, какой аспект сервиса может использовать Stitch и как. Добавьте правило, которое позволит этой службе взаимодействовать с Kinesis, нажав кнопку с надписью NEW RULE. Назовите правило «kinesis», так как мы будем использовать это специальное правило для обеспечения связи с AWS Kinesis. В разделе «Действие» выберите API с меткой «Кинезис» и выберите «Все действия».

Рисунок 4. Добавление правила для включения интеграции с Kinesis

Напишите функцию, которая направляет документы в Kinesis

Теперь, когда у нас есть работающий сервис AWS, мы можем использовать его для записи записей в поток Kinesis. В Stitch мы делаем это с помощью функций. Давайте настроим функцию putKinesisRecord .

Выберите «Функции» в левом меню и нажмите «Создать новую функцию». Укажите имя для функции и вставьте следующее в тело функции.

Рисунок 5. Пример функции - putKinesisRecord

exports = function(event){
 const awsService = context.services.get('aws');
try{
   awsService.kinesis().PutRecord({
     Data: JSON.stringify(event.fullDocument),
     StreamName: "stitchStream",
     PartitionKey: "1"
      }).then(function(response) {
        return response;
      });
}
catch(error){
  console.log(JSON.parse(error));
}
};

Проверьте функцию

Давайте убедимся, что все работает, вызвав эту функцию вручную. В редакторе функций нажмите « Консоль», чтобы просмотреть интерактивную консоль javascript для Stitch.

Функции, вызываемые из триггеров, требуют события. Чтобы проверить выполнение нашей функции, нам нужно передать фиктивное событие в функцию. Создать переменные из консоли в Stitch просто. Просто установите значение переменной для документа JSON. Для нашего простого примера используйте следующее:

 event = {
   "operationType": "replace",
   "fullDocument": {
       "color": "black",
       "inventory": {
           "$numberInt": "1"
       },
       "overview": "test document",
       "price": {
           "$numberDecimal": "123"
       },
       "type": "backpack"
   },
   "ns": {
       "db": "streamdata",
       "coll": "clickdata"
   }
}
exports(event);

Вставьте вышеперечисленное в консоль и нажмите кнопку с надписью Run Function As . Выберите пользователя, и функция будет выполнена.

Та-да!

Соединяя это с триггерами стежка

У нас есть коллекция MongoDB, живущая в Атласе, которая получает события из нашего веб-приложения. У нас есть готовый поток Kinesis для данных. У нас есть функция Stitch, которая может помещать данные в поток Kinesis.

Настроить триггеры стежков так просто, что это почти антиклиматично. Нажмите Триггеры на левой панели навигации, назовите свой триггер, укажите базу данных и контекст коллекции и выберите события базы данных, на которые Stitch будет реагировать при выполнении функции.

Для базы данных и коллекции используйте имена из первого шага. Теперь мы установим операции, которые мы хотим наблюдать с помощью нашего триггера. (Некоторые триггеры могут заботиться обо всех из них — вставки, обновления, удаления и замены — в то время как другие могут быть более эффективными, поскольку они логически могут иметь значение только для некоторых из них.) В нашем случае мы будем следить за вставкой, обновить и заменить операции.

Теперь мы указываем нашу функцию putKinesisRecord как связанную функцию, и все готово.

Рисунок 6. Конфигурация триггера в стежке

Как часть выполнения триггера, Stitch перешлет детали, связанные с событием триггера, включая полный документ, связанный с событием (т. Е. Недавно вставленный, обновленный или удаленный документ из коллекции.) Здесь мы можем оценить некоторое условие или атрибут входящего документа и решить, следует ли поместить запись в поток.

Проверьте триггер!

Amazon предоставляет панель мониторинга, которая позволит вам просматривать детали, связанные с данными, поступающими в ваш поток.

Рисунок 7. Мониторинг потока Kinesis

Когда вы выполните функцию из Stitch, вы начнете видеть данные, поступающие в поток Kinesis.

Строим больше функциональности

Пока наш триггер довольно простой — он просматривает коллекцию, а когда происходят какие-либо обновления или вставки, он подает весь документ в наш поток Kinesis. Отсюда мы можем создать более интеллектуальную функциональность. Чтобы завершить этот пост, давайте посмотрим, что мы можем сделать с данными, если они надежно хранятся в MongoDB и помещаются в поток.

Как только запись поступит в поток Kinesis, вы можете настроить дополнительные сервисы для обработки данных. Распространенный вариант использования включает в себя Amazon Kinesis Data Analytics для анализа потоковых данных. Amazon Kinesis Data Analytics предлагает предварительно настроенные шаблоны для выполнения таких задач, как обнаружение аномалий, простые оповещения, агрегирование и многое другое.

Например, наш поток данных будет содержать заказы, полученные в результате покупок. Эти заказы могут поступать из систем торговых точек, а также из нашего приложения для электронной коммерции через Интернет. Kinesis Analytics можно использовать для создания приложений, которые обрабатывают входящий поток данных. В нашем примере мы могли бы построить алгоритм машинного обучения для обнаружения аномалий в данных или создать таблицу лидеров производительности продукта из скользящего или падающего окна данных из нашего потока.

Рисунок 8. Amazon Data Analytics - пример обнаружения аномалий

Завершение

Теперь вы можете подключить MongoDB к Kinesis. Отсюда вы можете использовать любую из множества услуг, предлагаемых Amazon Web Services, для создания своего приложения. В нашей следующей статье из этой серии мы сосредоточимся на получении данных из Kinesis в MongoDB. А пока дайте нам знать, что вы строите с Atlas, Stitch и Kinesis!

Ресурсы

MongoDB Atlas

MongoDB Стич

Amazon Kinesis