Статьи

Использование Apache Spark и MySQL для анализа данных

Apache Spark — это среда кластерных вычислений, похожая на Apache Hadoop. В Википедии есть отличное описание этого:

Apache Spark — это инфраструктура кластерных вычислений с открытым исходным кодом, изначально разработанная в AMPLab в Университете Калифорнии, Беркли, но позже была пожертвована Apache Software Foundation, где она и сегодня существует. В отличие от двухэтапной дисковой парадигмы Hadoop MapReduce, многоступенчатые примитивы Spark в памяти обеспечивают производительность до 100 раз быстрее для определенных приложений. Позволяя пользовательским программам загружать данные в память кластера и запрашивать их повторно, Spark хорошо подходит для алгоритмов машинного обучения.

Apache Spark

Вопреки распространенному мнению, Spark не требует, чтобы все данные помещались в память, но будет использовать кэширование для ускорения операций (подобно MySQL). Spark также может работать в автономном режиме и не требует Hadoop; он также может быть запущен на одном сервере (или даже ноутбуке или настольном компьютере) и использовать все ядра вашего процессора.

Запустить его в распределенном режиме очень просто. Сначала запустите «мастер». Вы можете запустить «ведомый» на том же узле:

root@thor:~/spark# ./sbin/start-master.sh
less ../logs/spark-root-org.apache.spark.deploy.master.Master-1-thor.out
15/08/25 11:21:21 INFO Master: Starting Spark master at spark://thor:7077
15/08/25 11:21:21 INFO Master: Running Spark version 1.4.1
15/08/25 11:21:21 INFO Utils: Successfully started service 'MasterUI' on port 8080.
15/08/25 11:21:21 INFO MasterWebUI: Started MasterWebUI at http://10.60.23.188:8080
root@thor:~/spark# ./sbin/start-slave.sh spark://thor:7077

Затем запустите Spark Worker на любых дополнительных узлах (обязательно добавьте имя хоста в / etc / hosts или используйте DNS):

root@d31:~/spark# ./sbin/start-slave.sh spark://thor:7077

Почему Spark, а не MySQL?

Существует ряд задач, в которых MySQL (из коробки) не показывает отличную производительность. Одно из ограничений MySQL: 1 запрос = 1 ядро ​​процессора. Это означает, что даже если у вас есть 48 быстрых ядер и большой набор данных для обработки (т.е. группировка, сортировка и т. Д.), Он не будет использовать всю вычислительную мощность. Spark, напротив, сможет использовать все ядра вашего процессора.

Еще одно отличие между MySQL и Spark:

  • MySQL использует так называемую «схему при записи» — ему потребуется преобразовать данные в MySQL. Если наши данные не находятся внутри MySQL, вы не можете использовать «sql» для запроса.
  • Spark (а также Hadoop / Hive) использует «схему при чтении» — например, он может применить структуру таблицы поверх сжатого текстового файла (или любого другого поддерживаемого формата ввода) и просмотреть его в виде таблицы; тогда мы можем использовать SQL для запроса этой «таблицы».

Другими словами, MySQL — это хранилище + обработка, в то время как работа Spark — только обработка, и он может передавать данные напрямую из / в внешние наборы данных , например, Hadoop, Amazon S3, локальные файлы, JDBC (MySQL / другие базы данных). Spark поддерживает текстовые файлы (сжатые), SequenceFiles и любые другие форматы ввода Hadoop, а также хранилище Parquet Columnar. В этом отношении Spark более гибок по сравнению с Hadoop: Spark, например, может считывать данные непосредственно из MySQL.

Типичный конвейер для загрузки внешних данных в MySQL:

  1. Распаковка (обычно внешние данные находятся в сжатых текстовых файлах)
  2. Загрузите его в промежуточную таблицу MySQL с помощью «LOAD DATA INFILE»
  3. Только тогда мы можем фильтровать / группировать и сохранять результат в другой таблице.

Это может вызвать дополнительные накладные расходы. Во многих случаях нам не нужны «сырые» данные, но мы все равно должны загрузить их в MySQL.

