Статьи

Spark Streaming: модульное тестирование DStreams

Честно говоря, я не думаю, что есть необходимость говорить нам, «разработчикам», что мы должны провести надлежащее тестирование или модульное тестирование, чтобы быть корректными (QAs, не обольщайтесь: P). Модульные тесты — это самый быстрый способ узнать, есть ли что-то не так с нашим кодом.

Модульное тестирование важно, потому что это одна из самых ранних попыток тестирования кода. Чем раньше обнаружены дефекты, тем легче их устранить.

Spark Streaming

Spark Streaming — это API, предоставляемый Spark вместе с Spark-Core API. Он используется для масштабируемой, высокопроизводительной, отказоустойчивой потоковой обработки живых потоков данных и поддерживает множество источников данных, таких как Kafka, HDFS и т. Д.

Для приема требуется непрерывный поток данных, и через Spark Streaming API этот поток преобразуется в пакеты входных данных. Эти партии затем обрабатываются двигателем Spark. Чтобы преобразовать этот поток данных в небольшие пакеты и затем обработать их, в качестве абстракции используется API DStream.

DStream

DStream, или Дискретизированный поток,  является абстракцией, предоставляемой Spark Streaming. DStream обозначает серию RDD (т.е. небольшие неизменяемые пакеты данных), что делает DStream устойчивым. Мы можем легко применить любое преобразование к потоку входных данных, используя DStream (используя map, flatMap и т. Д.). Нам нужно только предоставить нашу бизнес-логику DStream, и мы можем легко достичь требуемого результата.

Например, следующий метод удаляет дубликаты сообщений из потока пар (ключ, значение) с помощью mapWithState:

def distinct(dStream: DStream[(Int, String)]): DStream[(Int, String)] = {
  dStream.mapWithState(StateSpec.function(dedup))
    .flatMap {
      case Some(value) => Seq(value)
      case _ => Seq()
    }
}

val dedup = (key: Int, value: Option[String], state: State[List[Int]]) => {
  (value, state.getOption()) match {
    case (Some(data), Some(keys)) if !keys.contains(key) =>
      state.update(key :: keys)
      Some(key, data)
    case (Some(data), None) =>
      state.update(List(key))
      Some(key, data)
    case _ =>
      None
  }
}

И еще один метод «update», который изменяет значения наших DStreams:

def update(dStream: DStream[(Int, String)]): DStream[(Int, String)] = {
dStream.map{
case (key, value) => (key, s"""{"value":[$value]}""")
}
}

Повестка дня

Итак, пока мы все не знаем, как работает Spark Streaming. Все хорошо. Но как нам на самом деле узнать, хорошо ли работает наша бизнес-логика с Spark Streaming и преобразовывать входной поток так, как мы хотим? Нам нужно протестировать DStream с входными данными и проверить вывод нашего потокового кода.

проблема

Так в чем проблема? Как мы выполняем потоковую логику в тестовой среде?

Как правило, мы можем написать интеграционные тесты и предоставить фактическую среду в интеграционном тесте. Но для модульного тестирования нам нужна среда тестирования, которая не должна зависеть от какого-либо внешнего приложения.

StreamingSuiteBase

StreamingSuiteBase предоставляет среду тестирования для DStream. Он отправляет входные данные в виде пакетов и выполняет предоставленную операцию для этих пакетов и предоставляет в качестве выходных данных. И эти результаты могут быть сопоставлены с ожидаемым результатом.

Импортируйте следующую зависимость в ваш build.sbt:

"com.holdenkarau" %% "spark-testing-base" % "2.1.0_0.8.0" % Test

И добавьте черту StreamingSuiteBase в вашу спецификацию:


class StreamingOperationsSpec extends WordSpec with StreamingSuiteBase {

Черта StreamingSuiteBase предоставляет метод « testOperation » , который принимает входные значения, наш бизнес — логику, ожидаемые значения выходных параметров , как и выдает результат. Тестирование нашего метода отличается и обновляется следующим образом:


"StreamingOperations" should{

"remove duplicates" in{
val inputPair = List(List((1, "value"), (1, "value")))
val outputPair = List(List((1, "value")))

testOperation(inputPair, distinct _ , outputPair, ordered = false)
}

"update stream" in {
val inputPair = List(List((1, """{"name": "Steve"}, {"name": "Tony"}""")))
val outputPair = List(List((1, """{"value":[{"name": "Steve"}, {"name": "Tony"}]}""")))

testOperation(inputPair, update _, outputPair, ordered = false)
}
}

Метод ‘ testOperation ‘ берет вывод операции, выполненной над ‘inputPair’, и проверяет, равна ли она ‘outputPair’, и точно так же мы можем проверить нашу бизнес-логику.

Этот короткий фрагмент позволяет вам проверить свою бизнес-логику, не заставляя вас создавать даже сеанс Spark. Вы можете легко смоделировать всю потоковую среду и легко проверить свою бизнес-логику.

Это был простой пример унарных операций над DStreams. Точно так же мы можем тестировать двоичные операции и оконные операции на DStreams.

Вы можете найти код здесь .