Статьи

Как ускорить MongoDB MapReduce в 20 раз

Аналитика становится все более важной темой с MongoDB, поскольку она используется для все более крупных критических проектов. Людям надоело использовать разные программы для аналитики (Hadoop довольно привлекателен), и им обычно требуется массивная передача данных, которая может быть дорогостоящей.

MongoDB предлагает два способа анализа данных на месте: MapReduce и Aggregation Framework. MR чрезвычайно гибок и прост в использовании. Он хорошо работает с шардингом и позволяет получить очень большой результат. MR был значительно улучшен в MongoDB v2.4 благодаря переключению движка JavaScript с Spider Monkey на V8. Основная претензия к нему заключается в том, что он довольно медленный, особенно по сравнению с Agg Framework (который использует C ++). Посмотрим, сможем ли мы выжать из него немного сока.

Упражнение

Давайте вставим 10 миллионов документов, содержащих одно целое число от 0 до 1 миллиона. Это означает, что в среднем 10 документов имеют одинаковую стоимость.

> for (var i = 0; i < 10000000; ++i){ db.uniques.insert({ dim0: Math.floor(Math.random()*1000000) });}
> db.uniques.findOne()
{ "_id" : ObjectId("51d3c386acd412e22c188dec"), "dim0" : 570859 }
> db.uniques.ensureIndex({dim0: 1})
> db.uniques.stats()
{
        "ns" : "test.uniques",
        "count" : 10000000,
        "size" : 360000052,
        "avgObjSize" : 36.0000052,
        "storageSize" : 582864896,
        "numExtents" : 18,
        "nindexes" : 2,
        "lastExtentSize" : 153874432,
        "paddingFactor" : 1,
        "systemFlags" : 1,
        "userFlags" : 0,
        "totalIndexSize" : 576040080,
        "indexSizes" : {
                "_id_" : 324456384,
                "dim0_1" : 251583696
        },
        "ok" : 1
}

Отсюда мы хотим получить количество уникальных значений. Это можно легко сделать с помощью следующей работы MR:

> db.runCommand(
{ mapreduce: "uniques", 
map: function () { emit(this.dim0, 1); }, 
reduce: function (key, values) { return Array.sum(values); }, 
out: "mrout" })
{
        "result" : "mrout",
        "timeMillis" : 1161960,
        "counts" : {
                "input" : 10000000,
                "emit" : 10000000,
                "reduce" : 1059138,
                "output" : 999961
        },
        "ok" : 1
}

Как видите, на выходе это занимает около 1200 секунд (проверено на экземпляре EC2 M3). Выпущено 10 миллионов карт, 1 миллион сокращений, 999961 документ. Результат выглядит так:

> db.mrout.find()
{ "_id" : 1, "value" : 10 }
{ "_id" : 2, "value" : 5 }
{ "_id" : 3, "value" : 6 }
{ "_id" : 4, "value" : 10 }
{ "_id" : 5, "value" : 9 }
{ "_id" : 6, "value" : 12 }
{ "_id" : 7, "value" : 5 }
{ "_id" : 8, "value" : 16 }
{ "_id" : 9, "value" : 10 }
{ "_id" : 10, "value" : 13 }
...

Использование сортировки

В предыдущем посте я описал, насколько полезным может быть использование сортировки для MR. Это очень плохо понятая особенность. В этом случае обработка несортированных входных данных означает, что механизм MR будет получать значения в случайном порядке и вообще не будет иметь возможности уменьшать их в оперативной памяти. Вместо этого он должен будет записать все документы обратно на диск во временную коллекцию, чтобы позже прочитать их по порядку и уменьшить. Посмотрим, поможет ли использование сортировки:

> db.runCommand(
{ mapreduce: "uniques", 
map: function () { emit(this.dim0, 1); }, 
reduce: function (key, values) { return Array.sum(values); }, 
out: "mrout", 
sort: {dim0: 1} })
{
        "result" : "mrout",
        "timeMillis" : 192589,
        "counts" : {
                "input" : 10000000,
                "emit" : 10000000,
                "reduce" : 1000372,
                "output" : 999961
        },
        "ok" : 1
}