Почему Spark вместе с MySQL

Напротив, результат нашего анализа (т.е. агрегированные данные) должен быть в MySQL. Это не обязательно, но гораздо удобнее хранить результаты вашего анализа в MySQL. Допустим, вы хотите проанализировать большой набор данных (т.е. сравнение продаж из года в год), и вам нужно будет представить его в виде таблицы или графика. Результирующий набор будет значительно меньше, так как он будет агрегирован, и будет намного проще хранить его в MySQL, так как многие стандартные приложения будут работать с этим.

Тестовый кейс в реальном мире

Один интересный бесплатный набор данных — это количество страниц в Википедии . (> 1 ТБ сжатых, доступно с 2008 года). Эти данные могут быть загружены (в виде сжатых текстовых файлов с разделителями) и также доступны (ограниченный набор данных) в AWS. Данные агрегируются по часам и имеют следующие поля:

  • проект (т. е. «en», «fr» и т. д., обычно это язык)
  • название страницы (uri), в кодировке урлен
  • количество запросов
  • размер возвращаемого контента

(поле даты закодировано в имени файла, 1 файл в час)

Наша цель — найти 10 лучших страниц по количеству запросов в день в английской Википедии, а также поддержать поиск произвольного слова, чтобы мы могли показать, как, например, количество запросов к статье в Википедии о «Myspace». »Будет сравниваться со статьей о« Фейсбуке »(с 2008 по 2015 годы).

Чтобы сделать это в MySQL, нам нужно загрузить его как есть в MySQL. Файлы распространяются с кодированной частью даты. Размер всех файлов без сжатия> 10 ТБ. Вот возможные шаги (согласно нашему типичному конвейеру MySQL):

  1. Распакуйте файл и запустите «LOAD DATA INFILE» во временную (временную) таблицу:
    load data local infile '$file'
    into table wikistats.wikistats_full CHARACTER SET latin1
    FIELDS TERMINATED BY ' '
    (project_name, title, num_requests, content_size)
    set request_date = STR_TO_DATE('$datestr', '%Y%m%d %H%i%S');
  2. Агрегировать с «вставить в» финальный стол
    insert into wikistats.wikistats_by_day
    select date(request_date) as request_day, title, count(*), sum(num_requests)
    from wikistats.wikistats_full
    group by request_day, title;
  3. Каким-то образом url декодирует заголовок (возможно, использует UDF).

Это большие накладные расходы. Мы распаковываем и преобразуем данные в MySQL, чтобы отбросить большинство, если оно есть.

В соответствии с моими расчетами на весь 6-летний сбор данных уйдет> 1 месяц (это время не включает время распаковки и не учитывает амортизацию времени загрузки, поскольку таблица становится все больше и больше, а индексы необходимо обновить). ). Конечно, есть много вещей, которые мы можем сделать здесь, чтобы ускорить его, то есть загрузить в разные экземпляры MySQL, сначала загрузить в таблицу MEMORY, затем сгруппировать в InnoDB и т. Д.

Но одним из самых простых способов будет использование скриптов Apache Spark и Python (pyspark). Pyspark может читать исходные текстовые файлы gziped, запрашивать эти текстовые файлы с помощью SQL, применять любые фильтры, функции, т.е. urldecode, группировать по дням и сохранять набор результатов в MySQL.

Вот скрипт Python для выполнения этих действий:

