Статьи

Spark Streaming Testing с примером Scala

Spark Streaming Testing

Как вы создаете и автоматизируете тестирование приложений Spark Streaming? В этом посте мы покажем пример одного пути в Scala. Этот пост посвящен примерам кода и имеет дополнительный бонус использования плагина покрытия кода.

Являются ли тесты в этом руководстве примерами юнит-тестов? Или это интеграционные тесты? Функциональные тесты? Я не знаю, вы говорите мне в комментариях ниже, если у вас есть мнение. Если бы мне пришлось выбирать, я бы сказал, модульные тесты, потому что мы заглушаем потокового провайдера.

Предпосылки

Как я уверен, вы можете догадаться, вам понадобится немного кода Spark Streaming Scala для тестирования. Мы собираемся использовать наш пример Spark Streaming из кода Slack в этом посте. Итак, сначала проверьте это, если вам нужен потоковый код scala для использования. Однако не обязательно использовать этот код. Вы должны быть в состоянии получить представленные концепции и применить к своему собственному коду при желании. Весь код тестирования и пример кода потоковой передачи Spark можно получить из Github в любом случае.

Мы собираемся использовать sbt для создания и запуска тестов и создания отчетов о покрытии. Итак, если вы не используете sbt пожалуйста, переведите ваш инструмент для сборки соответственно.

обзор

Для написания автоматических тестов для Spark Streaming мы собираемся использовать стороннюю библиотеку с именем scalatest. Также мы собираемся добавить плагин sbt под названием «sbt-покрытие». Затем, используя эти инструменты, мы можем написать некоторый тестовый код Scala и создать отчеты о покрытии тестов.

меры

  1. Извлеките пример кода Spark Streaming из github
  2. Опишите обновления для build.sbt
  3. Создать проект / plugins.sbt
  4. Написать код Scala
  5. Выполнить тесты и отчеты о покрытии

Извлечь пример потокового кода Spark из Github

Если вы не хотите копировать и вставлять код, вы можете извлечь его из github. Просто извлеките репозиторий spark-course с https://github.com/tmcgrath/spark-course, и проект, над которым мы работаем, находится в каталоге spark-streaming-tests.

Обновления к предыдущему build.sbt

build.sbt должен быть обновлен, чтобы включить новый псевдоним команды, а также самый масштабный сторонний lib, как показано ниже:

Spark Streaming тестирует обновление build.sbt

01
02
03
04
05
06
07
08
09
10
11
12
13
scalaVersion := "2.11.8"
   
  +addCommandAlias("sanity", ";clean ;compile ;coverage ;test; coverageReport")
   
  resolvers += "jitpack" at "https://jitpack.io"
 @@ -19,5 +21,6 @@ libraryDependencies ++= Seq(
  // comment above line and uncomment the following to run in sbt
  // "org.apache.spark" %% "spark-streaming" % "1.6.1",
    "org.scalaj" %% "scalaj-http" % "2.3.0",
 "org.jfarcand" % "wcs" % "1.5"
 "org.jfarcand" % "wcs" % "1.5",
 "org.scalatest" %% "scalatest" % "2.2.6" % "test"
  )

Обратите внимание, как мы добавляем «test» в конец последовательности libraryDependencies, чтобы указать, что библиотека нужна только для тестов.

Создать проект / plugins.sbt

Добавьте новую строку для плагина sbt-покрытие, как показано здесь:

Плагин покрытия кода SBT

1
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.3.5")

Написать тесты Scala

На самом деле, прежде чем писать реальные тесты, мы собираемся обновить main метод нашего предыдущего SlackStreamingApp, чтобы упростить автоматизированные тесты. Я знаю, я знаю, если бы мы написали SlackStreamingApp с TDD, нам бы не пришлось это делать, верно?

Во всяком случае, это не огромное изменение.

Тесты Spark Streaming Scala

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
object SlackStreamingApp {
 -
 +
    def main(args: Array[String]) {
      val conf = new SparkConf().setMaster(args(0)).setAppName("SlackStreaming")
      val ssc = new StreamingContext(conf, Seconds(5))
      val stream = ssc.receiverStream(new SlackReceiver(args(1)))
      stream.print()
 -    if (args.length > 2) {
 -      stream.saveAsTextFiles(args(2))
 -    }
 +
 +    processStream(args, stream)
 +
      ssc.start()
      ssc.awaitTermination()
    }
 -
 +
 +  def processStream(args: Array[String], stream: DStream[String]): Unit = {
 +    args match {
 +      case Array(_, _, path, _*) => stream.saveAsTextFiles(args(2))
 +      case _ => return
 +    }
 +
 +
 +  }
 +

Как вы можете надеяться, мы увидели, что нам просто нужно было извлечь код, ищущий аргумент командной строки, в новую функцию с именем processStream . Кроме того, нам нужно добавить еще одну строку в верхней части импорта

Импорт Spark Scala DStream

1
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

Далее мы пишем код тестирования. Для начала нам нужно создать новые каталоги для хранения тестового кода. Создайте каталоги src / test / scala / com / supergloo. Затем мы добавим тестовый код в этот каталог, создав следующий файл Scala: src / test / scala / com / supergloo / SlackStreamingTest.scala

Spark Streaming Scalatest

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
package com.supergloo
  
import com.supergloo.SlackStreamingApp._
import org.apache.hadoop.mapred.InvalidInputException
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{ClockWrapper, Seconds, StreamingContext}
import org.scalatest.concurrent.Eventually
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
  
import scala.collection.mutable
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.reflect.io.Path
import scala.util.Try
  
class SlackStreamingTest extends FlatSpec with Matchers with Eventually with BeforeAndAfter {
  
