Статьи

Spark PairRDDFunctions — AggregateByKey

Одна из замечательных особенностей  Spark Framework  — это функциональность, предоставляемая из коробки. Существует класс, предназначенный исключительно для работы с парами ключ-значение,   класс PairRDDFunctions . При работе с данными в формате ключ-значение одной из наиболее распространенных операций является группировка значений по ключу. Класс PairRDDFunctions предоставляет  groupByKey функцию, которая делает группировку по ключу тривиальной. Однако  groupByKey это очень дорого и, в зависимости от варианта использования, доступны лучшие альтернативы. При  groupByKey вызове все пары ключ-значение будут перетасовываться по сети в редуктор, где значения собираются вместе. В некоторых случаях groupByKey это просто отправная точка для выполнения дополнительных операций (сумма, среднее) по ключу. В других случаях нам нужно собрать значения вместе, чтобы вернуть другой тип значения. Spark предоставляет несколько альтернатив для группировки, которые могут либо повысить производительность, либо упростить возможность объединения значений в другой тип. Смысл этого поста заключается в рассмотрении одной из этих альтернативных группирующих функций.

Альтернативные функции группировки

Хотя в PairRDDFunctions классе много функций  , сегодня мы собираемся сосредоточиться  aggregateByKeyaggregateByKey Функция используется для агрегирования значений для каждого ключа и добавляет потенциал для возврата различного типа значения.

AggregateByKey

Функция aggregateByKey требует 3 параметра:

  1. Начальное нулевое значение, которое не повлияет на общие значения, которые нужно собрать. Например, если бы мы добавляли числа, начальное значение было бы равно 0. Или в случае сбора уникальных элементов для каждого ключа начальное значение было бы пустым набором.
  2. Комбинирующая функция, принимающая два параметра. Второй параметр объединяется с первым параметром. Эта функция объединяет / объединяет значения внутри раздела.
  3. Функция сливающейся функции, принимающая два параметра. В этом случае паремтеры объединяются в одно. Этот шаг объединяет значения между разделами.

В качестве примера давайте соберем уникальные значения для каждого ключа. Думайте об этом как об альтернативе вызова.  someRDD.groupByKey().distinct() Вот код:

val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()

val initialSet = mutable.HashSet.empty[String]
val addToSet = (s: mutable.HashSet[String], v: String) => s += v
val mergePartitionSets = (p1: mutable.HashSet[String], p2: mutable.HashSet[String]) => p1 ++= p2

val uniqueByKey = kv.aggregateByKey(initialSet)(addToSet, mergePartitionSets)

Вы заметите, что  в нашем примере мы используем  изменяемые хэш-наборы. Причина использования изменяемых коллекций состоит в том, чтобы избежать дополнительных затрат памяти, связанных с возвратом новых коллекций каждый раз, когда мы добавляем значения или объединяем коллекции. (Это простота, изложенная в документации PairRDDFunctions). Хотя использование  aggregateByKey более многословно, если ваши данные имеют много значений, но только некоторые из них уникальны, такой подход может привести к повышению производительности.

Для нашего второго примера мы сделаем сумму значений по ключу, что должно помочь с производительностью, так как меньшее количество данных будет перетасовываться по сети. Мы предоставляем 3 различных параметра для нашей  aggregateByKey функции. На этот раз мы хотим посчитать, сколько значений мы имеем по ключу, независимо от дубликатов.

val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()

val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2

val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)

Для тех, кто работал с hadoop, эта функциональность аналогична использованию комбайнеров.

Полученные результаты

Запуск наших примеров дает следующие результаты:

Aggregate By Key unique Results
bar -> C,D
foo -> B,A
------------------
Aggregate By Key sum Results
bar -> 3
foo -> 5

Вывод

На этом мы завершаем наш краткий обзор  aggregateByKey функции. Хотя использование 3-х функций может быть немного громоздким, это, безусловно, хороший инструмент в вашем распоряжении. В последующих постах мы продолжим освещение методов в PairRDDFunctions классе Спарка 

Ресурсы