Существует достаточно доказательств, чтобы доказать важность автоматического тестирования. Проекты в новых областях часто пренебрегают автоматизированным тестированием, поскольку сам домен привлекает внимание разработчиков. Однако отсутствие тестирования подразумевает «смеяться сейчас, плакать позже». Некоторые из инструментов в области больших данных были спроектированы вокруг тестируемости или, по крайней мере, сообщество позаботилось об этом позже. Мы увидим, как Spark , а точнее Spark Streaming , выполняет различные аспекты автоматизированного тестирования.
Что такое потоковая обработка
Потоковая обработка — это парадигма программирования, которая работает с бесконечными и непрерывными потоками данных, применяя к ним параллельные операции. Идея проста, но мощна, и сложность реализации будет варьироваться в зависимости от следующих требований:
- Семантическая доставка: по крайней мере, один раз, максимум один раз или точно один раз.
- Операции с состоянием: локальное или удаленное состояние.
- Задержка: в реальном времени или почти в реальном времени.
- Надежность, высокая доступность и долговечность.
Что такое Spark Streaming
Spark — это революция в пространстве больших данных . Он заменил MapReduce Hadoop в качестве предпочтительной среды пакетной обработки. Основными причинами являются:
- Скорость: запускать программы в 100 раз быстрее, чем Hadoop MapReduce в памяти, или в 10 раз быстрее на диске.
- Удобство использования: MapReduce DSL далеко не прост в написании и чтении. Spark Scala DSL работает как расширение операций Scala Collections, поэтому кривая обучения не крутая.
- Сообщество: вокруг Spark много азарта, и есть множество сопутствующих инструментов, таких как MLib, Spark SQL или Spark Streaming.
Spark Streaming построен поверх Spark. Это означает, что вы можете использовать инфраструктуру Spark и такие концепции, как YARN , HDFS или RDD . Кроме того, у нас будут абстракции, помогающие нам создавать потоковые функции, такие как агрегаты или скользящие окна.
Что такое юнит-тестирование
Это фантастическая серия о разных взглядах на юнит-тестирование. Чтобы сфокусировать внимание на этом посте, мы будем работать со следующими характеристиками:
- Изоляция сети: тестируемый производственный код будет включать код, который живет в одном процессе. Сетевые звонки не разрешены.
- Изоляция фреймворка: мы хотим как можно больше тестировать наш код, а не взаимодействие с базовыми фреймворками.
Искровая испытательная база на помощь
Контроль жизненного цикла Spark может быть громоздким и утомительным. К счастью, проект Spark Testing Base предлагает нам черты Scala, которые обрабатывают эти детали низкого уровня для нас. Потоковая передача имеет дополнительную сложность, поскольку нам необходимо своевременно получать данные для приема. В то же время, внутренние часы Spark должны быть отмечены контролируемым образом, если мы хотим проверить синхронизированные операции как скользящие окна.
Давайте посмотрим, как проверить архетипический пример WordCount:
1
2
3
4
|
def count(lines: DStream[String]): DStream[(String, Int)] = lines.flatMap(_.split( " " )) .map(word => (word, 1 )) .reduceByKey(_ + _) |
Как видите, это чистая функция, без побочных эффектов и доступа к внешнему состоянию. Мы можем рассуждать об этом, посмотрев на сигнатуру функции. DStream — базовая абстракция в Spark Streaming, и Spark Testing Base поможет нам с этим справиться.
1
2
3
4
5
6
7
8
|
class WordCountSpec extends StreamingSuiteBase { test( "count words" ) { val input = List(List( "the word the" )) val expected = List(List(( "the" , 2 ), ( "word" , 1 ))) testOperation[String, (String, Int)](input, count _ , expected, ordered = false ) } } |
Вам не нужно работать напрямую с абстракцией DStream. Входными данными будет последовательность входных коллекций, и каждая коллекция будет потребляться с галочкой внутренних часов Spark Streaming. Вы можете найти больше примеров того, что вы можете сделать с этой библиотекой здесь .
Объединение потоковой и пакетной обработки
Одним из классических сценариев в поточной обработке является объединение потока с базой данных для обогащения, фильтрации или преобразования событий, содержащихся в потоке. Благодаря Spark 2.0 и структурированной потоковой передаче Streaming и Batch выровнены и каким-то образом скрыты в слое абстракции.
Поскольку Spark 2.0 был недавно выпущен, давайте сосредоточимся на примере старого API:
1
2
3
4
5
6
7
8
9
|
def countWithSpecialWords(lines: DStream[String], specialWords: RDD[String]): DStream[(String, Int)] = { val words = lines.flatMap(_.split( " " )) val bonusWords = words.transform(_.intersection(specialWords)) words.union(bonusWords) .map(word => (word, 1 )) .reduceByKey(_ + _) } |
Это запутанный пример, но служит демонстрацией. Наша система хранит список специальных слов во внешней базе данных. Мы хотим дважды посчитать слово в потоке, которое содержится в этой специальной сумке слов. Важно отметить, что наша функция не заботится о том, как получить эти специальные слова. Это делается вне функции, и это дает нам возможность провести модульное тестирование логики.
1
2
3
4
5
6
7
8
|
val lines = ingestEventsFromKafka(ssc, brokers, topic).map(_._2) val specialWords = ssc.sparkContext .cassandraTable(keyspace, specialWordsTable) .map(_.getString( "word" )) countWithSpecialWords(lines, specialWords) .saveToCassandra(keyspace, wordCountTable) |
В настоящее время такая поддержка не поддерживается в Spark Testing Base, но я создал PR , который обеспечит эту функциональность.
1
2
3
4
5
6
7
8
9
|
test( "stream and batch transformation" ) { def intersection(f1: DStream[String], f2: RDD[String]) = f1.transform(_.intersection(f2)) val stream = List(List( "hi" ), List( "holden" ), List( "bye" )) val batch = List( "holden" ) val expected = List(List(), List( "holden" ), List()) testOperationWithRDD[String, String, String](stream, batch, intersection _, expected, ordered = false ) } |
Вывод
Модульное тестирование Spark Streaming довольно просто благодаря Spark Testing Base. Тем не менее, если мы хотим использовать эту библиотеку, нам нужно аккуратно спроектировать наши операции. В следующих статьях мы увидим, как проводить интеграционные тесты с Spark Streaming, Kafka и Cassandra.
Ссылка: | Тестирование Spark Streaming: модульное тестирование от нашего партнера JCG в блоге Crafted Software . |