from pyspark import SparkContext
sc=SparkContext()
# sc is an existing SparkContext.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
import urllib
from datetime import timedelta, date
def load_day(filename, mydate):
    # Load a text file and convert each line to a Row.
    lines = sc.textFile(filename)
    parts = lines.map(lambda l: l.split(" ")).filter(lambda line: line[0]=="en").filter(lambda line: len(line)>3).cache()
    wiki = parts.map(lambda p: Row(project=p[0],  url=urllib.unquote(p[1]).lower(), num_requests=int(p[2]), content_size=int(p[3])))
    #wiki.count()
    # Infer the schema, and register the DataFrame as a table.
    schemaWiki = sqlContext.createDataFrame(wiki)
    schemaWiki.registerTempTable("wikistats")
    group_res = sqlContext.sql("SELECT '"+ mydate + "' as mydate, url, count(*) as cnt, sum(num_requests) as tot_visits FROM wikistats group by url")
    # Save to MySQL
    mysql_url="jdbc:mysql://thor?user=wikistats&password=wikistats"
    group_res.write.jdbc(url=mysql_url, table="wikistats.wikistats_by_day_spark", mode="append")
    # Write to parquet file - if needed
    group_res.saveAsParquetFile("/ssd/wikistats_parquet_bydate/mydate=" + mydate)
mount = "/data/wikistats/"
d= date(2008, 1, 1)
end_date = date(2008, 2, 1)
delta = timedelta(days=1)
while d < end_date:
    print d.strftime("%Y-%m-%d")
    filename=mount + "wikistats//dumps.wikimedia.org/other/pagecounts-raw/2008/2008-01/pagecounts-200801" + d.strftime("%d") + "-*.gz"
    print(filename)
    load_day(filename, d.strftime("%Y-%m-%d"))
    d += delta

В скрипте я использовал Spark для чтения оригинальных файлов gzip (1 день за раз). Мы можем использовать каталог как «вход» или список файлов. Затем я буду использовать преобразования Resilient Data Set (RDD) ; В Python есть карта и фильтр лямбда-функций, которые позволят разделить «входные файлы» и отфильтровать их.

Следующим шагом будет применение схемы (объявление полей); здесь мы также можем применять любые другие функции; т.е. я использую urllib.unquote для декодирования заголовка (urldecode). Наконец, мы можем зарегистрировать временную таблицу и затем использовать знакомый SQL, чтобы сделать группу.

Скрипт обычно использует все ядра процессора. Кроме того, его очень легко запустить в распределенном режиме даже без Hadoop: просто скопируйте файлы на все машины в кластере Spark или используйте NFS / внешнее хранилище.

Сценарию потребовалось около часа на 3 блоках, чтобы обработать данные за 1 месяц и загрузить агрегированные данные в MySQL (один экземпляр). Мы можем оценить, что загрузка всех 6 лет (агрегированных) в MySQL составляет ~ 3 дня.

Вы можете спросить, почему это теперь значительно быстрее (и у нас все еще есть результат, загруженный в тот же экземпляр MySQL)? Ответ в том, что это другой и более эффективный конвейер. В нашем исходном конвейере MySQL (который, вероятно, займет месяцы) мы загружаем необработанные данные в MySQL. Здесь мы фильтруем и группируем по чтению, и пишем только то, что нам нужно в MySQL.

Здесь также может возникнуть один вопрос: нужен ли нам весь этот «трубопровод»? Можем ли мы просто выполнить наши аналитические запросы поверх «необработанных» данных? Что ж, это возможно, но, вероятно, для эффективной работы потребуется 1000 узлов Spark Cluster, поскольку для этого потребуется просканировать данные объемом 5 ТБ (см. «Подробнее» ниже).

Многопоточная производительность для вставок MySQL

При использовании group_res.write.jdbc (url = mysql_url, table = ”wikistats.wikistats_by_day_spark”, mode = ”append”) Spark будет использовать несколько потоков для вставки в MySQL.

