Учебники

16) Учебник Apache Spark

Что такое Apache Spark?

Spark — это решение для обработки больших данных, которое оказалось проще и быстрее, чем Hadoop MapReduce. Spark — это программное обеспечение с открытым исходным кодом, разработанное лабораторией UC Berkeley RAD в 2009 году. С тех пор, как оно было выпущено для публики в 2010 году, популярность Spark возросла и используется в этой отрасли в беспрецедентных масштабах.

В эпоху больших данных практикующим специалистам нужны более быстрые и надежные инструменты для обработки потоковых данных. Более ранние инструменты, такие как MapReduce, были любимыми, но были медленными. Чтобы преодолеть эту проблему, Spark предлагает быстрое и универсальное решение. Основное различие между Spark и MapReduce состоит в том, что Spark выполняет вычисления в памяти во время последующего на жестком диске. Это обеспечивает высокоскоростной доступ и обработку данных, сокращая время с часов до минут.

Что такое Pyspark?

Spark — это имя движка для реализации кластерных вычислений, а PySpark — это библиотека Python для использования Spark.

В этом уроке вы узнаете

Как работает Spark?

Spark основан на вычислительном движке, то есть он отвечает за планирование, распределение и мониторинг приложений. Каждая задача выполняется на различных рабочих машинах, называемых вычислительным кластером. Вычислительный кластер относится к разделению задач. Одна машина выполняет одну задачу, в то время как другие вносят вклад в конечный результат посредством другой задачи. В конце концов, все задачи агрегируются для получения результата. Администратор Spark дает обзор 360 различных рабочих мест Spark.

Spark предназначен для работы с

  • питон
  • Ява
  • Scala
  • SQL

Важной особенностью Spark является огромное количество встроенных библиотек, в том числе MLlib для машинного обучения . Spark также предназначен для работы с кластерами Hadoop и может читать широкий спектр файлов, включая данные Hive, CSV, JSON, Casandra и другие.

Зачем использовать Spark?

As a future data practitioner, you should be familiar with python’s famous libraries: Pandas and scikit-learn. These two libraries are fantastic to explore dataset up to mid-size. Regular machine learning projects are built around the following methodology:

  • Load the data to the disk
  • Import the data into the machine’s memory
  • Process/analyze the data
  • Build the machine learning model
  • Store the prediction back to disk

The problem arises if the data scientist wants to process data that’s too big for one computer. During earlier days of data science, the practitioners would sample the as training on huge data sets was not always needed. The data scientist would find a good statistical sample, perform an additional robustness check and comes up with an excellent model.

However, there are some problems with this:

  • Is the dataset reflecting the real world?
  • Does the data include a specific example?
  • Is the model fit for sampling?

Take users recommendation for instance. Recommenders rely on comparing users with other users in evaluating their preferences. If the data practitioner takes only a subset of the data, there won’t be a cohort of users who are very similar to one another. Recommenders need to run on the full dataset or not at all.

What is the solution?

The solution has been evident for a long time, split the problem up onto multiple computers. Parallel computing comes with multiple problems as well. Developers often have trouble writing parallel code and end up having to solve a bunch of the complex issues around multi-processing itself.

Pyspark gives the data scientist an API that can be used to solve the parallel data proceedin problems. Pyspark handles the complexities of multiprocessing, such as distributing the data, distributing code and collecting output from the workers on a cluster of machines.

Spark can run standalone but most often runs on top of a cluster computing framework such as Hadoop. In test and development, however, a data scientist can efficiently run Spark on their development boxes or laptops without a cluster

• One of the main advantages of Spark is to build an architecture that encompasses data streaming management, seamlessly data queries, machine learning prediction and real-time access to various analysis.

• Spark works closely with SQL language, i.e., structured data. It allows querying the data in real time.

• Data scientist main’s job is to analyze and build predictive models. In short, a data scientist needs to know how to query data using SQL, produce a statistical report and make use of machine learning to produce predictions. Data scientist spends a significant amount of their time on cleaning, transforming and analyzing the data. Once the dataset or data workflow is ready, the data scientist uses various techniques to discover insights and hidden patterns. The data manipulation should be robust and the same easy to use. Spark is the right tool thanks to its speed and rich APIs.

