Статьи

Потоковое API Twitter в Akka и Spray

В этом уроке я собираюсь использовать Spray Client, Akka IO и ядро ​​Akka для создания приложения, которое транслирует твиты, а затем выполняет тривиальный анализ полученных данных. В нем показано, как создать простое приложение Akka с несколькими действующими лицами, как использовать Akka IO для выполнения HTTP-запросов, как подключиться к OAuth и как работать с потоковым вводом. Также демонстрируются подходы к тестированию таких приложений.

Это должно позволить нам отслеживать конкретные темы в Твиттере и анализировать полученные твиты, давая следующий результат:

настроение

Давайте начнем с демонстрации общей структуры кода, который мы создаем.

в общем и целом

Компоненты в синем — это актеры, компоненты в оранжевом — это черты; и мы будем предоставлять некоторые интересные реализации этих черт.

Ядро

Я начну с построения ядра нашей системы. Он содержит двух участников, один из которых связан с HTTP-соединением с Twitter, а другой — с обработкой твитов по мере их поступления. В коде они являются  TweetStreamerActorи  SentimentAnalysisActorTweetStreamerActor Нужен URI для подключения и ссылки на актера , который собирается провести анализ настроений. Таким образом, мы приходим к

object TweetStreamerActor {
val twitterUri = Uri("https://stream.twitter.com/1.1/statuses/filter.json")
}




class TweetStreamerActor(uri: Uri, processor: ActorRef) extends Actor {
...
}




class SentimentAnalysisActor extends Actor {
...
}

Для нашего удобства я включил  TweetStreamerActor объект-компаньон, который содержит URI для потокового API Twitter. Чтобы построить наше приложение, все, что нам нужно сделать, это создать экземпляры актеров в правильной последовательности

object Main extends App {
import Commands._




val system = ActorSystem()
val sentiment = system.actorOf(Props(new SentimentAnalysisActor))
val stream = system.actorOf(Props(
new TweetStreamerActor(TweetStreamerActor.twitterUri, sentiment)))
...
}

Потоковые твиты

Далее, давайте напишем ядро  TweetStreamerActor конструктора HTTP-актора Akka IO, а затем создадим соответствующее,  HttpRequest которое мы отправляем заданному  uri.

class TweetStreamerActor(uri: Uri, processor: ActorRef) extends Actor {
val io = IO(Http)(context.system)




def receive: Receive = {
case query: String =>
val body = HttpEntity(
ContentType(MediaTypes.`application/x-www-form-urlencoded`), 
s"track=$query")
val rq = HttpRequest(HttpMethods.POST, uri = uri, entity = body)
sendTo(io).withResponsesReceivedBy(self)(rq)
case MessageChunk(entity, _) =>
case _ =>
}
}

Рассекая  TweetStreamerActor, мы сначала получаем ссылку (  ActorRef не меньше) на HTTP-менеджер, вызывая  IO(Http)(system.context). Этот актер отвечает за все HTTP-коммуникации; в нашем случае мы будем отправлять  HttpRequests на него.

И это именно то, что мы делаем в  receive функции. Когда  query: String приходит сообщение, мы создаем HTTP-запрос, а затем вызываем  sendTo(io).withResponsesReceivedBy(self)(rq), что, по-человечески говоря, означает  взять субъект IO (sendTo (io)), заставить его отправить все полученные ответы этому субъекту (withResponsesReceivedBy (self)) и применить просьба к нему (rq) . И поэтому, спрей клиент собирается отправить ответ на запрос к этому актеру, который означает , что мы должны обрабатывать  ChunkedResponseStart, MessageChunkи много других сообщений HTTP. Тем не менее, единственное сообщение уровня HTTP, которое нас действительно интересует, это тот  MessageChunk, который  entity содержит JSON, представляющий каждый твит.

Будучи строго типизированными, мы хотели бы иметь дело с нашими собственными типами, а не с JSON  String. Итак, мы должны реализовать unmarshaller, который может превратить сущность в экземпляр  Tweet. Для этого мы будем использовать модуль Spray JSON и определим экземпляр класса  Unmarshaller типов для этого типа  Tweet.

