Статьи

Объединяет с Map Reduce

Последние несколько дней я читал о реализациях Join, доступных для Hadoop. В этом посте я расскажу о некоторых приемах, которые я изучил в процессе. Объединения могут быть выполнены как на стороне карты, так и на стороне соединения, в зависимости от характера наборов данных, подлежащих объединению.

Уменьшить боковое соединение

Давайте возьмем следующие таблицы, содержащие данные о сотрудниках и отделах.

Давайте посмотрим, как можно выполнить приведенный ниже запрос на соединение, используя соединение на стороне сокращения.

1
SELECT Employees.Name, Employees.Age, Department.Name FROM Employees INNER JOIN Department ON Employees.Dept_Id=Department.Dept_Id

Сторона карты отвечает за выдачу значений предикатов соединения вместе с соответствующей записью из каждой таблицы, так что записи, имеющие одинаковый идентификатор отдела в обеих таблицах, будут иметь одинаковый редуктор, который затем будет выполнять объединение записей, имеющих одинаковый идентификатор отдела. Однако также необходимо пометить каждую запись, чтобы указать, из какой таблицы произошла запись, так что соединение происходит между записями двух таблиц. Следующая диаграмма иллюстрирует процесс соединения со стороны сокращения.

Вот псевдокод для функции карты для этого сценария.

01
02
03
04
05
06
07
08
09
10
11
map (K table, V rec) {
 
   dept_id = rec.Dept_Id
 
   tagged_rec.tag = table
 
   tagged_rec.rec = rec
 
   emit(dept_id, tagged_rec)
 
}

При сокращении соединение происходит внутри записей, имеющих разные теги.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
reduce (K dept_id, list<tagged_rec> tagged_recs) {
 
   for (tagged_rec : tagged_recs) {
 
      for (tagged_rec1 : taagged_recs) {
 
          if (tagged_rec.tag != tagged_rec1.tag) {
 
              joined_rec = join(tagged_rec, tagged_rec1)
 
          }
       emit (tagged_rec.rec.Dept_Id, joined_rec)
 
    }
 
}

Присоединение к карте (повторное соединение)

Использование распределенного кэша на меньшей таблице

Чтобы эта реализация работала, одно отношение должно вписываться в память. Меньшая таблица реплицируется на каждый узел и загружается в память. Объединение происходит на стороне карты без участия редуктора, что значительно ускоряет процесс, поскольку это позволяет избежать перетасовки всех данных по сети, даже если большинство несоответствующих записей впоследствии удаляется. Меньшую таблицу можно заполнить хеш-таблицей, чтобы можно было выполнить поиск по Dept_Id. Псевдокод описан ниже.

01
02
03
04
05
06
07
08
09
10
11
12
13
map (K table, V rec) {
 
list recs = lookup(rec.Dept_Id) // Get smaller table records having this Dept_Id
 
for (small_table_rec : recs) {
 
joined_rec = join (small_table_rec, rec)
 
}
 
emit (rec.Dept_id, joined_rec)
 
}

Использование распределенного кэша в отфильтрованной таблице

Если таблица меньшего размера не умещается в памяти, может быть возможно удалить ее содержимое, если в запросе было указано выражение фильтрации. Рассмотрим следующий запрос.

1
SELECT Employees.Name, Employees.Age, Department.Name FROM Employees INNER JOIN Department ON Employees.Dept_Id=Department.Dept_Id WHERE Department.Name="Eng"

Здесь меньший набор данных можно получить из таблицы Department, отфильтровывая записи с названиями отделов, отличными от «Eng». Теперь возможно сделать реплицированное соединение на стороне карты с этим меньшим набором данных.

Тиражированный Полу-Присоединение

Уменьшить соединение сторон с помощью фильтрации сторон карты

Даже отфильтрованные данные небольшой таблицы не помещаются в память, возможно, можно включить только Dept_Id отфильтрованных записей в реплицируемый набор данных. Затем на стороне карты этот кэш может быть использован для фильтрации записей, которые будут отправлены для уменьшения, таким образом, уменьшая объем данных, перемещаемых между преобразователями и преобразователями.

Логика на стороне карты будет выглядеть следующим образом.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
map (K table, V rec) {
 
   // Check if this record needs to be sent to reducer
   boolean sendToReducer = check_cache(rec.Dept_Id)
   if (sendToReducer) {
      dept_id = rec.Dept_Id
 
      tagged_rec.tag = table
 
      tagged_rec.rec = rec
 
      emit(dept_id, tagged_rec)
   }
}

Логика на стороне редуктора будет такой же, как и в случае уменьшения бокового соединения.

Использование фильтра Блума

Фильтр Блума — это конструкция, которая может быть использована для проверки содержания данного элемента в наборе. Меньшее представление отфильтрованных Dept_ids может быть получено, если значения Dept_Id могут быть увеличены до фильтра Блума. Затем этот фильтр Блума может быть реплицирован на каждый узел. На стороне карты для каждой записи, выбранной из таблицы меньшего размера, фильтр Блума можно использовать для проверки наличия Dept_Id в записи в фильтре Блума и только в этом случае для отправки этой конкретной записи на сторону уменьшения. Поскольку фильтр Блума гарантированно не дает ложных негативов, результат будет точным.

Ссылка: Присоединяется к Map Reduce от нашего партнера JCG Буддики Чамит в блоге Source Open .