Некоторое время назад я написал два сообщения о том, как избежать использования groupBy
функции в Spark. Хотя я не буду повторно хэшировать оба сообщения здесь, суть заключалась в том, чтобы воспользоваться преимуществами функций combByKey или aggreagateByKey . Хотя обе функции обладают потенциалом для повышения производительности и эффективности в наших заданиях Spark, время от времени создание необходимых аргументов снова и снова для базовых сценариев использования может быть утомительным. Это заставило меня задуматься, есть ли способ обеспечить некоторый уровень абстракции для базовых сценариев использования? Например, группировка значений в список или набор. Одновременно я пытался расширить свою Knowlege в Scala в более продвинутых функций , включая implicits и классы типов . Что я придумал, так это Класс GroupingRDDFunctions , обеспечивающий некоторый синтаксический сахар для базовых вариантов использования aggregateByKey
функции с использованием неявной функциональности класса Scala .
Кратко о Scala
Хотя полное объяснение последствий Scala выходит за рамки этого поста, вот краткое описание. Когда компилятор Scala находит переменную или выражение неправильного типа, он ищет implicit
функцию, выражение или класс для предоставления правильного типа. implicit
Функция (или класс) должен быть в текущей области видимости для компилятора , чтобы сделать свою работу. Обычно это достигается путем импорта объекта Scala, который содержит неявные определения. В этом случае GroupingRDDFunctions
класс оборачивается в GroupingRDDUtils
объект. Вот объявление класса:
object GroupingRDDUtils {
implicit class GroupingRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) extends Logging with Serializable {
//... details left out for clarity
}
}
Чтобы использовать GroupingRDDFunctions, просто используйте следующую инструкцию импорта:
import bbejeck.implicits.GroupingRDDUtils._
Предоставленная функциональность
Методы, определенные на GroupingRDDFunctions
:
- groupByKeyToList
- groupByKeyUnique
- countByKey
- sumWithTotal — обеспечивает кортеж суммированием числового значения в виде Double вместе с общим количеством элементов для создания суммы
- averageByKey
Вот несколько примеров использования GroupingRDDFunctions
:
//To do a grouping of unique values by key using aggregateByKey
val initialSet = mutable.HashSet.empty[String]
val addToSet = (s: mutable.HashSet[String], v: String) => s += v
val mergePartitionSets = (p1: mutable.HashSet[String], p2: mutable.HashSet[String]) => p1 ++= p2
val uniqueByKey = kv.aggregateByKey(initialSet)(addToSet, mergePartitionSets)
//Now can be accomplished doing
val uniqueByKey = kv.groupByKeyUnique()
//Computing Count By Key with aggregateByKey
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val counts = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
//Now becomes
val counts = kv.countByKey()
Здесь действительно ничего особенного не происходит. Мы просто упаковываем экземпляр RDD и предоставляем возможность использовать методы, перечисленные выше, для этого экземпляра RDD. Внутри GroupingRDDFunctions
класса мы все еще используем aggregateByKey
функцию.
Неявное преобразование параметров
В качестве другого примера неявного использования давайте взглянем на averageByKey
функцию. В приведенном ниже коде мы вычисляем среднее значение по ключу, применяя avaragingFunction
к результатам, возвращаемым sumWithTotal
методом. Но если мы посмотрим внимательнее, наши ключи и значения являются родовыми типами ‘K’ и ‘V’, но все эти функции работают с двойными числами.
implicit def intToDouble(num: V): Double = {
num match {
case i: Int => i.toDouble
case _ => num.asInstanceOf[Double]
}
}
def sumWithTotal(): RDD[(K, (Int, Double))] = {
self.aggregateByKey((0, 0.0))(incrementCountSumValue, sumTuples)
}
def averageByKey(): RDD[(K, Double)] = {
self.sumWithTotal().map(t => averagingFunction(t))
}
private def averagingFunction(t: (K, (Int, Double))): (K, Double) = {
val (name, (numberScores, totalScore)) = t
(name, totalScore / numberScores)
}
private def incrementCountSumValue(t: (Int, Double), v: V): (Int, Double) = {
(t._1 + 1, t._2 + v)
}
private def sumTuples(t: (Int, Double), t2: (Int, Double)): (Int, Double) = {
val (numScores1, totalScore1) = t
val (numScores2, totalScore2) = t2
(numScores1 + numScores2, totalScore1 + totalScore2)
}
Так что же произойдет, если указанные значения будут целыми числами, а не двойными? Также взгляните на incrementCountSumValue
метод, как можно добавить значение типа ‘V’ к двойному значению кортежа? Это хороший пример использования неявной функции. Компилятор будет искать и находить intToDouble
функцию и применять ее к параметру incrementCountSumValue
метода. Если значение является целым числом, его безразличие преобразуется в двойное, иначе мы возвращаем двойное.
Вывод
Хотя использование имплицитов в Scala должно быть разумным, на мой взгляд, приведенный здесь пример представляет собой хороший пример использования. Мы добавляем некоторое полезное поведение в класс, просто добавляя оператор импорта. Плюс очень просто проверить неявный класс, чтобы увидеть, что происходит под прикрытием.
Ссылки
- Scala Doc Описание неявных классов
- Хорошее объяснение неявного приоритета
- Хорошее объяснение последствий и типов классов
- Руководство неофитов по Scala — имплициты, классы по типу
- tresata / spark-sorted использует импликации для добавления функциональности.
- GroupingRDDFunctions исходный код
- Группировка RDDFunctions юнит-тест