Статьи

MapReduce Вопросы и ответы, часть 2

4 Инвертирование индексации для поиска текста

Глава содержит много деталей о кодировании и сжатии целых чисел. Поскольку эти темы не имеют прямого отношения к MapReduce, я не задавал вопросов о них.

4.4 Инвертирование индексации: пересмотренная реализация

Объясните инвертирующий алгоритм поиска индекса. Вы можете предположить, что каждый документ помещается в память. Предположим также, что существует огромное количество документов. Какая часть должна быть оптимизирована с помощью целочисленного кодирования и сжатия?

Input: text documents
key: document id
value: text document Output: key/value pairs where
key: word
value: list[documentId, numberOfOccurences] list elements must be sorted by numberOfOccurences

Input: text documents
key: document id
value: text document Output: key/value pairs where
key: word
value: list[documentId, numberOfOccurences] list elements must be sorted by numberOfOccurences

Ответ:

Mapper подсчитывает количество вхождений в документе для каждого слова. Поскольку весь документ помещается в память, мы можем хранить частичные результаты на карте.

Intermediate key/values:
key: word, numberOfOccurences
value: documentId

Пользовательский разделитель группирует промежуточный ключ / значения по слову. Пользовательская сортировка сортирует их первично по слову и вторично по количеству вхождений.

Редуктор использует метод инициализации для инициализации списка всех сообщений. Метод Reduce обрабатывает два случая:

  • текущее слово, равное предыдущему слову — добавить documentId и numberOfOccurences в список публикаций.
  • текущее слово, равное предыдущему слову — вывести предыдущее слово и список рассылки; инициализировать список рассылки.

Список проводок в редукторе должен быть сжат.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class MAPPER
  method INITIALIZE
    H = new hash map   
 
  method MAP(docid, doc d)
    H = new hash map
    for all term w in doc d do
        H(w) = H(w) + 1
 
    for all term w in H do
        emit(pair(u, w), count 1)
 
  method CLOSE
    for all term w in H
      emit(pair(w, H(w)), docid)   
 
