Существует достаточно доказательств, чтобы доказать важность автоматического тестирования. Проекты в новых областях часто пренебрегают автоматизированным тестированием, поскольку сам домен привлекает внимание разработчиков. Однако отсутствие тестирования подразумевает «смеяться сейчас, плакать позже». Некоторые из инструментов в области больших данных были спроектированы вокруг тестируемости или, по крайней мере, сообщество позаботилось об этом позже. Мы увидим, как Spark , а точнее Spark Streaming , выполняет различные аспекты автоматизированного тестирования.
Что такое потоковая обработка
Потоковая обработка — это парадигма программирования, которая работает с бесконечными и непрерывными потоками данных, применяя к ним параллельные операции. Идея проста, но мощна, и сложность реализации будет варьироваться в зависимости от следующих требований:
- Семантическая доставка: по крайней мере, один раз, максимум один раз или точно один раз.
- Операции с состоянием: локальное или удаленное состояние.
- Задержка: в реальном времени или почти в реальном времени.
- Надежность, высокая доступность и долговечность.
Что такое Spark Streaming
Spark — это революция в пространстве больших данных . Он заменил MapReduce Hadoop в качестве предпочтительной среды пакетной обработки. Основными причинами являются:
- Скорость: запускать программы в 100 раз быстрее, чем Hadoop MapReduce в памяти, или в 10 раз быстрее на диске.
- Удобство использования: MapReduce DSL далеко не прост в написании и чтении. Spark Scala DSL работает как расширение операций Scala Collections, поэтому кривая обучения не крутая.
- Сообщество: вокруг Spark много азарта, и есть множество сопутствующих инструментов, таких как MLib, Spark SQL или Spark Streaming.
Spark Streaming построен поверх Spark. Это означает, что вы можете использовать инфраструктуру Spark и такие концепции, как YARN , HDFS или RDD . Кроме того, у нас будут абстракции, помогающие нам создавать потоковые функции, такие как агрегаты или скользящие окна.
Что такое юнит-тестирование
Это фантастическая серия о разных взглядах на юнит-тестирование. Чтобы сфокусировать внимание на этом посте, мы будем работать со следующими характеристиками:
- Изоляция сети: тестируемый производственный код будет включать код, который живет в одном процессе. Сетевые звонки не разрешены.
- Изоляция фреймворка: мы хотим как можно больше тестировать наш код, а не взаимодействие с базовыми фреймворками.
Искровая испытательная база на помощь
Контроль жизненного цикла Spark может быть громоздким и утомительным. К счастью, проект Spark Testing Base предлагает нам черты Scala, которые обрабатывают эти детали низкого уровня для нас. Потоковая передача имеет дополнительную сложность, поскольку нам необходимо своевременно получать данные для приема. В то же время, внутренние часы Spark должны быть отмечены контролируемым образом, если мы хотим проверить синхронизированные операции как скользящие окна.
Давайте посмотрим, как проверить архетипический пример WordCount:
| 1 2 3 4 | def count(lines: DStream[String]): DStream[(String, Int)] = lines.flatMap(_.split(" "))   .map(word => (word, 1))   .reduceByKey(_ + _) | 
Как видите, это чистая функция, без побочных эффектов и доступа к внешнему состоянию. Мы можем рассуждать об этом, посмотрев на сигнатуру функции. DStream — базовая абстракция в Spark Streaming, и Spark Testing Base поможет нам с этим справиться.
| 1 2 3 4 5 6 7 8 | classWordCountSpec extendsStreamingSuiteBase { test("count words") {  val input = List(List("the word the"))  val expected = List(List(("the", 2), ("word", 1)))  testOperation[String, (String, Int)](input, count _ , expected, ordered = false) }} | 
Вам не нужно работать напрямую с абстракцией DStream. Входными данными будет последовательность входных коллекций, и каждая коллекция будет потребляться с галочкой внутренних часов Spark Streaming. Вы можете найти больше примеров того, что вы можете сделать с этой библиотекой здесь .
Объединение потоковой и пакетной обработки
Одним из классических сценариев в поточной обработке является объединение потока с базой данных для обогащения, фильтрации или преобразования событий, содержащихся в потоке. Благодаря Spark 2.0 и структурированной потоковой передаче Streaming и Batch выровнены и каким-то образом скрыты в слое абстракции.
Поскольку Spark 2.0 был недавно выпущен, давайте сосредоточимся на примере старого API:
| 1 2 3 4 5 6 7 8 9 | def countWithSpecialWords(lines: DStream[String], specialWords: RDD[String]): DStream[(String, Int)] = { val words = lines.flatMap(_.split(" ")) val bonusWords = words.transform(_.intersection(specialWords)) words.union(bonusWords)    .map(word => (word, 1))    .reduceByKey(_ + _)} | 
Это запутанный пример, но служит демонстрацией. Наша система хранит список специальных слов во внешней базе данных. Мы хотим дважды посчитать слово в потоке, которое содержится в этой специальной сумке слов. Важно отметить, что наша функция не заботится о том, как получить эти специальные слова. Это делается вне функции, и это дает нам возможность провести модульное тестирование логики.
| 1 2 3 4 5 6 7 8 | val lines = ingestEventsFromKafka(ssc, brokers, topic).map(_._2)val specialWords = ssc.sparkContext  .cassandraTable(keyspace, specialWordsTable)  .map(_.getString("word"))countWithSpecialWords(lines, specialWords)  .saveToCassandra(keyspace, wordCountTable) | 
В настоящее время такая поддержка не поддерживается в Spark Testing Base, но я создал PR , который обеспечит эту функциональность.
| 1 2 3 4 5 6 7 8 9 | test("stream and batch transformation") {  def intersection(f1: DStream[String], f2: RDD[String]) = f1.transform(_.intersection(f2))  val stream = List(List("hi"), List("holden"), List("bye"))  val batch = List("holden")  val expected = List(List(), List("holden"), List())  testOperationWithRDD[String, String, String](stream, batch, intersection _, expected, ordered = false)} | 
Вывод
Модульное тестирование Spark Streaming довольно просто благодаря Spark Testing Base. Тем не менее, если мы хотим использовать эту библиотеку, нам нужно аккуратно спроектировать наши операции. В следующих статьях мы увидим, как проводить интеграционные тесты с Spark Streaming, Kafka и Cassandra.
| Ссылка: | Тестирование Spark Streaming: модульное тестирование от нашего партнера JCG в блоге Crafted Software . |