Это действительно большая помощь! Мы до 192-х годов, что уже является улучшением в 6 раз. Количество сокращений примерно одинаково, но теперь они выполняются в оперативной памяти до того, как результаты будут записаны на диск.

Использование нескольких потоков

MongoDB не выполняет многопоточность одного задания MR — она ​​будет выполнять только многопоточность нескольких заданий. Но с многоядерными процессорами было бы очень выгодно распараллелить работу на одном сервере в стиле Hadoop. Что нам действительно нужно, так это разделить входные данные на несколько частей и ускорить одну задачу MR для каждого фрагмента. Возможно, у набора данных есть простой способ получить разделение, но в противном случае команда splitVector (не документированная) позволяет вам очень быстро найти точки разделения:

> db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32000000})
{
    "timeMillis" : 6006,
	"splitKeys" : [
		{
			"dim0" : 18171
		},
		{
			"dim0" : 36378
		},
		{
			"dim0" : 54528
		},
		{
			"dim0" : 72717
		},
…
		{
			"dim0" : 963598
		},
		{
			"dim0" : 981805
		}
	],
	"ok" : 1
}

Эта команда занимает всего 5 секунд, чтобы найти точки разделения на 10 м документов, это быстро! Так что теперь нам просто нужен способ создать несколько рабочих мест MR С сервера приложений было бы довольно легко использовать несколько потоков и запрос с $ gt / $ lt для команды MR. Из оболочки можно использовать объект ScopedThread, который работает следующим образом:

> var t = new ScopedThread(mapred, 963598, 981805)
> t.start()
> t.join()

Итак, теперь мы можем собрать небольшой быстрый код JS, который будет порождать четыре потока (столько же, сколько ядер), ждать и отображать результаты:

> var res = db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32 *1024 * 1024 })
> var keys = res.splitKeys
> keys.length
39
> var mapred = function(min, max) { 
return db.runCommand({ mapreduce: "uniques", 
map: function () { emit(this.dim0, 1); }, 
reduce: function (key, values) { return Array.sum(values); }, 
out: "mrout" + min, 
sort: {dim0: 1}, 
query: { dim0: { $gte: min, $lt: max } } }) }
> var numThreads = 4
> var inc = Math.floor(keys.length / numThreads) + 1
> threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
min:0 max:274736
min:274736 max:524997
min:524997 max:775025
min:775025 max:{ "$maxKey" : 1 }
connecting to: test
connecting to: test
connecting to: test
connecting to: test
> for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
{ 
        "result" : "mrout0",
        "timeMillis" : 205790,
        "counts" : {
                "input" : 2750002,
                "emit" : 2750002,
                "reduce" : 274828,
                "output" : 274723
        },
        "ok" : 1
}
{ 
        "result" : "mrout274736",
        "timeMillis" : 189868,
        "counts" : {
                "input" : 2500013,
                "emit" : 2500013,
                "reduce" : 250364,
                "output" : 250255
        },
        "ok" : 1
} 
{
        "result" : "mrout524997",
        "timeMillis" : 191449,
        "counts" : {
                "input" : 2500014,
                "emit" : 2500014,
                "reduce" : 250120,
                "output" : 250019
        },
        "ok" : 1
}
{
        "result" : "mrout775025",
        "timeMillis" : 184945,
        "counts" : {
                "input" : 2249971,
                "emit" : 2249971,
                "reduce" : 225057,
                "output" : 224964
        },
        "ok" : 1
}

1-й поток делает немного больше, чем другие, но все равно он составляет около 190 с на поток … что означает, что это не быстрее, чем 1 поток! Это любопытно, поскольку, используя top, вы можете видеть, что все ядра работают в некоторой степени.

Использование нескольких баз данных

Проблема в том, что между потоками слишком много конфликтов блокировки. MR не очень альтруистичен при блокировке (он дает каждые 1000 операций чтения), и поскольку задания MR тоже много пишут, потоки в конечном итоге ждут друг друга. Поскольку MongoDB имеет индивидуальные блокировки для каждой базы данных, давайте попробуем использовать разные выходные данные для каждого потока:

