Статьи

Вторичная сортировка в искре

Вторичная сортировка — это метод, который позволяет упорядочивать по значению (значениям) (в дополнение к сортировке по ключу) на этапе сокращения задания Map-Reduce. Например, вы можете захотеть anyalize вход пользователя в ваше приложение. Сортировка результатов по дню и времени, а также идентификатор пользователя (естественный ключ) помогут определить тенденции пользователей. Дополнительный порядок по дню и времени является примером вторичной сортировки. В то время как я писал ранее о  вторичной сортировке в Hadoop , эта статья расскажет, как мы выполняем вторичную сортировку в Spark.

Настроить

Мы будем использовать RDD API для реализации вторичной сортировки. Хотя это может быть легко достигнуто с помощью  DataFrames , мы не будем останавливаться на этом подходе. Данные, которые мы используем, — это данные о своевременной работе авиакомпании в  Бюро транспорта . Доступно несколько точек данных, но мы сосредоточимся на том, какие авиакомпании имеют наиболее поздние прибытия и в каком аэропорту происходят такие поздние прибытия. Из этого утверждения мы можем определить наш порядок сортировки: идентификатор авиакомпании, идентификатор аэропорта и время задержки. Отказ от ответственности:  Этот пример только для демонстрационных целей! Это не предназначено, чтобы вывести или определить фактическую работу авиакомпании.

Создание пар ключ-значение

Данные загружаются в формате CSV и будут преобразованы в формат ключ-значение. Важной частью вторичной сортировки является то, какие значения включить в ключ, чтобы включить дополнительный порядок. «Естественный» ключ — это AirlinesId, а ArriAirportId и birthyDelay — значения, которые мы включим в ключ. Это представлено  FlightKey классом дела:

case class FlightKey(airLineId: String, arrivalAirPortId: Int, arrivalDelay: Double)

Наша первая попытка создания наших пар ключ-значение выглядит следующим образом:

val rawDataArray = sc.textFile(args(0)).map(line => line.split(","))
  //Using keyBy but retains entire array in value
  val keyedByData = rawDataArray.keyBy(arr => createKey(arr))

 //supporting code
  def createKey(data: Array[String]): FlightKey = {
    FlightKey(data(UNIQUE_CARRIER), safeInt(data(DEST_AIRPORT_ID)), safeDouble(data(ARR_DELAY)))
  }

Этот пример — простой пример использования  keyBy функции. В этом случае  keyBy функция принимает функцию, которая возвращает ключ заданного типа  FlightKey . У нас есть одна проблема с этим подходом. Часть данных, которые мы хотим проанализировать, находится в ключе  и  остается в исходном массиве значений. Хотя отдельные значения сами по себе не очень велики, при рассмотрении объема данных, с которыми мы работаем, мы хотим убедиться, что мы не передаем дублированные данные. Кроме того, окончательные данные, которые мы будем анализировать, содержат только 7 полей. В исходных данных 24 поля. Мы также отбросим эти неиспользуемые поля. Вот наш второй шаг при создании наших пар ключ-значение:

val rawDataArray = sc.textFile(args(0)).map(line => line.split(","))
 val airlineData = rawDataArray.map(arr => createKeyValueTuple(arr))

 //supporting code
 def createKeyValueTuple(data: Array[String]) :(FlightKey,List[String]) = {
      (createKey(data),listData(data))
  }

  def createKey(data: Array[String]): FlightKey = {
    FlightKey(data(UNIQUE_CARRIER), safeInt(data(DEST_AIRPORT_ID)), safeDouble(data(ARR_DELAY)))
  }

  def listData(data: Array[String]): List[String] = {
    List(data(FL_DATE), data(ORIGIN_AIRPORT_ID), data(ORIGIN_CITY_MARKET_ID), data(DEST_CITY_MARKET_ID))
  }

В этом примере мы используем  map для преобразования наших строк в пары ключ-значение. Мы используем ту же  createKey функцию, но добавили,  listData что она возвращает список, содержащий только те значения, которые мы будем анализировать. В целом количество полей в нашем значении изменилось с 24 до 4, что должно дать нам некоторые улучшения производительности.

Код разделения и сортировки

Теперь нам нужно рассмотреть, как разделить и отсортировать наши данные. Есть два момента, которые мы должны рассмотреть.

  1. Нам нужно сгруппировать данные по идентификатору авиакомпании, чтобы попасть в один и тот же раздел на этапе сокращения. Но наш ключ — это составной ключ с 3 полями. Простое разделение по ключу не будет работать для нас. Поэтому мы создадим пользовательский разделитель, который знает, какое значение использовать при определении раздела, к которому будут передаваться данные.
  2. Нам также нужно сообщить Spark, как мы хотим, чтобы наши данные сортировались: сначала авиакомпания, затем и ИД прибытия, и, наконец, ИД прибытия. Кроме того, мы хотим, чтобы задержка прибытия была в порядке убывания, поэтому рейсы с наибольшей задержкой указываются первыми.

Код Разделителя

Код разделителя прост. Мы расширяем   класс Partitioner следующим образом:

class AirlineFlightPartitioner(partitions: Int) extends Partitioner {
    require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

