Акка и Кассандра
В этом руководстве я собираюсь использовать Spray-Client, драйвер DataStacks Cassandra и Akka для создания приложения, которое загружает твиты, а затем сохраняет их идентификатор, текст, имя и дату в таблице Cassandra. В ней показано, как создать простое приложение. Приложение Akka с несколькими участниками, как использовать Akka IO для выполнения HTTP-запросов и как хранить данные в Cassandra. Также демонстрируются подходы к тестированию таких приложений, в том числе тесты производительности.
Узнайте, как создавать приложения командной строки на основе Akka, как их тестировать (используя TestKit ) и Specs2 ; и как использовать Spray-Client для выполнения асинхронных HTTP-запросов.
Ядро
Я начну с построения ядра нашей системы. Он содержит три актера, два из которых взаимодействуют с базой данных твитов, а другой загружает твиты. TwitterReadActor
Читает из Cluster
, в TweetWriteActor
записывает в Cluster
, и TweetScanActor
downloadsthe твиты и передает их на TweetWriteActor
быть написано. Эти зависимости выражены в конструкторах актеров
class TweetReadActor(cluster: Cluster) extends Actor { ... } class TweetWriterActor(cluster: Cluster) extends Actor { ... } class TweetScanActor(tweetWrite: ActorRef, queryUrl: String => String) extends Actor { ... }
Параметр конструктора актеров чтения и записи — это просто Cluster
экземпляр Cassandra ; сканирование актер берет ActorRef
на запись актера и функцию , которая задана String
запрос может построить URL запроса для загрузки твитов. (Вот как я строю поиск по ключевым словам, например.)
Чтобы построить наше приложение, все, что нам нужно сделать, это создать экземпляры актеров в правильной последовательности
val system = ActorSystem() def queryUrl(query: String): String = ??? val cluster: Cluster = ??? val reader = system.actorOf(Props(new TweetReaderActor(cluster))) val writer = system.actorOf(Props(new TweetWriterActor(cluster))) val scanner = system.actorOf(Props(new TweetScannerActor(writer, queryUrl)))
Я оставлю реализацию cluster
и queryUrl
как ???
с изломом в цепи , логическая непоследовательность в противном случае совершенной системы , иначе нижний типа .
Писать Кассандре
Теперь, когда у нас есть структура, мы можем взглянуть на TwitterWriterActor
. Он получает экземпляры Tweet
, которые он записывает в tweets
пространство клавиш в Кассандре.
class TweetWriterActor(cluster: Cluster) extends Actor { val session = cluster.connect(Keyspaces.akkaCassandra) val preparedStatement = session.prepare( "INSERT INTO tweets(key, user_user, text, createdat) VALUES (?, ?, ?, ?);") def receive: Receive = { case tweets: List[Tweet] => case tweet: Tweet => } }
Чтобы сохранить твиты, нам нужно подключиться к правильному пространству клавиш, которое дает нам Кассандра Session
. Потому что мы стараемся быть как можно более эффективными, мы воспользоваться Кассандрой PreparedStatement
s и BoundStatement
с. PreparedStatement
Является предварительно жевали CQL заявления, BoundStatement
это PreparedStatemnt
параметр, значение которого устанавливается.
Итак, это дает нам подсказку о том, что saveTweet
должна делать функция.
class TweetWriterActor(cluster: Cluster) extends Actor { val session = cluster.connect(Keyspaces.akkaCassandra) val preparedStatement = session.prepare( "INSERT INTO tweets(key, user_user, text, createdat) VALUES (?, ?, ?, ?);") def saveTweet(tweet: Tweet): Unit = session.executeAsync(preparedStatement.bind( tweet.id.id, tweet.user.user, tweet.text.text, tweet.createdAt)) def receive: Receive = { case tweets: List[Tweet] => case tweet: Tweet => } }
Единственное, что остается сделать — это использовать его в receive
частичной функции.
class TweetWriterActor(cluster: Cluster) extends Actor { val session = cluster.connect(Keyspaces.akkaCassandra) val preparedStatement = session.prepare( "INSERT INTO tweets(key, user_user, text, createdat) VALUES (?, ?, ?, ?);") def saveTweet(tweet: Tweet): Unit = session.executeAsync(preparedStatement.bind( tweet.id.id, tweet.user.user, tweet.text.text, tweet.createdAt)) def receive: Receive = { case tweets: List[Tweet] => tweets foreach saveTweet case tweet: Tweet => saveTweet(tweet) } }
Итак, у нас есть код, который сохраняет экземпляры Tweet
в пространстве ключей в нашем кластере Cassandra.
Чтение с Кассандры
Чтение данных немного сложнее, мы хотели бы поддержать подсчет и найти все операции. Затем мы должны быть в состоянии построить запросы Кассандры; тогда, имея Кассандру Row
, мы должны быть в состоянии превратить ее в наш Tweet
объект. Естественно, мы также хотим использовать асинхронную природу драйвера Cassandra. К счастью, все не будет так сложно. Позвольте мне начать со структуры TweetReaderActor
.
object TweetReaderActor { case class FindAll(maximum: Int = 100) case object CountAll } class TweetReaderActor(cluster: Cluster) extends Actor { val session = cluster.connect(Keyspaces.akkaCassandra) val countAll = new BoundStatement(session.prepare( "select count(*) from tweets;")) def receive: Receive = { case FindAll(maximum) => // reply with List[Tweet] case CountAll => // reply with Long } }
В объекте компаньона, я определил FindAll
и CountAll
сообщения , что наш актер будет реагировать на; Я также оставил в коде, который дает нам, Session
а затем использовал Session
для создания, BoundStatement
который считает все строки. Далее нам нужно уметь создавать экземпляр Tweet
данного a Row
.
class TweetReaderActor(cluster: Cluster) extends Actor { ... def buildTweet(r: Row): Tweet = { val id = r.getString("key") val user = r.getString("user_user") val text = r.getString("text") val createdAt = r.getDate("createdat") Tweet(id, user, text, createdAt) } ... }
Опять же, ничего особенного: мы просто выбираем значения столбцов в строке и используем их для создания экземпляра Tweet
. Теперь давайте подключим магию Кассандры. Мы хотели бы выполнить (асинхронно) некоторый запрос ; отобразить строки, возвращенные при выполнении этого запроса, чтобы превратить их в твиты ; и затем передать результат отправителю . (Текст, выделенный курсивом, дает множество подсказок, поэтому давайте просто добавим код.)
class TweetReaderActor(cluster: Cluster) extends Actor { val session = cluster.connect(Keyspaces.akkaCassandra) val countAll = new BoundStatement(session.prepare( "select count(*) from tweets;")) import scala.collection.JavaConversions._ import cassandra.resultset._ import context.dispatcher import akka.pattern.pipe def buildTweet(r: Row): Tweet = {...} def receive: Receive = { case FindAll(maximum) => val query = QueryBuilder.select(). all(). from(Keyspaces.akkaCassandra, "tweets"). limit(maximum) session.executeAsync(query). map(_.all().map(buildTweet).toList) pipeTo sender case CountAll => session.executeAsync(countAll) map(_.one.getLong(0)) pipeTo sender } }
Позвольте мне разобрать FindAll
обработчик сообщений. Во-первых, я query
строю использование Кассандры QueryBuilder
. Это обычный код Кассандры.
То, что следует, намного интереснее. Я вызываю executeAsync
метод session
, который возвращает ResultSetFuture
. Используя неявное преобразование в cassandra.resultset._
, я превращаю ResultSetFuture
в Скала Future[ResultSet]
. Это позволяет мне использовать Future.map
метод, чтобы превратить ResultSet
в List[Tweet]
.
Вызов session.executeAsync(query) map
ожидает в качестве своего параметра функцию от ResultSet
некоторого типа B
. В нашем случае B
есть List[Tweet]
. ResultSet
Содержит метод all()
, который возвращает java.util.List[Row]
. Для того, чтобы быть в состоянии на map
протяжении многих java.util.List[Row]
, нам нужно , чтобы превратить его в Scala List[Row]
. Для этого мы вводим неявные преобразования в scala.collection.JavaConversions
. И теперь мы можем завершить параметр Future.map
функции.
session.executeAsync(query) map(_.all().map(buildTweet).toList)
поэтому дает нам Future[List[Tweet]]
, что мучительно близко к тому, что нам нужно. Мы не хотим блокировать результат, и мы слишком ленивы, чтобы использовать эту onSuccess
функцию, потому что все, что она делает, — передает результат в sender
. Таким образом, вместо этого, мы труба успех будущего на sender
этом завершает картину, объясняя всю линию session.executeAsync(query) map(_.all().map(buildTweet).toList) pipeTo sender
.
Подключение к Кассандре
Прежде чем двигаться дальше, я должен объяснить, откуда Cluster
берется это значение. Думая о системе, которую мы пишем, нам могут потребоваться разные значения Cluster
для тестов и для основной системы. Более того, тесту Cluster
, скорее всего, понадобится специальная настройка. Поскольку я пока не могу решить, я бы просто определил, что есть CassandraCluster
черта, которая возвращает Cluster
; и дать реализации, которые правильно делают то, что загружает конфигурацию из ActorSystem
конфигурации, и ту, которая жестко запрограммирована для использования в тестах.
trait CassandraCluster { def cluster: Cluster }
Реализация на основе конфигурации и конфигурация теста отличаются только значениями, которые они используют для создания Cluster
экземпляра.
// in src/scala/main trait ConfigCassandraCluster extends CassandraCluster { def system: ActorSystem private def config = system.settings.config import scala.collection.JavaConversions._ private val cassandraConfig = config.getConfig( "akka-cassandra.main.db.cassandra") private val port = cassandraConfig.getInt("port") private val hosts = cassandraConfig.getStringList("hosts").toList lazy val cluster: Cluster = Cluster.builder(). addContactPoints(hosts: _*). withCompression(ProtocolOptions.Compression.SNAPPY). withPort(port). build() } // in src/scala/test trait TestCassandraCluster extends CassandraCluster { def system: ActorSystem private def config = system.settings.config import scala.collection.JavaConversions._ private val cassandraConfig = config.getConfig( "akka-cassandra.test.db.cassandra") private val port = cassandraConfig.getInt("port") private val hosts = cassandraConfig.getStringList("hosts").toList lazy val cluster: Cluster = Cluster.builder(). addContactPoints(hosts: _*). withPort(port). withCompression(ProtocolOptions.Compression.SNAPPY). build() }
Это позволяет мне смешивать подходящие черты и правильно настраивать их Cluster
. Но когда дело доходит до тестов для тестов, есть небольшой поворот: я хочу, чтобы кластер был в известном состоянии. Чтобы решить эту проблему, я создаю CleanCassandra
черту, которая сбрасывает Cluster
некоторые CassandraCluster.cluster
.
trait CleanCassandra extends SpecificationStructure { this: CassandraCluster => private def runClq(session: Session, file: File): Unit = { val query = Source.fromFile(file).mkString query.split(";").foreach(session.execute) } private def runAllClqs(): Unit = { val session = cluster.connect(Keyspaces.akkaCassandra) val uri = getClass.getResource("/").toURI new File(uri).listFiles().foreach { file => if (file.getName.endsWith(".cql")) runClq(session, file) } session.shutdown() } override def map(fs: => Fragments) = super.map(fs) insert Step(runAllClqs()) }
Когда я смешиваю эту черту в моем тесте, она регистрирует runAllClqs()
шаги, которые должны быть выполнены перед всеми другими шагами в тесте.
тестирование
И так, я могу написать свой первый тест, который проверяет, что TwitterReaderActor
и TwitterWriterActor
действительно работает, как ожидалось. Тело теста довольно длинное, но не так сложно концептуально следить за происходящим.
class TweetActorsSpec extends TestKit(ActorSystem()) with SpecificationLike with TestCassandraCluster with CleanCassandra with ImplicitSender { sequential val writer = TestActorRef(new TweetWriterActor(cluster)) val reader = TestActorRef(new TweetReaderActor(cluster)) "Slow & steady" >> { def write(count: Int): List[Tweet] = { val tweets = (1 to count).map(id => Tweet(id.toString, "@honzam399", "Yay!", new Date)) tweets.foreach(writer !) Thread.sleep(1000) // wait for the tweets to hit the db tweets.toList } "Single tweet" in { val tweet = write(1).head reader ! FindAll(1) val res = expectMsgType[List[Tweet]] res mustEqual List(tweet) } "100 tweets" in { val writtenTweets = write(100) reader ! FindAll(100) val readTweets = expectMsgType[List[Tweet]] readTweets must containTheSameElementsAs(writtenTweets) } } }
Мы смешиваем множество компонентов, чтобы собрать тест. Прежде всего, мы расширяем TestKit
, давая ему ActorSystem()
параметр конструктора; Затем мы смешиваем в Specs2 SpecificationLike
, а затем в нашей тестовой среде Cassandra, дополняя картину ImplicitSender
до, чтобы мы могли изучить ответы.
Фактическое тело "Slow & steady"
спецификации проверяет, что мы можем написать одно чтение и 100 твитов.
Перед запуском теста вы должны убедиться, что у вас запущена Cassandra и что вы создали правильные пространства клавиш. Чтобы сделать вашу жизнь проще, вы можете просто запустить скрипты CQL в src/data
. Вам нужно запустить в последовательности
keyspaces.cql Then, in the correct keyspace: tables.cql words.cql
Сканирование твитов
Onwards Now that we know that we can safely store and retrieve the tweets from Cassandra, we need to write the component that is going to download them. In our system, this is the TweetScannerActor
. It receives a message of type String
, and it performs the HTTP request to download the tweets. (To keep this tutorial simple, I’m using the convenient Twitter proxy at “http//twitter-search-proxy.herokuapp.com/search/tweets. In any case, the task for the scanner actor is to construct the HTTP request, receive the response, turn it into List[Tweet]
and send that list to the ActorRef
of theTweetWriterActor
.
class TweetScannerActor(tweetWrite: ActorRef, queryUrl: String => String) extends Actor with TweetMarshaller { import context.dispatcher import akka.pattern.pipe private val pipeline = sendReceive ~> unmarshal[List[Tweet]] def receive: Receive = { case query: String => pipeline(Get(queryUrl(query))) pipeTo tweetWrite } }
It is actually that simple We use Spray-Client to construct the HTTP pipeline, which makes HTTP request (sendReceive
), and passes the raw HTTP response to be unmarshalled (that is, turned into instance of types in our systems).
The pipeline
starts its job when it is applied to HttpRequest
; in our case, Get(url: String)
represents a mechanism that can construct such HttpRequest
s. When applied to the query
, the functionqueryUrl
returns the actual URL for the pipeline to work on.
Execution of the pipeline
returns Future[List[Tweet]]
, which we can happily pipeTo
the tweetWrite
actor.
The only job that remains is for us to implement the unmarshaller. In Spray-Client’s case unmarshaller is a typeclass and the implementation is an instance of the typeclass. The easiest way to think about typeclasses is to imagine that typeclass is a trait which defines behaviour for some type, and that the typeclass instance is the implementation of that trait for some type.
In Spray-Client’s case, the typeclass is trait Unmarshaller[A]
, whose apply
method takes HttpEntity
and returns Deserialized[A]
. The name apply
should ring some bells–and indeed, Unmarshaller[A]
is in essence an alias for trait Unmarshaller[A] extends (HttpEntity => Deserialized[A])
. (Yes, you can extend (A = B) in Scala, which is syntactic sugar for trait Unmarshaller[A] extends Function1[HttpEntity, Deserialized[A]]
.) Now, the unmarshal
directive we used earlier is defined as
def unmarshal[A : Unmarshaller]: HttpResponse => A
The : Unmarshaller
is a context bound on the type parameter A
, which causes the compiler to expand the function into
def unmarshal[A](implicit ev: Unmarshaller[A]): HttpResponse => A
The unmarshal
function expects an instance of the typeclass Unmarshaller
for some type A
; in our case, we specify the type A
to be List[Tweet]
. We can make a mental substitution of A
for List[Tweet]
and arrive at unmarshal[List[Tweet]](implicit ev: Unmarshaller[List[Tweet]]): ...
. To make the application work, there needs to be a value of type Unmarshaller[List[Tweet]]
in the current implicit scope. When we give such value, we say that we are giving instance of the Unmarshaller
typeclass.
trait TweetMarshaller { type Tweets = List[Tweet] implicit object TweetUnmarshaller extends Unmarshaller[Tweets] { val dateFormat = new SimpleDateFormat("EEE MMM d HH:mm:ss Z yyyy") def mkTweet(status: JsValue): Deserialized[Tweet] = { val json = status.asJsObject ... } def apply(entity: HttpEntity): Deserialized[Tweets] = { val json = JsonParser(entity.asString).asJsObject ... } } }
Our typeclass instance is the TweetUnmarshaller
singleton, which extends Unmarshaller[Tweets]
. Notice that I have also defined a type alias type Tweets = List[Tweet]
so that I don’t have to write too many square brackets. By extending Unmarshaller[Tweets]
, we must implement the apply
method, which is applied to the HttpEntity
and should return either deserialized tweets or indicate an error.
We nearly have everything in place. But how do we satisfy ourselves that the TweetScannerActor
indeed works?
Testing the TweetScannerActor
To test the scanner fully, we would like to use a well-known service. But where do we get it? We can’t really use the live service, because the tweets keep changing. It seems that the only way would be for us to implement a mock service and use it in our tests.
class TweetScanActorSpec extends TestKit(ActorSystem()) with SpecificationLike with ImplicitSender { sequential val port = 12345 def testQueryUrl(query: String) = s"http://localhost:$port/q=$query" val tweetScan = TestActorRef(new TweetScannerActor(testActor, testQueryUrl)) "Getting all 'typesafe' tweets" >> { "should return more than 10 last entries" in { val twitterApi = TwitterApi(port) tweetScan ! "typesafe" Thread.sleep(1000) val tweets = expectMsgType[List[Tweet]] tweets.size mustEqual 4 twitterApi.stop() success } } }
When constructing the TweetScannerActor
, we give it the testActor
and a function that returns URLs onlocalhost
on some port
. In the body of the example, we start the mock TwitterApi
on the given port; and use our TweetScannerActor
to make the HTTP request. Because we gave the testActor
as the writerActorRef
, we should now be able to see the List[Tweet]
that would have been sent to theTweetWriterActor
.
Because our mock tweetset contains four tweets, we can make the assertion that the list indeed contains four tweets. (I leave more extensive testing as exercise for the reader.)
Main
I am now satisfied that the components in the system work as expected; I can therefore assemble the App
object, which brings everything together in a command-line interface. I give you the Main
object
object Main extends App with ConfigCassandraCluster { import Commands._ import akka.actor.ActorDSL._ def twitterSearchProxy(query: String) = s"http://twitter-search-proxy.herokuapp.com/search/tweets?q=$query" implicit lazy val system = ActorSystem() val write = system.actorOf(Props(new TweetWriterActor(cluster))) val read = system.actorOf(Props(new TweetReaderActor(cluster))) val scan = system.actorOf(Props( new TweetScannerActor(write, twitterSearchProxy))) // we don't want to bother with the ``ask`` pattern, so // we set up sender that only prints out the responses to // be implicitly available for ``tell`` to pick up. implicit val _ = actor(new Act { become { case x => println(">>> " + x) } }) @tailrec private def commandLoop(): Unit = { Console.readLine() match { case QuitCommand => return case ScanCommand(query) => scan ! query.toString case ListCommand(count) => read ! FindAll(count.toInt) case CountCommand => read ! CountAll case _ => println("WTF??!!") } commandLoop() } // start processing the commands commandLoop() // when done, stop the ActorSystem system.shutdown() }
We have the main commandLoop()
function, which reads the line from standard input, matches it against the commands and sends the appropriate messages to the right actors. It also mixes in the “real” source of Cassandra Cluster
values and specifies the live function that constructs the URL to retrieve the tweets.
For interested readers TwitterApi
The TwitterApi
is the mock version of the real Twitter Proxy API. It makes it easy to write repeatable and independent tests of the TweetScannerActor
. Under the hood, it is implemented using Spray-Can and the HTTP Akka Extension. The intention is that upon construction it binds to the given port and responds to every GET request with the given body. To shutdown the API, you must call the stop()
method. To give me greater control over the construction of the class, I define the constructor as private and give a companion object whose apply
method returns properly constructed and bound TwitterApi
.
class TwitterApi private(system: ActorSystem, port: Int, body: String) { val blackHoleActor = system.actorOf(Props(new Actor { def receive: Receive = Actor.emptyBehavior })) private class Service extends Actor { def receive: Receive = { case _: Http.Connected => sender ! Http.Register(self) case HttpRequest(HttpMethods.GET, _, _, _, _) => sender ! HttpResponse(entity = HttpEntity(body)) case _ => } } private val service = system.actorOf( Props(new Service).withRouter(RoundRobinRouter(nrOfInstances = 50))) private val io = IO(Http)(system) io.tell(Http.Bind(service, "localhost", port = port), blackHoleActor) def stop(): Unit = { io.tell(Http.Unbind, blackHoleActor) system.stop(service) system.stop(io) } } object TwitterApi { def apply(port: Int)(implicit system: ActorSystem): TwitterApi = { val body = Source.fromInputStream( getClass.getResourceAsStream("/tweets.json")).mkString new TwitterApi(system, port, body) } }
Calling TwitterApi(1234)
with an implicit ActorSystem
in scope (for example in a TestKit
test) loads the body from a well-known location on the classpath and then constructs the TwitterApi
instance, passing it the ActorSystem
, port
, and body
. In the body of the TwitterApi
class, I have an Actor
that serves the HTTP requests, which is then used in the Bind
message sent to the io
extension.
The service is bound to the HTTP server until the stop()
method is called. The stop()
method unbinds the service
, and stops it and the io
extension. (You would typically do this at the end of your example.)
For interested readers sentiment.R
Now, let’s complete the picture with some mood analysis in R. I am trying to find if people are happy or unhappy about the tweets. To do so, I use a list of positive and negative words, which I store in my Cassandra positivewords
and negativewords
tables.
Grab the code from https://github.com/eigengo/activator-akka-cassandra