Одна из замечательных особенностей Spark Framework — это функциональность, предоставляемая из коробки. Существует класс, предназначенный исключительно для работы с парами ключ-значение, класс PairRDDFunctions . При работе с данными в формате ключ-значение одной из наиболее распространенных операций является группировка значений по ключу. Класс PairRDDFunctions предоставляет groupByKey
функцию, которая делает группировку по ключу тривиальной. Однако groupByKey
это очень дорого и, в зависимости от варианта использования, доступны лучшие альтернативы. При groupByKey
вызове все пары ключ-значение будут перетасовываться по сети в редуктор, где значения собираются вместе. В некоторых случаях groupByKey
это просто отправная точка для выполнения дополнительных операций (сумма, среднее) по ключу. В других случаях нам нужно собрать значения вместе, чтобы вернуть другой тип значения. Spark предоставляет несколько альтернатив для группировки, которые могут либо повысить производительность, либо упростить возможность объединения значений в другой тип. Смысл этого поста заключается в рассмотрении одной из этих альтернативных группирующих функций.
Альтернативные функции группировки
Хотя в PairRDDFunctions
классе много функций , сегодня мы собираемся сосредоточиться aggregateByKey
. aggregateByKey
Функция используется для агрегирования значений для каждого ключа и добавляет потенциал для возврата различного типа значения.
AggregateByKey
Функция aggregateByKey требует 3 параметра:
- Начальное нулевое значение, которое не повлияет на общие значения, которые нужно собрать. Например, если бы мы добавляли числа, начальное значение было бы равно 0. Или в случае сбора уникальных элементов для каждого ключа начальное значение было бы пустым набором.
- Комбинирующая функция, принимающая два параметра. Второй параметр объединяется с первым параметром. Эта функция объединяет / объединяет значения внутри раздела.
- Функция сливающейся функции, принимающая два параметра. В этом случае паремтеры объединяются в одно. Этот шаг объединяет значения между разделами.
В качестве примера давайте соберем уникальные значения для каждого ключа. Думайте об этом как об альтернативе вызова. someRDD.groupByKey().distinct()
Вот код:
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialSet = mutable.HashSet.empty[String]
val addToSet = (s: mutable.HashSet[String], v: String) => s += v
val mergePartitionSets = (p1: mutable.HashSet[String], p2: mutable.HashSet[String]) => p1 ++= p2
val uniqueByKey = kv.aggregateByKey(initialSet)(addToSet, mergePartitionSets)
Вы заметите, что в нашем примере мы используем изменяемые хэш-наборы. Причина использования изменяемых коллекций состоит в том, чтобы избежать дополнительных затрат памяти, связанных с возвратом новых коллекций каждый раз, когда мы добавляем значения или объединяем коллекции. (Это простота, изложенная в документации PairRDDFunctions). Хотя использование aggregateByKey
более многословно, если ваши данные имеют много значений, но только некоторые из них уникальны, такой подход может привести к повышению производительности.
Для нашего второго примера мы сделаем сумму значений по ключу, что должно помочь с производительностью, так как меньшее количество данных будет перетасовываться по сети. Мы предоставляем 3 различных параметра для нашей aggregateByKey
функции. На этот раз мы хотим посчитать, сколько значений мы имеем по ключу, независимо от дубликатов.
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
Для тех, кто работал с hadoop, эта функциональность аналогична использованию комбайнеров.
Полученные результаты
Запуск наших примеров дает следующие результаты:
Aggregate By Key unique Results
bar -> C,D
foo -> B,A
------------------
Aggregate By Key sum Results
bar -> 3
foo -> 5
Вывод
На этом мы завершаем наш краткий обзор aggregateByKey
функции. Хотя использование 3-х функций может быть немного громоздким, это, безусловно, хороший инструмент в вашем распоряжении. В последующих постах мы продолжим освещение методов в PairRDDFunctions
классе Спарка