Статьи

Таблицы гуавы для искры

В прошлый раз мы освещали вторичную сортировку в Spark . Мы взяли данные об эффективности авиакомпаний и отсортировали результаты по авиакомпаниям, аэропорту назначения и сумме задержек. Мы использовали идентификаторы для всех наших данных. Хотя такой подход хорош для производительности, просмотр результатов в этом формате теряет смысл. К счастью, сайт Бюро транспорта предлагает справочные файлы для скачивания. Эталонные файлы представлены в формате CSV, где каждая строка состоит из пары ключ-значение. Наша цель — хранить данные ссылок в хеш-картах и ​​использовать широковещательные переменные.поэтому все операции на разных разделах будут иметь легкий доступ к одним и тем же данным. У нас есть четыре поля с кодами: авиакомпания, аэропорт города отправления, город отправителя, аэропорт назначения и город назначения. Два из наших полей кода используют один и тот же справочный файл (идентификатор аэропорта), поэтому нам нужно загрузить 3 файла. Но есть ли более простой способ загрузки 3 файлов в хеш-карты и наличия 3 отдельных переменных широковещания? Существует с помощью таблиц гуавы .

Таблицы гуавы вкратце

Хотя полное обсуждение гуавы Tableвыходит за рамки этого поста, будет полезно краткое описание. По сути, это абстракция для «хэш-карты хэш-карт», которая убирает все из добавленных или извлекаемых данных. Например:

Map<String,Map<String,String>> outerMap = new HashMap<>();

Map<String,String> inner = outerMap.get("key");
//getting a value
if(inner == null){
    inner = new HashMap<>();
    outerMap.put("key",inner);
    return null;
}else{
   String value = inner.get("innerKey");
   return value;
}

//adding a value
if(inner == null){
    inner = new HashMap<>();
    outerMap.put("key",inner);
}
 inner.put("innerKey","innerValue");


Table<String,String,String> table = HashBasedTable.create();
//expected behavior if not found - returns null
String innerValue = table.get("key","innerKey");
//if no value exists for "key" hashmap is created.
table.put("key","innerKey","value")

Таблица гуавы против HashMaps

Надеюсь, этого примера достаточно, чтобы показать, почему мы хотели бы использовать таблицы Guava вместо подхода «hashmap of hashmaps».

Загрузка таблицы

У нас есть 3 файла для загрузки в нашу таблицу для поиска. Код для этого прост:

object GuavaTableLoader {

  //custom type for convenience
  type RefTable = Table[String, String, String]

  def load(path: String, filenames: List[String]): RefTable = {
    val lookupTable = HashBasedTable.create[String, String, String]()
    for (filename <- filenames) {
      val lines = Source.fromFile(path + "/" + filename).getLines()
      val baseFilename = filename.substring(0, filename.indexOf('.'))
      loadFileInTable(lines, baseFilename, lookupTable)
    }

    lookupTable
  }

  def load(path: String, filenames: String): RefTable = {
    val fileList = filenames.split(",").toList
    load(path, fileList)
  }

  private def loadFileInTable(lines: Iterator[String], rowKey: String, tb: RefTable): Unit = {
    for (line <- lines) {
      if (!line.trim().isEmpty) {
        val keyValue = line.split("#")
        tb.put(rowKey, keyValue(0), keyValue(1))
      }
    }
  }
}

Загрузка таблиц гуавы

loadМетод принимает базовый-путь , где справочные файлы расположены и список имен файлов (существует еще один loadметод , который принимает разделенный запятыми список имен файлов). Мы перебираем список имен файлов, повторно используя базовое имя в качестве «строки-ключа», а затем перебираем пары ключ-значение, найденные в файле, сохраняя их в таблице. Здесь мы разбиваем строку на символе «#». Значения в справочных данных содержали запятые и были заключены в кавычки. Файлы были очищены путем удаления двойных кавычек и изменения разделителя на «#».

Установка таблиц гуавы в качестве широковещательных переменных

Теперь нам нужно интегрировать объект таблицы в нашу работу Spark в качестве широковещательной переменной. Для этого мы будем повторно использовать SecondarySortобъект из последнего поста:

val dataPath = args(0)
val refDataPath = args(1)
val refDataFiles = args(2)

val sc = context("SecondarySorting")
val rawDataArray = sc.textFile(dataPath).map(line => line.split(","))

val table = GuavaTableLoader.load(refDataPath, refDataFiles)

val bcTable = sc.broadcast(table)

Добавление таблицы в качестве широковещательной переменной

