Глава содержит много деталей о кодировании и сжатии целых чисел. Поскольку эти темы не имеют прямого отношения к MapReduce, я не задавал вопросов о них.
4.4 Инвертирование индексации: пересмотренная реализация
Объясните инвертирующий алгоритм поиска индекса. Вы можете предположить, что каждый документ помещается в память. Предположим также, что существует огромное количество документов. Какая часть должна быть оптимизирована с помощью целочисленного кодирования и сжатия?
Input: text documents
key: document id
value: text document Output: key/value pairs where
key: word
value: list[documentId, numberOfOccurences] list elements must be sorted by numberOfOccurences
Input: text documents
key: document id
value: text document Output: key/value pairs where
key: word
value: list[documentId, numberOfOccurences] list elements must be sorted by numberOfOccurences
Mapper подсчитывает количество вхождений в документе для каждого слова. Поскольку весь документ помещается в память, мы можем хранить частичные результаты на карте.
Intermediate key/values:
key: word, numberOfOccurences
value: documentId
Пользовательский разделитель группирует промежуточный ключ / значения по слову. Пользовательская сортировка сортирует их первично по слову и вторично по количеству вхождений.
Редуктор использует метод инициализации для инициализации списка всех сообщений. Метод Reduce обрабатывает два случая:
- текущее слово, равное предыдущему слову — добавить documentId и numberOfOccurences в список публикаций.
- текущее слово, равное предыдущему слову — вывести предыдущее слово и список рассылки; инициализировать список рассылки.
Список проводок в редукторе должен быть сжат.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
class MAPPER method INITIALIZE H = new hash map method MAP(docid, doc d) H = new hash map for all term w in doc d do H(w) = H(w) + 1 for all term w in H do emit(pair(u, w), count 1) method CLOSE for all term w in H emit(pair(w, H(w)), docid) class REDUCER variable previous_word = 0 variable PL = new list of postings method REDUCE(pair (w, #occurrences), docid) if w <> previous_word && previous_word <> 0 do emit(w, PL) PL = new list of postings PL.add(pair(#occurrences, docid)) previous_word = w method compare(key (w1, o1), key (w2, o2)) if w1 = w2 return keys are equal return keys are differentclass SORTING_COMPARATOR method compare(key (w1, o1), key (w2, o2)) if w1 = w2 do return compare(o1, o2) return compare(w1, w2) |
5 графовых алгоритмов
Глава содержит два алгоритма: кратчайший путь на графике и алгоритм ранжирования страниц. Вопросы просты.
5.2 Параллельный поиск в ширину
Найти кратчайший путь от одного узла до всех других узлов. С каждым ребром связан вес. Пары ввода / значения уже обработаны компонентом в удобную форму.
Input: graph
key: node id
value: distance to origin, list[adjacent node, edge length]
Вывод: пары ключ / значение, где
ключ: идентификатор узла
значение: расстояние до источника, список [соседний узел, длина ребра]
Алгоритм требует нескольких итераций. Он останавливает итерацию, не меняет никакого «расстояния до источника». В худшем случае, будет O (n) итераций, где n — количество узлов в графе.
Mapper передает исходный граф на следующую итерацию как есть. Кроме того, он генерирует пару ключ / значение для каждого соседнего узла. Значение содержит минимальное известное расстояние от источника, если маршрут пройдет через узел.
|
1
2
3
4
5
|
class MAPPER method MAP(node, pair(dist, adjacencylist)) emit(node, pair(dist, adjacencylist)) for all (closenode, nodedist) in adjacencylist do emit(closenode, pair(dist + nodedist, empty)) |
Редуктор находит минимальное известное расстояние от каждого узла. Он передает расстояние вместе с исходным графиком до следующей итерации. Он также увеличивает глобальный счетчик всякий раз, когда изменяется минимальное известное расстояние до любого узла.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
class REDUCER method REDUCE(node, list(dist, adjacencylist)) minimum = infinity previous_iteration_solution = infinity original_graph = empty for all (dist, adjacencylist) in list do if adjacencylist not empty do original_graph = adjacencylist previous_iteration_solution = dist if minimum > dist minimum = dist if previous_iteration_solution <> minimum increment global counter emit(node, pair(minimum, original_graph)) |
Если глобальный счетчик равен 0, алгоритм останавливается. В противном случае требуется еще одна итерация.
Объясните алгоритм ранга страницы, предположим, что alpha = 0.
Рейтинг страницы P(n) страницы n рассчитывается по формам страниц всех страниц, ссылающихся на нее.
P(n) = sum_m (P(m)/C(m))
Сумма проходит через все страницы m ссылающиеся на страницу n . C(m) — количество исходящих ссылок на странице m .
Алгоритм ранга страницы выполняется в итерациях. Mapper передает вклад ранга каждой страницы на соседние страницы. Редуктор обновляет рейтинг страницы каждого узла. Алгоритм останавливается, когда рейтинг страниц больше не перемещается.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
class MAPPER method MAP(page, (page_rank, adjacency_list)) emit(page, (0, adjacency_list)) contribution = page_rank/adjacency_list.length for all node in adjacency_list do emit(node, (contribution, empty))class REDUCER method REDUCE(page, contributions[c1, c2, ..., cn]) rank = 0 adjacency_list = new list for all c in contributions do adjacency_list.addAll(c.adjacency_list) rank = rank + c.contribution emit(page, (rank, adjacency_list)) |
6 EM алгоритмы для обработки текста
Я не сделал никаких вопросов из этой главы.
упражнения
Эта глава содержит практические упражнения для MapReduce. Некоторые из них требуют нескольких итераций.
Разогреть
Подсчитать количество вхождений каждого слова в текстовой коллекции.
Input:
key: document id,
value: text document.
Выход:
ключ: слово,
значение: количество вхождений.
Intermediate pairs:
key: word
value: integer - how many times was the word seen in the input.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
class MAPPER method MAP(docid a, doc d) for all term w in doc d do emit(w, 1)class COMBINER method COMBINE(word w, counts[c1, c2, ..., cn]) s = 0 for all c in counts[c1, c2, ..., cn] do s = s + c emit(word w, s)class REDUCER variable total_occurrences = 0 method REDUCE(word w, counts[c1, c2, ..., cn]) s = 0 for all c in counts[c1, c2, ..., cn] do s = s + c emit(word w, s) |
Альтернативное решение будет использовать комбинирование в картографе.
Интернет-магазин
Журнал пользователя сайта содержит идентификаторы пользователей и продолжительность каждого сеанса. На сайте скромное количество зарегистрированных пользователей. Вычислите среднюю продолжительность сеанса для каждого пользователя.
Input:
key: user id,
value: session length.
Выход:
ключ: идентификатор пользователя,
значение: средняя продолжительность сеанса.
Поскольку количество зарегистрированных пользователей невелико, мы можем использовать комбинирование в картографе.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
class MAPPER variable total_time = new hash map variable sessions_number = new hash map method MAP(user_id, session_length) total_time(user_id) = total_time(user_id) + session_length sessions_number(user_id) = sessions_number(user_id) + 1 method CLOSE for all user_id in total_logged_in_time tt = total_time(user_id) sn = sessions_number(user_id) emit(user_id, pair(tt, sn)) class REDUCER method REDUCE(user_id, [pairs(time, sessions_number)]) total_time = 0 total_sessions = 0 for all pairs in [pairs(time, sessions_number)] do total_time = total_time + time total_sessions = total_sessions + sessions_number emit(user_id, total_time/total_sessions) |
Журнал интернет-магазина содержит идентификатор пользователя и купленный товар для каждой продажи. Вам необходимо реализовать функцию «покупатели товара также купили». Каждый раз, когда товар отображается, магазин предлагает пять товаров, которые чаще всего покупают покупатели.
Input:
key: user id,
value: brought item.
Выход:
ключ: элемент,
значение: список пяти наиболее распространенных предметов, которые покупатели также покупают.
Наше решение имеет две итерации. Первая итерация генерирует списки всех предметов, принесенных одним и тем же пользователем. Группировка выполняется структурой, и преобразователь, и преобразователь выполняют функцию идентификации.
Input:
key: user id,
value: brought item.
Выход:
ключ: идентификатор пользователя,
значение: список всех принесенных предметов.
|
1
2
3
4
5
6
7
|
class MAPPER method MAP(user_id, item) emit(user_id, item)class REDUCER method REDUCE(user_id, items[i1, i2, ..., in]) emit(user_id, items) |
Вторая итерация решает проблему одновременного появления элементов списка. Это использует подход полос. Единственное отличие от стандартного решения состоит в том, что мы испустили только пять наиболее распространенных случаев.
Input:
key: user id,
value: list of all brought items.
Выход:
ключ: элемент,
значение: список из пяти наиболее распространенных совпадений.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
class MAPPER method MAP(user_id, items[i1, i2, ..., in]) for all item in items do H = new hash map for all item j in items do H(j) = H(j) + 1 emit(item, H)class REDUCER method REDUCE(item, stripes[H1, H2, ..., Hn]) T = new hash map for all H in stripes do for all (key/value) in H do T(key) = T(key) + value emit(user_id, max_five(T)) |
Журнал интернет-магазина содержит идентификатор пользователя, метку времени, предмет и количество принесенных штук для каждой продажи. Магазин ищет товары, продажи которых растут или падают одновременно. Найдите 20 пар предметов с максимумом таких месяцев.
Input:
key: user id,
value: timestamp, brought item, count.
Выход:
ключ: предмет, предмет
значение: количество месяцев, в течение которых продажи обоих товаров увеличивались или уменьшались.
#: вывод содержит 20 пар ключ / значение с максимальным значением
Наше решение требует нескольких итераций MapReduce. Мы должны:
- рассчитать, выросли или упали продажи товаров за месяц,
- создавать списки товаров с одинаковыми изменениями продаж в течение одного месяца,
- найти количество совпадений в этих списках,
- выбирайте предметы с максимальным количеством совпадений.
Первая итерация рассчитывает изменения продаж за любой месяц. Мы должны поставить маппер, разделитель, сортировку и редуктор. Mapper генерирует одну промежуточную пару ключ / значение для каждого входного ключа / значения. Ключ состоит из проданного товара и месяца продаж. Значение содержит количество проданных штук.
Partitioner отправляет все пары ключ / значение с одним и тем же элементом в один и тот же редуктор. Пользовательская сортировка сортирует их по месяцам. Наконец, редуктор рассчитывает изменения продаж.
Input:
key: user id,
value: timestamp, item, count.
Промежуточный ключ / значения:
ключ: пункт, месяц
значение: кол.
Выход:
ключ: месяц, вверх / вниз / равно
значение: элемент.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
class MAPPER method MAP(user_id, (timestamp, item, count)) month = get_month(timestamp) emit((item, month), count)class PARTITIONING_COMPARATOR method compare(key (item1, month1), key (item2, month2)) if item1 = item2 return keys are equal return keys are differentclass SORTING_COMPARATOR method compare(key (item1, month1), key (item2, month2)) if item1 = item2 do return compare(month1, month2) return compare(item1, item2)class REDUCER method REDUCE((item, month), counts[c1, c2, ..., cn]) c = sum([c1, c2, ..., cn]) if last_item = item if last_month + 1 = month //emit correct up/down/equal flags if last_count < count emit((item, month), up) if last_count > count emit((item, month), down) if last_count = count emit((item, month), equal) else //no sales during some months emit((item, last_month + 1), down) emit((item, month), up) else // new item emit((last_item, last_month + 1), down) emit((item, month), up) last_item = item last_count = count last_month = month |
Вторая итерация группирует результаты первой итерации по ключам. Он генерирует списки товаров с одинаковыми изменениями продаж в течение одного месяца. Framework делает всю работу. Как картограф, так и редуктор выполняют функцию идентификации.
Input:
key: month, up/down/equal
value: item.
Выход:
ключ: месяц, вверх / вниз / равно
значение: [предметы].
Третья итерация выполняет стандартный алгоритм совместного использования пар.
Input:
key: month, up/down/equal
value: [items].
Промежуточный ключ / значения:
ключ: предмет, предмет
значение: частичное количество совпадений.
Выход:
ключ: предмет, предмет
значение: количество месяцев, в течение которых продажи обоих товаров увеличивались или уменьшались.
#: вывод содержит все пары элементов
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
class MAPPER method MAP((month, change), items[i1, i2, ..., in]) for each i in items do for each j in items do if i != j emit((i, j), 1) class COMBINER method COMBINE((item1, item2), co-occurrences[c1, c2, ..., cn]) s = 0 for all c in co-occurrences[c1, c2, ..., cn] do s = s + c emit((item1, item2), s)class REDUCER method REDUCE((item, item), co-occurrences[c1, c2, ..., cn]) s = 0 for all c in co-occurrences[c1, c2, ..., cn] do s = s + c emit((item1, item2), s) |
Наконец, мы должны выбрать 20 пар ключ / значение с максимальным значением. Каждый картограф выбирает 20 пар ключ / значение с максимальным значением и испускает их одним и тем же ключом. Будет только один редуктор, который выберет последние 20 пар ключ / значение.
Input:
key: item, item
value: number of months when both items sales rose or decline.
#: the output contains all items couples
Промежуточный ключ / значения:
ключ: 1
value: item, item, количество месяцев, в течение которых продажи обоих товаров росли или снижались.
#: вывод содержит 20 пар ключ / значение с максимальным значением для каждого преобразователя
Выход:
ключ: предмет, предмет
значение: количество месяцев, в течение которых продажи обоих товаров увеличивались или уменьшались.
#: вывод содержит 20 пар ключ / значение с максимальным значением
|
1
|
the code is very simple but long |
Криминальное агентство
Во всех упражнениях этой главы используется одна и та же структура данных.
Криминальное агентство украло базу данных о дружбе Facebook и хочет проанализировать новые данные. Дружеские отношения хранятся в виде пар ключ / значение, каждая дружба соответствует двум парам ключ / значение:
Friends:
key: first friend name
value: second friend name
ключ: имя второго друга
значение: имя первого друга
Агентству принадлежат также судимости всех граждан:
Criminal record:
key: citizen name
value: when, where, accomplices, description
Найти в опасности молодых людей. Человек считается подверженным риску молодежью, если более половины его друзей имеют судимость.
Наше решение имеет две итерации. Первая итерация объединяет два набора и помечает каждый «значение друга» флагами has_record / law_abiding.
Output:
key: first friend
value: second friend, has_record/law_abiding
Картограф отмечает каждый ключ с именем набора данных. Разделитель группирует данные по именам в ключах, а сортировщик ставит судимости перед дружбой. Мы могли бы использовать локальную агрегацию для удаления нескольких криминальных записей одного и того же лица
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
class MAPPER method MAP(name, value) if value is name do emit(name, friendship, item) else emit(name, criminal, item)class PARTITIONING_COMPARATOR method compare(key (name1, dataset1), key (name2, dataset2)) if name1 = name2 return keys are equal return keys are different class SORTING_COMPARATOR method compare(key (name1, dataset1), key (name2, dataset2)) if name1 = name2 AND dataset1 is criminal return key1 is lower if name1 = name2 AND dataset2 is criminal return key2 is lower return compare(name1, name2)class REDUCER variable previous_name method REDUCE(pair(name, flag), items[i1, i2, ..., in]) if flag is criminal do previous_name = name has_record = criminal return if previous_name <> name do has_record = law_abiding else has_record = criminal previous_name = name for all i in items do emit(i.name, pair(name, has_record)) |
Вторая итерация учитывает как общее количество друзей, так и количество друзей с криминальным прошлым. Редуктор испускает пары ключ / значение только для молодых людей из группы риска. Также эта итерация может использовать некоторую локальную агрегацию.
Intermediate key/value:
key: name
value: total friends, total friend criminals
# totals are relative only to in data sets subsets
Выход:
ключ: имя
значение: пусто
# только молодые люди в группе риска
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
class MAPPER method MAP(name, pair(name, has_record)) if has_record is law_abiding do emit(name, pair(0, 1)) else emit(name, pair(1, 1))class REDUCER method REDUCE(name, items[pair(total, criminals)]) total = 0 criminals = 0 for all i in items do total = total + i.total criminals = criminals + i.criminals if criminals / total > 0.5 do emit(name, empty) |
Найти банды. Банда — это группа людей, которая:
- имеет ровно 5 членов,
- каждый участник дружит со всеми остальными участниками,
- каждый из двух членов совершил как минимум 3 преступления вместе.
Опять же, нам нужно три итерации. Идея состоит в том, чтобы сначала очистить график от всех бесполезных ребер, чтобы остались только криминальные контакты. Затем мы разбиваем граф на более мелкие управляемые подграфы. Мы прикрепляем все криминальные контакты и грани между ними каждому человеку:
Last iteration reducers input:
key: person
values: all his criminal contacts and relationships between them.
Конечный редуктор берет меньшие графики, представленные значением в каждой паре ключ / значение, и находит полные подграфы с 4 вершинами. Добавьте в него человека из ключа, и вы нашли полный подграф с 5 вершинами. Редуктор может использовать любой полиномиальный алгоритм.
Первая итерация использует парный подход для очистки графика. Мы опускаем как локальную агрегацию, так и удаление дубликатов. И то, и другое сделает алгоритм более эффективным.
Intermediate key/values:
key: first friend, second friend, friendship/accomplice
value: 1
Выход:
ключ: первый друг, второй друг
значение: пусто
# только друзья с общей судимостью
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
class MAPPER method MAP(name, value) if value is name do emit(triple(name, value, friendship), empty) else for all crime_accomplice in value.accomplices do emit(triple(name, crime_accomplice, accomplice), 1)class PARTITIONING_COMPARATOR method compare(key (name1, accomplice1, flag1), key (name2, accomplice2, flag2)) if name1 = name2 AND accomplice1 = accomplice2 return keys are equal return keys are different class SORTING_COMPARATOR method compare(key (name1, accomplice1, flag1), key (name2, accomplice2, flag2)) if name1 = name2 AND accomplice1 AND flag1 is friendship return key1 is lower if name1 = name2 AND accomplice1 AND flag2 is friendship return key2 is lower return compare(pair(name1, accomplice1), pair(name2, accomplice2))class REDUCER variable previous_name variable previous_accomplice method sameAsPrevious(name, accomplice) if previous_name <> name return false if previous_accomplice <> accomplice return false return true method REDUCE(triple(name, accomplice, flag), items[i1, i2, ..., in]) if sameAsPrevious(name, accomplice) do if items.length > 2 do emit(name, accomplice) return if flag is friendship do previous_name = name previous_accomplice = accomplice |
Вторая итерация прикрепляет списки всех друзей «второй степени» к ребрам:
Input
key: first friend, second friend
value: empty
Intermediate key/values:
key: first friend
value: first friend, second friend
ключ: второй друг
значение: первый друг, второй друг
Выход:
ключ: первый друг, второй друг
значение: все друзья второго друга
ключ: второй друг, первый друг
значение: все друзья первого друга
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
class MAPPER method MAP((first friend, second friend), empty) emit(first friend, (first friend, second friend)) emit(second friend, (first friend, second friend))class REDUCER method REDUCE(name, edges[e1, e2, ..., en]) friends = new Set friends.add(name) for all edge in edges do friends.add(edge.v1, edge.v2) for all edge in edges do emit(edge, friends) |
Наконец, фазы mapper, shuffle и sort вместе генерируют списки всех друзей любого конкретного человека и отношений между ними.
Input
key: friend 1, friend 2
value: all friends of friend 2
Промежуточный ключ / значения:
ключ: друг 1
значение: друг 2, все друзья друга 2
Ввод редукторов (после перемешивания и сортировки):
ключевая фигура
ценности: все его друзья и отношения между ними.
Выход:
ключ: первый друг, второй друг, третий друг, четвертый друг, пятый друг
значение: банда
|
1
2
3
4
5
6
7
|
class MAPPER method MAP((friend , friend 2), all friends of second friend) emit(friend 1, (friend 2, all friends of friend 2))class REDUCER method REDUCE(name, graph attached to it) any polynomial algorithm will work |
Ссылка: MapReduce Вопросы и ответы от нашего партнера по JCG Марии Юрковичовой в блоге This Is Stuff .