Глава содержит много деталей о кодировании и сжатии целых чисел. Поскольку эти темы не имеют прямого отношения к MapReduce, я не задавал вопросов о них.
4.4 Инвертирование индексации: пересмотренная реализация
Объясните инвертирующий алгоритм поиска индекса. Вы можете предположить, что каждый документ помещается в память. Предположим также, что существует огромное количество документов. Какая часть должна быть оптимизирована с помощью целочисленного кодирования и сжатия?
Input: text documents
key: document id
value: text document Output: key/value pairs where
key: word
value: list[documentId, numberOfOccurences] list elements must be sorted by numberOfOccurences
Input: text documents
key: document id
value: text document Output: key/value pairs where
key: word
value: list[documentId, numberOfOccurences] list elements must be sorted by numberOfOccurences
Mapper подсчитывает количество вхождений в документе для каждого слова. Поскольку весь документ помещается в память, мы можем хранить частичные результаты на карте.
Intermediate key/values:
key: word, numberOfOccurences
value: documentId
Пользовательский разделитель группирует промежуточный ключ / значения по слову. Пользовательская сортировка сортирует их первично по слову и вторично по количеству вхождений.
Редуктор использует метод инициализации для инициализации списка всех сообщений. Метод Reduce обрабатывает два случая:
- текущее слово, равное предыдущему слову — добавить documentId и numberOfOccurences в список публикаций.
- текущее слово, равное предыдущему слову — вывести предыдущее слово и список рассылки; инициализировать список рассылки.
Список проводок в редукторе должен быть сжат.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
class MAPPER method INITIALIZE H = new hash map method MAP(docid, doc d) H = new hash map for all term w in doc d do H(w) = H(w) + 1 for all term w in H do emit(pair(u, w), count 1 ) method CLOSE for all term w in H emit(pair(w, H(w)), docid) class REDUCER variable previous_word = 0 variable PL = new list of postings method REDUCE(pair (w, #occurrences), docid) if w <> previous_word && previous_word <> 0 do emit(w, PL) PL = new list of postings PL.add(pair(#occurrences, docid)) previous_word = w method compare(key (w1, o1), key (w2, o2)) if w1 = w2 return keys are equal return keys are different class SORTING_COMPARATOR method compare(key (w1, o1), key (w2, o2)) if w1 = w2 do return compare(o1, o2) return compare(w1, w2) |
5 графовых алгоритмов
Глава содержит два алгоритма: кратчайший путь на графике и алгоритм ранжирования страниц. Вопросы просты.
5.2 Параллельный поиск в ширину
Найти кратчайший путь от одного узла до всех других узлов. С каждым ребром связан вес. Пары ввода / значения уже обработаны компонентом в удобную форму.
Input: graph
key: node id
value: distance to origin, list[adjacent node, edge length]
Вывод: пары ключ / значение, где
ключ: идентификатор узла
значение: расстояние до источника, список [соседний узел, длина ребра]
Алгоритм требует нескольких итераций. Он останавливает итерацию, не меняет никакого «расстояния до источника». В худшем случае, будет O (n) итераций, где n — количество узлов в графе.
Mapper передает исходный граф на следующую итерацию как есть. Кроме того, он генерирует пару ключ / значение для каждого соседнего узла. Значение содержит минимальное известное расстояние от источника, если маршрут пройдет через узел.
1
2
3
4
5
|
class MAPPER method MAP(node, pair(dist, adjacencylist)) emit(node, pair(dist, adjacencylist)) for all (closenode, nodedist) in adjacencylist do emit(closenode, pair(dist + nodedist, empty)) |
Редуктор находит минимальное известное расстояние от каждого узла. Он передает расстояние вместе с исходным графиком до следующей итерации. Он также увеличивает глобальный счетчик всякий раз, когда изменяется минимальное известное расстояние до любого узла.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
class REDUCER method REDUCE(node, list(dist, adjacencylist)) minimum = infinity previous_iteration_solution = infinity original_graph = empty for all (dist, adjacencylist) in list do if adjacencylist not empty do original_graph = adjacencylist previous_iteration_solution = dist if minimum > dist minimum = dist if previous_iteration_solution <> minimum increment global counter emit(node, pair(minimum, original_graph)) |
Если глобальный счетчик равен 0, алгоритм останавливается. В противном случае требуется еще одна итерация.
Объясните алгоритм ранга страницы, предположим, что alpha = 0.
Рейтинг страницы P(n)
страницы n
рассчитывается по формам страниц всех страниц, ссылающихся на нее.
P(n) = sum_m (P(m)/C(m))
Сумма проходит через все страницы m
ссылающиеся на страницу n
. C(m)
— количество исходящих ссылок на странице m
.
Алгоритм ранга страницы выполняется в итерациях. Mapper передает вклад ранга каждой страницы на соседние страницы. Редуктор обновляет рейтинг страницы каждого узла. Алгоритм останавливается, когда рейтинг страниц больше не перемещается.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
class MAPPER method MAP(page, (page_rank, adjacency_list)) emit(page, ( 0 , adjacency_list)) contribution = page_rank/adjacency_list.length for all node in adjacency_list do emit(node, (contribution, empty)) class REDUCER method REDUCE(page, contributions[c1, c2, ..., cn]) rank = 0 adjacency_list = new list for all c in contributions do adjacency_list.addAll(c.adjacency_list) rank = rank + c.contribution emit(page, (rank, adjacency_list)) |
6 EM алгоритмы для обработки текста
Я не сделал никаких вопросов из этой главы.
упражнения
Эта глава содержит практические упражнения для MapReduce. Некоторые из них требуют нескольких итераций.
Разогреть
Подсчитать количество вхождений каждого слова в текстовой коллекции.
Input:
key: document id,
value: text document.
Выход:
ключ: слово,
значение: количество вхождений.
Intermediate pairs:
key: word
value: integer - how many times was the word seen in the input.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
class MAPPER method MAP(docid a, doc d) for all term w in doc d do emit(w, 1 ) class COMBINER method COMBINE(word w, counts[c1, c2, ..., cn]) s = 0 for all c in counts[c1, c2, ..., cn] do s = s + c emit(word w, s) class REDUCER variable total_occurrences = 0 method REDUCE(word w, counts[c1, c2, ..., cn]) s = 0 for all c in counts[c1, c2, ..., cn] do s = s + c emit(word w, s) |
Альтернативное решение будет использовать комбинирование в картографе.
Интернет-магазин
Журнал пользователя сайта содержит идентификаторы пользователей и продолжительность каждого сеанса. На сайте скромное количество зарегистрированных пользователей. Вычислите среднюю продолжительность сеанса для каждого пользователя.
Input:
key: user id,
value: session length.
Выход:
ключ: идентификатор пользователя,
значение: средняя продолжительность сеанса.
Поскольку количество зарегистрированных пользователей невелико, мы можем использовать комбинирование в картографе.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
class MAPPER variable total_time = new hash map variable sessions_number = new hash map method MAP(user_id, session_length) total_time(user_id) = total_time(user_id) + session_length sessions_number(user_id) = sessions_number(user_id) + 1 method CLOSE for all user_id in total_logged_in_time tt = total_time(user_id) sn = sessions_number(user_id) emit(user_id, pair(tt, sn)) class REDUCER method REDUCE(user_id, [pairs(time, sessions_number)]) total_time = 0 total_sessions = 0 for all pairs in [pairs(time, sessions_number)] do total_time = total_time + time total_sessions = total_sessions + sessions_number emit(user_id, total_time/total_sessions) |
Журнал интернет-магазина содержит идентификатор пользователя и купленный товар для каждой продажи. Вам необходимо реализовать функцию «покупатели товара также купили». Каждый раз, когда товар отображается, магазин предлагает пять товаров, которые чаще всего покупают покупатели.
Input:
key: user id,
value: brought item.
Выход:
ключ: элемент,
значение: список пяти наиболее распространенных предметов, которые покупатели также покупают.
Наше решение имеет две итерации. Первая итерация генерирует списки всех предметов, принесенных одним и тем же пользователем. Группировка выполняется структурой, и преобразователь, и преобразователь выполняют функцию идентификации.
Input:
key: user id,
value: brought item.
Выход:
ключ: идентификатор пользователя,
значение: список всех принесенных предметов.
1
2
3
4
5
6
7
|
class MAPPER method MAP(user_id, item) emit(user_id, item) class REDUCER method REDUCE(user_id, items[i1, i2, ..., in]) emit(user_id, items) |
Вторая итерация решает проблему одновременного появления элементов списка. Это использует подход полос. Единственное отличие от стандартного решения состоит в том, что мы испустили только пять наиболее распространенных случаев.
Input:
key: user id,
value: list of all brought items.
Выход:
ключ: элемент,
значение: список из пяти наиболее распространенных совпадений.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
class MAPPER method MAP(user_id, items[i1, i2, ..., in]) for all item in items do H = new hash map for all item j in items do H(j) = H(j) + 1 emit(item, H) class REDUCER method REDUCE(item, stripes[H1, H2, ..., Hn]) T = new hash map for all H in stripes do for all (key/value) in H do T(key) = T(key) + value emit(user_id, max_five(T)) |
Журнал интернет-магазина содержит идентификатор пользователя, метку времени, предмет и количество принесенных штук для каждой продажи. Магазин ищет товары, продажи которых растут или падают одновременно. Найдите 20 пар предметов с максимумом таких месяцев.
Input:
key: user id,
value: timestamp, brought item, count.
Выход:
ключ: предмет, предмет
значение: количество месяцев, в течение которых продажи обоих товаров увеличивались или уменьшались.
#: вывод содержит 20 пар ключ / значение с максимальным значением
Наше решение требует нескольких итераций MapReduce. Мы должны:
- рассчитать, выросли или упали продажи товаров за месяц,
- создавать списки товаров с одинаковыми изменениями продаж в течение одного месяца,
- найти количество совпадений в этих списках,
- выбирайте предметы с максимальным количеством совпадений.
Первая итерация рассчитывает изменения продаж за любой месяц. Мы должны поставить маппер, разделитель, сортировку и редуктор. Mapper генерирует одну промежуточную пару ключ / значение для каждого входного ключа / значения. Ключ состоит из проданного товара и месяца продаж. Значение содержит количество проданных штук.
Partitioner отправляет все пары ключ / значение с одним и тем же элементом в один и тот же редуктор. Пользовательская сортировка сортирует их по месяцам. Наконец, редуктор рассчитывает изменения продаж.
Input:
key: user id,
value: timestamp, item, count.
Промежуточный ключ / значения:
ключ: пункт, месяц
значение: кол.
Выход:
ключ: месяц, вверх / вниз / равно
значение: элемент.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
class MAPPER method MAP(user_id, (timestamp, item, count)) month = get_month(timestamp) emit((item, month), count) class PARTITIONING_COMPARATOR method compare(key (item1, month1), key (item2, month2)) if item1 = item2 return keys are equal return keys are different class SORTING_COMPARATOR method compare(key (item1, month1), key (item2, month2)) if item1 = item2 do return compare(month1, month2) return compare(item1, item2) class REDUCER method REDUCE((item, month), counts[c1, c2, ..., cn]) c = sum([c1, c2, ..., cn]) if last_item = item if last_month + 1 = month //emit correct up/down/equal flags if last_count < count emit((item, month), up) if last_count > count emit((item, month), down) if last_count = count emit((item, month), equal) else //no sales during some months emit((item, last_month + 1 ), down) emit((item, month), up) else // new item emit((last_item, last_month + 1 ), down) emit((item, month), up) last_item = item last_count = count last_month = month |
Вторая итерация группирует результаты первой итерации по ключам. Он генерирует списки товаров с одинаковыми изменениями продаж в течение одного месяца. Framework делает всю работу. Как картограф, так и редуктор выполняют функцию идентификации.
Input:
key: month, up/down/equal
value: item.
Выход:
ключ: месяц, вверх / вниз / равно
значение: [предметы].
Третья итерация выполняет стандартный алгоритм совместного использования пар.
Input:
key: month, up/down/equal
value: [items].
Промежуточный ключ / значения:
ключ: предмет, предмет
значение: частичное количество совпадений.
Выход:
ключ: предмет, предмет
значение: количество месяцев, в течение которых продажи обоих товаров увеличивались или уменьшались.
#: вывод содержит все пары элементов
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
class MAPPER method MAP((month, change), items[i1, i2, ..., in]) for each i in items do for each j in items do if i != j emit((i, j), 1 ) class COMBINER method COMBINE((item1, item2), co-occurrences[c1, c2, ..., cn]) s = 0 for all c in co-occurrences[c1, c2, ..., cn] do s = s + c emit((item1, item2), s) class REDUCER method REDUCE((item, item), co-occurrences[c1, c2, ..., cn]) s = 0 for all c in co-occurrences[c1, c2, ..., cn] do s = s + c emit((item1, item2), s) |
Наконец, мы должны выбрать 20 пар ключ / значение с максимальным значением. Каждый картограф выбирает 20 пар ключ / значение с максимальным значением и испускает их одним и тем же ключом. Будет только один редуктор, который выберет последние 20 пар ключ / значение.
Input:
key: item, item
value: number of months when both items sales rose or decline.
#: the output contains all items couples
Промежуточный ключ / значения:
ключ: 1
value: item, item, количество месяцев, в течение которых продажи обоих товаров росли или снижались.
#: вывод содержит 20 пар ключ / значение с максимальным значением для каждого преобразователя
Выход:
ключ: предмет, предмет
значение: количество месяцев, в течение которых продажи обоих товаров увеличивались или уменьшались.
#: вывод содержит 20 пар ключ / значение с максимальным значением
1
|
the code is very simple but long |
Криминальное агентство
Во всех упражнениях этой главы используется одна и та же структура данных.
Криминальное агентство украло базу данных о дружбе Facebook и хочет проанализировать новые данные. Дружеские отношения хранятся в виде пар ключ / значение, каждая дружба соответствует двум парам ключ / значение:
Friends:
key: first friend name
value: second friend name
ключ: имя второго друга
значение: имя первого друга
Агентству принадлежат также судимости всех граждан:
Criminal record:
key: citizen name
value: when, where, accomplices, description
Найти в опасности молодых людей. Человек считается подверженным риску молодежью, если более половины его друзей имеют судимость.
Наше решение имеет две итерации. Первая итерация объединяет два набора и помечает каждый «значение друга» флагами has_record / law_abiding.
Output:
key: first friend
value: second friend, has_record/law_abiding
Картограф отмечает каждый ключ с именем набора данных. Разделитель группирует данные по именам в ключах, а сортировщик ставит судимости перед дружбой. Мы могли бы использовать локальную агрегацию для удаления нескольких криминальных записей одного и того же лица
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
class MAPPER method MAP(name, value) if value is name do emit(name, friendship, item) else emit(name, criminal, item) class PARTITIONING_COMPARATOR method compare(key (name1, dataset1), key (name2, dataset2)) if name1 = name2 return keys are equal return keys are different class SORTING_COMPARATOR method compare(key (name1, dataset1), key (name2, dataset2)) if name1 = name2 AND dataset1 is criminal return key1 is lower if name1 = name2 AND dataset2 is criminal return key2 is lower return compare(name1, name2) class REDUCER variable previous_name method REDUCE(pair(name, flag), items[i1, i2, ..., in]) if flag is criminal do previous_name = name has_record = criminal return if previous_name <> name do has_record = law_abiding else has_record = criminal previous_name = name for all i in items do emit(i.name, pair(name, has_record)) |
Вторая итерация учитывает как общее количество друзей, так и количество друзей с криминальным прошлым. Редуктор испускает пары ключ / значение только для молодых людей из группы риска. Также эта итерация может использовать некоторую локальную агрегацию.
Intermediate key/value:
key: name
value: total friends, total friend criminals
# totals are relative only to in data sets subsets
Выход:
ключ: имя
значение: пусто
# только молодые люди в группе риска
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
class MAPPER method MAP(name, pair(name, has_record)) if has_record is law_abiding do emit(name, pair( 0 , 1 )) else emit(name, pair( 1 , 1 )) class REDUCER method REDUCE(name, items[pair(total, criminals)]) total = 0 criminals = 0 for all i in items do total = total + i.total criminals = criminals + i.criminals if criminals / total > 0.5 do emit(name, empty) |
Найти банды. Банда — это группа людей, которая:
- имеет ровно 5 членов,
- каждый участник дружит со всеми остальными участниками,
- каждый из двух членов совершил как минимум 3 преступления вместе.
Опять же, нам нужно три итерации. Идея состоит в том, чтобы сначала очистить график от всех бесполезных ребер, чтобы остались только криминальные контакты. Затем мы разбиваем граф на более мелкие управляемые подграфы. Мы прикрепляем все криминальные контакты и грани между ними каждому человеку:
Last iteration reducers input:
key: person
values: all his criminal contacts and relationships between them.
Конечный редуктор берет меньшие графики, представленные значением в каждой паре ключ / значение, и находит полные подграфы с 4 вершинами. Добавьте в него человека из ключа, и вы нашли полный подграф с 5 вершинами. Редуктор может использовать любой полиномиальный алгоритм.
Первая итерация использует парный подход для очистки графика. Мы опускаем как локальную агрегацию, так и удаление дубликатов. И то, и другое сделает алгоритм более эффективным.
Intermediate key/values:
key: first friend, second friend, friendship/accomplice
value: 1
Выход:
ключ: первый друг, второй друг
значение: пусто
# только друзья с общей судимостью
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
class MAPPER method MAP(name, value) if value is name do emit(triple(name, value, friendship), empty) else for all crime_accomplice in value.accomplices do emit(triple(name, crime_accomplice, accomplice), 1 ) class PARTITIONING_COMPARATOR method compare(key (name1, accomplice1, flag1), key (name2, accomplice2, flag2)) if name1 = name2 AND accomplice1 = accomplice2 return keys are equal return keys are different class SORTING_COMPARATOR method compare(key (name1, accomplice1, flag1), key (name2, accomplice2, flag2)) if name1 = name2 AND accomplice1 AND flag1 is friendship return key1 is lower if name1 = name2 AND accomplice1 AND flag2 is friendship return key2 is lower return compare(pair(name1, accomplice1), pair(name2, accomplice2)) class REDUCER variable previous_name variable previous_accomplice method sameAsPrevious(name, accomplice) if previous_name <> name return false if previous_accomplice <> accomplice return false return true method REDUCE(triple(name, accomplice, flag), items[i1, i2, ..., in]) if sameAsPrevious(name, accomplice) do if items.length > 2 do emit(name, accomplice) return if flag is friendship do previous_name = name previous_accomplice = accomplice |
Вторая итерация прикрепляет списки всех друзей «второй степени» к ребрам:
Input
key: first friend, second friend
value: empty
Intermediate key/values:
key: first friend
value: first friend, second friend
ключ: второй друг
значение: первый друг, второй друг
Выход:
ключ: первый друг, второй друг
значение: все друзья второго друга
ключ: второй друг, первый друг
значение: все друзья первого друга
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
class MAPPER method MAP((first friend, second friend), empty) emit(first friend, (first friend, second friend)) emit(second friend, (first friend, second friend)) class REDUCER method REDUCE(name, edges[e1, e2, ..., en]) friends = new Set friends.add(name) for all edge in edges do friends.add(edge.v1, edge.v2) for all edge in edges do emit(edge, friends) |
Наконец, фазы mapper, shuffle и sort вместе генерируют списки всех друзей любого конкретного человека и отношений между ними.
Input
key: friend 1, friend 2
value: all friends of friend 2
Промежуточный ключ / значения:
ключ: друг 1
значение: друг 2, все друзья друга 2
Ввод редукторов (после перемешивания и сортировки):
ключевая фигура
ценности: все его друзья и отношения между ними.
Выход:
ключ: первый друг, второй друг, третий друг, четвертый друг, пятый друг
значение: банда
1
2
3
4
5
6
7
|
class MAPPER method MAP((friend , friend 2 ), all friends of second friend) emit(friend 1 , (friend 2 , all friends of friend 2 )) class REDUCER method REDUCE(name, graph attached to it) any polynomial algorithm will work |
Ссылка: MapReduce Вопросы и ответы от нашего партнера по JCG Марии Юрковичовой в блоге This Is Stuff .