Эта статья была первоначально опубликована на MongoDB . Спасибо за поддержку партнеров, которые делают возможным использование SitePoint.
Вы можете создавать свои оперативные рабочие нагрузки поверх MongoDB и по-прежнему реагировать на события в режиме реального времени, запуская действия по обработке потоков Amazon Kinesis , используя триггеры стежков MongoDB.
Давайте рассмотрим пример сценария, в котором поток данных генерируется в результате действий, которые пользователи выполняют на веб-сайте. Мы будем надежно хранить данные и одновременно подпитывать процесс Kinesis, чтобы выполнять потоковую аналитику по таким вопросам, как отказ от корзины, рекомендации по продукту или даже обнаружение мошенничества с кредитными картами.
Мы сделаем это, настроив триггер строчки. Когда соответствующие обновления данных выполняются в MongoDB, триггер будет использовать функцию Stitch для вызова AWS Kinesis, как вы можете видеть на этой диаграмме архитектуры:
То, что вам нужно следовать
- Экземпляр Атласа
Если у вас еще нет приложения, работающего в Atlas, вы можете ознакомиться с нашим руководством по началу работы с Atlas здесь . В этом примере мы будем использовать базу данных под названием streamdata с коллекцией под названием clickdata, в которую мы записываем данные из нашего веб-приложения электронной коммерции. - Аккаунт AWS и поток Kinesis
В этом примере мы будем использовать поток Kinesis для отправки данных в другие приложения, такие как Kinesis Analytics. Это поток, в который мы хотим направлять наши обновления. - Применение стежка
Если у вас еще нет приложения Stitch, войдите в Atlas и нажмите « Stitch Apps» в левой части навигации, затем нажмите « Создать новое приложение» .
Создать коллекцию
Первым шагом является создание базы данных и коллекции из консоли приложения Stitch. Нажмите « Правила» в левом меню навигации и нажмите кнопку « Добавить коллекцию» . Введите streamdata для базы данных и clickdata для имени коллекции. Выберите шаблон с надписью «Пользователи могут только читать и записывать свои собственные данные» и указывать имя поля, в котором мы будем указывать идентификатор пользователя.
Настройка Stitch для общения с AWS
Stitch позволяет настраивать службы для взаимодействия с внешними службами, такими как AWS Kinesis . Выберите « Сервисы» в навигационной панели слева и нажмите кнопку « Добавить сервис» , выберите сервис AWS, задайте идентификатор ключа доступа AWS и секретный ключ доступа .
Сервисы используют Правила, чтобы указать, какой аспект сервиса может использовать Stitch и как. Добавьте правило, которое позволит этой службе взаимодействовать с Kinesis, нажав кнопку с надписью NEW RULE. Назовите правило «kinesis», так как мы будем использовать это специальное правило для обеспечения связи с AWS Kinesis. В разделе «Действие» выберите API с меткой «Кинезис» и выберите «Все действия».
Напишите функцию, которая направляет документы в Kinesis
Теперь, когда у нас есть работающий сервис AWS, мы можем использовать его для записи записей в поток Kinesis. В Stitch мы делаем это с помощью функций. Давайте настроим функцию putKinesisRecord .
Выберите «Функции» в левом меню и нажмите «Создать новую функцию». Укажите имя для функции и вставьте следующее в тело функции.
exports = function(event){
const awsService = context.services.get('aws');
try{
awsService.kinesis().PutRecord({
Data: JSON.stringify(event.fullDocument),
StreamName: "stitchStream",
PartitionKey: "1"
}).then(function(response) {
return response;
});
}
catch(error){
console.log(JSON.parse(error));
}
};
Проверьте функцию
Давайте убедимся, что все работает, вызвав эту функцию вручную. В редакторе функций нажмите « Консоль», чтобы просмотреть интерактивную консоль javascript для Stitch.
Функции, вызываемые из триггеров, требуют события. Чтобы проверить выполнение нашей функции, нам нужно передать фиктивное событие в функцию. Создать переменные из консоли в Stitch просто. Просто установите значение переменной для документа JSON. Для нашего простого примера используйте следующее:
event = {
"operationType": "replace",
"fullDocument": {
"color": "black",
"inventory": {
"$numberInt": "1"
},
"overview": "test document",
"price": {
"$numberDecimal": "123"
},
"type": "backpack"
},
"ns": {
"db": "streamdata",
"coll": "clickdata"
}
}
exports(event);
Вставьте вышеперечисленное в консоль и нажмите кнопку с надписью Run Function As . Выберите пользователя, и функция будет выполнена.
Та-да!
Соединяя это с триггерами стежка
У нас есть коллекция MongoDB, живущая в Атласе, которая получает события из нашего веб-приложения. У нас есть готовый поток Kinesis для данных. У нас есть функция Stitch, которая может помещать данные в поток Kinesis.
Настроить триггеры стежков так просто, что это почти антиклиматично. Нажмите Триггеры на левой панели навигации, назовите свой триггер, укажите базу данных и контекст коллекции и выберите события базы данных, на которые Stitch будет реагировать при выполнении функции.
Для базы данных и коллекции используйте имена из первого шага. Теперь мы установим операции, которые мы хотим наблюдать с помощью нашего триггера. (Некоторые триггеры могут заботиться обо всех из них — вставки, обновления, удаления и замены — в то время как другие могут быть более эффективными, поскольку они логически могут иметь значение только для некоторых из них.) В нашем случае мы будем следить за вставкой, обновить и заменить операции.
Теперь мы указываем нашу функцию putKinesisRecord как связанную функцию, и все готово.
Как часть выполнения триггера, Stitch перешлет детали, связанные с событием триггера, включая полный документ, связанный с событием (т. Е. Недавно вставленный, обновленный или удаленный документ из коллекции.) Здесь мы можем оценить некоторое условие или атрибут входящего документа и решить, следует ли поместить запись в поток.
Проверьте триггер!
Amazon предоставляет панель мониторинга, которая позволит вам просматривать детали, связанные с данными, поступающими в ваш поток.
Когда вы выполните функцию из Stitch, вы начнете видеть данные, поступающие в поток Kinesis.
Строим больше функциональности
Пока наш триггер довольно простой — он просматривает коллекцию, а когда происходят какие-либо обновления или вставки, он подает весь документ в наш поток Kinesis. Отсюда мы можем создать более интеллектуальную функциональность. Чтобы завершить этот пост, давайте посмотрим, что мы можем сделать с данными, если они надежно хранятся в MongoDB и помещаются в поток.
Как только запись поступит в поток Kinesis, вы можете настроить дополнительные сервисы для обработки данных. Распространенный вариант использования включает в себя Amazon Kinesis Data Analytics для анализа потоковых данных. Amazon Kinesis Data Analytics предлагает предварительно настроенные шаблоны для выполнения таких задач, как обнаружение аномалий, простые оповещения, агрегирование и многое другое.
Например, наш поток данных будет содержать заказы, полученные в результате покупок. Эти заказы могут поступать из систем торговых точек, а также из нашего приложения для электронной коммерции через Интернет. Kinesis Analytics можно использовать для создания приложений, которые обрабатывают входящий поток данных. В нашем примере мы могли бы построить алгоритм машинного обучения для обнаружения аномалий в данных или создать таблицу лидеров производительности продукта из скользящего или падающего окна данных из нашего потока.
Завершение
Теперь вы можете подключить MongoDB к Kinesis. Отсюда вы можете использовать любую из множества услуг, предлагаемых Amazon Web Services, для создания своего приложения. В нашей следующей статье из этой серии мы сосредоточимся на получении данных из Kinesis в MongoDB. А пока дайте нам знать, что вы строите с Atlas, Stitch и Kinesis!
Ресурсы
MongoDB Atlas
- Начало работы — учебный плейлист
- Подпишитесь бесплатно
- Вопросы-Ответы
MongoDB Стич
- Начало работы с документацией
- MongoDB Учебники по стежку
- MongoDB Стич Белая книга
- Вебинар — 8 августа 2018 г.
Amazon Kinesis