Статьи

Введение в Apache Spark

Автор  Мануэль Галеано для Logentries .

Apache Spark — это быстрая и универсальная кластерная вычислительная система. Последнюю версию можно загрузить с http://spark.apache.org/downloads.html . В этом посте мы попытаемся выполнить некоторые основные манипуляции с данными, используя spark и python.

Использование Spark Python Shell

Первое, что должна сделать программа Spark, это создать объект SparkContext, который сообщает Spark, как получить доступ к кластеру. Когда вы используете оболочку Python, контекстная переменная с именем «sc» будет создана автоматически.

Чтобы получить доступ к оболочке Python Spark, вы можете запустить в своей директории spark следующее:

./bin/pyspark --master local[*]

Using Python version 2.7.3 (default, Mar 13 2014 11:03:55)
SparkContext available as sc.
>>>

Эластичные распределенные наборы данных (RDD)
Эластичные распределенные наборы данных (RDD) — это отказоустойчивые наборы элементов, которые могут работать параллельно. Самый простой способ создать RDD в памяти — вызвать функцию параллелизации следующим образом:

>>> numbers = [x for x in xrange(100)]
>>> dist_numbers = sc.parallelize(numbers)
>>> print dist_numbers
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:364

СДР поддерживают два типа операций:

  • преобразования: они создают новый набор данных из существующего
  • действия: они возвращают значение

Ниже приведены некоторые примеры преобразований с использованием списка чисел, созданных ранее:

 >>> # getting an rdd with odd numbers
>>> odd_rdd = dist_numbers.filter(lambda x: x % 2 != 0)

>>> # calculating factorial for any number on the list
>>> import math
>>> fact_rdd = dist_numbers.map(lambda x: math.factorial(x))

>>> # get first 10 elements of the list
>>> dist_numbers.take(10)

Основным преимуществом вышеизложенного является то, что все эти операции распределяются по вашему кластеру.

Теперь давайте рассмотрим более сложный пример с использованием нескольких операций.
Сначала мы прочитаем локальный файл с именем «ЛИЦЕНЗИЯ», затем посчитаем количество вхождений для каждого слова и, наконец, получим 10 лучших упомянутых слов. Для этого нам нужно выполнить следующие шаги:

  • Прочитайте файл.
  • Разделите каждую строку отдельными словами.
  • Примените flatMap, чтобы объединить все строки в единый список.
  • Применить карту, чтобы сопоставить каждое слово и число 1 (эффективно подсчитывая 1 каждый раз).
  • Уменьшить по ключу для увеличения на единицу для каждого появления слова (сумма).
  • Инвертировать значение ключа, чтобы можно было заказать по количеству появлений.
  • Получить окончательный результат.

Результирующая программа на Python выглядит так:

>>> # loading local file
>>> dist_file = sc.textFile("LICENSE")
>>> dist_file.flatMap(lambda x: x.split()).map(lambda x: (x, 1,)).reduceByKey(lambda x, y: x + y ).map(lambda x: (x[1], x[0],)  ).sortByKey(False).take(10)

[(279, u'#'), (253, u'the'), (142, u'of'), (118, u'or'), (114, u'and'), (111, u'to'), (102, u'OR'), (85, u'OF'), (80, u'in'), (55, u'this')]

Теперь мы понимаем, что первые 10 слов — это не то, что мы искали (т.е. все комментарии или простые слова). Мы можем улучшить наши вычисления, сохранив только слова длиной более 4 символов:

>>> dist_file.flatMap(lambda x: x.split()).filter(lambda x: len(x) > 4).map(lambda x: (x, 1,)).reduceByKey(lambda x, y: x + y ).map(lambda x: (x[1], x[0],)  ).sortByKey(False).take(10)
>>> [(55, u'this'), (43, u'that'), (40, u'License'), (33, u'conditions'), (31, u'with'), (28, u'following'), (27, u'copyright'), (26, u'Python'), (24, u'========================================================================'), (23, u'without')]

Как видите, мы определили набор слов, который является более приемлемым.
Более сложный пример

Мы напишем скрипт на python с именем «weather.py», который проанализирует файл из Национальной службы погоды США. Чтобы получить исходные данные, получите файл с именем «200705hourly.txt», который можно извлечь из следующего сжатого файла http://cdo.ncdc.noaa.gov/qclcd_ascii/QCLCD200705.zip. Этот файл содержит статистику погоды за июль 2005 года в часовом формате. Мы собираемся собрать эти данные за весь месяц, используя RDD.

Программа выглядит следующим образом:

from pyspark import SparkContext, StorageLevel

sc = SparkContext("local", "Spark Demo")

def extract_values(x):
      """
      x[1] Date
      x[2] Time
      x[6] Visibility
      x[10] DryBulbFarenheit
      x[12] DryBulbCelsius
      x[24] WindSpeed

      """
      return ( x[1], (x[2], x[6], x[10], x[12], x[24]), )

def process_files():
      # reading in the input file
      data = sc.textFile('/tmp/200705hourly.txt')
      # getting the column names
      keys = data.map(lambda x: x.split(',')).first()
      # splitting the lines but ignoring the first one
      values_rdd = data.filter(lambda x: x is not None and not x.startswith(keys[0])).map(lambda x: x.split(','))
      # persisting our set to avoid extra computation
      values_rdd.persist(StorageLevel.MEMORY_ONLY)
      mapped_values_rdd = values_rdd.map(extract_values)
      grouped_rdd = mapped_values_rdd.groupByKey()

if __name__ == "__main__":
      process_files()

Мы можем выполнить этот скрипт с помощью команды ниже:

 bin/spark-submit --master local[*]  weather.py

Spark — отличный инструмент, и мы уже видели, с которым довольно легко начать. Он также имеет гораздо больше функций, которые мы не будем рассматривать в этом посте, таких как алгоритмы машинного обучения, развертывание кластеров, потоковая передача и анализ графиков. Все эти функции могут быть доступны программно не только с помощью Python, но также с Java и Scala, если вы более знакомы с ними.