trait TweetMarshaller {




implicit object TweetUnmarshaller extends Unmarshaller[Tweet] {
def apply(entity: HttpEntity): Deserialized[Tweet] = {
???
}
}
}

Я не буду показывать всю скучную гимнастику JSON; вместо этого я просто обрисую основные моменты. Наш экземпляр класса типов должен реализовывать  apply(HttpEntity): Deserialized[Tweet] метод; Deserializedэто псевдоним типа для  Either[DeserializationError, T].

trait TweetMarshaller {




implicit object TweetUnmarshaller extends Unmarshaller[Tweet] {




def mkUser(user: JsObject): Deserialized[User] = ...




def mkPlace(place: JsValue): Deserialized[Option[Place]] = ...




def apply(entity: HttpEntity): Deserialized[Tweet] = {
Try {
val json = JsonParser(entity.asString).asJsObject
(json.fields.get("id_str"), json.fields.get("text"), 
json.fields.get("place"), json.fields.get("user")) match {
case (Some(JsString(id)), Some(JsString(text)), 
Some(place), Some(user: JsObject)) =>
val x = mkUser(user).fold(x => Left(x), { user =>
mkPlace(place).fold(x => Left(x), { place =>
Right(Tweet(id, user, text, place))
})
})
x
case _ => Left(MalformedContent("bad tweet"))
}
}
}.getOrElse(Left(MalformedContent("bad json")))
}
}

Складывая экземпляры десериализованного [_], мы получаем код, который может (безопасно) превратить JSON, который может представлять твит, в экземпляр твита. Теперь давайте добавим TweetMarshaller в ourTweetStreamerActor и используем его при работе с сообщением MessageChunk.

class TweetStreamerActor(uri: Uri, processor: ActorRef) extends Actor 
with TweetMarshaller {




def receive: Receive = {
...
case MessageChunk(entity, _) => 
TweetUnmarshaller(entity).fold(_ => (), processor !)
case _ =>
}
}

Обратите внимание, что я смешал в TweetMarshaller с актером ключевое слово with. Это дает мне доступ к экземпляру класса типов TweetUnmarshaller, и я вызываю его метод apply для полученного объекта. Затем он дает мне десериализованный [Tweet], складывает результат в (), «игнорируя» значения слева и отправляя значения справа процессору. Это краткий способ написания обычного сопоставления с образцом

class TweetStreamerActor(uri: Uri, processor: ActorRef) extends Actor 
with TweetMarshaller {




def receive: Receive = {
...
case MessageChunk(entity, _) =>
TweetUnmarshaller(entity) match {
case Right(tweet) => processor ! tweet
case _ =>
}
}
}

Onwards. Давайте посмотрим, работает ли наш код, как ожидается, написав тест; тест, который создает HTTP-сервер, который обслуживает твит-ответы, такие как настоящий API Twitter, и с помощью TweetStreamerActor проверить, что он может обрабатывать полученные ответы. Тест полностью умещается всего в нескольких строках кода.

class TweetStreamerActorSpec extends TestKit(ActorSystem()) 
with SpecificationLike with ImplicitSender {
sequential




val port = 12345
val tweetStream = TestActorRef(
new TweetStreamerActor(Uri(s"http://localhost:$port/"), testActor))




"Streaming tweets" >> {




"Should unmarshal one tweet" in {
val twitterApi = TwitterApi(port)
tweetStream ! "quux" // our TwitterApi does not care




val tweet = expectMsgType[Tweet]
tweet.text mustEqual "Aggressive Ponytail #freebandnames"
tweet.user.lang mustEqual "en"
tweet.user.id mustEqual "137238150"
tweet.place mustEqual None
twitterApi.stop()
success
}
}




}