In this tutorial, you will learn how to build a classifier with Pyspark.

Launch Pyspark with AWS

The Jupyter team build a Docker image to run Spark efficiently. You can follow this step to launch Spark instance in AWS.

См. Наш учебник по AWS и TensorFlow

Шаг 1. Создайте экземпляр

Прежде всего, вам нужно создать экземпляр. Перейдите в свою учетную запись AWS и запустите экземпляр. Вы можете увеличить объем хранилища до 15 г и использовать ту же группу безопасности, что и в руководстве TensorFlow.

Шаг 2: Откройте соединение

Откройте соединение и установите Docker-контейнер. Для получения более подробной информации, обратитесь к учебнику с TensorFlow с Docker. Обратите внимание, что вы должны быть в правильном рабочем каталоге.

Просто запустите эти коды для установки Docker:

sudo yum update -y
sudo yum install -y docker
sudo service docker start
sudo user-mod -a -G docker ec2-user
exit

Шаг 3: снова откройте соединение и установите Spark

После того, как вы снова откроете соединение, вы можете установить образ, содержащий Pyspark.

## Spark
docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook

## Allow preserving Jupyter notebook
sudo chown 1000 ~/work

## Install tree to see our working directory next
sudo yum install -y tree

Шаг 4: Откройте Jupyter

Проверьте контейнер и его название

docker ps			

Запустите докер с журналами докера, за которым следует имя докера. Например, докер регистрирует журнал zealous_goldwasser

Зайдите в ваш браузер и запустите Jupyter. Адрес: http: // localhost: 8888 /. Вставьте пароль, заданный терминалом.

Примечание . Если вы хотите загрузить / загрузить файл на свой компьютер AWS, вы можете использовать программное обеспечение Cyberduck, https://cyberduck.io/ .

Установите Pyspark на Mac / Windows с помощью Conda

Для установки Spark на локальный компьютер рекомендуется создать новую среду conda. Эта новая среда установит Python 3.6, Spark и все зависимости.

Пользователь Mac

cd anaconda3
touch hello-spark.yml
vi hello-spark.yml

Пользователь Windows

cd C:\Users\Admin\Anaconda3
echo.>hello-spark.yml
notepad hello-spark.yml

Вы можете редактировать файл .yml. Будьте осторожны с отступом. Два пробела требуются перед —

name: hello-spark 
    dependencies:
    
    - python=3.6
    - jupyter
    - ipython
    - numpy
    - numpy-base
    - pandas
    - py4j
    - pyspark
    - pytz

Сохраните это и создайте окружающую среду. Это займет некоторое время

conda env create -f hello-spark.yml			

Для получения более подробной информации о местоположении, пожалуйста, ознакомьтесь с руководством Установить TensorFlow

Вы можете проверить всю среду, установленную на вашем компьютере

conda env list			
Activate hello-spark

Пользователь Mac

source activate hello-spark			

Пользователь Windows

activate hello-spark			

Примечание. Вы уже создали специальную среду TensorFlow для запуска учебников по TensorFlow. Удобнее создавать новую среду, отличную от hello-tf. Нет смысла перегружать hello-tf Spark или любыми другими библиотеками машинного обучения.

Представьте, что большая часть вашего проекта связана с TensorFlow, но вам нужно использовать Spark для одного конкретного проекта. Вы можете установить среду TensorFlow для всего вашего проекта и создать отдельную среду для Spark. Вы можете добавить столько библиотек в среду Spark, сколько захотите, не вмешиваясь в среду TensorFlow. Как только вы закончите с проектом Spark, вы можете стереть его, не затрагивая среду TensorFlow.

Jupyter

Откройте Jupyter Notebook и попробуйте, если PySpark работает. В новой записной книжке вставьте следующий код:

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Если отображается ошибка, скорее всего, Java не установлена ​​на вашем компьютере. В Mac откройте терминал и напишите java -version, если есть java-версия, убедитесь, что она 1.8. В Windows перейдите в Приложение и проверьте, есть ли папка Java. Если есть папка Java, убедитесь, что установлена ​​Java 1.8. На момент написания статьи PySpark не совместим с Java9 и выше.

Если вам нужно установить Java, вы должны подумать о ссылке и скачать jdk-8u181-windows-x64.exe

