Статьи

Акка и Кассандра Активатор

Акка и Кассандра

В этом руководстве я собираюсь использовать Spray-Client, драйвер DataStacks Cassandra и Akka для создания приложения, которое загружает твиты, а затем сохраняет их идентификатор, текст, имя и дату в таблице Cassandra. В ней показано, как создать простое приложение. Приложение Akka с несколькими участниками, как использовать Akka IO для выполнения HTTP-запросов и как хранить данные в Cassandra. Также демонстрируются подходы к тестированию таких приложений, в том числе тесты производительности.

в целом

Узнайте, как создавать  приложения командной строки на основе Akka, как их тестировать (используя  TestKit ) и  Specs2 ; и как использовать  Spray-Client  для выполнения асинхронных HTTP-запросов.

Ядро

Я начну с построения ядра нашей системы. Он содержит три актера, два из которых взаимодействуют с базой данных твитов, а другой загружает твиты. TwitterReadActor Читает из  Cluster, в TweetWriteActor записывает в  Cluster, и  TweetScanActor downloadsthe твиты и передает их на  TweetWriteActor быть написано. Эти зависимости выражены в конструкторах актеров

class TweetReadActor(cluster: Cluster) extends Actor {
...
}




class TweetWriterActor(cluster: Cluster) extends Actor {
...
}




class TweetScanActor(tweetWrite: ActorRef, queryUrl: String => String) 
extends Actor {
...
}

Параметр конструктора   актеров чтения  и  записи — это просто Cluster экземпляр Cassandra  сканирование актер берет  ActorRef на  запись  актера и функцию , которая задана  String запрос может построить URL запроса для загрузки твитов. (Вот как я строю поиск по ключевым словам, например.)

Чтобы построить наше приложение, все, что нам нужно сделать, это создать экземпляры актеров в правильной последовательности

val system = ActorSystem()




def queryUrl(query: String): String = ???
val cluster: Cluster = ???




val reader = system.actorOf(Props(new TweetReaderActor(cluster)))
val writer = system.actorOf(Props(new TweetWriterActor(cluster)))
val scanner = system.actorOf(Props(new TweetScannerActor(writer, queryUrl)))

Я оставлю реализацию  cluster и  queryUrl как  ??? с  изломом в цепилогическая непоследовательность в противном случае совершенной системы , иначе  нижний типа .

Писать Кассандре

Теперь, когда у нас есть структура, мы можем взглянуть на  TwitterWriterActor. Он получает экземпляры  Tweet, которые он записывает в  tweets пространство клавиш в Кассандре.

class TweetWriterActor(cluster: Cluster) extends Actor {
val session = cluster.connect(Keyspaces.akkaCassandra)
val preparedStatement = session.prepare(
"INSERT INTO tweets(key, user_user, text, createdat) VALUES (?, ?, ?, ?);")




def receive: Receive = {
case tweets: List[Tweet] =>
case tweet: Tweet =>
}
}

Чтобы сохранить твиты, нам нужно  подключиться  к правильному пространству клавиш, которое дает нам Кассандра  Session. Потому что мы стараемся быть как можно более эффективными, мы воспользоваться Кассандрой  PreparedStatements и  BoundStatementс. PreparedStatement Является предварительно жевали CQL заявления,  BoundStatement это PreparedStatemnt параметр, значение которого устанавливается.

Итак, это дает нам подсказку о том, что  saveTweet должна делать функция.

class TweetWriterActor(cluster: Cluster) extends Actor {
val session = cluster.connect(Keyspaces.akkaCassandra)
val preparedStatement = session.prepare(
"INSERT INTO tweets(key, user_user, text, createdat) VALUES (?, ?, ?, ?);")




def saveTweet(tweet: Tweet): Unit =
session.executeAsync(preparedStatement.bind(
tweet.id.id, tweet.user.user, tweet.text.text, tweet.createdAt))




def receive: Receive = {
case tweets: List[Tweet] =>
case tweet: Tweet =>
}
}

