Статьи

Тестирование Spark Streaming: модульное тестирование

Существует достаточно доказательств, чтобы доказать важность автоматического тестирования. Проекты в новых областях часто пренебрегают автоматизированным тестированием, поскольку сам домен привлекает внимание разработчиков. Однако отсутствие тестирования подразумевает «смеяться сейчас, плакать позже». Некоторые из инструментов в области больших данных были спроектированы вокруг тестируемости или, по крайней мере, сообщество позаботилось об этом позже. Мы увидим, как Spark , а точнее Spark Streaming , выполняет различные аспекты автоматизированного тестирования.

Что такое потоковая обработка

Потоковая обработка — это парадигма программирования, которая работает с бесконечными и непрерывными потоками данных, применяя к ним параллельные операции. Идея проста, но мощна, и сложность реализации будет варьироваться в зависимости от следующих требований:

  • Семантическая доставка: по крайней мере, один раз, максимум один раз или точно один раз.
  • Операции с состоянием: локальное или удаленное состояние.
  • Задержка: в реальном времени или почти в реальном времени.
  • Надежность, высокая доступность и долговечность.

Что такое Spark Streaming

Spark — это революция в пространстве больших данных . Он заменил MapReduce Hadoop в качестве предпочтительной среды пакетной обработки. Основными причинами являются:

Spark Streaming построен поверх Spark. Это означает, что вы можете использовать инфраструктуру Spark и такие концепции, как YARN , HDFS или RDD . Кроме того, у нас будут абстракции, помогающие нам создавать потоковые функции, такие как агрегаты или скользящие окна.

Что такое юнит-тестирование

Это фантастическая серия о разных взглядах на юнит-тестирование. Чтобы сфокусировать внимание на этом посте, мы будем работать со следующими характеристиками:

  • Изоляция сети: тестируемый производственный код будет включать код, который живет в одном процессе. Сетевые звонки не разрешены.
  • Изоляция фреймворка: мы хотим как можно больше тестировать наш код, а не взаимодействие с базовыми фреймворками.

Искровая испытательная база на помощь

Контроль жизненного цикла Spark может быть громоздким и утомительным. К счастью, проект Spark Testing Base предлагает нам черты Scala, которые обрабатывают эти детали низкого уровня для нас. Потоковая передача имеет дополнительную сложность, поскольку нам необходимо своевременно получать данные для приема. В то же время, внутренние часы Spark должны быть отмечены контролируемым образом, если мы хотим проверить синхронизированные операции как скользящие окна.

Давайте посмотрим, как проверить архетипический пример WordCount:

1
2
3
4
def count(lines: DStream[String]): DStream[(String, Int)] =
 lines.flatMap(_.split(" "))
   .map(word => (word, 1))
   .reduceByKey(_ + _)

Как видите, это чистая функция, без побочных эффектов и доступа к внешнему состоянию. Мы можем рассуждать об этом, посмотрев на сигнатуру функции. DStream — базовая абстракция в Spark Streaming, и Spark Testing Base поможет нам с этим справиться.

1
2
3
4
5
6
7
8
class WordCountSpec extends StreamingSuiteBase {
 
 test("count words") {
  val input = List(List("the word the"))
  val expected = List(List(("the", 2), ("word", 1)))
  testOperation[String, (String, Int)](input, count _ , expected, ordered = false)
 }
}

Вам не нужно работать напрямую с абстракцией DStream. Входными данными будет последовательность входных коллекций, и каждая коллекция будет потребляться с галочкой внутренних часов Spark Streaming. Вы можете найти больше примеров того, что вы можете сделать с этой библиотекой здесь .

Объединение потоковой и пакетной обработки

Одним из классических сценариев в поточной обработке является объединение потока с базой данных для обогащения, фильтрации или преобразования событий, содержащихся в потоке. Благодаря Spark 2.0 и структурированной потоковой передаче Streaming и Batch выровнены и каким-то образом скрыты в слое абстракции.

Поскольку Spark 2.0 был недавно выпущен, давайте сосредоточимся на примере старого API:

1
2
3
4
5
6
7
8
9
def countWithSpecialWords(lines: DStream[String], specialWords: RDD[String]):
 DStream[(String, Int)] = {
 val words = lines.flatMap(_.split(" "))
 val bonusWords = words.transform(_.intersection(specialWords))
 
 words.union(bonusWords)
    .map(word => (word, 1))
    .reduceByKey(_ + _)
}

Это запутанный пример, но служит демонстрацией. Наша система хранит список специальных слов во внешней базе данных. Мы хотим дважды посчитать слово в потоке, которое содержится в этой специальной сумке слов. Важно отметить, что наша функция не заботится о том, как получить эти специальные слова. Это делается вне функции, и это дает нам возможность провести модульное тестирование логики.

1
2
3
4
5
6
7
8
val lines = ingestEventsFromKafka(ssc, brokers, topic).map(_._2)
 
val specialWords = ssc.sparkContext
  .cassandraTable(keyspace, specialWordsTable)
  .map(_.getString("word"))
 
countWithSpecialWords(lines, specialWords)
  .saveToCassandra(keyspace, wordCountTable)

В настоящее время такая поддержка не поддерживается в Spark Testing Base, но я создал PR , который обеспечит эту функциональность.

1
2
3
4
5
6
7
8
9
test("stream and batch transformation") {
  def intersection(f1: DStream[String], f2: RDD[String]) = f1.transform(_.intersection(f2))
 
  val stream = List(List("hi"), List("holden"), List("bye"))
  val batch = List("holden")
  val expected = List(List(), List("holden"), List())
 
  testOperationWithRDD[String, String, String](stream, batch, intersection _, expected, ordered = false)
}

Вывод

Модульное тестирование Spark Streaming довольно просто благодаря Spark Testing Base. Тем не менее, если мы хотим использовать эту библиотеку, нам нужно аккуратно спроектировать наши операции. В следующих статьях мы увидим, как проводить интеграционные тесты с Spark Streaming, Kafka и Cassandra.