+------+-----------+------------+-----------+---------+------+--------+--------------------------------------------------------------------------------------------------------+-----------+---------------+
 | Id   | User      | Host       | db        | Command | Time | State  | Info                                                                                                   | Rows_sent | Rows_examined |
 +------+-----------+------------+-----------+---------+------+--------+--------------------------------------------------------------------------------------------------------+-----------+---------------+
 | 1050 | root      | localhost  | wikistats | Query   |    0 | init   | show processlist                                                                                       |         0 |             0 |
 | 2133 | wikistats | thor:40994 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Colegio+san+ignacio', 1, 1)        |         0 |             0 |
 | 2134 | wikistats | thor:40995 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Miloš_Crnjanski', 2, 3)            |         0 |             0 |
 | 2135 | wikistats | thor:40996 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Robert_Edgar', 6, 7)               |         0 |             0 |
 | 2136 | wikistats | thor:40997 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Eastern_Orange_Tip', 6, 7)         |         0 |             0 |
 | 2137 | wikistats | thor:40998 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Image:Dresden_Augustusbrücke_Al   |         0 |             0 |
 | 2138 | wikistats | thor:40999 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Diamond_and_pearl', 11, 24)        |         0 |             0 |
 | 2139 | wikistats | thor:41000 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Operation_polo', 2, 2)             |         0 |             0 |
 | 2140 | wikistats | thor:41001 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Template_talk:Edit-first-section   |         0 |             0 |
 | 2141 | wikistats | thor:41002 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Bertha_of_Artois', 1, 1)           |         0 |             0 |
 | 2142 | wikistats | thor:41003 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'A Change of Pace', 1, 1)           |         0 |             0 |
 | 2143 | wikistats | thor:41005 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'FAIRCHILD-REPUBLIC A-10 THUNDERB   |         0 |             0 |
 | 2144 | wikistats | thor:41006 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Special:Recentchangeslinked/Wiki   |         0 |             0 |
 | 2145 | wikistats | thor:41007 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Image:Carl-sassenrath-sp-1982.jp   |         0 |             0 |
 | 2146 | wikistats | thor:41008 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'List_of_Fleet_Air_Arm_aircraft_s   |         0 |             0 |
 | 2147 | wikistats | thor:41009 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Systemic_sclerosis', 17, 29)       |         0 |             0 |
 | 2148 | wikistats | thor:41011 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'tataviam', 1, 1)                   |         0 |             0 |
 | 2149 | wikistats | thor:41010 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'The_Devil_Wears_Prada_(film)#_no   |         0 |             0 |
 | 2150 | wikistats | thor:41013 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Seaford_High_School', 5, 7)        |         0 |             0 |
 | 2151 | wikistats | thor:41014 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Talk:Shocker_(hand_gesture)', 3,   |         0 |             0 |
 | 2152 | wikistats | thor:41015 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Paul_Szabo', 14, 23)               |         0 |             0 |
 | 2153 | wikistats | thor:41016 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'ausgereift', 1, 1)                 |         0 |             0 |
 | 2154 | wikistats | thor:41017 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Category:March_2005_news', 1, 2)   |         0 |             0 |
 | 2155 | wikistats | thor:41018 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Foot_Locker_Inc', 10, 10)          |         0 |             0 |
 | 2156 | wikistats | thor:41019 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Abbey_Park,_Nottinghamshire', 3,   |         0 |             0 |
 +------+-----------+------------+-----------+---------+------+--------+--------------------------------------------------------------------------------------------------------+-----------+---------------+
 25 rows in set (0.00 sec)

Мониторинг ваших рабочих мест

Spark предоставляет вам веб-интерфейс для мониторинга и управления вашей работой. Вот пример: я запускаю приложение wikistats.py:

Spark Web InterfaceСнимок экрана 2015-09-22 в 14:32.

Результат: использование формата столбцов паркетных таблиц по сравнению с таблицей Myno InnoDB

Spark поддерживает формат Apache Parquet Columnar , поэтому мы можем сохранить RDD в виде файла паркета (его можно сохранить в каталоге на HDFS):

    group_res.saveAsParquetFile("/ssd/wikistats_parquet_bydate/mydate=" + mydate)

Здесь мы сохраняем результат нашего конвейера (агрегированные данные) в Spark. Я также использую разделение по дням («mydate = 20080101»), и Spark может автоматически обнаруживать разделы в этом формате.

Когда у меня будут результаты, я хочу запросить их. Допустим, я хочу найти топ-10 наиболее часто запрашиваемых вики-страниц в январе 2018 года. Я могу выполнить этот запрос с MySQL (мне нужно отфильтровать главную страницу и страницы поиска):