Единственное, что остается сделать — это использовать его в  receive частичной функции.

class TweetWriterActor(cluster: Cluster) extends Actor {
val session = cluster.connect(Keyspaces.akkaCassandra)
val preparedStatement = session.prepare(
"INSERT INTO tweets(key, user_user, text, createdat) VALUES (?, ?, ?, ?);")




def saveTweet(tweet: Tweet): Unit =
session.executeAsync(preparedStatement.bind(
tweet.id.id, tweet.user.user, tweet.text.text, tweet.createdAt))




def receive: Receive = {
case tweets: List[Tweet] => tweets foreach saveTweet
case tweet: Tweet => saveTweet(tweet)
}
}

Итак, у нас есть код, который сохраняет экземпляры  Tweet в пространстве ключей в нашем кластере Cassandra.

Чтение с Кассандры

Чтение данных немного сложнее, мы хотели бы поддержать  подсчет  и  найти все операции. Затем мы должны быть в состоянии построить запросы Кассандры; тогда, имея Кассандру  Row, мы должны быть в состоянии превратить ее в наш  Tweet объект. Естественно, мы также хотим использовать асинхронную природу драйвера Cassandra. К счастью, все не будет так сложно. Позвольте мне начать со структуры  TweetReaderActor.

object TweetReaderActor {
case class FindAll(maximum: Int = 100)
case object CountAll
}




class TweetReaderActor(cluster: Cluster) extends Actor {
val session = cluster.connect(Keyspaces.akkaCassandra)
val countAll = new BoundStatement(session.prepare(
"select count(*) from tweets;"))




def receive: Receive = {
case FindAll(maximum) =>
// reply with List[Tweet]
case CountAll =>
// reply with Long
}
}

В объекте компаньона, я определил  FindAll и  CountAll сообщения , что наш актер будет реагировать на; Я также оставил в коде, который дает нам,  Session а затем использовал  Session для создания, BoundStatement который считает все строки. Далее нам нужно уметь создавать экземпляр  Tweet данного a  Row.

class TweetReaderActor(cluster: Cluster) extends Actor {
...




def buildTweet(r: Row): Tweet = {
val id = r.getString("key")
val user = r.getString("user_user")
val text = r.getString("text")
val createdAt = r.getDate("createdat")
Tweet(id, user, text, createdAt)
}
...
}

Опять же, ничего особенного: мы просто выбираем значения столбцов в строке и используем их для создания экземпляра  Tweet. Теперь давайте подключим магию Кассандры. Мы хотели бы  выполнить  (асинхронно) некоторый запрос ; отобразить  строки,  возвращенные при выполнении этого запроса, чтобы превратить их в  твиты ; и затем  передать  результат  отправителю . (Текст, выделенный курсивом, дает множество подсказок, поэтому давайте просто добавим код.)

class TweetReaderActor(cluster: Cluster) extends Actor {
val session = cluster.connect(Keyspaces.akkaCassandra)
val countAll = new BoundStatement(session.prepare(
"select count(*) from tweets;"))




import scala.collection.JavaConversions._
import cassandra.resultset._
import context.dispatcher
import akka.pattern.pipe




def buildTweet(r: Row): Tweet = {...}




def receive: Receive = {
case FindAll(maximum) =>
val query = QueryBuilder.select().
all().
from(Keyspaces.akkaCassandra, "tweets").
limit(maximum)
session.executeAsync(query).
map(_.all().map(buildTweet).toList) pipeTo sender
case CountAll =>
session.executeAsync(countAll) map(_.one.getLong(0)) pipeTo sender
}
}

Позвольте мне разобрать  FindAll обработчик сообщений. Во-первых, я query строю использование  Кассандры  QueryBuilder. Это обычный код Кассандры.