Обратите внимание на то, как я максимально использую TestKit от Akka и использую testActorRef в качестве параметра процессора TweetStreamerActor. Это позволяет мне изучить полученные ответы, а именно lineexpectMsgType [Tweet]. Прежде чем мы перейдем к настоящему API-интерфейсу Twitter, я покажу реализацию theTwitterApi нашего HTTP-сервера, предназначенного только для тестирования, который имитирует API-интерфейс Twitter.

class TwitterApi private(system: ActorSystem, port: Int, body: String) {




private class Service extends Actor {




def receive: Receive = {
case _: Http.Connected =>
sender ! Http.Register(self)
case HttpRequest(HttpMethods.POST, _, _, _, _) =>
sender ! ChunkedResponseStart(HttpResponse(StatusCodes.OK))
sender ! MessageChunk(body = body)
sender ! ChunkedMessageEnd()
}
}




val service = system.actorOf(Props(new Service))
val io = IO(Http)(system)
io ! Http.Bind(service, "localhost", port = port)




def stop(): Unit = {
io ! Http.Unbind
system.stop(service)
system.stop(io)
}
}




object TwitterApi {




def apply(port: Int)(implicit system: ActorSystem): TwitterApi = {
val body = Source.fromInputStream(
getClass.getResourceAsStream("/tweet.json")).mkString
new TwitterApi(system, port, body)
}




}

Он также использует расширение Http от Akka IO, но на этот раз для привязки  Service к  localhost интерфейсу по заданному  port. Всякий раз, когда клиент подключается (отправляя  Http.Connected сообщение), я регистрирую того же актера для обработки запросов от этого клиента, отвечая с  Http.Register(self). (Это означает, что наш  Service обработчик одноэлементный, все клиенты обрабатываются одним и тем же субъектом.) Поскольку я зарегистрировался  self как обработчик для всех клиентских подключений, я должен реагировать на  HttpRequest сообщения, которые отправляют клиенты. В моем случае я отвечаю на каждый HTTP POST, разбивая содержимое  /tweet.jsonресурса на части.

OAuth

Прежде чем я смогу перейти к   коду больших данных , я должен разобраться с авторизацией, которую требует Twitter Twitter требует OAuth-авторизации всех запросов. Наиболее просто, это означает добавление  Authorizationзаголовка HTTP с правильно сконструированным значением. Иными словами, авторизовать a  HttpRequest — это взять исходный запрос и вернуть новый HttpRequest , содержащий  соответствующий заголовок. Чтобы сделать это в контексте OAuth, мне также нужно знать токен и секретный ключ потребителя, а также ключ и секретный токен. И этого достаточно, чтобы я мог дать набросок в Scala

object OAuth {
  case class Consumer(key: String, secret: String)
  case class Token(value: String, secret: String)

  def oAuthAuthorizer(consumer: Consumer, token: Token): 
    HttpRequest => HttpRequest = {
    // magic
    ???
  }
}

Форма кода выше соответствует тому, что я сказал выше для OAuth-авторизации HttpRequest, мне нужно знать детали  потребителя  и  токен ; и процесс авторизации принимает несанкционированный доступ  HttpRequestи добавляет к нему требуемое разрешение.

Теперь я могу добавить авторизацию OAuth к  TweetStreamerActor. Однако вместо того, чтобы просто «жестко кодировать» его, я определю несколько черт, которые позволят мне контролировать экземпляры TweetStreamerActors. Я определяю

trait TwitterAuthorization {
  def authorize: HttpRequest => HttpRequest
}

А затем требуют, чтобы экземпляры экземпляра  TweetStreamerActor были созданы с соответствующей реализацией  TwitterAuthorization. Другими словами, шаблон Cake с аннотациями собственного типа.

class TweetStreamerActor(uri: Uri, processor: ActorRef) extends Actor 
  with TweetMarshaller {
  this: TwitterAuthorization =>

  ...
}

Я также предоставляю реализацию,  TwitterAuthorization которая использует механизм OAuth и загружает данные о потребителе и токене из файла. (Чтобы вы не включили данные авторизации в свой код.)