Для пользователей Mac рекомендуется использовать «brew».

brew tap caskroom/versions
brew cask install java8

Ознакомьтесь с этим пошаговым руководством по установке Java

Примечание . Используйте команду «Удалить», чтобы полностью стереть среду.

 conda env remove -n hello-spark -y			

Spark Context

SparkContext является внутренним механизмом, который позволяет соединения с кластерами. Если вы хотите запустить операцию, вам нужен SparkContext.

Создать SparkContext

Прежде всего, вам нужно инициировать SparkContext.

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Теперь, когда SparkContext готов, вы можете создать коллекцию данных с именем RDD, Resilient Distributed Dataset. Вычисления в RDD автоматически распараллеливаются по всему кластеру.

nums= sc.parallelize([1,2,3,4])			

Вы можете получить доступ к первой строке с дублем

nums.take(1) 			
[1]			

Вы можете применить преобразование к данным с помощью лямбда-функции. В приведенном ниже примере вы возвращаете квадрат чисел. Это преобразование карты

squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i ' % (num))
1 
4 
9 
16 			

SQLContext

Более удобный способ — использовать DataFrame. SparkContext уже установлен, вы можете использовать его для создания dataFrame. Вам также нужно объявить SQLContext

SQLContext позволяет подключать движок к разным источникам данных. Он используется для запуска функций Spark SQL.

from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

Давайте создадим список кортежей. Каждый кортеж будет содержать имя людей и их возраст. Требуются четыре шага:

Шаг 1) Создайте список кортежей с информацией

[('John',19),('Smith',29),('Adam',35),('Henry',50)]			

Шаг 2) Постройте RDD

rdd = sc.parallelize(list_p)			

Шаг 3) Конвертировать кортежи

rdd.map(lambda x: Row(name=x[0], age=int(x[1])))			

Шаг 4) Создайте контекст DataFrame

sqlContext.createDataFrame(ppl)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

Если вы хотите получить доступ к типу каждой функции, вы можете использовать printSchema ()

DF_ppl.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

Машинное обучение с Spark

Теперь, когда у вас есть краткое представление о Spark и SQLContext, вы готовы создать свою первую программу машинного обучения.

Вы будете действовать следующим образом:

  • Шаг 1) Основные операции с PySpark
  • Шаг 2) Предварительная обработка данных
  • Шаг 3) Построить конвейер обработки данных
  • Шаг 4) Построить классификатор
  • Шаг 5) Обучите и оцените модель
  • Шаг 6) Настройте гиперпараметр

В этом уроке мы будем использовать набор данных для взрослых. Цель этого урока — научиться пользоваться Pyspark. Для получения дополнительной информации о наборе данных обратитесь к этому руководству.

Обратите внимание, что набор данных не имеет значения, и вы можете подумать, что вычисление занимает много времени. Spark предназначен для обработки значительного объема данных. Производительность Spark увеличивается по сравнению с другими библиотеками машинного обучения, когда объем обрабатываемого набора данных увеличивается.

Шаг 1) Основные операции с PySpark

Прежде всего, вам нужно инициализировать SQLContext, который еще не запущен.

#from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

затем вы можете прочитать файл cvs с помощью sqlContext.read.csv. Вы используете inferSchema со значением True, чтобы Spark автоматически угадывал тип данных. По умолчанию это ложь.

df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)			

Давайте посмотрим на тип данных

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

Вы можете увидеть данные с шоу.

df.show(5, truncate = False)			
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|age|workclass       |fnlwgt|education|education_num|marital           |occupation       |relationship |race |sex   |capital_gain|capital_loss|hours_week|native_country|label|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|39 |State-gov       |77516 |Bachelors|13           |Never-married     |Adm-clerical     |Not-in-family|White|Male  |2174        |0           |40        |United-States |<=50K|
|50 |Self-emp-not-inc|83311 |Bachelors|13           |Married-civ-spouse|Exec-managerial  |Husband      |White|Male  |0           |0           |13        |United-States |<=50K|
|38 |Private         |215646|HS-grad  |9            |Divorced          |Handlers-cleaners|Not-in-family|White|Male  |0           |0           |40        |United-States |<=50K|
|53 |Private         |234721|11th     |7            |Married-civ-spouse|Handlers-cleaners|Husband      |Black|Male  |0           |0           |40        |United-States |<=50K|
|28 |Private         |338409|Bachelors|13           |Married-civ-spouse|Prof-specialty   |Wife         |Black|Female|0           |0           |40        |Cuba          |<=50K|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
only showing top 5 rows