То, что следует, намного интереснее. Я вызываю  executeAsync метод  session, который возвращает ResultSetFuture. Используя неявное преобразование в  cassandra.resultset._, я превращаю  ResultSetFutureв Скала  Future[ResultSet]. Это позволяет мне использовать  Future.map метод, чтобы превратить  ResultSetв  List[Tweet].

Вызов  session.executeAsync(query) map ожидает в качестве своего параметра функцию от  ResultSet некоторого типа  B. В нашем случае  B есть  List[Tweet]ResultSet Содержит метод  all(), который возвращает java.util.List[Row]. Для того, чтобы быть в состоянии  на map протяжении многих  java.util.List[Row], нам нужно , чтобы превратить его в Scala  List[Row]. Для этого мы вводим неявные преобразования в  scala.collection.JavaConversions. И теперь мы можем завершить параметр  Future.map функции.

session.executeAsync(query) map(_.all().map(buildTweet).toList) поэтому дает нам Future[List[Tweet]], что мучительно близко к тому, что нам нужно. Мы не хотим блокировать результат, и мы слишком ленивы, чтобы использовать эту  onSuccess функцию, потому что все, что она делает, — передает результат в  sender. Таким образом, вместо этого, мы  труба  успех будущего на  sender этом завершает картину, объясняя всю линию  session.executeAsync(query) map(_.all().map(buildTweet).toList) pipeTo sender.

Подключение к Кассандре

Прежде чем двигаться дальше, я должен объяснить, откуда Cluster берется это  значение. Думая о системе, которую мы пишем, нам могут потребоваться разные значения  Cluster для тестов и для основной системы. Более того, тесту  Cluster , скорее всего, понадобится специальная настройка. Поскольку я пока не могу решить, я бы просто определил, что есть  CassandraCluster черта, которая возвращает  Cluster; и дать реализации, которые правильно делают то, что загружает конфигурацию из  ActorSystemконфигурации, и ту, которая жестко запрограммирована для использования в тестах.

trait CassandraCluster {
def cluster: Cluster
}

Реализация на основе конфигурации и конфигурация теста отличаются только значениями, которые они используют для создания  Cluster экземпляра.

// in src/scala/main
trait ConfigCassandraCluster extends CassandraCluster {
def system: ActorSystem




private def config = system.settings.config




import scala.collection.JavaConversions._
private val cassandraConfig = config.getConfig(
"akka-cassandra.main.db.cassandra")
private val port = cassandraConfig.getInt("port")
private val hosts = cassandraConfig.getStringList("hosts").toList




lazy val cluster: Cluster =
Cluster.builder().
addContactPoints(hosts: _*).
withCompression(ProtocolOptions.Compression.SNAPPY).
withPort(port).
build()
}




// in src/scala/test
trait TestCassandraCluster extends CassandraCluster {
def system: ActorSystem




private def config = system.settings.config




import scala.collection.JavaConversions._
private val cassandraConfig = config.getConfig(
"akka-cassandra.test.db.cassandra")
private val port = cassandraConfig.getInt("port")
private val hosts = cassandraConfig.getStringList("hosts").toList




lazy val cluster: Cluster =
Cluster.builder().
addContactPoints(hosts: _*).
withPort(port).
withCompression(ProtocolOptions.Compression.SNAPPY).
build()




}

Это позволяет мне смешивать подходящие черты и правильно настраивать их  Cluster. Но когда дело доходит до тестов для тестов, есть небольшой поворот: я хочу, чтобы кластер был в известном состоянии. Чтобы решить эту проблему, я создаю  CleanCassandra черту, которая сбрасывает  Cluster некоторые  CassandraCluster.cluster.