class REDUCER
  variable previous_word = 0
  variable PL = new list of postings
 
  method REDUCE(pair (w, #occurrences), docid)
    if w <> previous_word && previous_word <> 0 do
      emit(w, PL)
      PL = new list of postings
     
    PL.add(pair(#occurrences, docid))
    previous_word = w
 
  method compare(key (w1, o1), key (w2, o2))
    if w1 = w2
      return keys are equal
 
    return keys are different
 
class SORTING_COMPARATOR
  method compare(key (w1, o1), key (w2, o2))
    if w1 = w2 do
      return compare(o1, o2)
          
    return compare(w1, w2)

5 графовых алгоритмов

Глава содержит два алгоритма: кратчайший путь на графике и алгоритм ранжирования страниц. Вопросы просты.

5.2 Параллельный поиск в ширину

Найти кратчайший путь от одного узла до всех других узлов. С каждым ребром связан вес. Пары ввода / значения уже обработаны компонентом в удобную форму.

Input: graph
key: node id
value: distance to origin, list[adjacent node, edge length]

Вывод: пары ключ / значение, где
ключ: идентификатор узла
значение: расстояние до источника, список [соседний узел, длина ребра]

Ответ:

Алгоритм требует нескольких итераций. Он останавливает итерацию, не меняет никакого «расстояния до источника». В худшем случае, будет O (n) итераций, где n — количество узлов в графе.

Mapper передает исходный граф на следующую итерацию как есть. Кроме того, он генерирует пару ключ / значение для каждого соседнего узла. Значение содержит минимальное известное расстояние от источника, если маршрут пройдет через узел.

1
2
3
4
5
class MAPPER
  method MAP(node, pair(dist, adjacencylist))
    emit(node, pair(dist, adjacencylist))
    for all (closenode, nodedist) in adjacencylist do
      emit(closenode, pair(dist + nodedist, empty))

Редуктор находит минимальное известное расстояние от каждого узла. Он передает расстояние вместе с исходным графиком до следующей итерации. Он также увеличивает глобальный счетчик всякий раз, когда изменяется минимальное известное расстояние до любого узла.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
class REDUCER
  method REDUCE(node, list(dist, adjacencylist))
    minimum = infinity
    previous_iteration_solution = infinity
    original_graph = empty
    for all (dist, adjacencylist) in list do
      if adjacencylist not empty do
        original_graph = adjacencylist
        previous_iteration_solution = dist
      if minimum > dist
        minimum = dist
     
    if previous_iteration_solution <> minimum
      increment global counter
    emit(node, pair(minimum, original_graph))

Если глобальный счетчик равен 0, алгоритм останавливается. В противном случае требуется еще одна итерация.

Объясните алгоритм ранга страницы, предположим, что alpha = 0.

Ответ:

Рейтинг страницы P(n) страницы n рассчитывается по формам страниц всех страниц, ссылающихся на нее.

P(n) = sum_m (P(m)/C(m))

Сумма проходит через все страницы m ссылающиеся на страницу n . C(m) — количество исходящих ссылок на странице m .

Алгоритм ранга страницы выполняется в итерациях. Mapper передает вклад ранга каждой страницы на соседние страницы. Редуктор обновляет рейтинг страницы каждого узла. Алгоритм останавливается, когда рейтинг страниц больше не перемещается.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
class MAPPER
  method MAP(page, (page_rank, adjacency_list))
    emit(page, (0, adjacency_list))
    contribution = page_rank/adjacency_list.length
    for all node in adjacency_list do
      emit(node, (contribution, empty))
 
class REDUCER
  method REDUCE(page, contributions[c1, c2, ..., cn])
    rank = 0
    adjacency_list = new list
    for all c in contributions do
      adjacency_list.addAll(c.adjacency_list)
      rank = rank + c.contribution
 
    emit(page, (rank, adjacency_list))

6 EM алгоритмы для обработки текста

Я не сделал никаких вопросов из этой главы.

упражнения
Эта глава содержит практические упражнения для MapReduce. Некоторые из них требуют нескольких итераций.

Разогреть

Подсчитать количество вхождений каждого слова в текстовой коллекции.

Input:
key: document id,
value: text document.

Выход:
ключ: слово,
значение: количество вхождений.

Ответ:

Intermediate pairs:
key: word
value: integer - how many times was the word seen in the input.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
class MAPPER
  method MAP(docid a, doc d)
    for all term w in doc d do
      emit(w, 1)
 
class COMBINER
  method COMBINE(word w, counts[c1, c2, ..., cn])
    s = 0
    for all c in counts[c1, c2, ..., cn] do
      s = s + c
 
    emit(word w, s)
 
class REDUCER
  variable total_occurrences = 0
 
  method REDUCE(word w, counts[c1, c2, ..., cn])
    s = 0
    for all c in counts[c1, c2, ..., cn] do
      s = s + c
 
    emit(word w, s)

Альтернативное решение будет использовать комбинирование в картографе.  

Интернет-магазин

Журнал пользователя сайта содержит идентификаторы пользователей и продолжительность каждого сеанса. На сайте скромное количество зарегистрированных пользователей. Вычислите среднюю продолжительность сеанса для каждого пользователя.

Input:
key: user id,
value: session length.

Выход:
ключ: идентификатор пользователя,
значение: средняя продолжительность сеанса.

Ответ:

Поскольку количество зарегистрированных пользователей невелико, мы можем использовать комбинирование в картографе.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class MAPPER
  variable total_time = new hash map
  variable sessions_number = new hash map
 
  method MAP(user_id, session_length)
    total_time(user_id) = total_time(user_id) + session_length
    sessions_number(user_id) = sessions_number(user_id) + 1
 
  method CLOSE
    for all user_id in total_logged_in_time
      tt = total_time(user_id)
      sn = sessions_number(user_id)
      emit(user_id, pair(tt, sn))   
 
class REDUCER
  method REDUCE(user_id, [pairs(time, sessions_number)])
    total_time = 0
    total_sessions = 0
    for all pairs in [pairs(time, sessions_number)] do
      total_time = total_time + time
      total_sessions = total_sessions + sessions_number
 
    emit(user_id, total_time/total_sessions)

Журнал интернет-магазина содержит идентификатор пользователя и купленный товар для каждой продажи. Вам необходимо реализовать функцию «покупатели товара также купили». Каждый раз, когда товар отображается, магазин предлагает пять товаров, которые чаще всего покупают покупатели.

Input:
key: user id,
value: brought item.

Выход:
ключ: элемент,
значение: список пяти наиболее распространенных предметов, которые покупатели также покупают.

Ответ:

Наше решение имеет две итерации. Первая итерация генерирует списки всех предметов, принесенных одним и тем же пользователем. Группировка выполняется структурой, и преобразователь, и преобразователь выполняют функцию идентификации.

Input:
key: user id,
value: brought item.

Выход:
ключ: идентификатор пользователя,
значение: список всех принесенных предметов.

1
2
3
4
5
6
7
class MAPPER
  method MAP(user_id, item)
    emit(user_id, item)
 
class REDUCER
  method REDUCE(user_id, items[i1, i2, ..., in])
    emit(user_id, items)

Вторая итерация решает проблему одновременного появления элементов списка. Это использует подход полос. Единственное отличие от стандартного решения состоит в том, что мы испустили только пять наиболее распространенных случаев.

Input:
key: user id,
value: list of all brought items.

Выход:
ключ: элемент,
значение: список из пяти наиболее распространенных совпадений.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
class MAPPER
  method MAP(user_id, items[i1, i2, ..., in])
    for all item in items do
      H = new hash map
      for all item j in items do
        H(j) = H(j) + 1
      emit(item, H)
 
class REDUCER
  method REDUCE(item, stripes[H1, H2, ..., Hn])
    T = new hash map
    for all H in stripes do
      for all (key/value) in H do
        T(key) = T(key) + value
    emit(user_id, max_five(T))

Журнал интернет-магазина содержит идентификатор пользователя, метку времени, предмет и количество принесенных штук для каждой продажи. Магазин ищет товары, продажи которых растут или падают одновременно. Найдите 20 пар предметов с максимумом таких месяцев.

Input:
key: user id,
value: timestamp, brought item, count.

Выход:
ключ: предмет, предмет
значение: количество месяцев, в течение которых продажи обоих товаров увеличивались или уменьшались.
#: вывод содержит 20 пар ключ / значение с максимальным значением

Ответ:

Наше решение требует нескольких итераций MapReduce. Мы должны:

  • рассчитать, выросли или упали продажи товаров за месяц,
  • создавать списки товаров с одинаковыми изменениями продаж в течение одного месяца,
  • найти количество совпадений в этих списках,
  • выбирайте предметы с максимальным количеством совпадений.

Первая итерация рассчитывает изменения продаж за любой месяц. Мы должны поставить маппер, разделитель, сортировку и редуктор. Mapper генерирует одну промежуточную пару ключ / значение для каждого входного ключа / значения. Ключ состоит из проданного товара и месяца продаж. Значение содержит количество проданных штук.

Partitioner отправляет все пары ключ / значение с одним и тем же элементом в один и тот же редуктор. Пользовательская сортировка сортирует их по месяцам. Наконец, редуктор рассчитывает изменения продаж.

Input:
key: user id,
value: timestamp, item, count.

Промежуточный ключ / значения:
ключ: пункт, месяц
значение: кол.

Выход:
ключ: месяц, вверх / вниз / равно
значение: элемент.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class MAPPER
  method MAP(user_id, (timestamp, item, count))
    month = get_month(timestamp)
    emit((item, month), count)
 
class PARTITIONING_COMPARATOR
  method compare(key (item1, month1), key (item2, month2))
    if item1 = item2
      return keys are equal
 
    return keys are different
 
class SORTING_COMPARATOR
  method compare(key (item1, month1), key (item2, month2))
    if item1 = item2 do
      return compare(month1, month2)
          
    return compare(item1, item2)
 
class REDUCER
  method REDUCE((item, month), counts[c1, c2, ..., cn])
    c = sum([c1, c2, ..., cn])
    if last_item = item
      if last_month + 1 = month
        //emit correct up/down/equal flags
        if last_count < count
          emit((item, month), up)
        if last_count > count
          emit((item, month), down)
        if last_count = count
          emit((item, month), equal)
      else
        //no sales during some months
        emit((item, last_month + 1), down)
        emit((item, month), up)
    else
      // new item
      emit((last_item, last_month + 1), down)
      emit((item, month), up)
 
    last_item = item
    last_count = count
    last_month = month

Вторая итерация группирует результаты первой итерации по ключам. Он генерирует списки товаров с одинаковыми изменениями продаж в течение одного месяца. Framework делает всю работу. Как картограф, так и редуктор выполняют функцию идентификации.

Input:
key: month, up/down/equal
value: item.

Выход:
ключ: месяц, вверх / вниз / равно
значение: [предметы].

Третья итерация выполняет стандартный алгоритм совместного использования пар.

Input:
key: month, up/down/equal
value: [items].

Промежуточный ключ / значения:
ключ: предмет, предмет
значение: частичное количество совпадений.

Выход:
ключ: предмет, предмет
значение: количество месяцев, в течение которых продажи обоих товаров увеличивались или уменьшались.
#: вывод содержит все пары элементов

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
class MAPPER
  method MAP((month, change), items[i1, i2, ..., in])
    for each i in items do
      for each j in items do
        if i != j
          emit((i, j), 1)
 
class COMBINER
  method COMBINE((item1, item2), co-occurrences[c1, c2, ..., cn])
    s = 0
    for all c in co-occurrences[c1, c2, ..., cn] do
      s = s + c
 
    emit((item1, item2), s)
 
class REDUCER
  method REDUCE((item, item), co-occurrences[c1, c2, ..., cn])
    s = 0
    for all c in co-occurrences[c1, c2, ..., cn] do
      s = s + c
 
    emit((item1, item2), s)

Наконец, мы должны выбрать 20 пар ключ / значение с максимальным значением. Каждый картограф выбирает 20 пар ключ / значение с максимальным значением и испускает их одним и тем же ключом. Будет только один редуктор, который выберет последние 20 пар ключ / значение.

Input:
key: item, item
value: number of months when both items sales rose or decline.
#: the output contains all items couples

Промежуточный ключ / значения:
ключ: 1
value: item, item, количество месяцев, в течение которых продажи обоих товаров росли или снижались.
#: вывод содержит 20 пар ключ / значение с максимальным значением для каждого преобразователя

Выход:
ключ: предмет, предмет
значение: количество месяцев, в течение которых продажи обоих товаров увеличивались или уменьшались.
#: вывод содержит 20 пар ключ / значение с максимальным значением

1
the code is very simple but long

Криминальное агентство

Во всех упражнениях этой главы используется одна и та же структура данных.

Криминальное агентство украло базу данных о дружбе Facebook и хочет проанализировать новые данные. Дружеские отношения хранятся в виде пар ключ / значение, каждая дружба соответствует двум парам ключ / значение:

Friends:
key: first friend name
value: second friend name

ключ: имя второго друга
значение: имя первого друга

Агентству принадлежат также судимости всех граждан:

Criminal record:
key: citizen name
value: when, where, accomplices, description

Найти в опасности молодых людей. Человек считается подверженным риску молодежью, если более половины его друзей имеют судимость.

Ответ:

Наше решение имеет две итерации. Первая итерация объединяет два набора и помечает каждый «значение друга» флагами has_record / law_abiding.

Output:
key: first friend
value: second friend, has_record/law_abiding

Картограф отмечает каждый ключ с именем набора данных. Разделитель группирует данные по именам в ключах, а сортировщик ставит судимости перед дружбой. Мы могли бы использовать локальную агрегацию для удаления нескольких криминальных записей одного и того же лица

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class MAPPER
  method MAP(name, value)
    if value is name do
      emit(name, friendship, item)
    else
      emit(name, criminal, item)
 
class PARTITIONING_COMPARATOR
  method compare(key (name1, dataset1), key (name2, dataset2))
    if name1 = name2
      return keys are equal
  
    return keys are different
  
class SORTING_COMPARATOR
  method compare(key (name1, dataset1), key (name2, dataset2))
    if name1 = name2 AND dataset1 is criminal
      return key1 is lower
 
    if name1 = name2 AND dataset2 is criminal
      return key2 is lower
 
    return compare(name1, name2)
 
class REDUCER
  variable previous_name
 
  method REDUCE(pair(name, flag), items[i1, i2, ..., in])
    if flag is criminal do
      previous_name = name
      has_record = criminal
      return
 
    if previous_name <> name do
      has_record = law_abiding
    else
      has_record = criminal
 
    previous_name = name
    for all i in items do
      emit(i.name, pair(name, has_record))

Вторая итерация учитывает как общее количество друзей, так и количество друзей с криминальным прошлым. Редуктор испускает пары ключ / значение только для молодых людей из группы риска. Также эта итерация может использовать некоторую локальную агрегацию.

Intermediate key/value:
key: name
value: total friends, total friend criminals
# totals are relative only to in data sets subsets

Выход:
ключ: имя
значение: пусто
# только молодые люди в группе риска

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
class MAPPER
  method MAP(name, pair(name, has_record))
    if has_record is law_abiding do
      emit(name, pair(0, 1))
    else
      emit(name, pair(1, 1))
 
class REDUCER
  method REDUCE(name, items[pair(total, criminals)])
    total = 0
    criminals = 0
    for all i in items do
      total = total + i.total
      criminals = criminals + i.criminals
 
    if criminals / total > 0.5 do
      emit(name, empty)

Найти банды. Банда — это группа людей, которая:

  • имеет ровно 5 членов,
  • каждый участник дружит со всеми остальными участниками,
  • каждый из двух членов совершил как минимум 3 преступления вместе.
Ответ:

Опять же, нам нужно три итерации. Идея состоит в том, чтобы сначала очистить график от всех бесполезных ребер, чтобы остались только криминальные контакты. Затем мы разбиваем граф на более мелкие управляемые подграфы. Мы прикрепляем все криминальные контакты и грани между ними каждому человеку:

Last iteration reducers input:
key: person
values: all his criminal contacts and relationships between them.

Конечный редуктор берет меньшие графики, представленные значением в каждой паре ключ / значение, и находит полные подграфы с 4 вершинами. Добавьте в него человека из ключа, и вы нашли полный подграф с 5 вершинами. Редуктор может использовать любой полиномиальный алгоритм.

Первая итерация использует парный подход для очистки графика. Мы опускаем как локальную агрегацию, так и удаление дубликатов. И то, и другое сделает алгоритм более эффективным.

Intermediate key/values:
key: first friend, second friend, friendship/accomplice
value: 1

Выход:
ключ: первый друг, второй друг
значение: пусто
# только друзья с общей судимостью

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class MAPPER
  method MAP(name, value)
    if value is name do
      emit(triple(name, value, friendship), empty)
    else
      for all crime_accomplice in value.accomplices do
        emit(triple(name, crime_accomplice, accomplice), 1)
 
class PARTITIONING_COMPARATOR
  method compare(key (name1, accomplice1, flag1), key (name2, accomplice2, flag2))
    if name1 = name2 AND accomplice1 = accomplice2
      return keys are equal
  
    return keys are different
  
class SORTING_COMPARATOR
  method compare(key (name1, accomplice1, flag1), key (name2, accomplice2, flag2))
    if name1 = name2 AND accomplice1 AND flag1 is friendship
      return key1 is lower
 
    if name1 = name2 AND accomplice1 AND flag2 is friendship
      return key2 is lower
 
    return compare(pair(name1, accomplice1), pair(name2, accomplice2))
 
class REDUCER
  variable previous_name
  variable previous_accomplice
 
  method sameAsPrevious(name, accomplice)
    if previous_name <> name
      return false
 
    if previous_accomplice <> accomplice
      return false
 
    return true
 
  method REDUCE(triple(name, accomplice, flag), items[i1, i2, ..., in])
    if sameAsPrevious(name, accomplice) do
      if items.length > 2 do
        emit(name, accomplice)
      return
 
    if flag is friendship do
      previous_name = name
      previous_accomplice = accomplice

Вторая итерация прикрепляет списки всех друзей «второй степени» к ребрам:

Input
key: first friend, second friend
value: empty
Intermediate key/values:
key: first friend
value: first friend, second friend

ключ: второй друг
значение: первый друг, второй друг

Выход:
ключ: первый друг, второй друг
значение: все друзья второго друга

ключ: второй друг, первый друг
значение: все друзья первого друга

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
class MAPPER
  method MAP((first friend, second friend), empty)
    emit(first friend, (first friend, second friend))
    emit(second friend, (first friend, second friend))
 
class REDUCER
  method REDUCE(name, edges[e1, e2, ..., en])
    friends = new Set
    friends.add(name)
 
    for all edge in edges do
      friends.add(edge.v1, edge.v2)
 
    for all edge in edges do
       emit(edge, friends)     

Наконец, фазы mapper, shuffle и sort вместе генерируют списки всех друзей любого конкретного человека и отношений между ними.

Input
key: friend 1, friend 2
value: all friends of friend 2

Промежуточный ключ / значения:
ключ: друг 1
значение: друг 2, все друзья друга 2

Ввод редукторов (после перемешивания и сортировки):
ключевая фигура
ценности: все его друзья и отношения между ними.

Выход:
ключ: первый друг, второй друг, третий друг, четвертый друг, пятый друг
значение: банда

1
2
3
4
5
6
7
class MAPPER
  method MAP((friend , friend 2), all friends of second friend)
    emit(friend 1, (friend 2, all friends of friend 2))
 
class REDUCER
  method REDUCE(name, graph attached to it)
    any polynomial algorithm will work

Ссылка: MapReduce Вопросы и ответы от нашего партнера по JCG Марии Юрковичовой в блоге This Is Stuff .