Если вы не установили inderShema в True, вот что происходит с типом. Есть все в строке.

df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema=  False)
df_string.printSchema()
root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

Чтобы преобразовать непрерывную переменную в правильный формат, вы можете использовать переделать столбцы. Вы можете использовать withColumn, чтобы сообщить Spark, какой столбец должен выполнять преобразование.

# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()
root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel")
#model = stringIndexer.fit(df)
#df = model.transform(df)
df.printSchema()

Выберите столбцы

Вы можете выбрать и показать строки с помощью выбора и имен объектов. Ниже, возраст и fnlwgt выбраны.

df.select('age','fnlwgt').show(5)			
+---+------+
|age|fnlwgt|
+---+------+
| 39| 77516|
| 50| 83311|
| 38|215646|
| 53|234721|
| 28|338409|
+---+------+
only showing top 5 rows

Подсчет по группе

Если вы хотите посчитать количество вхождений по группам, вы можете связать:

  • группа по()
  • кол-()

все вместе. В приведенном ниже примере вы подсчитываете количество строк по уровню образования.

df.groupBy("education").count().sort("count",ascending=True).show()			
+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   51|
|     1st-4th|  168|
|     5th-6th|  333|
|   Doctorate|  413|
|        12th|  433|
|         9th|  514|
| Prof-school|  576|
|     7th-8th|  646|
|        10th|  933|
|  Assoc-acdm| 1067|
|        11th| 1175|
|   Assoc-voc| 1382|
|     Masters| 1723|
|   Bachelors| 5355|
|Some-college| 7291|
|     HS-grad|10501|
+------------+-----+

Опишите данные

Чтобы получить сводную статистику данных, вы можете использовать description (). Это вычислит:

  • подсчитывать
  • жадный
  • среднеквадратичное отклонение
  • мин
  • Максимум
df.describe().show()			
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|summary|               age|  workclass|            fnlwgt|   education|    education_num| marital|      occupation|relationship|              race|   sex|      capital_gain|    capital_loss|        hours_week|native_country|label|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|  count|             32561|      32561|             32561|       32561|            32561|   32561|           32561|       32561|             32561| 32561|             32561|           32561|             32561|         32561|32561|
|   mean| 38.58164675532078|       null|189778.36651208502|        null| 10.0806793403151|    null|            null|        null|              null|  null|1077.6488437087312| 87.303829734959|40.437455852092995|          null| null|
| stddev|13.640432553581356|       null|105549.97769702227|        null|2.572720332067397|    null|            null|        null|              null|  null| 7385.292084840354|402.960218649002|12.347428681731838|          null| null|
|    min|                17|          ?|             12285|        10th|                1|Divorced|               ?|     Husband|Amer-Indian-Eskimo|Female|                 0|               0|                 1|             ?|<=50K|
|    max|                90|Without-pay|           1484705|Some-college|               16| Widowed|Transport-moving|        Wife|             White|  Male|             99999|            4356|                99|    Yugoslavia| >50K|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+

Если вы хотите получить сводную статистику только по одному столбцу, добавьте имя столбца внутри description ()

df.describe('capital_gain').show()			
+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840354|
|    min|                 0|
|    max|             99999|
+-------+------------------+

Кросс-таблица вычислений

В некоторых случаях может быть интересно увидеть описательную статистику между двумя попарно столбцами. Например, вы можете посчитать количество людей с доходом ниже или выше 50 000 в зависимости от уровня образования. Эта операция называется кросс-таблицей.

df.crosstab('age', 'label').sort("age_label").show()			
+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|       17|  395|   0|
|       18|  550|   0|
|       19|  710|   2|
|       20|  753|   0|
|       21|  717|   3|
|       22|  752|  13|
|       23|  865|  12|
|       24|  767|  31|
|       25|  788|  53|
|       26|  722|  63|
|       27|  754|  81|
|       28|  748| 119|
|       29|  679| 134|
|       30|  690| 171|
|       31|  705| 183|
|       32|  639| 189|
|       33|  684| 191|
|       34|  643| 243|
|       35|  659| 217|
|       36|  635| 263|
+---------+-----+----+
only showing top 20 rows

