Статьи

ScalaTest MapReduce с использованием Akka

Для спешащих людей  есть MapReduce с ScalaTest и кодом Akka, а также шаги

Я пытался выучить  Скалу,  и я хотел убить несколько птиц одним выстрелом. Позвольте мне сказать вам, я не разочарован — мне комфортно работать со  Scala . Если вы выходите из мира Java,  Scala  сравнительно более сложен, но как только вы преодолеете начальное препятствие, он вам понравится. Я хотел учиться

  • Насколько хороша поддержка  SBT  и  Eclipse IDE для Scala?
  • Понять, как мы можем реализовать  TDD,  используя  ScalaTest
  • Нить модель  Акка
  • Синтаксис программирования Scala

Одним из вариантов использования, который я хотел попробовать, был простой  Word Count  MapReduce,  это привет мир MapReduce . MapReduce — это метод программирования функций,  обычно ассоциируемый с параллельными вычислениями Hadoop . Есть  хороший пример MapReduce с использованием Java и Akka . Есть достойный пример MapReduce , и  еще один здесь,  и  еще один . Ниже схема из источника описания потока,

Подсчет слов MapReduce с помощью Akka и Java от Munish Gupta

Подсчет слов MapReduce с помощью Akka и Java от Munish Gupta

В этом примере я использую поддержку  Akka ‘s Actor для разбиения порций задач и параллельной обработки и агрегирования окончательных результатов.

Для начала  SBT  — это инструмент для сборки, похожий на Maven, широко используемый для разработки Scala. См. Project / plugins.sbt, это интегрировано с Eclipse IDE. Как только вы получите код от github, запустите приведенную ниже команду, и вы заметите, что сгенерировано 2 файла .project и .classpath.

sbt eclipse

Теперь импортируйте проект в Eclipse как Import => «Существующий проект в рабочую область». После того, как проект импортирован в Eclipse, мы можем воспользоваться IntelliSense и другими функциями IDE и легко разработать приложение по сравнению с написанием кода Scala в TextPad.

Как всегда, я начну писать тест, самый низкий уровень теста — это тест агрегации, который берет карту слов и количество раз, которое оно произошло, и агрегирует его. Я использовал  WordSpec  для этого  ScalaTest,  как показано ниже,

"Aggregrate actor" must {
"send back Map message" in {
// create the aggregate Actor
val aggregateActor = system.actorOf(Props[AggregateActor]);
var map: Map[String, Int] = Map[String, Int]("ak" -> 1, "kp" -> 2)
aggregateActor ! map
var map1: Map[String, Int] = Map[String, Int]("ak" -> 1, "kp" -> 2)
aggregateActor ! map1
Thread.sleep(1000)
var output = Map("kp" -> 4, "ak" -> 2)
aggregateActor ! "DISPLAY_LIST"
expectMsg(output)
}
}

Теперь я пишу модульный тест Reduce, который берет объект Result, создает объект Map и публикует его в объекте Aggregator для последующей агрегации.

"Reduce actor" must {
"send back Map message" in {
// create the aggregate Actor
val aggregateActor = system.actorOf(Props[AggregateActor]);
// create the list of reduce Actors
val reduceRouter = system.actorOf(Props(new ReduceActor(aggregateActor)))
val list: List[Result] = List[Result](new Result("kp", 1), new Result("ak", 2))
reduceRouter ! list
val list1: List[Result] = List[Result](new Result("ak", 1), new Result("kp", 2))
reduceRouter ! list1
Thread.sleep(1000)
var output = Map("kp" -> 3, "ak" -> 3)
aggregateActor ! "DISPLAY_LIST"
expectMsg(output)
}
}

Напишите модульный тест Map, чтобы взять строку и создать объект Result. Если вы внимательно заметите, карта и объект сокращения реализует  Roundrobin Routers  от Akka, где линия обрабатывается несколькими потоками в циклическом режиме.

"Map actor" must {
"send back Map message" in {
// create the aggregate Actor
val aggregateActor = system.actorOf(Props[AggregateActor]);
// create the list of reduce Actors
val reduceRouter = system.actorOf(Props(new ReduceActor(aggregateActor)).withRouter(RoundRobinRouter(nrOfInstances = 2)))
// create the list of map Actors
val mapRouter = system.actorOf(Props(new MapActor(reduceRouter)).withRouter(RoundRobinRouter(nrOfInstances = 2)))
var line = "Aditya Krishna Kartik Manjula"
mapRouter ! line
Thread.sleep(1000)
var output = Map("Kartik" -> 1, "Krishna" -> 1, "Aditya" -> 1, "Manjula" -> 1)
aggregateActor ! "DISPLAY_LIST"
expectMsg(output)
}
}

Мы пишем listController, который тестирует сквозную интеграцию карты / редуктора / агрегатора и утверждает значения,

"List Reader Controller actor" must {
"send back Map message" in {
// create the aggregate Actor
 val aggregateActor = system.actorOf(Props[AggregateActor]);
// create the list of reduce Actors
 val reduceRouter = system.actorOf(Props(new ReduceActor(aggregateActor)).withRouter(RoundRobinRouter(nrOfInstances = 2)))
// create the list of map Actors
 val mapRouter = system.actorOf(Props(new MapActor(reduceRouter)).withRouter(RoundRobinRouter(nrOfInstances = 2)))
val controller = system.actorOf(Props(new ControllerActor(aggregateActor, mapRouter)))
val lineReadActor = system.actorOf(Props[LineReadActor])
var list = List[String]("Aditya Krishna Kartik Manjula", "Manjula Anand Aditya Kartik", "Anand Vani Phani Aditya", "Kartik Krishna Manjula Aditya", "Vani Phani Anand Manjula")
lineReadActor.tell(list, controller)
Thread.sleep(1000)
var output = Map("Anand" -> 3, "Kartik" -> 3, "EOF" -> 1, "Krishna" -> 2, "Vani" -> 2, "Phani" -> 2, "Aditya" -> 4, "Manjula" -> 4)
 aggregateActor ! "DISPLAY_LIST"
 expectMsg(output)
 }
 }

Наконец, мы напишем fileReaderActor и передадим ему большой файл для выполнения MapReduce.

Теперь, если вы видите реальный код, обратитесь к MapActor.scala, есть ключевое слово  yield. yield помогает вам создать другой тип данных из одной коллекции. Синтаксис как ниже,

def evaluateExpression(line: String): List[Result] = {
var result = for (word <- line split (" ") toList; if !STOP_WORDS.contains(word.toLowerCase))
yield (new Result(word, 1))
return result
}

Ссылаясь на ReduceActor.scala, вы найдете код как «result =>», называемый лямбда-выражением. Лямбда-выражения являются фундаментальными для функционального языка программирования, такого как Scala,

def reduce(list: List[Result]): Map[String, Int] = {
var results: Map[String, Int] = new HashMap[String, Int]

list.foreach(result => {
if (results.contains(result.word)) {
results(result.word) += result.noOfInstances
} else {
results(result.word) = result.noOfInstances
}
})
return results;
}

Вывод

Если вы заметили, в этом примере я использовал и узнал:

  • Аспекты программирования Scala, такие как Collection, выход ключевых слов, лямбда-выражения
  • Akka «s  Roundrobin Маршрутизаторы  для нарезания резьбы и параллельности
  • SBT для интеграции с Eclipse
  • СкалаТест  для  TDD