trait CleanCassandra extends SpecificationStructure {
this: CassandraCluster =>




private def runClq(session: Session, file: File): Unit = {
val query = Source.fromFile(file).mkString
query.split(";").foreach(session.execute)
}




private def runAllClqs(): Unit = {
val session = cluster.connect(Keyspaces.akkaCassandra)
val uri = getClass.getResource("/").toURI
new File(uri).listFiles().foreach { file =>
if (file.getName.endsWith(".cql")) runClq(session, file)
}
session.shutdown()
}




override def map(fs: => Fragments) = super.map(fs) insert Step(runAllClqs())
}

Когда я смешиваю эту черту в моем тесте, она регистрирует  runAllClqs() шаги, которые должны быть выполнены  перед  всеми другими шагами в тесте.

тестирование

И так, я могу написать свой первый тест, который проверяет, что  TwitterReaderActor и  TwitterWriterActorдействительно работает, как ожидалось. Тело теста довольно длинное, но не так сложно концептуально следить за происходящим.

class TweetActorsSpec extends TestKit(ActorSystem())
with SpecificationLike with TestCassandraCluster 
with CleanCassandra with ImplicitSender {
sequential




val writer = TestActorRef(new TweetWriterActor(cluster))
val reader = TestActorRef(new TweetReaderActor(cluster))




"Slow & steady" >> {
def write(count: Int): List[Tweet] = {
val tweets = (1 to count).map(id => 
Tweet(id.toString, "@honzam399", "Yay!", new Date))
tweets.foreach(writer !)
Thread.sleep(1000) // wait for the tweets to hit the db
tweets.toList
}




"Single tweet" in {
val tweet = write(1).head




reader ! FindAll(1)
val res = expectMsgType[List[Tweet]]
res mustEqual List(tweet)
}




"100 tweets" in {
val writtenTweets = write(100)




reader ! FindAll(100)
val readTweets = expectMsgType[List[Tweet]]
readTweets must containTheSameElementsAs(writtenTweets)
}
}




}

Мы смешиваем множество компонентов, чтобы собрать тест. Прежде всего, мы расширяем  TestKit, давая ему  ActorSystem() параметр конструктора; Затем мы смешиваем в Specs2  SpecificationLike, а затем в нашей тестовой среде Cassandra, дополняя картину  ImplicitSender до, чтобы мы могли изучить ответы.

Фактическое тело  "Slow & steady" спецификации проверяет, что мы можем написать одно чтение и 100 твитов.

Перед запуском теста вы должны убедиться, что у вас запущена Cassandra и что вы создали правильные пространства клавиш. Чтобы сделать вашу жизнь проще, вы можете просто запустить скрипты CQL в  src/data. Вам нужно запустить в последовательности

keyspaces.cql
Then, in the correct keyspace:
tables.cql
words.cql

Сканирование твитов