Вы можете видеть, что у людей нет дохода выше 50 тысяч, когда они молоды.

Опустить столбец

Существует два интуитивно понятных API для удаления столбцов:

  • drop (): удалить столбец
  • dropna (): отбросить NA

Ниже вы опускаете столбец education_num

df.drop('education_num').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

Фильтровать данные

Вы можете использовать filter () для применения описательной статистики в подмножестве данных. Например, вы можете посчитать количество людей старше 40 лет

df.filter(df.age > 40).count()			

13443

Описательная статистика по группам

Наконец, вы можете группировать данные по группам и вычислять статистические операции, такие как среднее значение.

df.groupby('marital').agg({'capital_gain': 'mean'}).show()			
+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|           Separated| 535.5687804878049|
|       Never-married|376.58831788823363|
|Married-spouse-ab...| 653.9832535885167|
|            Divorced| 728.4148098131893|
|             Widowed| 571.0715005035247|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
+--------------------+------------------+

Шаг 2) Предварительная обработка данных

Обработка данных является важным шагом в машинном обучении. После того, как вы удалите данные мусора, вы получите некоторые важные идеи. Например, вы знаете, что возраст не является линейной функцией дохода. Когда люди молоды, их доход обычно ниже, чем в среднем возрасте. После выхода на пенсию домохозяйство использует свои сбережения, что означает уменьшение дохода. Чтобы захватить этот шаблон, вы можете добавить квадрат к возрасту

Добавить возрастной квадрат

Чтобы добавить новую функцию, вам необходимо:

  1. Выберите столбец
  2. Примените преобразование и добавьте его в DataFrame.
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)

Вы можете видеть, что age_square был успешно добавлен во фрейм данных. Вы можете изменить порядок переменных с помощью выбора. Ниже вы приводите age_square сразу после возраста.

COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']
df = df.select(COLUMNS)
df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')

Исключить Голанд-Нидерланды

Когда группа в объекте имеет только одно наблюдение, она не приносит информации в модель. Наоборот, это может привести к ошибке во время перекрестной проверки.

Давайте проверим происхождение домашнего хозяйства

df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|             Hungary|                   13|
|            Honduras|                   13|
|Outlying-US(Guam-...|                   14|
|          Yugoslavia|                   16|
|            Thailand|                   18|
|                Laos|                   18|
|            Cambodia|                   19|
|     Trinadad&Tobago|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|               Haiti|                   44|
+--------------------+---------------------+
only showing top 20 rows

Функция native_country имеет только одну семью из Нидерландов. Вы исключаете это.

df_remove = df.filter(df.native_country !=	'Holand-Netherlands')			

Шаг 3) Построить конвейер обработки данных

Подобно scikit-learn, Pyspark имеет конвейерный API. Конвейер очень удобен для поддержания структуры данных. Вы помещаете данные в конвейер. Внутри конвейера выполняются различные операции, выходные данные используются для подачи алгоритма.

Например, одно универсальное преобразование в машинном обучении состоит в преобразовании строки в один горячий кодировщик, то есть один столбец группой. Один горячий кодировщик обычно представляет собой матрицу, полную нулей.

Шаги для преобразования данных очень похожи на scikit-learn. Тебе следует:

  • Индексируйте строку в числовой
  • Создайте один горячий кодировщик
  • Преобразовать данные

Два API делают работу: StringIndexer, OneHotEncoder

  1. Прежде всего, вы выбираете столбец строки для индексации. InputCol — это имя столбца в наборе данных. outputCol — это новое имя, данное преобразованному столбцу.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")		
  1. Подгоните данные и преобразуйте их