Мы добавили два параметра: базовый путь для справочных файлов и список имен справочных файлов через запятую. После загрузки нашей таблицы мы создаем широковещательную переменную с помощью sc.broadcastвызова метода.

Поиск справочных данных

Теперь все, что нам осталось, это взять отсортированные результаты и преобразовать все идентификаторы в более значимые имена.

val keyedDataSorted = airlineData.repartitionAndSortWithinPartitions(new AirlineFlightPartitioner(5))

val translatedData = keyedDataSorted.map(t => createDelayedFlight(t._1, t._2, bcTable))
//printing out only done locally for demo purposes, usually write out to HDFS
    translatedData.collect().foreach(println)

//supporting code
def createDelayedFlight(key: FlightKey, data: List[String], bcTable: Broadcast[RefTable]): DelayedFlight = {
    val table = bcTable.value
    val airline = table.get(AIRLINE_DATA, key.airLineId)
    val destAirport = table.get(AIRPORT_DATA, key.arrivalAirportId.toString)
    val destCity = table.get(CITY_DATA, data(3))
    val origAirport = table.get(AIRPORT_DATA, data(1))
    val originCity = table.get(CITY_DATA, data(2))

    DelayedFlight(airline, data.head, origAirport, originCity, destAirport, destCity, key.arrivalDelay)
  }

Отображение отсортированных результатов в справочные данные

Здесь мы отображаем отсортированные результаты в DelayedFlightобъекты с помощью createDelayedFlightметода. Здесь следует обратить внимание на две вещи:

  1. Чтобы использовать объект таблицы, нам нужно сначала «развернуть» его от Broadcastобъекта.
  2. Идентификатор аэропорта прибытия необходимо преобразовать в a, так Stringкак это int в FlightKeyклассе, но наша справочная таблица содержит только строки.

Полученные результаты

Теперь результаты выглядят так:

DelayedFlight(American Airlines Inc.,2015-01-01,Dallas/Fort Worth, TX: Dallas/Fort Worth International,Dallas/Fort Worth, TX,Atlanta, GA: Hartsfield-Jackson Atlanta International,Atlanta, GA (Metropolitan Area),-2.0)
DelayedFlight(American Airlines Inc.,2015-01-01,Dallas/Fort Worth, TX: Dallas/Fort Worth International,Dallas/Fort Worth, TX,Washington, DC: Ronald Reagan Washington National,Washington, DC (Metropolitan Area),-2.0)
DelayedFlight(American Airlines Inc.,2015-01-01,Los Angeles, CA: Los Angeles International,Los Angeles, CA (Metropolitan Area),Washington, DC: Ronald Reagan Washington National,Washington, DC (Metropolitan Area),-14.0)
DelayedFlight(American Airlines Inc.,2015-01-01,Chicago, IL: Chicago O'Hare International,Chicago, IL,Denver, CO: Denver International,Denver, CO,24.0)
DelayedFlight(American Airlines Inc.,2015-01-01,Ontario, CA: Ontario International,Los Angeles, CA (Metropolitan Area),Dallas/Fort Worth, TX: Dallas/Fort Worth International,Dallas/Fort Worth, TX,133.0)
DelayedFlight(American Airlines Inc.,2015-01-01,Honolulu, HI: Honolulu International,Honolulu, HI,Dallas/Fort Worth, TX: Dallas/Fort Worth International,Dallas/Fort Worth, TX,109.0)
DelayedFlight(American Airlines Inc.,2015-01-01,Phoenix, AZ: Phoenix Sky Harbor International,Phoenix, AZ,Dallas/Fort Worth, TX: Dallas/Fort Worth International,Dallas/Fort Worth, TX,55.0)
DelayedFlight(American Airlines Inc.,2015-01-01,New York, NY: John F. Kennedy International,New York City, NY (Metropolitan Area),Dallas/Fort Worth, TX: Dallas/Fort Worth International,Dallas/Fort Worth, TX,49.0)
DelayedFlight(American Airlines Inc.,2015-01-01,San Francisco, CA: San Francisco International,San Francisco, CA (Metropolitan Area),Dallas/Fort Worth, TX: Dallas/Fort Worth International,Dallas/Fort Worth, TX,40.0)

Отображенные результаты полета

При быстром взгляде и прокрутке до упора вправо мы видим, что полеты в этот день в Даллас, штат Техас, имели некоторые значительные задержки.

Вывод

Это завершает, как мы могли бы использовать Таблицы Гуавы как переменные широковещания в работе Spark. Надеюсь, читатель увидит преимущества использования такого подхода. Спасибо за ваше время.

Ресурсы