> var mapred = function(min, max) { 
return db.runCommand({ mapreduce: "uniques", 
map: function () { emit(this.dim0, 1); }, 
reduce: function (key, values) { return Array.sum(values); }, 
out: { replace: "mrout" + min, db: "mrdb" + min }, 
sort: {dim0: 1}, 
query: { dim0: { $gte: min, $lt: max } } }) }
> threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
min:0 max:274736
min:274736 max:524997
min:524997 max:775025
min:775025 max:{ "$maxKey" : 1 }
connecting to: test
connecting to: test
connecting to: test
connecting to: test
> for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
...
{ 
        "result" : {
                "db" : "mrdb274736",
                "collection" : "mrout274736"
        },
        "timeMillis" : 105821,
        "counts" : {
                "input" : 2500013,
                "emit" : 2500013,
                "reduce" : 250364,
                "output" : 250255
        },
        "ok" : 1
}
...

Это больше походит на это! Сейчас мы достигли 100-х, что означает примерно 2-кратное улучшение по сравнению с одним потоком. Не так хорошо, как хотелось бы, но все же хорошо. Здесь у меня четыре ядра, поэтому я получаю только 2x, но 8-ядерный процессор даст вам 4x и т. Д.

Использование режима Pure JavaScript

При разделении входных данных между потоками возникает нечто очень интересное: у каждого потока теперь есть только около 250 000 уникальных ключей для вывода, а не 1 м. Это означает, что мы можем использовать «чистый режим JS», который можно включить с помощью jsMode: true. Когда он включен, MongoDB не будет переводить объекты назад и вперед из JS в BSON во время обработки, а вместо этого сокращает все объекты из внутреннего словаря JS с пределом в 500 000 ключей. Посмотрим, поможет ли это:

> var mapred = function(min, max) { 
return db.runCommand({ mapreduce: "uniques", 
map: function () { emit(this.dim0, 1); }, 
reduce: function (key, values) { return Array.sum(values); }, 
out: { replace: "mrout" + min, db: "mrdb" + min }, 
sort: {dim0: 1}, 
query: { dim0: { $gte: min, $lt: max } }, 
jsMode: true }) }
> threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
min:0 max:274736
min:274736 max:524997
min:524997 max:775025
min:775025 max:{ "$maxKey" : 1 }
connecting to: test
connecting to: test
connecting to: test
connecting to: test
> for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
...
{ 
        "result" : {
                "db" : "mrdb274736",
                "collection" : "mrout274736"
        },
        "timeMillis" : 70507,
        "counts" : {
                "input" : 2500013,
                "emit" : 2500013,
                "reduce" : 250156,
                "output" : 250255
        },
        "ok" : 1
}
...

Мы сейчас до 70-х годов, добираемся туда! JsMode может действительно помочь, особенно когда объекты имеют много полей. Здесь есть одно числовое поле, и оно все равно помогло на 30%.

Улучшение в MongoDB v2.6

В самом начале разработки v2.6 мы избавились от фрагмента кода, который устанавливает необязательный параметр «args» для любого вызова функции JS. Это не было ни стандартом, ни использованием, но оно было сохранено по наследству (см. SERVER-4654 ). Давайте извлечем MongoDB из основного репозитория Git, скомпилируем его и снова запустим тест:

...
{ 
        "result" : {
                "db" : "mrdb274736",
                "collection" : "mrout274736"
        },
        "timeMillis" : 62785,
        "counts" : {
                "input" : 2500013,
                "emit" : 2500013,
                "reduce" : 250156,
                "output" : 250255
        },
        "ok" : 1
}
...

Там определенно есть улучшение, так как мы до 60-х годов, то есть около 10-15%. Это изменение также улучшило общее потребление кучи двигателя JS.

Вывод

Оглядываясь назад, мы начали в 1200-х и закончили в 60-х для той же работы по МР, что представляет собой 20-кратное улучшение! Это улучшение должно быть доступно для большинства случаев использования, даже если некоторые приемы не идеальны (например, использование нескольких выходных БД / коллекций). Тем не менее, это может дать людям идеи о том, как ускорить их работу по МР, и, надеюсь, некоторые из этих функций станут проще в будущем. Следующий билет сделает команду «splitVector» более доступной, и этот билет улучшит несколько заданий MR в одной базе данных. Ура!