model = stringIndexer.fit(df)		
`indexed = model.transform(df)``
  1. Создайте колонки новостей на основе группы. Например, если в объекте 10 групп, в новой матрице будет 10 столбцов, по одному для каждой группы.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
|age|age_square|       workclass|fnlwgt|education|education_num|           marital|     occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
| 39|    1521.0|       State-gov| 77516|Bachelors|           13|     Never-married|   Adm-clerical|Not-in-family|White|Male|        2174|           0|        40| United-States|<=50K|              4.0|(9,[4],[1.0])|
| 50|    2500.0|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|Exec-managerial|      Husband|White|Male|           0|           0|        13| United-States|<=50K|              1.0|(9,[1],[1.0])|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
only showing top 2 rows

Построить трубопровод

Вы создадите конвейер для преобразования всех точных объектов и добавления их в окончательный набор данных. В конвейере будет четыре операции, но вы можете добавить столько операций, сколько захотите.

  1. Кодировать категориальные данные
  2. Индексируйте метку
  3. Добавить непрерывную переменную
  4. Соберите шаги.

Каждый шаг хранится в списке названных этапов. Этот список расскажет VectorAssembler, какую операцию выполнить внутри конвейера.

1. Кодировать категориальные данные

Этот шаг точно такой же, как в приведенном выше примере, за исключением того, что вы перебираете все категориальные функции.

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

2. Индексируйте метку

Spark, как и многие другие библиотеки, не принимает строковые значения для метки. Вы конвертируете функцию метки с помощью StringIndexer и добавляете ее на стадии списка

# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

3. Добавить непрерывную переменную

InputCols объекта VectorAssembler представляет собой список столбцов. Вы можете создать новый список, содержащий все новые столбцы. Код ниже пополняет список закодированными категориальными функциями и непрерывными функциями.

assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

4. Соберите шаги.

Наконец, вы проходите все шаги в VectorAssembler

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]			

Теперь, когда все шаги готовы, вы отправляете данные в конвейер.

# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

Если вы проверите новый набор данных, вы увидите, что он содержит все функции, преобразованные и не преобразованные. Вас интересует только новая маркировка и особенности. Особенности включают в себя все преобразованные функции и непрерывные переменные.

model.take(1)

[Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]

Шаг 4) Построить классификатор: логистика

Чтобы ускорить вычисления, вы конвертируете модель в DataFrame. Вам нужно выбрать новую метку и объекты из модели, используя карту.

from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

Вы готовы создать данные поезда в виде DataFrame. Вы используете sqlContext

df_train = sqlContext.createDataFrame(input_data, ["label", "features"])			

Проверьте второй ряд

df_train.show(2)			
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows

Создать поезд / тестовый набор

Вы разделяете набор данных 80/20 с помощью randomSplit.

# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

Давайте посчитаем, сколько людей с доходом ниже / выше 50 тыс. Как в тренировочном, так и в тестовом наборе

train_data.groupby('label').agg({'label': 'count'}).show()			
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       19698|
|  1.0|        6263|
+-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()			
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+

Построить логистический регрессор

И последнее, но не менее важное: вы можете построить классификатор. Pyspark имеет API, который называется LogisticRegression, для выполнения логистической регрессии.

Вы инициализируете lr, указывая столбец метки и столбцы объектов. Вы устанавливаете максимум 10 итераций и добавляете параметр регуляризации со значением 0,3. Обратите внимание, что в следующем разделе вы будете использовать перекрестную проверку с сеткой параметров для настройки модели.

# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

# Вы можете увидеть коэффициенты из регрессии

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698]
Intercept: -1.9884177974805692

Шаг 5) Обучите и оцените модель

Чтобы сгенерировать прогноз для вашего тестового набора, вы можете использовать linearModel с transform () для test_data

# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

Вы можете распечатать элементы в прогнозах

predictions.printSchema()
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

Вас интересует ярлык, прогноз и вероятность

selected = predictions.select("label", "prediction", "probability")
selected.show(20)			
+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.91560704124179...|
|  0.0|       0.0|[0.92812140213994...|
|  0.0|       0.0|[0.92161406774159...|
|  0.0|       0.0|[0.96222760777142...|
|  0.0|       0.0|[0.66363283056957...|
|  0.0|       0.0|[0.65571324475477...|
|  0.0|       0.0|[0.73053376932829...|
|  0.0|       1.0|[0.31265053873570...|
|  0.0|       0.0|[0.80005907577390...|
|  0.0|       0.0|[0.76482251301640...|
|  0.0|       0.0|[0.84447301189069...|
|  0.0|       0.0|[0.75691912026619...|
|  0.0|       0.0|[0.60902504096722...|
|  0.0|       0.0|[0.80799228385509...|
|  0.0|       0.0|[0.87704364852567...|
|  0.0|       0.0|[0.83817652582377...|
|  0.0|       0.0|[0.79655423248500...|
|  0.0|       0.0|[0.82712311232246...|
|  0.0|       0.0|[0.81372823882016...|
|  0.0|       0.0|[0.59687710752201...|
+-----+----------+--------------------+
only showing top 20 rows

Оценить модель

Вам нужно посмотреть на метрику точности, чтобы увидеть, насколько хорошо (или плохо) модель работает. В настоящее время в Spark нет API для вычисления показателя точности. Значением по умолчанию является ROC, кривая рабочих характеристик приемника. Это разные метрики, которые учитывают ложноположительный показатель.

Прежде чем вы посмотрите на ROC, давайте построим меру точности. Вы более знакомы с этой метрикой. Мера точности — это сумма правильного прогноза по общему количеству наблюдений.

Вы создаете DataFrame с меткой и `предсказанием.

