Автор Logentries .
дляApache Spark — это быстрая и универсальная кластерная вычислительная система. Последнюю версию можно загрузить с http://spark.apache.org/downloads.html . В этом посте мы попытаемся выполнить некоторые основные манипуляции с данными, используя spark и python.
Использование Spark Python Shell
Первое, что должна сделать программа Spark, это создать объект SparkContext, который сообщает Spark, как получить доступ к кластеру. Когда вы используете оболочку Python, контекстная переменная с именем «sc» будет создана автоматически.
Чтобы получить доступ к оболочке Python Spark, вы можете запустить в своей директории spark следующее:
./bin/pyspark --master local[*]
Using Python version 2.7.3 (default, Mar 13 2014 11:03:55)
SparkContext available as sc.
>>>
Эластичные распределенные наборы данных (RDD)
Эластичные распределенные наборы данных (RDD) — это отказоустойчивые наборы элементов, которые могут работать параллельно. Самый простой способ создать RDD в памяти — вызвать функцию параллелизации следующим образом:
>>> numbers = [x for x in xrange(100)]
>>> dist_numbers = sc.parallelize(numbers)
>>> print dist_numbers
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:364
СДР поддерживают два типа операций:
- преобразования: они создают новый набор данных из существующего
- действия: они возвращают значение
Ниже приведены некоторые примеры преобразований с использованием списка чисел, созданных ранее:
>>> # getting an rdd with odd numbers
>>> odd_rdd = dist_numbers.filter(lambda x: x % 2 != 0)
>>> # calculating factorial for any number on the list
>>> import math
>>> fact_rdd = dist_numbers.map(lambda x: math.factorial(x))
>>> # get first 10 elements of the list
>>> dist_numbers.take(10)
Основным преимуществом вышеизложенного является то, что все эти операции распределяются по вашему кластеру.
Теперь давайте рассмотрим более сложный пример с использованием нескольких операций.
Сначала мы прочитаем локальный файл с именем «ЛИЦЕНЗИЯ», затем посчитаем количество вхождений для каждого слова и, наконец, получим 10 лучших упомянутых слов. Для этого нам нужно выполнить следующие шаги:
- Прочитайте файл.
- Разделите каждую строку отдельными словами.
- Примените flatMap, чтобы объединить все строки в единый список.
- Применить карту, чтобы сопоставить каждое слово и число 1 (эффективно подсчитывая 1 каждый раз).
- Уменьшить по ключу для увеличения на единицу для каждого появления слова (сумма).
- Инвертировать значение ключа, чтобы можно было заказать по количеству появлений.
- Получить окончательный результат.
Результирующая программа на Python выглядит так:
>>> # loading local file
>>> dist_file = sc.textFile("LICENSE")
>>> dist_file.flatMap(lambda x: x.split()).map(lambda x: (x, 1,)).reduceByKey(lambda x, y: x + y ).map(lambda x: (x[1], x[0],) ).sortByKey(False).take(10)
[(279, u'#'), (253, u'the'), (142, u'of'), (118, u'or'), (114, u'and'), (111, u'to'), (102, u'OR'), (85, u'OF'), (80, u'in'), (55, u'this')]
Теперь мы понимаем, что первые 10 слов — это не то, что мы искали (т.е. все комментарии или простые слова). Мы можем улучшить наши вычисления, сохранив только слова длиной более 4 символов:
>>> dist_file.flatMap(lambda x: x.split()).filter(lambda x: len(x) > 4).map(lambda x: (x, 1,)).reduceByKey(lambda x, y: x + y ).map(lambda x: (x[1], x[0],) ).sortByKey(False).take(10)
>>> [(55, u'this'), (43, u'that'), (40, u'License'), (33, u'conditions'), (31, u'with'), (28, u'following'), (27, u'copyright'), (26, u'Python'), (24, u'========================================================================'), (23, u'without')]
Как видите, мы определили набор слов, который является более приемлемым.
Более сложный пример
Мы напишем скрипт на python с именем «weather.py», который проанализирует файл из Национальной службы погоды США. Чтобы получить исходные данные, получите файл с именем «200705hourly.txt», который можно извлечь из следующего сжатого файла http://cdo.ncdc.noaa.gov/qclcd_ascii/QCLCD200705.zip. Этот файл содержит статистику погоды за июль 2005 года в часовом формате. Мы собираемся собрать эти данные за весь месяц, используя RDD.
Программа выглядит следующим образом:
from pyspark import SparkContext, StorageLevel
sc = SparkContext("local", "Spark Demo")
def extract_values(x):
"""
x[1] Date
x[2] Time
x[6] Visibility
x[10] DryBulbFarenheit
x[12] DryBulbCelsius
x[24] WindSpeed
"""
return ( x[1], (x[2], x[6], x[10], x[12], x[24]), )
def process_files():
# reading in the input file
data = sc.textFile('/tmp/200705hourly.txt')
# getting the column names
keys = data.map(lambda x: x.split(',')).first()
# splitting the lines but ignoring the first one
values_rdd = data.filter(lambda x: x is not None and not x.startswith(keys[0])).map(lambda x: x.split(','))
# persisting our set to avoid extra computation
values_rdd.persist(StorageLevel.MEMORY_ONLY)
mapped_values_rdd = values_rdd.map(extract_values)
grouped_rdd = mapped_values_rdd.groupByKey()
if __name__ == "__main__":
process_files()
Мы можем выполнить этот скрипт с помощью команды ниже:
bin/spark-submit --master local[*] weather.py
Spark — отличный инструмент, и мы уже видели, с которым довольно легко начать. Он также имеет гораздо больше функций, которые мы не будем рассматривать в этом посте, таких как алгоритмы машинного обучения, развертывание кластеров, потоковая передача и анализ графиков. Все эти функции могут быть доступны программно не только с помощью Python, но также с Java и Scala, если вы более знакомы с ними.