  private val master = "local[1]"
  private val appName = "spark-streaming-test"
  private val filePath: String = "target/testfile"
  
  private var ssc: StreamingContext = _
  
  private val batchDuration = Seconds(1)
  
  var clock: ClockWrapper = _
  
  before {
    val conf = new SparkConf()
      .setMaster(master).setAppName(appName)
      .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
  
    ssc = new StreamingContext(conf, batchDuration)
    clock = new ClockWrapper(ssc)
  }
  
  after {
    if (ssc != null) {
      ssc.stop()
    }
    Try(Path(filePath + "-1000").deleteRecursively)
  }
  
  "Slack Streaming App " should " store streams into a file" in {
    val lines = mutable.Queue[RDD[String]]()
    val dstream = ssc.queueStream(lines)
  
    dstream.print()
    processStream(Array("", "", filePath), dstream)
  
  
    ssc.start()
  
    lines += ssc.sparkContext.makeRDD(Seq("b", "c"))
    clock.advance(1000)
  
    eventually(timeout(2 seconds)){
      val wFile: RDD[String] = ssc.sparkContext.textFile(filePath+ "-1000")
      wFile.count() should be (2)
      wFile.collect().foreach(println)
    }
  
  }
  
  "Slack Streaming App " should " store empty streams if no data received" in {
    val lines = mutable.Queue[RDD[String]]()
    val dstream = ssc.queueStream(lines)
  
    dstream.print()
    processStream(Array("", "", filePath), dstream)
  
  
    ssc.start()
  
    clock.advance(1000)
  
    eventually(timeout(1 seconds)){
      val wFile: RDD[String] = ssc.sparkContext.textFile(filePath+ "-1000")
      wFile.count() should be (0)
      wFile.collect().foreach(println)
    }
  
  }
  
  "Slack Streaming App " should " not store streams if argument is not passed" in {
    val lines = mutable.Queue[RDD[String]]()
    val dstream = ssc.queueStream(lines)
  
    dstream.print()
    processStream(Array("", ""), dstream)
  
    val wFile: RDD[String] = ssc.sparkContext.textFile(filePath+ "-1000")
  
    ssc.start()
  
    lines += ssc.sparkContext.makeRDD(Seq("b", "c"))
    clock.advance(2000)
  
    eventually(timeout(3 seconds)){
      a [InvalidInputException] should be thrownBy {
        wFile.count() should be (0)
      }
    }
  }
}

Далее нам нужно создать дополнительные каталоги и добавить ClockWrapper.scala в src / test / scala / org / apache / spark / streaming /. Подробнее об этом уроке позже.

Spark Streaming ClockWrapper

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
package org.apache.spark.streaming
  
import org.apache.spark.util.ManualClock
  
/**
  * This class is defined in this package as the ManualClock is
  * private in the "spark" package
  */
class ClockWrapper(ssc: StreamingContext) {
  
  def getTimeMillis(): Long = manualClock().getTimeMillis()
  
  def setTime(timeToSet: Long) = manualClock().setTime(timeToSet)
  
  def advance(timeToAdd: Long) = manualClock().advance(timeToAdd)
  
  def waitTillTime(targetTime: Long): Long = manualClock().waitTillTime(targetTime)
  
  private def manualClock(): ManualClock = {
    ssc.scheduler.clock.asInstanceOf[ManualClock]
  }
}

(Кстати, ClockWrapper взят из подхода, который я видел в модульном тестировании Spark. См. Раздел «Дополнительные ресурсы» ниже).

Хорошо, мы готовы выполнить сейчас.

Выполнить тесты Scala и отчеты о покрытии

В каталоге spark-streaming-tests теперь мы можем sbt sanity команду sbt sanity из командной строки. Вы должны увидеть все три теста:

Spark Streaming Самые потрясающие результаты

01
02
03
04
05
06
07
08
09
10
11
12
[info] SlackStreamingTest:
[info] Slack Streaming App
[info] - should store streams into a file
[info] Slack Streaming App
[info] - should store empty streams if no data received
[info] Slack Streaming App
[info] - should not store streams if argument is not passed
[info] Run completed in 4 seconds, 436 milliseconds.
[info] Total number of tests run: 3
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 3, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

Чтобы просмотреть отчеты о покрытии, просто откройте target / scala-2.11 / scoverage-report / index.html в браузере.

Вывод

Надеемся, этот пример модульного тестирования Spark Streaming поможет начать ваш подход к тестированию Spark Streaming. Мы рассмотрели пример кода, как запустить и просмотреть результаты покрытия тестами. Если у вас есть какие-либо вопросы или комментарии, дайте мне знать. Кроме того, подпишитесь на канал Supergloo YouTube для предстоящего скринкаста из этого поста.

Дополнительные ресурсы

Рекомендуемое изображение кредит https://flic.kr/p/dgSbYM

Ссылка: Тестирование Spark Streaming с помощью Scala Example от нашего партнера по JCG Тодда МакГрата в блоге Supergloo .