Мы уже познакомили вас с Hadoop и кластерами в нашей предыдущей статье . В этот момент вы можете задаться вопросом, зачем нам нужен многоузловой кластер . Поскольку большинство служб будут работать на главном сервере, почему бы нам просто не создать кластер с одним узлом?
Существует две основные причины использования многоузлового кластера. Во-первых, объем данных, которые должны храниться и обрабатываться, может быть слишком большим для обработки одним узлом. Во-вторых, вычислительная мощность кластера с одним узлом может быть ограничена.
Вам также могут понравиться: Исследование данных и подготовка данных для бизнес-аналитики
Подготовка данных
Как обсуждалось в предыдущих статьях, у нас есть данные, которые мы собираем по какой-то причине, но оставили без изменений. Для бизнеса нет смысла получать данные и сохранять их как есть. Подготовка данных означает подготовку или преобразование необработанных данных в уточненную информацию, которая может эффективно использоваться для различных бизнес-целей и анализа.
Наша конечная цель — превратить данные в информацию, а информацию — в понимание, которое может помочь вам в различных аспектах принятия решений и улучшениях в бизнесе. Обработка или подготовка данных — это не новый термин для рассмотрения, поскольку это было с первых дней, когда обработка выполнялась вручную. Теперь, когда данные стали большими, пришло время выполнять обработку автоматическими средствами, чтобы сэкономить время и добиться большей точности.
Если вы просматриваете пять лучших платформ обработки больших данных, вы увидите следующий список слов:
- Hadoop.
- Спарк.
- Буря.
- Flink.
- Samza.
Первые два из пяти фреймворков хорошо известны и наиболее реализованы среди различных проектов. Они также являются в основном средами пакетной обработки. Кажется, что они похожи, но есть большая разница между этими двумя. Давайте кратко рассмотрим сравнительный анализ.
критерии | искра | Hadoop MapReduce |
обработка | В памяти | Сохраняется на диске после карты и сокращает функции |
Простота использования | Легко благодаря поддержке Scala и Python | Тяжело, так как поддерживается только Java |
скорость | Запускает приложения в 100 раз быстрее | Помедленнее |
Задержка | ниже | выше |
Планирование задач | Графики задач самостоятельно | Требуются внешние планировщики |
Согласно таблице, различные факторы заставили нас перейти от MapReduce к Spark. Другой простой причиной является простота использования, так как он поставляется с удобными API для Scala, Java, Python и Spark SQL . Spark SQL похож на SQL 92, поэтому он прост даже для начинающих. Вот некоторые из ключевых особенностей, которые делают Spark мощным механизмом обработки больших данных:
- Оборудован MLlib.
- Поддерживает несколько языков, таких как Scala, Python и Java.
- Одна библиотека способна выполнять SQL, графическую аналитику и потоковую передачу.
- Хранит данные в оперативной памяти серверов, что упрощает доступ и ускоряет аналитику.
- Обработка в реальном времени.
- Совместим с Hadoop (работает независимо и поверх Hadoop).
Spark Over Storm
Мы сравнили первые два и пришли к решению. Иногда люди могут также предпочесть третий стек — Storm . Оба являются общим стеком для обработки в реальном времени и аналитики. Storm — это чистый потоковый фреймворк, но многие функции, такие как MLlib, недоступны в Storm, так как это меньший фреймворк.
Spark предпочтительнее, чем Storm, для таких деталей, как увеличение или уменьшение сервисов. Лучше знать различия для переключения между различными инструментами в зависимости от требований. В этой статье мы сосредоточимся на Spark , широко используемом инструменте обработки.
Компоненты искры
Apache Spark Ecosystem.
- Spark Core — это база, которая состоит из механизма общего исполнения, используемого для диспетчеризации и планирования.
- Spark SQL — это компонент поверх Spark Core, который представляет новый набор абстракций данных, называемый Schema RDD. Это поддерживает как структурированные, так и полуструктурированные данные.
- Spark Streaming — этот компонент обеспечивает отказоустойчивую обработку потоков данных в реальном времени, что обеспечивает API для управления потоками данных.
- MLlib (Библиотека машинного обучения) — Apache Spark оснащен MLlib, который содержит широкий спектр алгоритмов машинного обучения, фильтров совместной работы и т. Д.
Приложения
Некоторые приложения Apache Spark являются
- Машинное обучение. Как известно, Apache Spark оснащен библиотекой машинного обучения MLlib, которая может выполнять расширенные аналитические функции, такие как кластеризация, классификация и т. Д.
- Обнаружение событий — потоковая передача Spark позволяет организации отслеживать редкие и необычные действия для защиты системы.
Искра Совместимость
- Форматы файлов — Spark поддерживает все форматы файлов, поддерживаемые Hadoop, от неструктурированных, как текст, до структурированных, например, Sequence Files. Но, как обсуждалось ранее, использование соответствующих форматов файлов может привести к повышению производительности.
- Файловые системы — локальные, Amazon s3, HDFS и др.
- Базы данных — Поддерживает многие, такие как Cassandra, Hbase, Elastic search с помощью коннекторов Hadoop и нестандартных искровых коннекторов.
использование
Spark — это мощный инструмент, который предоставляет интерактивную оболочку для интерактивного анализа данных. Точки ниже выделят открытие, использование и закрытие искрового снаряда.
Открытие Spark Shell
Как правило, Spark построен с использованием Scala. Введите следующую команду, чтобы запустить spark-shell.
$ spark-shell
Если оболочка Spark открылась успешно, вы увидите следующий экран. Последняя строка вывода «Spark context available as sc» означает, что spark автоматически создал объекты контекста spark с именем sc. Если этого нет, то перед началом работы создайте объект SparkContext.
Теперь все готово для продолжения работы с программами Scala.
Нажмите «Ctrl + z», чтобы выйти из спарк-оболочки при необходимости.
Spark Context
SparkContext может подключаться к нескольким типам диспетчеров кластеров, которые могут распределять ресурсы между приложениями. Позвольте мне показать два разных сценария с двумя разными языками.
Давайте узнаем количество музеев по штатам по принятым данным, используя Scala, и запишем результат в виде CSV-файла в Hadoop.
Scala
xxxxxxxxxx
1
импорт Java . -й . _ импортировать scala . Массив . _ импортировать scala . -й . _ импортировать Java . -й . BufferedOutputStream импортировать Java . -й . FileOutputStream импортировать Java . -й . InputStream импортировать Java . -й . OutputStream импортировать Java . Util . Календарь импорта java, языки . _ импортировать орг . апач . hadoop . конф . Импорт конфигурации орг . апач . hadoop . конф . Настроил импорт орг . апач . hadoop . фс . Файловая система импорта орг . апач . hadoop . фс . Путь импорта орг . апач . hadoop . И.О. , IOUtils импорт орг . апач . hadoop . Util . Инструмент импорта орг . апач . hadoop . Util . ToolRunner импорт орг . апач . искра . кв . SparkSession // импорт com.databricks.spark.avro._ import java.util.Calendar объект музей {def main (args: Array [String]) {println (Calendar.getInstance () .getTime ()) var cols = "" val spark1 = org.apache.spark.sql.SparkSession.builder.master ("local"). appName ("Spark Avro Reader"). getOrCreate var df1 = spark1.read.format ("com.databricks.spark.csv") .option ( "заголовок", "True") .option ( "бежать", "\" ") .load (" HDFS: //emr-header-1.cluster-95904: 9000 / пользователь / демо / tripadvisor_merged.csv ") .coalesce (1) df1.createOrReplaceTempView (" museum ") val df2 = spark1.sql (" выберите State, count (MuseumName) Museum_Count из музейной группы по состоянию ") var flag = 0 df2.foreachPartition (itr => { val conf = новая конфигурация () conf.set ("fs.defaultFS", "hdfs: // emr-header-1.cluster-95904: 9000 ") val fs = FileSystem.get (conf) val output = fs.create (новый путь (" / user / ogs / etl / процессе / MUSEUM_COUNT_BY_STATE / MUSEUM_COUNT_BY_STATE.csv ")) val pw1 = новый PrintWriter ( вывод) if (flag == 0) {cols = "State" + "," + "Count" + "\ n"; pw1.write (cols); flag = 1} while (itr.hasNext) {val item = itr.next (). toString () val l = item.length cols = item.toString (). substring (1, l-1) cols = cols.concat ("\ n") pw1.write (cols) // println (cols)} pw1.close})hasNext) {val item = itr.next (). toString () val l = item.length cols = item.toString (). substring (1, l-1) cols = cols.concat ("\ n") pw1. write (cols) // println (cols)} pw1.close})hasNext) {val item = itr.next (). toString () val l = item.length cols = item.toString (). substring (1, l-1) cols = cols.concat ("\ n") pw1. write (cols) // println (cols)} pw1.close})
Здесь spark читает этот файл, запоминая его как разделенный запятыми файл. Но столбец с именем Address на этом листе содержит запятые. Поэтому, чтобы не разбивать их на разные столбцы, мы используем здесь «escape». Scala зависит от Java и, следовательно, необходимо импортировать различные библиотеки. Давайте сделаем это коротко, используя «pyspark».
Перед запуском Spark с Python установите необходимые библиотеки. Здесь я устанавливаю панды, которые используются для эффективной обработки файлов.
Теперь запустите оболочку с помощью команды «pyspark».
Давайте узнаем 10 лучших музеев по количеству посетителей. Следующий код использует Spark SQL и соглашения оболочки Pyspark. Вы также можете использовать фрейм данных Panda для чтения и обработки файла. Но использование форматов чтения и записи Spark приводит к большей эффективности.
SQL
xxxxxxxxxx
1
импортировать панд как pd из pyspark .sql import SparkSession df = spark .read.format ( "com.databricks.spark.csv" ) .option ( "header" , "True" ) .option ( "escape" , "\" " ) .load ( "hdfs: //emr-header-1.cluster-95904: 9000 / user / demo / sqoop / tripadvisor_merged.csv" ) df .createOrReplaceTempView ( "family" ) из pyspark .sql.functions import lit df1 = искра .sql ("выберите значение MuseumName, Families_Count Count (выберите значение MuseumName, Families_Count, rank () более (порядок по длине (Families_Count), Families_Count desc) из семейства) где rank <= 30" ) .withColumn ( "Visitor_Type" , lit ( " Families_Count " )) df2 = spark .sql ( " выберите значение MuseumName, Couples_Count Count из (выберите MuseumName, Couples_Count, rank () более (порядок по длине (Couples_Count) desc, Couples_Count desc, ранг из семейства), где rank <= 10 " ) .withColumn ( "Visitor_Type" , горит ( "Couples_Count" )) df3 = spark .sql ("выберите значение MuseumName, Solo_Count Count (выберите значение MuseumName, Solo_Count, rank () более (порядок по длине (Solo_Count), Solo_Count desc) из семейства), где rank <= 10" ) .withColumn ( "Visitor_Type" , lit ( " Solo_Count " )) df4 = spark .sql ( " выберите MuseumName, Business_Count Count из (выберите MuseumName, Business_Count, rank () более (порядок по длине (Business_Count) desc, Business_Count desc) из семейства), где rank <= 10 " ) .withColumn ( "Visitor_Type" , горит ( "Business_Count" )) df5 = spark .sql ("выберите значение MuseumName, Friends_Count Count из (выберите значение MuseumName, Friends_Count, rank () более (порядок по длине (Friends_Count) desc, Friends_Count desc) из семейства) где rank <= 10" ) .withColumn ( "Visitor_Type" , lit ( " Friends_Count " )) df6 = df1 .unionAll (df2) .unionAll (df3) .unionAll (df4) .unionAll (df5) df6 .write.csv ( '/user/demo/spark/top_museums_by_count.csv' )
2
Мы также можем сохранить этот скрипт с расширением .py и отправить заявку, используя spark-submit. У нас были разные подсчеты. Поэтому мы создали отдельные DataFrames и объединили их, используя объединение. Сортировка выполняется по порядку первой цифры, а не по номеру, если вы используете обычный код сортировки. Таким образом, результат будет примерно таким:
В этом случае, включая длину столбца также для точных результатов.
Например,
xxxxxxxxxx
1
df1 = spark .sql ( "выберите MuseumName, Families_Count из семейного заказа по длине (Families_Count) desc, Families_Count desc" )
2
После этого запишите Spark DataFrame в виде файла CSV. Поведение по умолчанию заключается в сохранении вывода в несколько частей - *. CSV- файлов по указанному пути. Давайте запросим папку, в которую мы ответили. Вы можете увидеть «top_museums.csv», который не является CSV-файлом, а каталогом, в котором ваши выходные данные сохранены в нескольких частях. Эта структура ссылок на папки играет важную роль в распределенном хранении и обработке.
Предположим, мне нужно сохранить Dataframe с:
- Путь, который отображается на точное имя файла вместо папки.
- Пишите как один файл вместо нескольких файлов.
Затем объедините DF и сохраните файл.
Преимущества Spark на Alibaba Cloud
Адаптивное Исполнение
Spark SQL от Alibaba Cloud поддерживает адаптивное выполнение. Он используется для автоматической установки количества задач сокращения и самостоятельного решения проблем с данными. Настраивая диапазон номеров разделов в случайном порядке, среда адаптивного выполнения Spark SQL может динамически регулировать количество задач сокращения на разных этапах различных заданий.
Перекос данных
Отклонение данных относится к сценарию, в котором определенные задачи вовлекают в обработку слишком много данных. Spark SQL не выполняет оптимизацию для искаженных данных, что может быть решено с помощью инфраструктуры адаптивного выполнения Spark SQL. Это может автоматически обнаруживать перекос данных и выполнять оптимизацию во время выполнения.
Лучшие практики
- Не собирайте большие RDD и, по возможности, предпочитайте API набора данных по умолчанию RDD.
- Избегайте UDF и замените их функциями Spark SQL.
- При выполнении задания чтения / записи HDFS задайте число одновременных заданий для каждого исполнителя, которое должно быть меньше или равно 5 для чтения и записи.
- Общий совет - искать время выполнения и соответственно увеличивать работу.
Надеюсь, вам понравилось изучать Spark. Нашим следующим шагом будет изучение создания и отправки различных заданий с помощью пользовательского интерфейса Alibaba Cloud, а также выполнение запросов и анализа. В следующей статье мы познакомим вас с основами Hive, включая создание таблиц и другие базовые концепции для приложений с большими данными.
«Цель - превратить данные в информацию, а информацию в понимание», - Карли Фиорина
Дальнейшее чтение
Как подготовить данные для обучения OCR
Исследование данных и подготовка данных для бизнес-аналитики
3 Выводы из Руководства Gartner по рынку 2019 года для подготовки данных