mysql> SELECT lower(url) as lurl, sum(tot_visits) as max_visits , count(*) FROM wikistats_by_day_spark where lower(url) not like '%special%' and lower(url) not like '%page%' and lower(url) not like '%test%' and lower(url) not like '%wiki%' group by lower(url) order by max_visits desc limit 10;
+--------------------------------------------------------+------------+----------+
| lurl                                                   | max_visits | count(*) |
+--------------------------------------------------------+------------+----------+
| heath_ledger                                           |    4247338 |      131 |
| cloverfield                                            |    3846404 |      131 |
| barack_obama                                           |    2238406 |      153 |
| 1925_in_baseball#negro_league_baseball_final_standings |    1791341 |       11 |
| the_dark_knight_(film)                                 |    1417186 |       64 |
| martin_luther_king,_jr.                                |    1394934 |      136 |
| deaths_in_2008                                         |    1372510 |       67 |
| united_states                                          |    1357253 |      167 |
| scientology                                            |    1349654 |      108 |
| portal:current_events                                  |    1261538 |      125 |
+--------------------------------------------------------+------------+----------+
10 rows in set (1 hour 22 min 10.02 sec)

Обратите внимание, что здесь мы используем нашу уже агрегированную (сводка по данным) таблицу, а не «необработанные» данные.

Как видим, запрос занял 1 час 22 минуты. Я также сохранил те же результаты в Parquet (см. Скрипт), так что теперь я могу использовать его с Spark-SQL:

./bin/spark-sql --master local

Это будет использовать локальную версию spark-sql, используя только 1 хост.

spark-sql> CREATE TEMPORARY TABLE wikistats_parquet
USING org.apache.spark.sql.parquet
OPTIONS (
  path "/ssd/wikistats_parquet_bydate"
);
Time taken: 3.466 seconds
spark-sql> select count(*) from wikistats_parquet;
select count(*) from wikistats_parquet;
227846039
Time taken: 82.185 seconds, Fetched 1 row(s)
spark-sql> SELECT lower(url) as lurl, sum(tot_visits) as max_visits , count(*) FROM wikistats_parquet where lower(url) not like '%special%' and lower(url) not like '%page%' and lower(url) not like '%test%' and lower(url) not like '%wiki%' group by lower(url) order by max_visits desc limit 10;
heath_ledger    4247335 42
cloverfield     3846400 42
barack_obama    2238402 53
1925_in_baseball#negro_league_baseball_final_standings  1791341 11
the_dark_knight_(film)  1417183 36
martin_luther_king,_jr. 1394934 46
deaths_in_2008  1372510 38
united_states   1357251 55
scientology     1349650 44
portal:current_events   1261305 44
Time taken: 1239.014 seconds, Fetched 10 row(s)

Это заняло ~ 20 минут, что намного быстрее.

Вывод

Apache Spark предоставляет отличный и простой способ анализа и агрегирования данных. Что мне нравится в Spark против других больших данных и аналитических сред:

  • С открытым исходным кодом и активно развивается
  • Отсутствие зависимости от инструментов, т. Е. Входные и выходные данные не обязательно должны быть в Hadoop
  • Автономный режим для быстрого запуска, простота развертывания
  • Параллельно, легко добавлять узлы
  • Поддержка разнообразных форматов ввода и вывода; то есть, он может читать / писать в MySQL (против драйвера JDBC) и в формате Parquet Columnar

Тем не менее, есть ряд недостатков:

  • Это все еще ново, поэтому вы можете ожидать некоторые ошибки и недокументированное поведение. Многие из ошибок трудно объяснить.
  • Это требует Java; Spark 1.5 поддерживает только Java 7 и выше. Это также означает, что это потребует дополнительной памяти, что разумно в наши дни.
  • Вам нужно будет запускать задания через «spark-submit»

Я считаю, что Apache Spark — отличный инструмент и может дополнять MySQL для анализа данных и целей бизнес-аналитики.

Первоначально написано Александром Рубином для Percona.