Статьи

Изучение последствий Scala с помощью Spark

Некоторое время назад я написал два сообщения о том, как избежать использования  groupBy функции в Spark. Хотя я не буду повторно хэшировать оба сообщения здесь, суть заключалась в том, чтобы воспользоваться преимуществами функций  combByKey  или  aggreagateByKey  . Хотя обе функции обладают потенциалом для повышения производительности и эффективности в наших заданиях Spark, время от времени создание необходимых аргументов снова и снова для базовых сценариев использования может быть утомительным. Это заставило меня задуматься, есть ли способ обеспечить некоторый уровень абстракции для базовых сценариев использования? Например, группировка значений в список или набор. Одновременно я пытался расширить свою Knowlege в  Scala в  более продвинутых функций , включая  implicits  и  классы типов . Что я придумал, так это  Класс GroupingRDDFunctions , обеспечивающий некоторый синтаксический сахар для базовых вариантов использования  aggregateByKey функции с использованием неявной  функциональности класса Scala  .

Кратко о Scala

Хотя полное объяснение последствий Scala выходит за рамки этого поста, вот краткое описание. Когда компилятор Scala находит переменную или выражение неправильного типа, он ищет  implicit функцию, выражение или класс для предоставления правильного типа. implicitФункция (или класс) должен быть в текущей области видимости для компилятора , чтобы сделать свою работу. Обычно это достигается путем импорта объекта Scala, который содержит неявные определения. В этом случае  GroupingRDDFunctions класс оборачивается в  GroupingRDDUtils объект. Вот объявление класса:

object GroupingRDDUtils {
  implicit class GroupingRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) extends Logging with Serializable {
//... details left out for clarity
   }
}

Чтобы использовать GroupingRDDFunctions, просто используйте следующую инструкцию импорта:

import bbejeck.implicits.GroupingRDDUtils._

Предоставленная функциональность

Методы, определенные на  GroupingRDDFunctions :

  1. groupByKeyToList
  2. groupByKeyUnique
  3. countByKey
  4. sumWithTotal — обеспечивает кортеж суммированием числового значения в виде Double вместе с общим количеством элементов для создания суммы
  5. averageByKey

Вот несколько примеров использования  GroupingRDDFunctions:

//To do a grouping of unique values by key using aggregateByKey
val initialSet = mutable.HashSet.empty[String]
val addToSet = (s: mutable.HashSet[String], v: String) => s += v
val mergePartitionSets = (p1: mutable.HashSet[String], p2: mutable.HashSet[String]) => p1 ++= p2
val uniqueByKey = kv.aggregateByKey(initialSet)(addToSet, mergePartitionSets)

//Now can be accomplished doing
val uniqueByKey = kv.groupByKeyUnique()

//Computing Count By Key with aggregateByKey
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val counts = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)

//Now becomes
val counts = kv.countByKey()

Здесь действительно ничего особенного не происходит. Мы просто упаковываем экземпляр RDD и предоставляем возможность использовать методы, перечисленные выше, для этого экземпляра RDD. Внутри  GroupingRDDFunctions класса мы все еще используем  aggregateByKey функцию.

Неявное преобразование параметров

В качестве другого примера неявного использования давайте взглянем на  averageByKey функцию. В приведенном ниже коде мы вычисляем среднее значение по ключу, применяя  avaragingFunction к результатам, возвращаемым  sumWithTotal методом. Но если мы посмотрим внимательнее, наши ключи и значения являются родовыми типами ‘K’ и ‘V’, но все эти функции работают с двойными числами.

implicit def intToDouble(num: V): Double = {
        num match {
          case i: Int => i.toDouble
          case _ => num.asInstanceOf[Double]
        }
}

def sumWithTotal(): RDD[(K, (Int, Double))] = {
      self.aggregateByKey((0, 0.0))(incrementCountSumValue, sumTuples)
}

def averageByKey(): RDD[(K, Double)] = {
    self.sumWithTotal().map(t => averagingFunction(t))

}

private def averagingFunction(t: (K, (Int, Double))): (K, Double) = {
      val (name, (numberScores, totalScore)) = t
      (name, totalScore / numberScores)
}

private def incrementCountSumValue(t: (Int, Double), v: V): (Int, Double) = {
      (t._1 + 1, t._2 + v)
}

private def sumTuples(t: (Int, Double), t2: (Int, Double)): (Int, Double) = {
      val (numScores1, totalScore1) = t
      val (numScores2, totalScore2) = t2
      (numScores1 + numScores2, totalScore1 + totalScore2)
}

Так что же произойдет, если указанные значения будут целыми числами, а не двойными? Также взгляните на  incrementCountSumValue метод, как можно добавить значение типа ‘V’ к двойному значению кортежа? Это хороший пример использования неявной функции. Компилятор будет искать и находить  intToDouble функцию и применять ее к параметру  incrementCountSumValueметода. Если значение является целым числом, его безразличие преобразуется в двойное, иначе мы возвращаем двойное.

Вывод

Хотя использование имплицитов в Scala должно быть разумным, на мой взгляд, приведенный здесь пример представляет собой хороший пример использования. Мы добавляем некоторое полезное поведение в класс, просто добавляя оператор импорта. Плюс очень просто проверить неявный класс, чтобы увидеть, что происходит под прикрытием.

Ссылки