Статьи

MongoDB Connector для Hadoop 1.4

Автор Люка Ловетта.

Прошел почти год с момента последнего выпуска функции MongoDB Connector для Hadoop. Мы очень рады объявить о выпуске 1.4, который содержит несколько отличных улучшений и множество исправлений. В этой статье я хотел бы остановиться на одном конкретном улучшении, которое действительно улучшает производительность коннектора: поддержку спекулятивного исполнения Hadoop .

Что такое спекулятивное исполнение?

При обработке данных в распределенной системе, такой как Hadoop, некоторые из этих рабочих нагрузок могут оказаться узким местом одного или двух узлов, которые работают особенно медленно. Причин медленной работы узлов может быть много, например, неправильная настройка программного обеспечения или аппаратный сбой. Возможно, сеть вокруг этих узлов особенно насыщена. Какова бы ни была причина, задание Hadoop не может быть выполнено, пока эти медленные узлы не начнут свою работу.

Спекулятивное выполнение позволяет Hadoop перепланировать некоторые задачи в конце задания с избыточностью. Это означает, что несколько узлов могут в конечном итоге обрабатывать один и тот же сегмент данных, даже если только один из этих узлов должен успешно завершить эту работу, чтобы завершить задание. Это становится гонкой до финиша: если один или два из узлов в кластере Hadoop медленнее остальных, то любой другой более быстрый узел, получивший ту же задачу, может завершить его первым. В этом случае задачи, запланированные на более медленных узлах, отменяются, и задание завершается раньше, чем при ожидании на более медленных узлах.

Пока это кажется разумной оптимизацией, но она вызвала некоторые серьезные проблемы при записи в MongoDB из коннектора Hadoop. Давайте рассмотрим, что стало причиной этой проблемы, немного глубже …

Эта проблема

MongoOutputFormat создает MongoRecordWriter , который отправляет данные, как только получает их прямо в MongoDB с помощью драйвера Java MongoDB. Напомним, что разрешение спекулятивного выполнения в кластере Hadoop позволяет ResourceManager планировать задачи с избыточностью. Если все эти избыточные задачи выполняются одновременно, и вполне вероятно, что они выполняются, то каждая из них выполняет запись в MongoDB независимо от того, будет ли она завершена.

В тех случаях, когда задание отправляет документы, которые уже имеют поле _id, это может привести к возникновению DuplicateKeyErrors. Одна из лишних задач завершила гонку первой, но проигравшие все равно попытаются вставить документы с уже существующими идентификаторами, потому что они были добавлены победителем! Если задание отправляет документы, которые не имеют _id, драйвер Java добавляет их автоматически. Если драйвер не генерирует дублирующиеся идентификаторы, мы уклоняемся от DuplicateKeyErrors, но теперь в нашей базе данных есть дублирующиеся документы с разными идентификаторами! В любом случае, это не желательно.

Ранее мы рекомендовали пользователям отключать спекулятивное выполнение. Это позволяет избежать этого неприятного поведения, но отключает полезную функцию. Я посмотрел на другие разъемы Hadoop , и все они сталкиваются с похожими проблемами и дают одинаковые рекомендации. Проблема казалась эндемичной для записи в работающую систему.

Есть ли случай, когда умозрительное выполнение Hadoop может избежать дублирования записей в выводе? Ответ — да, и при записи вывода в файл решение довольно простое. Каждое задание создает рабочий каталог, в который можно записывать временные файлы. Когда каждая задача начинает генерировать вывод, этот вывод записывается во временный файл. Еще одна часть головоломки — это класс, известный как OutputCommitter . OutputCommitter определяет методы, которые вызываются, когда задание или задание собирается начать, было прервано или завершено. Обычно каждый OutputFormat определяет тип OutputCommitter для использования. Например, если вы использовали FileOutputFormat , вы также используете FileOutputCommitter .

