Статьи

Пример машинного обучения Apache Spark со Scala

В этом примере Apache Spark Machine Learning будет представлен Spark MLlib и рассмотрен исходный код Scala. В этом посте и сопровождающих видео роликах демонстрируется специальное приложение драйвера Spark MLlib Spark. Затем будет исследован исходный код Spark MLLib Scala. Будет показано и объяснено много тем, но сначала давайте опишем несколько концепций машинного обучения.

Ключевые понятия машинного обучения

Что такое машинное обучение?

Машинное обучение — это создание и использование моделей, которые изучаются на основе данных. Вы также можете услышать машинное обучение, называемое прогнозным моделированием или интеллектуальным анализом данных.

Каковы три примера машинного обучения?

  • прогнозирование спама
  • прогноз мошеннических операций с кредитными картами
  • механизм рекомендаций по продукту или рекламе

Существует два типа моделей машинного обучения: контролируемые и неконтролируемые. Контролируемые модели содержат набор данных, помеченных правильными ответами, в то время как неконтролируемые не содержат меток.

Примеры контролируемых моделей машинного обучения

  • k-ближайшие соседи — пример: предсказать, как человек может голосовать, если вы знаете, как голосуют его соседи
  • Наивный байес — пример: определить, является ли входящее письмо спамом
  • линейная регрессия — попытайтесь определить, коррелированы ли две переменные
  • деревья решений — использует структуру для представления ряда возможных путей принятия решений и результатов для каждого пути

Примеры неконтролируемых моделей машинного обучения

  • кластеризация — работает с немечеными данными и пытается «кластеризовать» их. Например, набор данных, показывающий, где живут миллионеры, вероятно, имеет кластеры в таких местах, как Беверли-Хиллз и Манхэттен
  • Латентный анализ Дирихле (LDA) — обработка на естественном языке, обычно используемая для определения общих тем в тексте или наборе документов
  • нейронные сети — примеры: распознавание рукописного ввода и распознавание изображения лица

При построении моделей, используемых для прогнозирования, мы часто обучаем модель на основе существующего набора данных. Модель может быть переобучена много раз, поскольку становится доступным все больше и больше наборов данных для обучения. Например, мы переобучим механизм рекомендаций, основанный на коллаборативной фильтрации, когда узнаем больше о событиях, которые привели к продажам продукта или целевым показателям вовлеченности.

Пример машинного обучения Apache Spark (со Scala)

Давайте покажем демонстрацию программы машинного обучения Apache Spark. В следующей демонстрации мы начинаем с обучения модели кластеризации k-средних, а затем используем эту обученную модель для прогнозирования языка входящего текстового потока из Slack.

Этот пример основан на предыдущем учебном руководстве по Apache Spark Streaming, которое передает данные с сайта команды Slack. Эти и другие видео также доступны на наших курсах обучения Spark. См. Раздел Ресурсы ниже для ссылок.

Но давайте перейдем к демонстрации:

Обзор исходного кода Scala машинного обучения Apache

Хорошо, теперь, когда мы имеем в виду демо, давайте рассмотрим соответствующий код Spark MLLib. Опять же, ссылки на исходный код можно найти в разделе Ресурсы ниже. Давайте начнем со вступления в наш пример Spark Machine Learning и того, что называлось во время развертывания с помощью spark-submit в демоверсии, SlackMLApp:

Пример Spark Machine Learning со Scala

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
object SlackMLApp {
  
  object Config {
    @Parameter(names = Array("-st", "--slackToken"))
    var slackToken: String = null
    @Parameter(names = Array("-nc", "--numClusters"))
    var numClusters: Int = 4
    @Parameter(names = Array("-po", "--predictOutput"))
    var predictOutput: String = null
    @Parameter(names = Array("-td", "--trainData"))
    var trainData: String = null
    @Parameter(names = Array("-ml", "--modelLocation"))
    var modelLocation: String = null
  }
  
  def main(args: Array[String]) {
    new JCommander(Config, args.toArray: _*)
    val conf = new SparkConf().setAppName("SlackStreamingWithML")
    val sparkContext = new SparkContext(conf)
     
    // optain existing or create new model
    val clusters: KMeansModel =
      if (Config.trainData != null) {
        KMeanTrainTask.train(sparkContext, Config.trainData, Config.numClusters, Config.modelLocation)
      } else {
        if (Config.modelLocation != null) {
          new KMeansModel(sparkContext.objectFile[Vector](Config.modelLocation).collect())
        } else {
          throw new IllegalArgumentException("Either modelLocation or trainData should be specified")
        }
      }
  
    if (Config.slackToken != null) {
      SlackStreamingTask.run(sparkContext, Config.slackToken, clusters, Config.predictOutput)
    }
     
  }
  
}

Код выше содержит то, что содержит main метод и то, что вызывается из spark-submit . Как вы можете надеяться, вы можете либо обучить новую модель, либо использовать существующую модель при запуске SlackStreamingTask. Это зависит от входящих аргументов командной строки, таких как trainData , modelLocation и slackToken .