Onwards Now that we know that we can safely store and retrieve the tweets from Cassandra, we need to write the component that is going to download them. In our system, this is the TweetScannerActor. It receives a message of type String, and it performs the HTTP request to download the tweets. (To keep this tutorial simple, I’m using the convenient Twitter proxy at “http//twitter-search-proxy.herokuapp.com/search/tweets. In any case, the task for the scanner actor is to construct the HTTP request, receive the response, turn it into List[Tweet] and send that list to the ActorRef of theTweetWriterActor.

class TweetScannerActor(tweetWrite: ActorRef, queryUrl: String => String)
extends Actor with TweetMarshaller {




import context.dispatcher
import akka.pattern.pipe




private val pipeline = sendReceive ~> unmarshal[List[Tweet]]




def receive: Receive = {
case query: String => pipeline(Get(queryUrl(query))) pipeTo tweetWrite
}
}

It is actually that simple We use Spray-Client to construct the HTTP pipeline, which makes HTTP request (sendReceive), and passes the raw HTTP response to be unmarshalled (that is, turned into instance of types in our systems).

The pipeline starts its job when it is applied to HttpRequest; in our case, Get(url: String)represents a mechanism that can construct such HttpRequests. When applied to the query, the functionqueryUrl returns the actual URL for the pipeline to work on.

Execution of the pipeline returns Future[List[Tweet]], which we can happily pipeTo the tweetWriteactor.

The only job that remains is for us to implement the unmarshaller. In Spray-Client’s case unmarshaller is a typeclass and the implementation is an instance of the typeclass. The easiest way to think about typeclasses is to imagine that typeclass is a trait which defines behaviour for some type, and that the typeclass instance is the implementation of that trait for some type.

In Spray-Client’s case, the typeclass is trait Unmarshaller[A], whose apply method takes HttpEntityand returns Deserialized[A]. The name apply should ring some bells–and indeed, Unmarshaller[A] is in essence an alias for trait Unmarshaller[A] extends (HttpEntity => Deserialized[A]). (Yes, you can extend (A = B) in Scala, which is syntactic sugar for trait Unmarshaller[A] extends Function1[HttpEntity, Deserialized[A]].) Now, the unmarshal directive we used earlier is defined as

def unmarshal[A : Unmarshaller]: HttpResponse => A

The : Unmarshaller is a context bound on the type parameter A, which causes the compiler to expand the function into

def unmarshal[A](implicit ev: Unmarshaller[A]): HttpResponse => A

The unmarshal function expects an instance of the typeclass Unmarshaller for some type A; in our case, we specify the type A to be List[Tweet]. We can make a mental substitution of A for List[Tweet] and arrive at unmarshal[List[Tweet]](implicit ev: Unmarshaller[List[Tweet]]): .... To make the application work, there needs to be a value of type Unmarshaller[List[Tweet]] in the current implicit scope. When we give such value, we say that we are giving instance of the Unmarshaller typeclass.

trait TweetMarshaller {
type Tweets = List[Tweet]




implicit object TweetUnmarshaller extends Unmarshaller[Tweets] {




val dateFormat = new SimpleDateFormat("EEE MMM d HH:mm:ss Z yyyy")




def mkTweet(status: JsValue): Deserialized[Tweet] = {
val json = status.asJsObject
...
}




def apply(entity: HttpEntity): Deserialized[Tweets] = {
val json = JsonParser(entity.asString).asJsObject
...
}
}




}

Our typeclass instance is the TweetUnmarshaller singleton, which extends Unmarshaller[Tweets]. Notice that I have also defined a type alias type Tweets = List[Tweet] so that I don’t have to write too many square brackets. By extending Unmarshaller[Tweets], we must implement the apply method, which is applied to the HttpEntity and should return either deserialized tweets or indicate an error.

We nearly have everything in place. But how do we satisfy ourselves that the TweetScannerActor indeed works?

Testing the TweetScannerActor

To test the scanner fully, we would like to use a well-known service. But where do we get it? We can’t really use the live service, because the tweets keep changing. It seems that the only way would be for us to implement a mock service and use it in our tests.

class TweetScanActorSpec extends TestKit(ActorSystem())
with SpecificationLike with ImplicitSender {




sequential




val port = 12345
def testQueryUrl(query: String) = s"http://localhost:$port/q=$query"




val tweetScan = TestActorRef(new TweetScannerActor(testActor, testQueryUrl))




"Getting all 'typesafe' tweets" >> {




"should return more than 10 last entries" in {
val twitterApi = TwitterApi(port)
tweetScan ! "typesafe"
Thread.sleep(1000)
val tweets = expectMsgType[List[Tweet]]
tweets.size mustEqual 4
twitterApi.stop()
success
}
}
}

When constructing the TweetScannerActor, we give it the testActor and a function that returns URLs onlocalhost on some port. In the body of the example, we start the mock TwitterApi on the given port; and use our TweetScannerActor to make the HTTP request. Because we gave the testActor as the writerActorRef, we should now be able to see the List[Tweet] that would have been sent to theTweetWriterActor.

Because our mock tweetset contains four tweets, we can make the assertion that the list indeed contains four tweets. (I leave more extensive testing as exercise for the reader.)

Main

I am now satisfied that the components in the system work as expected; I can therefore assemble the Appobject, which brings everything together in a command-line interface. I give you the Main object

object Main extends App with ConfigCassandraCluster {
import Commands._
import akka.actor.ActorDSL._




def twitterSearchProxy(query: String) = 
s"http://twitter-search-proxy.herokuapp.com/search/tweets?q=$query"




implicit lazy val system = ActorSystem()
val write = system.actorOf(Props(new TweetWriterActor(cluster)))
val read = system.actorOf(Props(new TweetReaderActor(cluster)))
val scan = system.actorOf(Props(
new TweetScannerActor(write, twitterSearchProxy)))




// we don't want to bother with the ``ask`` pattern, so
// we set up sender that only prints out the responses to
// be implicitly available for ``tell`` to pick up.
implicit val _ = actor(new Act {
become {
case x => println(">>> " + x)
}
})




@tailrec
private def commandLoop(): Unit = {
Console.readLine() match {
case QuitCommand => return
case ScanCommand(query) => scan ! query.toString




case ListCommand(count) => read ! FindAll(count.toInt)
case CountCommand => read ! CountAll




case _ => println("WTF??!!")
}




commandLoop()
}




// start processing the commands
commandLoop()




// when done, stop the ActorSystem
system.shutdown()




}

We have the main commandLoop() function, which reads the line from standard input, matches it against the commands and sends the appropriate messages to the right actors. It also mixes in the “real” source of Cassandra Cluster values and specifies the live function that constructs the URL to retrieve the tweets.

For interested readers TwitterApi

The TwitterApi is the mock version of the real Twitter Proxy API. It makes it easy to write repeatable and independent tests of the TweetScannerActor. Under the hood, it is implemented using Spray-Can and the HTTP Akka Extension. The intention is that upon construction it binds to the given port and responds to every GET request with the given body. To shutdown the API, you must call the stop() method. To give me greater control over the construction of the class, I define the constructor as private and give a companion object whose apply method returns properly constructed and bound TwitterApi.

class TwitterApi private(system: ActorSystem, port: Int, body: String) {




val blackHoleActor = system.actorOf(Props(new Actor {
def receive: Receive = Actor.emptyBehavior
}))




private class Service extends Actor {




def receive: Receive = {
case _: Http.Connected =>
sender ! Http.Register(self)
case HttpRequest(HttpMethods.GET, _, _, _, _) =>




sender ! HttpResponse(entity = HttpEntity(body))
case _ =>
}
}




private val service = system.actorOf(
Props(new Service).withRouter(RoundRobinRouter(nrOfInstances = 50)))
private val io = IO(Http)(system)
io.tell(Http.Bind(service, "localhost", port = port), blackHoleActor)




def stop(): Unit = {
io.tell(Http.Unbind, blackHoleActor)
system.stop(service)
system.stop(io)
}
}




object TwitterApi {




def apply(port: Int)(implicit system: ActorSystem): TwitterApi = {
val body = Source.fromInputStream(
getClass.getResourceAsStream("/tweets.json")).mkString
new TwitterApi(system, port, body)
}




}

Calling TwitterApi(1234) with an implicit ActorSystem in scope (for example in a TestKit test) loads the body from a well-known location on the classpath and then constructs the TwitterApi instance, passing it the ActorSystemport, and body. In the body of the TwitterApi class, I have an Actor that serves the HTTP requests, which is then used in the Bind message sent to the io extension.

The service is bound to the HTTP server until the stop() method is called. The stop() method unbinds the service, and stops it and the io extension. (You would typically do this at the end of your example.)

For interested readers sentiment.R

Now, let’s complete the picture with some mood analysis in R. I am trying to find if people are happy or unhappy about the tweets. To do so, I use a list of positive and negative words, which I store in my Cassandra positivewords and negativewords tables.

Grab the code from https://github.com/eigengo/activator-akka-cassandra