Что такое Apache Spark?
Spark — это решение для обработки больших данных, которое оказалось проще и быстрее, чем Hadoop MapReduce. Spark — это программное обеспечение с открытым исходным кодом, разработанное лабораторией UC Berkeley RAD в 2009 году. С тех пор, как оно было выпущено для публики в 2010 году, популярность Spark возросла и используется в этой отрасли в беспрецедентных масштабах.
В эпоху больших данных практикующим специалистам нужны более быстрые и надежные инструменты для обработки потоковых данных. Более ранние инструменты, такие как MapReduce, были любимыми, но были медленными. Чтобы преодолеть эту проблему, Spark предлагает быстрое и универсальное решение. Основное различие между Spark и MapReduce состоит в том, что Spark выполняет вычисления в памяти во время последующего на жестком диске. Это обеспечивает высокоскоростной доступ и обработку данных, сокращая время с часов до минут.
Что такое Pyspark?
Spark — это имя движка для реализации кластерных вычислений, а PySpark — это библиотека Python для использования Spark.
В этом уроке вы узнаете
- Что такое Apache Spark?
- Как работает Spark?
- Запустите Pyspark с AWS
- Установите Pyspark на Mac / Windows с помощью Conda
- Spark Context
- SQLContext
- Машинное обучение с Spark
- Шаг 1) Основные операции с PySpark
- Шаг 2) Предварительная обработка данных
- Шаг 3) Построить конвейер обработки данных
- Шаг 4) Построить классификатор: логистика
- Шаг 5) Обучите и оцените модель
- Шаг 6) Настройте гиперпараметр
Как работает Spark?
Spark основан на вычислительном движке, то есть он отвечает за планирование, распределение и мониторинг приложений. Каждая задача выполняется на различных рабочих машинах, называемых вычислительным кластером. Вычислительный кластер относится к разделению задач. Одна машина выполняет одну задачу, в то время как другие вносят вклад в конечный результат посредством другой задачи. В конце концов, все задачи агрегируются для получения результата. Администратор Spark дает обзор 360 различных рабочих мест Spark.
Spark предназначен для работы с
- питон
- Ява
- Scala
- SQL
Важной особенностью Spark является огромное количество встроенных библиотек, в том числе MLlib для машинного обучения . Spark также предназначен для работы с кластерами Hadoop и может читать широкий спектр файлов, включая данные Hive, CSV, JSON, Casandra и другие.
Зачем использовать Spark?
As a future data practitioner, you should be familiar with python’s famous libraries: Pandas and scikit-learn. These two libraries are fantastic to explore dataset up to mid-size. Regular machine learning projects are built around the following methodology:
- Load the data to the disk
- Import the data into the machine’s memory
- Process/analyze the data
- Build the machine learning model
- Store the prediction back to disk
The problem arises if the data scientist wants to process data that’s too big for one computer. During earlier days of data science, the practitioners would sample the as training on huge data sets was not always needed. The data scientist would find a good statistical sample, perform an additional robustness check and comes up with an excellent model.
However, there are some problems with this:
- Is the dataset reflecting the real world?
- Does the data include a specific example?
- Is the model fit for sampling?
Take users recommendation for instance. Recommenders rely on comparing users with other users in evaluating their preferences. If the data practitioner takes only a subset of the data, there won’t be a cohort of users who are very similar to one another. Recommenders need to run on the full dataset or not at all.
What is the solution?
The solution has been evident for a long time, split the problem up onto multiple computers. Parallel computing comes with multiple problems as well. Developers often have trouble writing parallel code and end up having to solve a bunch of the complex issues around multi-processing itself.
Pyspark gives the data scientist an API that can be used to solve the parallel data proceedin problems. Pyspark handles the complexities of multiprocessing, such as distributing the data, distributing code and collecting output from the workers on a cluster of machines.
Spark can run standalone but most often runs on top of a cluster computing framework such as Hadoop. In test and development, however, a data scientist can efficiently run Spark on their development boxes or laptops without a cluster
• One of the main advantages of Spark is to build an architecture that encompasses data streaming management, seamlessly data queries, machine learning prediction and real-time access to various analysis.
• Spark works closely with SQL language, i.e., structured data. It allows querying the data in real time.
• Data scientist main’s job is to analyze and build predictive models. In short, a data scientist needs to know how to query data using SQL, produce a statistical report and make use of machine learning to produce predictions. Data scientist spends a significant amount of their time on cleaning, transforming and analyzing the data. Once the dataset or data workflow is ready, the data scientist uses various techniques to discover insights and hidden patterns. The data manipulation should be robust and the same easy to use. Spark is the right tool thanks to its speed and rich APIs.
In this tutorial, you will learn how to build a classifier with Pyspark.
Launch Pyspark with AWS
The Jupyter team build a Docker image to run Spark efficiently. You can follow this step to launch Spark instance in AWS.
См. Наш учебник по AWS и TensorFlow
Шаг 1. Создайте экземпляр
Прежде всего, вам нужно создать экземпляр. Перейдите в свою учетную запись AWS и запустите экземпляр. Вы можете увеличить объем хранилища до 15 г и использовать ту же группу безопасности, что и в руководстве TensorFlow.
Шаг 2: Откройте соединение
Откройте соединение и установите Docker-контейнер. Для получения более подробной информации, обратитесь к учебнику с TensorFlow с Docker. Обратите внимание, что вы должны быть в правильном рабочем каталоге.
Просто запустите эти коды для установки Docker:
sudo yum update -y sudo yum install -y docker sudo service docker start sudo user-mod -a -G docker ec2-user exit
Шаг 3: снова откройте соединение и установите Spark
После того, как вы снова откроете соединение, вы можете установить образ, содержащий Pyspark.
## Spark docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook ## Allow preserving Jupyter notebook sudo chown 1000 ~/work ## Install tree to see our working directory next sudo yum install -y tree
Шаг 4: Откройте Jupyter
Проверьте контейнер и его название
docker ps
Запустите докер с журналами докера, за которым следует имя докера. Например, докер регистрирует журнал zealous_goldwasser
Зайдите в ваш браузер и запустите Jupyter. Адрес: http: // localhost: 8888 /. Вставьте пароль, заданный терминалом.
Примечание . Если вы хотите загрузить / загрузить файл на свой компьютер AWS, вы можете использовать программное обеспечение Cyberduck, https://cyberduck.io/ .
Установите Pyspark на Mac / Windows с помощью Conda
Для установки Spark на локальный компьютер рекомендуется создать новую среду conda. Эта новая среда установит Python 3.6, Spark и все зависимости.
Пользователь Mac
cd anaconda3 touch hello-spark.yml vi hello-spark.yml
Пользователь Windows
cd C:\Users\Admin\Anaconda3 echo.>hello-spark.yml notepad hello-spark.yml
Вы можете редактировать файл .yml. Будьте осторожны с отступом. Два пробела требуются перед —
name: hello-spark dependencies: - python=3.6 - jupyter - ipython - numpy - numpy-base - pandas - py4j - pyspark - pytz
Сохраните это и создайте окружающую среду. Это займет некоторое время
conda env create -f hello-spark.yml
Для получения более подробной информации о местоположении, пожалуйста, ознакомьтесь с руководством Установить TensorFlow
Вы можете проверить всю среду, установленную на вашем компьютере
conda env list
Activate hello-spark
Пользователь Mac
source activate hello-spark
Пользователь Windows
activate hello-spark
Примечание. Вы уже создали специальную среду TensorFlow для запуска учебников по TensorFlow. Удобнее создавать новую среду, отличную от hello-tf. Нет смысла перегружать hello-tf Spark или любыми другими библиотеками машинного обучения.
Представьте, что большая часть вашего проекта связана с TensorFlow, но вам нужно использовать Spark для одного конкретного проекта. Вы можете установить среду TensorFlow для всего вашего проекта и создать отдельную среду для Spark. Вы можете добавить столько библиотек в среду Spark, сколько захотите, не вмешиваясь в среду TensorFlow. Как только вы закончите с проектом Spark, вы можете стереть его, не затрагивая среду TensorFlow.
Jupyter
Откройте Jupyter Notebook и попробуйте, если PySpark работает. В новой записной книжке вставьте следующий код:
import pyspark from pyspark import SparkContext sc =SparkContext()
Если отображается ошибка, скорее всего, Java не установлена на вашем компьютере. В Mac откройте терминал и напишите java -version, если есть java-версия, убедитесь, что она 1.8. В Windows перейдите в Приложение и проверьте, есть ли папка Java. Если есть папка Java, убедитесь, что установлена Java 1.8. На момент написания статьи PySpark не совместим с Java9 и выше.
Если вам нужно установить Java, вы должны подумать о ссылке и скачать jdk-8u181-windows-x64.exe
Для пользователей Mac рекомендуется использовать «brew».
brew tap caskroom/versions brew cask install java8
Ознакомьтесь с этим пошаговым руководством по установке Java
Примечание . Используйте команду «Удалить», чтобы полностью стереть среду.
conda env remove -n hello-spark -y
Spark Context
SparkContext является внутренним механизмом, который позволяет соединения с кластерами. Если вы хотите запустить операцию, вам нужен SparkContext.
Создать SparkContext
Прежде всего, вам нужно инициировать SparkContext.
import pyspark from pyspark import SparkContext sc =SparkContext()
Теперь, когда SparkContext готов, вы можете создать коллекцию данных с именем RDD, Resilient Distributed Dataset. Вычисления в RDD автоматически распараллеливаются по всему кластеру.
nums= sc.parallelize([1,2,3,4])
Вы можете получить доступ к первой строке с дублем
nums.take(1)
[1]
Вы можете применить преобразование к данным с помощью лямбда-функции. В приведенном ниже примере вы возвращаете квадрат чисел. Это преобразование карты
squared = nums.map(lambda x: x*x).collect() for num in squared: print('%i ' % (num))
1 4 9 16
SQLContext
Более удобный способ — использовать DataFrame. SparkContext уже установлен, вы можете использовать его для создания dataFrame. Вам также нужно объявить SQLContext
SQLContext позволяет подключать движок к разным источникам данных. Он используется для запуска функций Spark SQL.
from pyspark.sql import Row from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
Давайте создадим список кортежей. Каждый кортеж будет содержать имя людей и их возраст. Требуются четыре шага:
Шаг 1) Создайте список кортежей с информацией
[('John',19),('Smith',29),('Adam',35),('Henry',50)]
Шаг 2) Постройте RDD
rdd = sc.parallelize(list_p)
Шаг 3) Конвертировать кортежи
rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
Шаг 4) Создайте контекст DataFrame
sqlContext.createDataFrame(ppl) list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)] rdd = sc.parallelize(list_p) ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) DF_ppl = sqlContext.createDataFrame(ppl)
Если вы хотите получить доступ к типу каждой функции, вы можете использовать printSchema ()
DF_ppl.printSchema() root |-- age: long (nullable = true) |-- name: string (nullable = true)
Машинное обучение с Spark
Теперь, когда у вас есть краткое представление о Spark и SQLContext, вы готовы создать свою первую программу машинного обучения.
Вы будете действовать следующим образом:
- Шаг 1) Основные операции с PySpark
- Шаг 2) Предварительная обработка данных
- Шаг 3) Построить конвейер обработки данных
- Шаг 4) Построить классификатор
- Шаг 5) Обучите и оцените модель
- Шаг 6) Настройте гиперпараметр
В этом уроке мы будем использовать набор данных для взрослых. Цель этого урока — научиться пользоваться Pyspark. Для получения дополнительной информации о наборе данных обратитесь к этому руководству.
Обратите внимание, что набор данных не имеет значения, и вы можете подумать, что вычисление занимает много времени. Spark предназначен для обработки значительного объема данных. Производительность Spark увеличивается по сравнению с другими библиотеками машинного обучения, когда объем обрабатываемого набора данных увеличивается.
Шаг 1) Основные операции с PySpark
Прежде всего, вам нужно инициализировать SQLContext, который еще не запущен.
#from pyspark.sql import SQLContext url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv" from pyspark import SparkFiles sc.addFile(url) sqlContext = SQLContext(sc)
затем вы можете прочитать файл cvs с помощью sqlContext.read.csv. Вы используете inferSchema со значением True, чтобы Spark автоматически угадывал тип данных. По умолчанию это ложь.
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)
Давайте посмотрим на тип данных
df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
Вы можете увидеть данные с шоу.
df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |age|workclass |fnlwgt|education|education_num|marital |occupation |relationship |race |sex |capital_gain|capital_loss|hours_week|native_country|label| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |39 |State-gov |77516 |Bachelors|13 |Never-married |Adm-clerical |Not-in-family|White|Male |2174 |0 |40 |United-States |<=50K| |50 |Self-emp-not-inc|83311 |Bachelors|13 |Married-civ-spouse|Exec-managerial |Husband |White|Male |0 |0 |13 |United-States |<=50K| |38 |Private |215646|HS-grad |9 |Divorced |Handlers-cleaners|Not-in-family|White|Male |0 |0 |40 |United-States |<=50K| |53 |Private |234721|11th |7 |Married-civ-spouse|Handlers-cleaners|Husband |Black|Male |0 |0 |40 |United-States |<=50K| |28 |Private |338409|Bachelors|13 |Married-civ-spouse|Prof-specialty |Wife |Black|Female|0 |0 |40 |Cuba |<=50K| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ only showing top 5 rows
Если вы не установили inderShema в True, вот что происходит с типом. Есть все в строке.
df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema= False) df_string.printSchema() root |-- age: string (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: string (nullable = true) |-- education: string (nullable = true) |-- education_num: string (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: string (nullable = true) |-- capital_loss: string (nullable = true) |-- hours_week: string (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
Чтобы преобразовать непрерывную переменную в правильный формат, вы можете использовать переделать столбцы. Вы можете использовать withColumn, чтобы сообщить Spark, какой столбец должен выполнять преобразование.
# Import all from `sql.types` from pyspark.sql.types import * # Write a custom function to convert the data type of DataFrame columns def convertColumn(df, names, newType): for name in names: df = df.withColumn(name, df[name].cast(newType)) return df # List of continuous features CONTI_FEATURES = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week'] # Convert the type df_string = convertColumn(df_string, CONTI_FEATURES, FloatType()) # Check the dataset df_string.printSchema() root |-- age: float (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: float (nullable = true) |-- education: string (nullable = true) |-- education_num: float (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: float (nullable = true) |-- capital_loss: float (nullable = true) |-- hours_week: float (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) from pyspark.ml.feature import StringIndexer #stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel") #model = stringIndexer.fit(df) #df = model.transform(df) df.printSchema()
Выберите столбцы
Вы можете выбрать и показать строки с помощью выбора и имен объектов. Ниже, возраст и fnlwgt выбраны.
df.select('age','fnlwgt').show(5)
+---+------+ |age|fnlwgt| +---+------+ | 39| 77516| | 50| 83311| | 38|215646| | 53|234721| | 28|338409| +---+------+ only showing top 5 rows
Подсчет по группе
Если вы хотите посчитать количество вхождений по группам, вы можете связать:
- группа по()
- кол-()
все вместе. В приведенном ниже примере вы подсчитываете количество строк по уровню образования.
df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+ | education|count| +------------+-----+ | Preschool| 51| | 1st-4th| 168| | 5th-6th| 333| | Doctorate| 413| | 12th| 433| | 9th| 514| | Prof-school| 576| | 7th-8th| 646| | 10th| 933| | Assoc-acdm| 1067| | 11th| 1175| | Assoc-voc| 1382| | Masters| 1723| | Bachelors| 5355| |Some-college| 7291| | HS-grad|10501| +------------+-----+
Опишите данные
Чтобы получить сводную статистику данных, вы можете использовать description (). Это вычислит:
- подсчитывать
- жадный
- среднеквадратичное отклонение
- мин
- Максимум
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ |summary| age| workclass| fnlwgt| education| education_num| marital| occupation|relationship| race| sex| capital_gain| capital_loss| hours_week|native_country|label| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ | count| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561|32561| | mean| 38.58164675532078| null|189778.36651208502| null| 10.0806793403151| null| null| null| null| null|1077.6488437087312| 87.303829734959|40.437455852092995| null| null| | stddev|13.640432553581356| null|105549.97769702227| null|2.572720332067397| null| null| null| null| null| 7385.292084840354|402.960218649002|12.347428681731838| null| null| | min| 17| ?| 12285| 10th| 1|Divorced| ?| Husband|Amer-Indian-Eskimo|Female| 0| 0| 1| ?|<=50K| | max| 90|Without-pay| 1484705|Some-college| 16| Widowed|Transport-moving| Wife| White| Male| 99999| 4356| 99| Yugoslavia| >50K| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
Если вы хотите получить сводную статистику только по одному столбцу, добавьте имя столбца внутри description ()
df.describe('capital_gain').show()
+-------+------------------+ |summary| capital_gain| +-------+------------------+ | count| 32561| | mean|1077.6488437087312| | stddev| 7385.292084840354| | min| 0| | max| 99999| +-------+------------------+
Кросс-таблица вычислений
В некоторых случаях может быть интересно увидеть описательную статистику между двумя попарно столбцами. Например, вы можете посчитать количество людей с доходом ниже или выше 50 000 в зависимости от уровня образования. Эта операция называется кросс-таблицей.
df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+ |age_label|<=50K|>50K| +---------+-----+----+ | 17| 395| 0| | 18| 550| 0| | 19| 710| 2| | 20| 753| 0| | 21| 717| 3| | 22| 752| 13| | 23| 865| 12| | 24| 767| 31| | 25| 788| 53| | 26| 722| 63| | 27| 754| 81| | 28| 748| 119| | 29| 679| 134| | 30| 690| 171| | 31| 705| 183| | 32| 639| 189| | 33| 684| 191| | 34| 643| 243| | 35| 659| 217| | 36| 635| 263| +---------+-----+----+ only showing top 20 rows
Вы можете видеть, что у людей нет дохода выше 50 тысяч, когда они молоды.
Опустить столбец
Существует два интуитивно понятных API для удаления столбцов:
- drop (): удалить столбец
- dropna (): отбросить NA
Ниже вы опускаете столбец education_num
df.drop('education_num').columns ['age', 'workclass', 'fnlwgt', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label']
Фильтровать данные
Вы можете использовать filter () для применения описательной статистики в подмножестве данных. Например, вы можете посчитать количество людей старше 40 лет
df.filter(df.age > 40).count()
13443
Описательная статистика по группам
Наконец, вы можете группировать данные по группам и вычислять статистические операции, такие как среднее значение.
df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+ | marital| avg(capital_gain)| +--------------------+------------------+ | Separated| 535.5687804878049| | Never-married|376.58831788823363| |Married-spouse-ab...| 653.9832535885167| | Divorced| 728.4148098131893| | Widowed| 571.0715005035247| | Married-AF-spouse| 432.6521739130435| | Married-civ-spouse|1764.8595085470085| +--------------------+------------------+
Шаг 2) Предварительная обработка данных
Обработка данных является важным шагом в машинном обучении. После того, как вы удалите данные мусора, вы получите некоторые важные идеи. Например, вы знаете, что возраст не является линейной функцией дохода. Когда люди молоды, их доход обычно ниже, чем в среднем возрасте. После выхода на пенсию домохозяйство использует свои сбережения, что означает уменьшение дохода. Чтобы захватить этот шаблон, вы можете добавить квадрат к возрасту
Добавить возрастной квадрат
Чтобы добавить новую функцию, вам необходимо:
- Выберите столбец
- Примените преобразование и добавьте его в DataFrame.
from pyspark.sql.functions import * # 1 Select the column age_square = df.select(col("age")**2) # 2 Apply the transformation and add it to the DataFrame df = df.withColumn("age_square", col("age")**2) df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) |-- age_square: double (nullable = true)
Вы можете видеть, что age_square был успешно добавлен во фрейм данных. Вы можете изменить порядок переменных с помощью выбора. Ниже вы приводите age_square сразу после возраста.
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label'] df = df.select(COLUMNS) df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')
Исключить Голанд-Нидерланды
Когда группа в объекте имеет только одно наблюдение, она не приносит информации в модель. Наоборот, это может привести к ошибке во время перекрестной проверки.
Давайте проверим происхождение домашнего хозяйства
df.filter(df.native_country == 'Holand-Netherlands').count() df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+ | native_country|count(native_country)| +--------------------+---------------------+ | Holand-Netherlands| 1| | Scotland| 12| | Hungary| 13| | Honduras| 13| |Outlying-US(Guam-...| 14| | Yugoslavia| 16| | Thailand| 18| | Laos| 18| | Cambodia| 19| | Trinadad&Tobago| 19| | Hong| 20| | Ireland| 24| | Ecuador| 28| | Greece| 29| | France| 29| | Peru| 31| | Nicaragua| 34| | Portugal| 37| | Iran| 43| | Haiti| 44| +--------------------+---------------------+ only showing top 20 rows
Функция native_country имеет только одну семью из Нидерландов. Вы исключаете это.
df_remove = df.filter(df.native_country != 'Holand-Netherlands')
Шаг 3) Построить конвейер обработки данных
Подобно scikit-learn, Pyspark имеет конвейерный API. Конвейер очень удобен для поддержания структуры данных. Вы помещаете данные в конвейер. Внутри конвейера выполняются различные операции, выходные данные используются для подачи алгоритма.
Например, одно универсальное преобразование в машинном обучении состоит в преобразовании строки в один горячий кодировщик, то есть один столбец группой. Один горячий кодировщик обычно представляет собой матрицу, полную нулей.
Шаги для преобразования данных очень похожи на scikit-learn. Тебе следует:
- Индексируйте строку в числовой
- Создайте один горячий кодировщик
- Преобразовать данные
Два API делают работу: StringIndexer, OneHotEncoder
- Прежде всего, вы выбираете столбец строки для индексации. InputCol — это имя столбца в наборе данных. outputCol — это новое имя, данное преобразованному столбцу.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
- Подгоните данные и преобразуйте их
model = stringIndexer.fit(df) `indexed = model.transform(df)``
- Создайте колонки новостей на основе группы. Например, если в объекте 10 групп, в новой матрице будет 10 столбцов, по одному для каждой группы.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded") model = stringIndexer.fit(df) indexed = model.transform(df) encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec") encoded = encoder.transform(indexed) encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ |age|age_square| workclass|fnlwgt|education|education_num| marital| occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ | 39| 1521.0| State-gov| 77516|Bachelors| 13| Never-married| Adm-clerical|Not-in-family|White|Male| 2174| 0| 40| United-States|<=50K| 4.0|(9,[4],[1.0])| | 50| 2500.0|Self-emp-not-inc| 83311|Bachelors| 13|Married-civ-spouse|Exec-managerial| Husband|White|Male| 0| 0| 13| United-States|<=50K| 1.0|(9,[1],[1.0])| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ only showing top 2 rows
Построить трубопровод
Вы создадите конвейер для преобразования всех точных объектов и добавления их в окончательный набор данных. В конвейере будет четыре операции, но вы можете добавить столько операций, сколько захотите.
- Кодировать категориальные данные
- Индексируйте метку
- Добавить непрерывную переменную
- Соберите шаги.
Каждый шаг хранится в списке названных этапов. Этот список расскажет VectorAssembler, какую операцию выполнить внутри конвейера.
1. Кодировать категориальные данные
Этот шаг точно такой же, как в приведенном выше примере, за исключением того, что вы перебираете все категориальные функции.
from pyspark.ml import Pipeline from pyspark.ml.feature import OneHotEncoderEstimator CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country'] stages = [] # stages in our Pipeline for categoricalCol in CATE_FEATURES: stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index") encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"]) stages += [stringIndexer, encoder]
2. Индексируйте метку
Spark, как и многие другие библиотеки, не принимает строковые значения для метки. Вы конвертируете функцию метки с помощью StringIndexer и добавляете ее на стадии списка
# Convert label into label indices using the StringIndexer label_stringIdx = StringIndexer(inputCol="label", outputCol="newlabel") stages += [label_stringIdx]
3. Добавить непрерывную переменную
InputCols объекта VectorAssembler представляет собой список столбцов. Вы можете создать новый список, содержащий все новые столбцы. Код ниже пополняет список закодированными категориальными функциями и непрерывными функциями.
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES
4. Соберите шаги.
Наконец, вы проходите все шаги в VectorAssembler
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]
Теперь, когда все шаги готовы, вы отправляете данные в конвейер.
# Create a Pipeline. pipeline = Pipeline(stages=stages) pipelineModel = pipeline.fit(df_remove) model = pipelineModel.transform(df_remove)
Если вы проверите новый набор данных, вы увидите, что он содержит все функции, преобразованные и не преобразованные. Вас интересует только новая маркировка и особенности. Особенности включают в себя все преобразованные функции и непрерывные переменные.
model.take(1) [Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]
Шаг 4) Построить классификатор: логистика
Чтобы ускорить вычисления, вы конвертируете модель в DataFrame. Вам нужно выбрать новую метку и объекты из модели, используя карту.
from pyspark.ml.linalg import DenseVector input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
Вы готовы создать данные поезда в виде DataFrame. Вы используете sqlContext
df_train = sqlContext.createDataFrame(input_data, ["label", "features"])
Проверьте второй ряд
df_train.show(2)
+-----+--------------------+ |label| features| +-----+--------------------+ | 0.0|[0.0,0.0,0.0,0.0,...| | 0.0|[0.0,1.0,0.0,0.0,...| +-----+--------------------+ only showing top 2 rows
Создать поезд / тестовый набор
Вы разделяете набор данных 80/20 с помощью randomSplit.
# Split the data into train and test sets train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)
Давайте посчитаем, сколько людей с доходом ниже / выше 50 тыс. Как в тренировочном, так и в тестовом наборе
train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 19698| | 1.0| 6263| +-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
Построить логистический регрессор
И последнее, но не менее важное: вы можете построить классификатор. Pyspark имеет API, который называется LogisticRegression, для выполнения логистической регрессии.
Вы инициализируете lr, указывая столбец метки и столбцы объектов. Вы устанавливаете максимум 10 итераций и добавляете параметр регуляризации со значением 0,3. Обратите внимание, что в следующем разделе вы будете использовать перекрестную проверку с сеткой параметров для настройки модели.
# Import `LinearRegression` from pyspark.ml.classification import LogisticRegression # Initialize `lr` lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3) # Fit the data to the model linearModel = lr.fit(train_data)
# Вы можете увидеть коэффициенты из регрессии
# Print the coefficients and intercept for logistic regression print("Coefficients: " + str(linearModel.coefficients)) print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698] Intercept: -1.9884177974805692
Шаг 5) Обучите и оцените модель
Чтобы сгенерировать прогноз для вашего тестового набора, вы можете использовать linearModel с transform () для test_data
# Make predictions on test data using the transform() method. predictions = linearModel.transform(test_data)
Вы можете распечатать элементы в прогнозах
predictions.printSchema() root |-- label: double (nullable = true) |-- features: vector (nullable = true) |-- rawPrediction: vector (nullable = true) |-- probability: vector (nullable = true) |-- prediction: double (nullable = false)
Вас интересует ярлык, прогноз и вероятность
selected = predictions.select("label", "prediction", "probability") selected.show(20)
+-----+----------+--------------------+ |label|prediction| probability| +-----+----------+--------------------+ | 0.0| 0.0|[0.91560704124179...| | 0.0| 0.0|[0.92812140213994...| | 0.0| 0.0|[0.92161406774159...| | 0.0| 0.0|[0.96222760777142...| | 0.0| 0.0|[0.66363283056957...| | 0.0| 0.0|[0.65571324475477...| | 0.0| 0.0|[0.73053376932829...| | 0.0| 1.0|[0.31265053873570...| | 0.0| 0.0|[0.80005907577390...| | 0.0| 0.0|[0.76482251301640...| | 0.0| 0.0|[0.84447301189069...| | 0.0| 0.0|[0.75691912026619...| | 0.0| 0.0|[0.60902504096722...| | 0.0| 0.0|[0.80799228385509...| | 0.0| 0.0|[0.87704364852567...| | 0.0| 0.0|[0.83817652582377...| | 0.0| 0.0|[0.79655423248500...| | 0.0| 0.0|[0.82712311232246...| | 0.0| 0.0|[0.81372823882016...| | 0.0| 0.0|[0.59687710752201...| +-----+----------+--------------------+ only showing top 20 rows
Оценить модель
Вам нужно посмотреть на метрику точности, чтобы увидеть, насколько хорошо (или плохо) модель работает. В настоящее время в Spark нет API для вычисления показателя точности. Значением по умолчанию является ROC, кривая рабочих характеристик приемника. Это разные метрики, которые учитывают ложноположительный показатель.
Прежде чем вы посмотрите на ROC, давайте построим меру точности. Вы более знакомы с этой метрикой. Мера точности — это сумма правильного прогноза по общему количеству наблюдений.
Вы создаете DataFrame с меткой и `предсказанием.
cm = predictions.select("label", "prediction")
Вы можете проверить номер класса в метке и прогноз
cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+ |prediction|count(prediction)| +----------+-----------------+ | 0.0| 5982| | 1.0| 617| +----------+-----------------+
Например, в тестовом наборе имеется 1578 домашних хозяйств с доходом выше 50 тысяч, а ниже 5021. Классификатор, однако, предсказал 617 домашних хозяйств с доходом выше 50 тысяч.
Вы можете вычислить точность путем вычисления количества, когда метка правильно классифицирована по общему количеству строк.
cm.filter(cm.label == cm.prediction).count() / cm.count()
0,8237611759357478
Вы можете обернуть все вместе и написать функцию для вычисления точности.
def accuracy_m(model): predictions = model.transform(test_data) cm = predictions.select("label", "prediction") acc = cm.filter(cm.label == cm.prediction).count() / cm.count() print("Model accuracy: %.3f%%" % (acc * 100)) accuracy_m(model = linearModel) Model accuracy: 82.376%
ROC metrics
The module BinaryClassificationEvaluator includes the ROC measures. The Receiver Operating Characteristic curve is another common tool used with binary classification. It is very similar to the precision/recall curve, but instead of plotting precision versus recall, the ROC curve shows the true positive rate (i.e. recall) against the false positive rate. The false positive rate is the ratio of negative instances that are incorrectly classified as positive. It is equal to one minus the true negative rate. The true negative rate is also called specificity. Hence the ROC curve plots sensitivity (recall) versus 1 — specificity
### Use ROC from pyspark.ml.evaluation import BinaryClassificationEvaluator # Evaluate model evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction") print(evaluator.evaluate(predictions)) print(evaluator.getMetricName())
0.8940481662695192areaUnderROC
print(evaluator.evaluate(predictions))
0.8940481662695192
Step 6) Tune the hyperparameter
Last but not least, you can tune the hyperparameters. Similar to scikit learn you create a parameter grid, and you add the parameters you want to tune. To reduce the time of the computation, you only tune the regularization parameter with only two values.
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # Create ParamGrid for Cross Validation paramGrid = (ParamGridBuilder() .addGrid(lr.regParam, [0.01, 0.5]) .build())
Finally, you evaluate the model with using the cross valiation method with 5 folds. It takes around 16 minutes to train.
from time import * start_time = time() # Create 5-fold CrossValidator cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5) # Run cross validations cvModel = cv.fit(train_data) # likely take a fair amount of time end_time = time() elapsed_time = end_time - start_time print("Time to train model: %.3f seconds" % elapsed_time)
Время на тренировку модели: 978,880 секунд.
Лучший гиперпараметр регуляризации — 0,01 с точностью 85,316%.
accuracy_m(model = cvModel) Model accuracy: 85.316%
Вы можете извлечь рекомендуемый параметр, связав cvModel.bestModel с помощью extractParamMap ()
bestModel = cvModel.bestModel bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}
Резюме
Spark — это фундаментальный инструмент для исследователя данных. Это позволяет практикующему врачу подключить приложение к различным источникам данных, беспрепятственно выполнять анализ данных или добавлять прогностическую модель.
Чтобы начать с Spark, вам нужно запустить контекст Spark с помощью:
`SparkContext ()` `
и и контекст SQL для подключения к источнику данных:
`SQLContext ()` `
В этом уроке вы научитесь тренировать логистическую регрессию:
- Преобразуйте набор данных в Dataframe с помощью:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"]))) sqlContext.createDataFrame(input_data, ["label", "features"])
Обратите внимание, что имя столбца метки — newlabel, и все функции объединены в функции. Измените эти значения, если они отличаются в вашем наборе данных.
- Создать поезд / тестовый набор
randomSplit([.8,.2],seed=1234)
- Тренируй модель
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
- Сделать прогноз
linearModel.transform()