trait OAuthTwitterAuthorization extends TwitterAuthorization {
  import OAuth._
  val home = System.getProperty("user.home")
  val lines = Source.fromFile(s"$home/.twitter/activator").getLines().toList

  val consumer = Consumer(lines(0), lines(1))
  val token = Token(lines(2), lines(3))

  val authorize = oAuthAuthorizer(consumer, token)
}

В частности, обратите внимание, что у меня  def authorize: HttpRequest => HttpRequest есть поле authorize, значение которого вычисляется путем применения  oAuthAuthorizer к потребителю и токену. Наконец, я должен применить авторизацию к тем, которые  HttpRequestя отправляю в TweetStreamerActor. Нет ничего проще. Я просто добавляю  ~> authorize к  HttpRequest я создаю при обработке  query: String сообщения.

class TweetStreamerActor(uri: Uri, processor: ActorRef) extends Actor 
  with TweetMarshaller {
  this: TwitterAuthorization =>
  ...

  def receive: Receive = {
    case query: String =>
      val body = HttpEntity(
        ContentType(MediaTypes.`application/x-www-form-urlencoded`), 
        s"track=$query")
      val rq = HttpRequest(HttpMethods.POST, uri = uri, entity = body) 
               ~> authorize
      sendTo(io).withResponsesReceivedBy(self)(rq)
    ...
  }
}

Для завершения мне нужно изменить тест и приложение, чтобы они соответствовали аннотации самостоятельного типа. Тест на самом деле не требует авторизации

class TweetStreamerActorSpec extends TestKit(ActorSystem()) 
  with SpecificationLike with ImplicitSender {
  sequential

  val port = 12345
  val tweetStream = TestActorRef(
    new TweetStreamerActor(Uri(s"http://localhost:$port/"), testActor)
    with TwitterAuthorization {
      def authorize = identity
    })

  ...
}

Я должен реализовать  authorize член, который возвращает  HttpRequest => HttpRequest функцию. Но для тестов возвращаемая функция возвращает заданное значение; следовательно, это  identity функция.

В приложении я смешиваю  OAuthTwitterAuthorization черту при построении  TweetStreamerActor.

object Main extends App {
  import Commands._

  val system = ActorSystem()
  val sentiment = system.actorOf(Props(new SentimentAnalysisActor))
  val stream = system.actorOf(Props(
    new TweetStreamerActor(TweetStreamerActor.twitterUri, sentiment)
    with OAuthTwitterAuthorization))
  ...
}

Теперь у меня есть код, который успешно транслирует твиты из потокового API Twitter, у меня есть авторизация OAuth; последний компонент, который мне нужен, это анализ настроений.

Анализ настроений

Анализ настроений получает  Tweet экземпляры и должен анализировать твиты. Для проведения анализа мне понадобятся наборы положительных и отрицательных слов; и способ отображения вывода. Я также хотел бы иметь некоторую гибкость в создании анализатора настроений. И вот, я прихожу к

trait SentimentSets {
  def positiveWords: Set[String]
  def negativeWords: Set[String]
}

trait SentimentOutput {
  type Category = String

  def outputCount(values: List[Iterable[(Category, Int)]]): Unit
}

И требуют, SentimentAnalysisActor чтобы был создан  экземпляр с соответствующими реализациями  SentimentSets и  SentimentOutput.

class SentimentAnalysisActor extends Actor {
  this: SentimentSets with SentimentOutput =>

  ...
}

Когда  SentimentAnalysisActor получает a  Tweet, он узнает, находится ли его текст в  positiveWords или negativeWords, увеличивая количество  положительных  и  отрицательных  твитов, соответственно. Он также отслеживает количество твитов в зависимости от места и языка твита. Без лишних слов, все SentimentAnalysisActor просто

class SentimentAnalysisActor extends Actor {
  this: SentimentSets with SentimentOutput =>
  import collection._

  private val counts = mutable.Map[Category, Int]()
  private val languages = mutable.Map[Category, Int]()
  private val places = mutable.Map[Category, Int]()