The FileOutputCommitter just deletes all the temporary directories immediately for tasks that were aborted. In the case of our slow nodes, their tasks were rescheduled on other faster nodes and finished before the slow nodes did, so now the slow nodes are cleaned up. The tasks that finished on the fast nodes have their temporary files collected together into a single directory that represents the output for the entire job. Since the output comes only from tasks that completed successfully, there are no duplicate records from the output.

Мы использовали аналогичный подход, поддерживая спекулятивное исполнение для записи в MongoDB. MongoRecordWriter вместо прямой записи в MongoDB пишет вместо этого во временный каталог. Каждая операция вставки или обновления имеет специальный сериализованный формат, который записывается. Когда задача отменяется, эти файлы удаляются. Когда задача завершается, MongoOutputCommitter читает файл и выполняет каждую операцию.

Этого достаточно для того, чтобы разъем Hadoop хорошо играл в умозрительное исполнение. На данный момент, однако, мы можем пойти еще дальше, чтобы провести еще одну оптимизацию.

Еще одна оптимизация

Уже почти год драйверы MongoDB поддерживают API массовых операций. Сервер MongoDB версий 2.6 и более поздних поддерживает массовые операции, которые, как правило, выполняются намного быстрее, чем те же операции, посылаемые последовательно. Соединитель Hadoop никогда не пользовался преимуществами массового API. Однако теперь, когда каждая задача создает замороженный пакет операций, уже в виде временного файла, довольно просто использовать массовый API для отправки этих операций в MongoDB.

Использование массового API в задании Hadoop, которое может обрабатывать и производить терабайты или даже петабайты документов, оказывает огромное положительное влияние на производительность. Мы сделали все возможное, чтобы точно определить, какую пользу это дает. Мы написали «идентичное» задание MapReduce (т. Е. Задание, выход которого идентичен вводу и не имеет обработки в середине). Входными данными для задания был большой файл BSON, похожий на то, что может быть создано программой « mongodump ».

Мы сравнили производительность задания «идентификация» до и после MongoOutputCommitter и массовых изменений записи в 5-узловом кластере Hadoop с CDH4. Входными данными для работы был набор данных «enron emails» , который включает 501 513 документов, каждый размером около 4 тыс. Перед изменением MongoOutputCommitter и массовой записи задание Hadoop заняло 147 минут . Конечно, некоторые из этих измерений представляют собой время, затрачиваемое на перемещение разбиений между узлами в кластере Hadoop, но большую часть этого времени составляли издержки на подключение Hadoop, поскольку в этом задании не требовалась обработка. После массовых изменений записи та же работа заняла 6 минут! Если предположить, что большая часть оставшихся 6 минут выполнения также накладные расходы соединителя (перемещение данных в MongoDB все еще занимает некоторое время), тогда это почти 96% улучшение!

Мы не только исправили ошибку, но и значительно улучшили производительность коннектора в довольно распространенном случае (например, используя MongoDB в качестве приемника для заданий Hadoop). Мы надеемся, что это улучшение и другие улучшения в 1.4 сделают наших пользователей очень счастливыми, и что наши пользователи по-прежнему будут большим поддерживающим сообществом в этом проекте. Чтобы воспользоваться преимуществами обсуждаемого здесь и многих других улучшений, загрузите версию 1.4.0 MongoDB Hadoop Connector, добавив в файл pom.xml следующее :

<dependency>
    <groupId>org.mongodb.mongo-hadoop</groupId>
    <artifactId>mongo-hadoop-core</artifactId>
    <version>1.4.0</version>
</dependency>

Или, если вы используете Gradle , попробуйте это:

compile 'org.mongodb.mongo-hadoop:mongo-hadoop-core:1.4.0'

Вы также можете посетить дом проекта на Github и скачать банки прямо со страницы «релизы» .

Наконец, вы можете прочитать все заметки о выпуске здесь .

Спасибо.
Команда водителей JVM

Хотите больше MongoDB? Узнайте, как Apache Spark и MongoDB работают вместе, чтобы превратить аналитику в реальном времени.