В этом уроке я собираюсь использовать Spray Client, Akka IO и ядро Akka для создания приложения, которое транслирует твиты, а затем выполняет тривиальный анализ полученных данных. В нем показано, как создать простое приложение Akka с несколькими действующими лицами, как использовать Akka IO для выполнения HTTP-запросов, как подключиться к OAuth и как работать с потоковым вводом. Также демонстрируются подходы к тестированию таких приложений.
Это должно позволить нам отслеживать конкретные темы в Твиттере и анализировать полученные твиты, давая следующий результат:
Давайте начнем с демонстрации общей структуры кода, который мы создаем.
Компоненты в синем — это актеры, компоненты в оранжевом — это черты; и мы будем предоставлять некоторые интересные реализации этих черт.
Ядро
Я начну с построения ядра нашей системы. Он содержит двух участников, один из которых связан с HTTP-соединением с Twitter, а другой — с обработкой твитов по мере их поступления. В коде они являются TweetStreamerActor
и SentimentAnalysisActor
. TweetStreamerActor
Нужен URI для подключения и ссылки на актера , который собирается провести анализ настроений. Таким образом, мы приходим к
object TweetStreamerActor { val twitterUri = Uri("https://stream.twitter.com/1.1/statuses/filter.json") } class TweetStreamerActor(uri: Uri, processor: ActorRef) extends Actor { ... } class SentimentAnalysisActor extends Actor { ... }
Для нашего удобства я включил TweetStreamerActor
объект-компаньон, который содержит URI для потокового API Twitter. Чтобы построить наше приложение, все, что нам нужно сделать, это создать экземпляры актеров в правильной последовательности
object Main extends App { import Commands._ val system = ActorSystem() val sentiment = system.actorOf(Props(new SentimentAnalysisActor)) val stream = system.actorOf(Props( new TweetStreamerActor(TweetStreamerActor.twitterUri, sentiment))) ... }
Потоковые твиты
Далее, давайте напишем ядро TweetStreamerActor
конструктора HTTP-актора Akka IO, а затем создадим соответствующее, HttpRequest
которое мы отправляем заданному uri
.
class TweetStreamerActor(uri: Uri, processor: ActorRef) extends Actor { val io = IO(Http)(context.system) def receive: Receive = { case query: String => val body = HttpEntity( ContentType(MediaTypes.`application/x-www-form-urlencoded`), s"track=$query") val rq = HttpRequest(HttpMethods.POST, uri = uri, entity = body) sendTo(io).withResponsesReceivedBy(self)(rq) case MessageChunk(entity, _) => case _ => } }
Рассекая TweetStreamerActor
, мы сначала получаем ссылку ( ActorRef
не меньше) на HTTP-менеджер, вызывая IO(Http)(system.context)
. Этот актер отвечает за все HTTP-коммуникации; в нашем случае мы будем отправлять HttpRequest
s на него.
И это именно то, что мы делаем в receive
функции. Когда query: String
приходит сообщение, мы создаем HTTP-запрос, а затем вызываем sendTo(io).withResponsesReceivedBy(self)(rq)
, что, по-человечески говоря, означает взять субъект IO (sendTo (io)), заставить его отправить все полученные ответы этому субъекту (withResponsesReceivedBy (self)) и применить просьба к нему (rq) . И поэтому, спрей клиент собирается отправить ответ на запрос к этому актеру, который означает , что мы должны обрабатывать ChunkedResponseStart
, MessageChunk
и много других сообщений HTTP. Тем не менее, единственное сообщение уровня HTTP, которое нас действительно интересует, это тот MessageChunk
, который entity
содержит JSON, представляющий каждый твит.
Будучи строго типизированными, мы хотели бы иметь дело с нашими собственными типами, а не с JSON String
. Итак, мы должны реализовать unmarshaller, который может превратить сущность в экземпляр Tweet
. Для этого мы будем использовать модуль Spray JSON и определим экземпляр класса Unmarshaller
типов для этого типа Tweet
.
trait TweetMarshaller { implicit object TweetUnmarshaller extends Unmarshaller[Tweet] { def apply(entity: HttpEntity): Deserialized[Tweet] = { ??? } } }
Я не буду показывать всю скучную гимнастику JSON; вместо этого я просто обрисую основные моменты. Наш экземпляр класса типов должен реализовывать apply(HttpEntity): Deserialized[Tweet]
метод; Deserialized
это псевдоним типа для Either[DeserializationError, T]
.
trait TweetMarshaller { implicit object TweetUnmarshaller extends Unmarshaller[Tweet] { def mkUser(user: JsObject): Deserialized[User] = ... def mkPlace(place: JsValue): Deserialized[Option[Place]] = ... def apply(entity: HttpEntity): Deserialized[Tweet] = { Try { val json = JsonParser(entity.asString).asJsObject (json.fields.get("id_str"), json.fields.get("text"), json.fields.get("place"), json.fields.get("user")) match { case (Some(JsString(id)), Some(JsString(text)), Some(place), Some(user: JsObject)) => val x = mkUser(user).fold(x => Left(x), { user => mkPlace(place).fold(x => Left(x), { place => Right(Tweet(id, user, text, place)) }) }) x case _ => Left(MalformedContent("bad tweet")) } } }.getOrElse(Left(MalformedContent("bad json"))) } }
Складывая экземпляры десериализованного [_], мы получаем код, который может (безопасно) превратить JSON, который может представлять твит, в экземпляр твита. Теперь давайте добавим TweetMarshaller в ourTweetStreamerActor и используем его при работе с сообщением MessageChunk.
class TweetStreamerActor(uri: Uri, processor: ActorRef) extends Actor with TweetMarshaller { def receive: Receive = { ... case MessageChunk(entity, _) => TweetUnmarshaller(entity).fold(_ => (), processor !) case _ => } }
Обратите внимание, что я смешал в TweetMarshaller с актером ключевое слово with. Это дает мне доступ к экземпляру класса типов TweetUnmarshaller, и я вызываю его метод apply для полученного объекта. Затем он дает мне десериализованный [Tweet], складывает результат в (), «игнорируя» значения слева и отправляя значения справа процессору. Это краткий способ написания обычного сопоставления с образцом
class TweetStreamerActor(uri: Uri, processor: ActorRef) extends Actor with TweetMarshaller { def receive: Receive = { ... case MessageChunk(entity, _) => TweetUnmarshaller(entity) match { case Right(tweet) => processor ! tweet case _ => } } }
Onwards. Давайте посмотрим, работает ли наш код, как ожидается, написав тест; тест, который создает HTTP-сервер, который обслуживает твит-ответы, такие как настоящий API Twitter, и с помощью TweetStreamerActor проверить, что он может обрабатывать полученные ответы. Тест полностью умещается всего в нескольких строках кода.
class TweetStreamerActorSpec extends TestKit(ActorSystem()) with SpecificationLike with ImplicitSender { sequential val port = 12345 val tweetStream = TestActorRef( new TweetStreamerActor(Uri(s"http://localhost:$port/"), testActor)) "Streaming tweets" >> { "Should unmarshal one tweet" in { val twitterApi = TwitterApi(port) tweetStream ! "quux" // our TwitterApi does not care val tweet = expectMsgType[Tweet] tweet.text mustEqual "Aggressive Ponytail #freebandnames" tweet.user.lang mustEqual "en" tweet.user.id mustEqual "137238150" tweet.place mustEqual None twitterApi.stop() success } } }
Обратите внимание на то, как я максимально использую TestKit от Akka и использую testActorRef в качестве параметра процессора TweetStreamerActor. Это позволяет мне изучить полученные ответы, а именно lineexpectMsgType [Tweet]. Прежде чем мы перейдем к настоящему API-интерфейсу Twitter, я покажу реализацию theTwitterApi нашего HTTP-сервера, предназначенного только для тестирования, который имитирует API-интерфейс Twitter.
class TwitterApi private(system: ActorSystem, port: Int, body: String) { private class Service extends Actor { def receive: Receive = { case _: Http.Connected => sender ! Http.Register(self) case HttpRequest(HttpMethods.POST, _, _, _, _) => sender ! ChunkedResponseStart(HttpResponse(StatusCodes.OK)) sender ! MessageChunk(body = body) sender ! ChunkedMessageEnd() } } val service = system.actorOf(Props(new Service)) val io = IO(Http)(system) io ! Http.Bind(service, "localhost", port = port) def stop(): Unit = { io ! Http.Unbind system.stop(service) system.stop(io) } } object TwitterApi { def apply(port: Int)(implicit system: ActorSystem): TwitterApi = { val body = Source.fromInputStream( getClass.getResourceAsStream("/tweet.json")).mkString new TwitterApi(system, port, body) } }
Он также использует расширение Http от Akka IO, но на этот раз для привязки Service
к localhost
интерфейсу по заданному port
. Всякий раз, когда клиент подключается (отправляя Http.Connected
сообщение), я регистрирую того же актера для обработки запросов от этого клиента, отвечая с Http.Register(self)
. (Это означает, что наш Service
обработчик одноэлементный, все клиенты обрабатываются одним и тем же субъектом.) Поскольку я зарегистрировался self
как обработчик для всех клиентских подключений, я должен реагировать на HttpRequest
сообщения, которые отправляют клиенты. В моем случае я отвечаю на каждый HTTP POST, разбивая содержимое /tweet.json
ресурса на части.
OAuth
Прежде чем я смогу перейти к коду больших данных , я должен разобраться с авторизацией, которую требует Twitter Twitter требует OAuth-авторизации всех запросов. Наиболее просто, это означает добавление Authorization
заголовка HTTP с правильно сконструированным значением. Иными словами, авторизовать a HttpRequest
— это взять исходный запрос и вернуть новый HttpRequest
, содержащий соответствующий заголовок. Чтобы сделать это в контексте OAuth, мне также нужно знать токен и секретный ключ потребителя, а также ключ и секретный токен. И этого достаточно, чтобы я мог дать набросок в Scala
object OAuth { case class Consumer(key: String, secret: String) case class Token(value: String, secret: String) def oAuthAuthorizer(consumer: Consumer, token: Token): HttpRequest => HttpRequest = { // magic ??? } }
Форма кода выше соответствует тому, что я сказал выше для OAuth-авторизации HttpRequest, мне нужно знать детали потребителя и токен ; и процесс авторизации принимает несанкционированный доступ HttpRequest
и добавляет к нему требуемое разрешение.
Теперь я могу добавить авторизацию OAuth к TweetStreamerActor
. Однако вместо того, чтобы просто «жестко кодировать» его, я определю несколько черт, которые позволят мне контролировать экземпляры TweetStreamerActor
s. Я определяю
trait TwitterAuthorization { def authorize: HttpRequest => HttpRequest }
А затем требуют, чтобы экземпляры экземпляра TweetStreamerActor
были созданы с соответствующей реализацией TwitterAuthorization
. Другими словами, шаблон Cake с аннотациями собственного типа.
class TweetStreamerActor(uri: Uri, processor: ActorRef) extends Actor with TweetMarshaller { this: TwitterAuthorization => ... }
Я также предоставляю реализацию, TwitterAuthorization
которая использует механизм OAuth и загружает данные о потребителе и токене из файла. (Чтобы вы не включили данные авторизации в свой код.)
trait OAuthTwitterAuthorization extends TwitterAuthorization { import OAuth._ val home = System.getProperty("user.home") val lines = Source.fromFile(s"$home/.twitter/activator").getLines().toList val consumer = Consumer(lines(0), lines(1)) val token = Token(lines(2), lines(3)) val authorize = oAuthAuthorizer(consumer, token) }
В частности, обратите внимание, что у меня def authorize: HttpRequest => HttpRequest
есть поле authorize
, значение которого вычисляется путем применения oAuthAuthorizer
к потребителю и токену. Наконец, я должен применить авторизацию к тем, которые HttpRequest
я отправляю в TweetStreamerActor
. Нет ничего проще. Я просто добавляю ~> authorize
к HttpRequest
я создаю при обработке query: String
сообщения.
class TweetStreamerActor(uri: Uri, processor: ActorRef) extends Actor with TweetMarshaller { this: TwitterAuthorization => ... def receive: Receive = { case query: String => val body = HttpEntity( ContentType(MediaTypes.`application/x-www-form-urlencoded`), s"track=$query") val rq = HttpRequest(HttpMethods.POST, uri = uri, entity = body) ~> authorize sendTo(io).withResponsesReceivedBy(self)(rq) ... } }
Для завершения мне нужно изменить тест и приложение, чтобы они соответствовали аннотации самостоятельного типа. Тест на самом деле не требует авторизации
class TweetStreamerActorSpec extends TestKit(ActorSystem()) with SpecificationLike with ImplicitSender { sequential val port = 12345 val tweetStream = TestActorRef( new TweetStreamerActor(Uri(s"http://localhost:$port/"), testActor) with TwitterAuthorization { def authorize = identity }) ... }
Я должен реализовать authorize
член, который возвращает HttpRequest => HttpRequest
функцию. Но для тестов возвращаемая функция возвращает заданное значение; следовательно, это identity
функция.
В приложении я смешиваю OAuthTwitterAuthorization
черту при построении TweetStreamerActor
.
object Main extends App { import Commands._ val system = ActorSystem() val sentiment = system.actorOf(Props(new SentimentAnalysisActor)) val stream = system.actorOf(Props( new TweetStreamerActor(TweetStreamerActor.twitterUri, sentiment) with OAuthTwitterAuthorization)) ... }
Теперь у меня есть код, который успешно транслирует твиты из потокового API Twitter, у меня есть авторизация OAuth; последний компонент, который мне нужен, это анализ настроений.
Анализ настроений
Анализ настроений получает Tweet
экземпляры и должен анализировать твиты. Для проведения анализа мне понадобятся наборы положительных и отрицательных слов; и способ отображения вывода. Я также хотел бы иметь некоторую гибкость в создании анализатора настроений. И вот, я прихожу к
trait SentimentSets { def positiveWords: Set[String] def negativeWords: Set[String] } trait SentimentOutput { type Category = String def outputCount(values: List[Iterable[(Category, Int)]]): Unit }
И требуют, SentimentAnalysisActor
чтобы был создан экземпляр с соответствующими реализациями SentimentSets
и SentimentOutput
.
class SentimentAnalysisActor extends Actor { this: SentimentSets with SentimentOutput => ... }
Когда SentimentAnalysisActor
получает a Tweet
, он узнает, находится ли его текст в positiveWords
или negativeWords
, увеличивая количество положительных и отрицательных твитов, соответственно. Он также отслеживает количество твитов в зависимости от места и языка твита. Без лишних слов, все SentimentAnalysisActor
просто
class SentimentAnalysisActor extends Actor { this: SentimentSets with SentimentOutput => import collection._ private val counts = mutable.Map[Category, Int]() private val languages = mutable.Map[Category, Int]() private val places = mutable.Map[Category, Int]() private def update(data: mutable.Map[Category, Int]) (category: Category, delta: Int): Unit = data.put(category, data.getOrElse(category, 0) + delta) val updateCounts = update(counts)_ val updateLanguages = update(languages)_ val updatePlaces = update(places)_ def receive: Receive = { case tweet: Tweet => val text = tweet.text.toLowerCase val positive = if (positiveWords.exists(text contains)) 1 else 0 val negative = if (negativeWords.exists(text contains)) 1 else 0 updateCounts("positive", positive) updateCounts("negative", negative) if (tweet.user.followersCount > 200) { updateCounts("positive.gurus", positive) updateCounts("negative.gurus", negative) } updateCounts("all", 1) updateLanguages(tweet.user.lang, 1) updatePlaces(tweet.place.toString, 1) outputCount(List(counts, places, languages)) } }
Существует одна реализации из , который загружает наборы настроения из CSV — файлов на пути к классам; Существует также одна реализация , которая отображает вывод на терминал ANSI.SentimentSets
CSVLoadedSentimentSets
SentimentOutput
«Правильный» экземпляр SentimentAnalysisActor
в приложении становится
object Main extends App { import Commands._ val system = ActorSystem() val sentiment = system.actorOf(Props( new SentimentAnalysisActor with CSVLoadedSentimentSets with AnsiConsoleSentimentOutput)) val stream = system.actorOf(Props( new TweetStreamerActor(TweetStreamerActor.twitterUri, sentiment) with OAuthTwitterAuthorization)) ... }
Приложение
Теперь, когда у нас есть весь необходимый код, мы можем создать приложение, которое вы можете запустить. Он запускает простой командный цикл, который читает стандартный ввод, интерпретирует введенные вами команды и отправляет сообщения в TweetStreamerActor
.
object Main extends App { import Commands._ val system = ActorSystem() val sentiment = system.actorOf(Props( new SentimentAnalysisActor with CSVLoadedSentimentSets with AnsiConsoleSentimentOutput)) val stream = system.actorOf(Props( new TweetStreamerActor(TweetStreamerActor.twitterUri, sentiment) with OAuthTwitterAuthorization)) @tailrec private def commandLoop(): Unit = { Console.readLine() match { case QuitCommand => return case TrackCommand(query) => stream ! query case _ => println("WTF??!!") } commandLoop() } commandLoop() system.shutdown() } object Commands { val QuitCommand = "quit" val TrackCommand = "track (.*)".r }
Перед запуском приложения не забудьте создать ~/.twitter/activator
файл, содержащий четыре строки; эти строки представляют ваш потребительский ключ и секрет в твиттере, за которыми следуют значение токена и секрет токена. Чтобы сгенерировать эти значения, перейдите по адресу https:// dev.twitter.com / apps /, создайте приложение и добавьте соответствующие строки в этот файл. Примером ~ / .twitter / activator является
*************TqOdlxA ****************************Fv9b1ELexCRhI ********-*************************GUjmnWQvZ5GwnBR2 ***********************************ybgUNqrZwD
Естественно, вам нужно будет заменить *
s на значения в вашем потребительском токене и секрете; и значение токена и секрет.
Добавив файл выше, вы можете увидеть приложение «в действии», запустив его sbt run
в терминале ANSI. После запуска, введите track christmas
, track daley
или что — нибудь еще , что щекочет ваши фантазии и торжествует в коллективной мудрости человечества.
возбудитель
Код представляет собой активатор Typesafe, перейдите по ссылке http://typesafe.com/activator, чтобы загрузить его, или по адресу: http://github.com/eigengo/activator-spray-twitter, если вы хотите создать приложение самостоятельно, используя Sbt.