  private def update(data: mutable.Map[Category, Int])
                    (category: Category, delta: Int): Unit =
    data.put(category, data.getOrElse(category, 0) + delta)

  val updateCounts = update(counts)_
  val updateLanguages = update(languages)_
  val updatePlaces = update(places)_

  def receive: Receive = {
    case tweet: Tweet =>
      val text = tweet.text.toLowerCase
      val positive = if (positiveWords.exists(text contains)) 1 else 0
      val negative = if (negativeWords.exists(text contains)) 1 else 0

      updateCounts("positive", positive)
      updateCounts("negative", negative)
      if (tweet.user.followersCount > 200) {
        updateCounts("positive.gurus", positive)
        updateCounts("negative.gurus", negative)
      }
      updateCounts("all", 1)
      updateLanguages(tweet.user.lang, 1)
      updatePlaces(tweet.place.toString, 1)

      outputCount(List(counts, places, languages))
  }
}

Существует одна реализации из  , который загружает наборы настроения из CSV — файлов на пути к классам; Существует также одна реализация , которая отображает вывод на терминал ANSI.SentimentSetsCSVLoadedSentimentSetsSentimentOutput

«Правильный» экземпляр  SentimentAnalysisActor в приложении становится

object Main extends App {
  import Commands._

  val system = ActorSystem()
  val sentiment = system.actorOf(Props(
    new SentimentAnalysisActor 
    with CSVLoadedSentimentSets with AnsiConsoleSentimentOutput))
  val stream = system.actorOf(Props(
    new TweetStreamerActor(TweetStreamerActor.twitterUri, sentiment) 
    with OAuthTwitterAuthorization))

  ...
}

Приложение

Теперь, когда у нас есть весь необходимый код, мы можем создать приложение, которое вы можете запустить. Он запускает простой командный цикл, который читает стандартный ввод, интерпретирует введенные вами команды и отправляет сообщения в  TweetStreamerActor.

object Main extends App {
  import Commands._

  val system = ActorSystem()
  val sentiment = system.actorOf(Props(
    new SentimentAnalysisActor 
    with CSVLoadedSentimentSets with AnsiConsoleSentimentOutput))
  val stream = system.actorOf(Props(
    new TweetStreamerActor(TweetStreamerActor.twitterUri, sentiment) 
    with OAuthTwitterAuthorization))

  @tailrec
  private def commandLoop(): Unit = {
    Console.readLine() match {
      case QuitCommand         => return
      case TrackCommand(query) => stream ! query
      case _                   => println("WTF??!!")
    }

    commandLoop()
  }

  commandLoop()
  system.shutdown()
}

object Commands {

  val QuitCommand   = "quit"
  val TrackCommand = "track (.*)".r

}

Перед запуском приложения не забудьте создать  ~/.twitter/activator файл, содержащий четыре строки; эти строки представляют ваш потребительский ключ и секрет в твиттере, за которыми следуют значение токена и секрет токена. Чтобы сгенерировать эти значения,  перейдите по адресу https:// dev.twitter.com / apps /, создайте приложение и добавьте соответствующие строки в этот файл. Примером ~ / .twitter / activator является

*************TqOdlxA
****************************Fv9b1ELexCRhI
********-*************************GUjmnWQvZ5GwnBR2
***********************************ybgUNqrZwD

Естественно, вам нужно будет заменить  *s на значения в вашем потребительском токене и секрете; и значение токена и секрет.

Добавив файл выше, вы можете увидеть приложение «в действии», запустив его  sbt run в терминале ANSI. После запуска, введите  track christmastrack daleyили что — нибудь еще , что щекочет ваши фантазии и торжествует в коллективной мудрости человечества.

возбудитель

Код представляет собой активатор Typesafe, перейдите по ссылке http://typesafe.com/activator, чтобы загрузить его, или по адресу: http://github.com/eigengo/activator-spray-twitter, если вы хотите создать приложение самостоятельно, используя Sbt.