cm = predictions.select("label", "prediction")			

Вы можете проверить номер класса в метке и прогноз

cm.groupby('label').agg({'label': 'count'}).show()			
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()			
+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             5982|
|       1.0|              617|
+----------+-----------------+

Например, в тестовом наборе имеется 1578 домашних хозяйств с доходом выше 50 тысяч, а ниже 5021. Классификатор, однако, предсказал 617 домашних хозяйств с доходом выше 50 тысяч.

Вы можете вычислить точность путем вычисления количества, когда метка правильно классифицирована по общему количеству строк.

cm.filter(cm.label == cm.prediction).count() / cm.count()			

0,8237611759357478

Вы можете обернуть все вместе и написать функцию для вычисления точности.

def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)
Model accuracy: 82.376%

ROC metrics

The module BinaryClassificationEvaluator includes the ROC measures. The Receiver Operating Characteristic curve is another common tool used with binary classification. It is very similar to the precision/recall curve, but instead of plotting precision versus recall, the ROC curve shows the true positive rate (i.e. recall) against the false positive rate. The false positive rate is the ratio of negative instances that are incorrectly classified as positive. It is equal to one minus the true negative rate. The true negative rate is also called specificity. Hence the ROC curve plots sensitivity (recall) versus 1 — specificity

### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8940481662695192areaUnderROC

print(evaluator.evaluate(predictions))			

0.8940481662695192

Step 6) Tune the hyperparameter

Last but not least, you can tune the hyperparameters. Similar to scikit learn you create a parameter grid, and you add the parameters you want to tune. To reduce the time of the computation, you only tune the regularization parameter with only two values.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

Finally, you evaluate the model with using the cross valiation method with 5 folds. It takes around 16 minutes to train.

from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Время на тренировку модели: 978,880 секунд.

Лучший гиперпараметр регуляризации — 0,01 с точностью 85,316%.

accuracy_m(model = cvModel)
Model accuracy: 85.316%

Вы можете извлечь рекомендуемый параметр, связав cvModel.bestModel с помощью extractParamMap ()

bestModel = cvModel.bestModel
bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}

Резюме

Spark — это фундаментальный инструмент для исследователя данных. Это позволяет практикующему врачу подключить приложение к различным источникам данных, беспрепятственно выполнять анализ данных или добавлять прогностическую модель.

Чтобы начать с Spark, вам нужно запустить контекст Spark с помощью:

`SparkContext ()` `

и и контекст SQL для подключения к источнику данных:

`SQLContext ()` `

В этом уроке вы научитесь тренировать логистическую регрессию:

  1. Преобразуйте набор данных в Dataframe с помощью:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
sqlContext.createDataFrame(input_data, ["label", "features"])

Обратите внимание, что имя столбца метки — newlabel, и все функции объединены в функции. Измените эти значения, если они отличаются в вашем наборе данных.

  1. Создать поезд / тестовый набор
randomSplit([.8,.2],seed=1234)
  1. Тренируй модель
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
  1. Сделать прогноз
linearModel.transform()