В этом примере Apache Spark Machine Learning будет представлен Spark MLlib и рассмотрен исходный код Scala. В этом посте и сопровождающих видео роликах демонстрируется специальное приложение драйвера Spark MLlib Spark. Затем будет исследован исходный код Spark MLLib Scala. Будет показано и объяснено много тем, но сначала давайте опишем несколько концепций машинного обучения.
Ключевые понятия машинного обучения
Что такое машинное обучение?
Машинное обучение — это создание и использование моделей, которые изучаются на основе данных. Вы также можете услышать машинное обучение, называемое прогнозным моделированием или интеллектуальным анализом данных.
Каковы три примера машинного обучения?
- прогнозирование спама
- прогноз мошеннических операций с кредитными картами
- механизм рекомендаций по продукту или рекламе
Существует два типа моделей машинного обучения: контролируемые и неконтролируемые. Контролируемые модели содержат набор данных, помеченных правильными ответами, в то время как неконтролируемые не содержат меток.
Примеры контролируемых моделей машинного обучения
- k-ближайшие соседи — пример: предсказать, как человек может голосовать, если вы знаете, как голосуют его соседи
- Наивный байес — пример: определить, является ли входящее письмо спамом
- линейная регрессия — попытайтесь определить, коррелированы ли две переменные
- деревья решений — использует структуру для представления ряда возможных путей принятия решений и результатов для каждого пути
Примеры неконтролируемых моделей машинного обучения
- кластеризация — работает с немечеными данными и пытается «кластеризовать» их. Например, набор данных, показывающий, где живут миллионеры, вероятно, имеет кластеры в таких местах, как Беверли-Хиллз и Манхэттен
- Латентный анализ Дирихле (LDA) — обработка на естественном языке, обычно используемая для определения общих тем в тексте или наборе документов
- нейронные сети — примеры: распознавание рукописного ввода и распознавание изображения лица
При построении моделей, используемых для прогнозирования, мы часто обучаем модель на основе существующего набора данных. Модель может быть переобучена много раз, поскольку становится доступным все больше и больше наборов данных для обучения. Например, мы переобучим механизм рекомендаций, основанный на коллаборативной фильтрации, когда узнаем больше о событиях, которые привели к продажам продукта или целевым показателям вовлеченности.
Пример машинного обучения Apache Spark (со Scala)
Давайте покажем демонстрацию программы машинного обучения Apache Spark. В следующей демонстрации мы начинаем с обучения модели кластеризации k-средних, а затем используем эту обученную модель для прогнозирования языка входящего текстового потока из Slack.
Этот пример основан на предыдущем учебном руководстве по Apache Spark Streaming, которое передает данные с сайта команды Slack. Эти и другие видео также доступны на наших курсах обучения Spark. См. Раздел Ресурсы ниже для ссылок.
Но давайте перейдем к демонстрации:
Обзор исходного кода Scala машинного обучения Apache
Хорошо, теперь, когда мы имеем в виду демо, давайте рассмотрим соответствующий код Spark MLLib. Опять же, ссылки на исходный код можно найти в разделе Ресурсы ниже. Давайте начнем со вступления в наш пример Spark Machine Learning и того, что называлось во время развертывания с помощью spark-submit в демоверсии, SlackMLApp:
Пример Spark Machine Learning со Scala
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
object SlackMLApp { object Config { @Parameter(names = Array("-st", "--slackToken")) var slackToken: String = null @Parameter(names = Array("-nc", "--numClusters")) var numClusters: Int = 4 @Parameter(names = Array("-po", "--predictOutput")) var predictOutput: String = null @Parameter(names = Array("-td", "--trainData")) var trainData: String = null @Parameter(names = Array("-ml", "--modelLocation")) var modelLocation: String = null } def main(args: Array[String]) { new JCommander(Config, args.toArray: _*) val conf = new SparkConf().setAppName("SlackStreamingWithML") val sparkContext = new SparkContext(conf) // optain existing or create new model val clusters: KMeansModel = if (Config.trainData != null) { KMeanTrainTask.train(sparkContext, Config.trainData, Config.numClusters, Config.modelLocation) } else { if (Config.modelLocation != null) { new KMeansModel(sparkContext.objectFile[Vector](Config.modelLocation).collect()) } else { throw new IllegalArgumentException("Either modelLocation or trainData should be specified") } } if (Config.slackToken != null) { SlackStreamingTask.run(sparkContext, Config.slackToken, clusters, Config.predictOutput) } } } |
Код выше содержит то, что содержит main метод и то, что вызывается из spark-submit . Как вы можете надеяться, вы можете либо обучить новую модель, либо использовать существующую модель при запуске SlackStreamingTask. Это зависит от входящих аргументов командной строки, таких как trainData , modelLocation и slackToken .
В этом примере анализа и обзора исходного кода машинного обучения Spark мы сосредоточимся на следующем: 1) код, используемый для обучения модели в KMeanTrainTask и 2) использование модели для прогнозирования в SlackStreamingTask.
Во-первых, давайте откроем соответствующую часть KMeanTrainTask
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
|
def train(sparkContext: SparkContext, trainData: String, numClusters: Int, modelLocation: String): KMeansModel = { if (new File(modelLocation).exists) removePrevious(modelLocation) val trainRdd = sparkContext.textFile(trainData) val parsedData = trainRdd.map(Utils.featurize).cache() // if we had a really large data set to train on, we'd want to call an action to trigger cache. val model = KMeans.train(parsedData, numClusters, numIterations) sparkContext.makeRDD(model.clusterCenters, numClusters).saveAsObjectFile(modelLocation) val example = trainRdd.sample(withReplacement = false, 0.1).map(s => (s, model.predict(Utils.featurize(s)))).collect() println("Prediction examples:") example.foreach(println) model} |
При вызове train мы попытаемся удалить любую ранее сохраненную модель в removePrevious . ( removePrevious не показано, поскольку это не имеет отношения к нашему removePrevious к машинному обучению с помощью Apache Spark.) Итак, давайте trainRdd новый RDD с именем trainRdd . Поскольку textFile принимает аргумент String каталога, он будет читать все файлы, содержащиеся в каталоге, который мы вызвали с помощью «input».
Затем мы должны преобразовать элементы (строки текста) в СДР в формат, подходящий для KMeans. Мы делаем это, вызывая Utils.featurize который выглядит следующим образом:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
object Utils { val NUM_DEMENSIONS: Int = 1000 val tf = new HashingTF(NUM_DEMENSIONS) /** * This uses min hash algorithm https://en.wikipedia.org/wiki/MinHash to transform * string to vector of double, which is required for k-means */ def featurize(s: String): Vector = { tf.transform(s.sliding(2).toSeq) } } |
Теперь, если мы вернемся к нашему объекту задачи KMeansTrain, мы сможем обучить нашу модель, используя функцию parsedData с parsedData и numClusters и numIterations . После этого мы сохраняем модель и отправляем несколько example прогнозов кластеризации на консоль, повторяя пример и отправляя в println .
Теперь, когда у нас есть обученная модель, давайте посмотрим SlackStreamingTask
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
bject SlackStreamingTask { def run(sparkContext: SparkContext, slackToken: String, clusters: KMeansModel, predictOutput: String) { val ssc = new StreamingContext(sparkContext, Seconds(5)) val dStream = ssc.receiverStream(new SlackReceiver(slackToken)) val stream = dStream //create stream of events from the Slack... but filter and marshall to JSON stream data .filter(JSON.parseFull(_).get.asInstanceOf[Map[String, String]]("type") == "message") // get only message events .map(JSON.parseFull(_).get.asInstanceOf[Map[String, String]]("text")) // extract message text from the event val kmeanStream = kMean(stream, clusters) // create K-mean model kmeanStream.print() // print k-mean results. It is pairs (k, m), where k - is a message text, m - is a cluster number to which message relates if (predictOutput != null) { kmeanStream.saveAsTextFiles(predictOutput) // save to results to the file, if file name specified } ssc.start() // run spark streaming application ssc.awaitTermination() // wait the end of the application } /** * transform stream of strings to stream of (string, vector) pairs and set this stream as input data for prediction */ def kMean(dStream: DStream[String], clusters: KMeansModel): DStream[(String, Int)] = { dStream.map(s => (s, Utils.featurize(s))).map(p => (p._1, clusters.predict(p._2))) } } |
Код Spark MLlib, который делает предсказания кластеризации с ранее сохраненной моделью, является clusters.predict . Прежде чем он вызывается, мы наносим на карту DStream и снова используем featurize , чтобы использовать его с featurize . Мы возвращаем DStream с исходным текстом, полученным из Slack, и с предсказанным кластером.
Если бы программа драйвера Spark была вызвана со значениемgnastOutput, выходные данные были бы сохранены как текстовые файлы.
Вот еще один скриншот, в котором я описываю код более подробно.
Ресурсы
Исходный код: https://github.com/tmcgrath/spark-course/tree/master/spark-ml
История машинного обучения: http://www.slideshare.net/ToddMcGrath1/machine-learning-with-apache-spark-62310284
Spark MLlib: http://www.slideshare.net/ToddMcGrath1/machine-learning-with-spark-mllib
Глава «Машинное обучение Spark» из раздела «Изучение Spark»
| Ссылка: | Пример машинного обучения Apache Spark с Scala от нашего партнера по JCG Тодда МакГрата в блоге Supergloo . |