    override def numPartitions: Int = partitions

    override def getPartition(key: Any): Int = {
      val k = key.asInstanceOf[FlightKey]
      k.airLineId.hashCode() % numPartitions
    }
  }

AirlineFlightPartitioner Класс прост, и мы можем видеть , что перегородки определяются с использованием  airlineId только.

Код сортировки

Теперь нам нужно определить, как данные будут отсортированы, как только мы разместим их в правильных разделах. Для достижения сортировки мы создаем сопутствующий объект  FlightKey и определяем неявный   метод Ordering :

object FlightKey {
    implicit def orderingByIdAirportIdDelay[A <: FlightKey] : Ordering[A] = {
       Ordering.by(fk => (fk.airLineId, fk.arrivalAirPortId, fk.arrivalDelay * -1))
    }
  }

Мы использовали неявный  Ordering вместо того, чтобы  FlightKey класс расширял  Ordered . Это так, потому что  Ordered признак подразумевает сортировку по единственному «естественному» значению и  Ordering предполагает сортировку по множественным значениям. Полное объяснение использования имплицитов в сортировке выходит за рамки этого поста, но любопытные могут обратиться  сюда  для лучшего объяснения. Теперь  FlightKey они будут отсортированы в правильном порядке: сначала авиакомпания, затем секунда прибытия, а затем — по величине задержки (в порядке убывания).

Собираем все вместе

Теперь пришло время привести наши разделы в порядок. Это достигается с помощью  repartitionAndSortWithinPartitions метода   класса OrderedRDDFunctions . Чтобы процитировать документ Scala, он  repartitionAndSortWithPartions делает следующее:

Разделите СДР в соответствии с заданным разделителем и в каждом полученном разделе сортируйте записи по их ключам.

Это более эффективно, чем вызов перераспределения, а затем сортировка внутри каждого раздела, потому что это может толкнуть сортировку вниз в механизм перемешивания.

Разделитель, который мы будем использовать,  AirlineFlightPartitioner описан выше в разделе «Разделитель».

Теперь со всеми частями по порядку мы делаем следующее, чтобы выполнить нашу секодорную сортировку:

object SecondarySort extends SparkJob {

  def runSecondarySortExample(args: Array[String]): Unit = {

    val sc = context("SecondarySorting")
    val rawDataArray = sc.textFile(args(0)).map(line => line.split(","))
    val airlineData = rawDataArray.map(arr => createKeyValueTuple(arr))

    val keyedDataSorted = airlineData.repartitionAndSortWithinPartitions(new AirlineFlightPartitioner(1))

    //only done locally for demo purposes, usually write out to HDFS
    keyedDataSorted.collect().foreach(println)
  }
}

После всего накопления в этом посте мы получаем несколько анти-климатических однострочников! Количество разделов, представленное здесь, установлено в 1, потому что эти примеры выполнялись локально. В реальном кластере вы хотите использовать другое значение.

Полученные результаты

Запуск искрового задания дает следующие результаты:

(FlightKey("AA",10397,-2.0),List(2015-01-01, 11298, 30194, 30397))
(FlightKey("AA",11278,-2.0),List(2015-01-01, 11298, 30194, 30852))
(FlightKey("AA",11278,-14.0),List(2015-01-01, 12892, 32575, 30852))
(FlightKey("AA",11292,24.0),List(2015-01-01, 13930, 30977, 30325))
(FlightKey("AA",11298,133.0),List(2015-01-01, 13891, 32575, 30194))
(FlightKey("AA",11298,109.0),List(2015-01-01, 12173, 32134, 30194))
(FlightKey("AA",11298,55.0),List(2015-01-01, 14107, 30466, 30194))
(FlightKey("AA",11298,49.0),List(2015-01-01, 12478, 31703, 30194))
(FlightKey("AA",11298,40.0),List(2015-01-01, 14771, 32457, 30194))
(FlightKey("AA",11298,35.0),List(2015-01-01, 12094, 34699, 30194))
(FlightKey("AA",11298,24.0),List(2015-01-01, 13830, 33830, 30194))
(FlightKey("AA",11298,23.0),List(2015-01-01, 14869, 34614, 30194))
(FlightKey("AA",11298,19.0),List(2015-01-01, 11433, 31295, 30194))
(FlightKey("AA",11298,17.0),List(2015-01-01, 12264, 30852, 30194))

Мы можем видеть наши результаты, отсортированные по авиакомпаниям, прибытию и задержке. Стоит отметить, что отрицательные числа представляют рейсы, которые прибыли  рано  (кто бы мог подумать, что рейсы могут прибыть рано!). Эти результаты были бы более значимыми, переводя коды в узнаваемые имена. В следующем посте мы продемонстрируем это, используя  Broadcast Variables .

Вывод

Надеемся, мы увидим полезность вторичной сортировки, а также простоту ее реализации в Spark. Фактически, я считаю, что этот пост имеет высокое отношение шума к сигналу, а это означает, что мы много обсуждали, что такое вторичная сортировка и как мы это делаем, но фактический код, который мы приводим, — это всего лишь несколько линий. Спасибо за ваше время.

Ресурсы