В этом примере анализа и обзора исходного кода машинного обучения Spark мы сосредоточимся на следующем: 1) код, используемый для обучения модели в KMeanTrainTask и 2) использование модели для прогнозирования в SlackStreamingTask.

Во-первых, давайте откроем соответствующую часть KMeanTrainTask

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
def train(sparkContext: SparkContext, trainData: String, numClusters: Int, modelLocation: String): KMeansModel = {
   
  if (new File(modelLocation).exists) removePrevious(modelLocation)
 
  val trainRdd = sparkContext.textFile(trainData)
 
  val parsedData = trainRdd.map(Utils.featurize).cache() 
  // if we had a really large data set to train on, we'd want to call an action to trigger cache.
       
  val model = KMeans.train(parsedData, numClusters, numIterations)
 
  sparkContext.makeRDD(model.clusterCenters, numClusters).saveAsObjectFile(modelLocation)
   
  val example = trainRdd.sample(withReplacement = false, 0.1).map(s => (s, model.predict(Utils.featurize(s)))).collect()
  println("Prediction examples:")
  example.foreach(println)
 
  model
}

При вызове train мы попытаемся удалить любую ранее сохраненную модель в removePrevious . ( removePrevious не показано, поскольку это не имеет отношения к нашему removePrevious к машинному обучению с помощью Apache Spark.) Итак, давайте trainRdd новый RDD с именем trainRdd . Поскольку textFile принимает аргумент String каталога, он будет читать все файлы, содержащиеся в каталоге, который мы вызвали с помощью «input».

Затем мы должны преобразовать элементы (строки текста) в СДР в формат, подходящий для KMeans. Мы делаем это, вызывая Utils.featurize который выглядит следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
object Utils {
  
  val NUM_DEMENSIONS: Int = 1000
  
  val tf = new HashingTF(NUM_DEMENSIONS)
  
  /**
   * This uses min hash algorithm https://en.wikipedia.org/wiki/MinHash to transform
   * string to vector of double, which is required for k-means
   */
  def featurize(s: String): Vector = {
    tf.transform(s.sliding(2).toSeq)
  }
  
}

Теперь, если мы вернемся к нашему объекту задачи KMeansTrain, мы сможем обучить нашу модель, используя функцию parsedData с parsedData и numClusters и numIterations . После этого мы сохраняем модель и отправляем несколько example прогнозов кластеризации на консоль, повторяя пример и отправляя в println .

Теперь, когда у нас есть обученная модель, давайте посмотрим SlackStreamingTask

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
bject SlackStreamingTask {
  
  def run(sparkContext: SparkContext, slackToken: String, clusters: KMeansModel, predictOutput: String) {
    val ssc = new StreamingContext(sparkContext, Seconds(5))
    val dStream = ssc.receiverStream(new SlackReceiver(slackToken))
     
    val stream = dStream //create stream of events from the Slack... but filter and marshall to JSON stream data
      .filter(JSON.parseFull(_).get.asInstanceOf[Map[String, String]]("type") == "message") // get only message events
      .map(JSON.parseFull(_).get.asInstanceOf[Map[String, String]]("text")) // extract message text from the event
  
    val kmeanStream = kMean(stream, clusters) // create K-mean model
    kmeanStream.print() // print k-mean results. It is pairs (k, m), where k - is a message text, m - is a cluster number to which message relates
     
    if (predictOutput != null) {
      kmeanStream.saveAsTextFiles(predictOutput) // save to results to the file, if file name specified
    }
  
    ssc.start() // run spark streaming application
    ssc.awaitTermination() // wait the end of the application
  }
  
  /**
  * transform stream of strings to stream of (string, vector) pairs and set this stream as input data for prediction
  */
  def kMean(dStream: DStream[String], clusters: KMeansModel): DStream[(String, Int)] = {
    dStream.map(s => (s, Utils.featurize(s))).map(p => (p._1, clusters.predict(p._2)))
  }
  
}

Код Spark MLlib, который делает предсказания кластеризации с ранее сохраненной моделью, является clusters.predict . Прежде чем он вызывается, мы наносим на карту DStream и снова используем featurize , чтобы использовать его с featurize . Мы возвращаем DStream с исходным текстом, полученным из Slack, и с предсказанным кластером.

Если бы программа драйвера Spark была вызвана со значениемgnastOutput, выходные данные были бы сохранены как текстовые файлы.

Вот еще один скриншот, в котором я описываю код более подробно.

Ресурсы

Исходный код: https://github.com/tmcgrath/spark-course/tree/master/spark-ml

История машинного обучения: http://www.slideshare.net/ToddMcGrath1/machine-learning-with-apache-spark-62310284

Spark MLlib: http://www.slideshare.net/ToddMcGrath1/machine-learning-with-spark-mllib

Глава «Машинное обучение Spark» из раздела «Изучение Spark»

Учебный курс Spark с Scala

Ссылка: Пример машинного обучения Apache Spark с Scala от нашего партнера по JCG Тодда МакГрата в блоге Supergloo .