Статьи

PubSub с актерами Redis и Akka


Redis (версия на внешней линии) предлагает обмен сообщениями на основе публикации / подписки.
Это довольно большая функция по сравнению с типичными сервисами, ориентированными на структуру данных, которые она предлагала до сих пор. Это также открывает множество возможностей для использования Redis в качестве механизма обмена сообщениями другого типа. Отправитель и получатель сообщений абсолютно отделены друг от друга в том смысле, что отправители не отправляют сообщения конкретным получателям. Издатели публикуют сообщения на определенных каналах. Подписчики, которые подписываются на эти каналы, получают их и могут предпринимать с ними определенные действия. Как отмечает Сальваторе в своих
еженедельных обновленияхв Redis эта особенность возникла из множества пользовательских запросов, которые запрашивали общий механизм уведомлений для отслеживания изменений в пространстве ключей. Redis уже предлагает BLPOP (операция всплывающего списка блокировки) для аналогичных случаев использования. Но все же этого недостаточно для удовлетворения потребностей общей схемы уведомлений. Сальваторе объясняет это более подробно в своем блоге.

Я работал над
клиентом Scala , который я разветвил из
хранилища
Алехандро Кросы . Я реализовал pubsub совсем недавно, а также интегрировал его с актерами Akka. Полная реализация клиента pubsub в Scala находится в моем github-хранилище. И если вам нравится играть с актерской реализацией Akka, взгляните на
хранилище Akka .

Вы определяете своих издателей и подписчиков как актеров и обмениваетесь сообщениями по каналам. Вы можете определить свои собственные обратные вызовы относительно того, что вы хотели бы сделать, когда вы получите конкретное сообщение. Давайте рассмотрим пример реализации на уровне клиента. Я предполагаю, что вы хотите реализовать свое собственное приложение pub / sub поверх подложки pubsub на основе актера Akka, которая использует службу redis.

Реализовать интерфейс издателя легко … вот как вы можете загрузить свой собственный издательский сервис.

object Pub {
println("starting publishing service ..")
val p = new Publisher(new RedisClient("localhost", 6379))
p.start

def publish(channel: String, message: String) = {
p ! Publish(channel, message)
}
}

Метод публикации просто отправляет сообщение публикации издателю. Publisher — это актер, определенный в Akka следующим образом:

class Publisher(client: RedisClient) extends Actor {
def receive = {
case Publish(channel, message) =>
client.publish(channel, message)
reply(true)
}
}

Реализация подписчика немного сложнее, так как вам нужно зарегистрировать свой обратный вызов относительно того, что вы хотели бы делать, когда вы получаете сообщение определенного типа. Это зависит от вашего варианта использования, и вы несете ответственность за обеспечение надлежащей функции обратного вызова.

Вот пример реализации для подписчика. Нам нужно два способа подписаться и отказаться от подписки на каналы. Помните, что в Redis подписчик не может публиковать — следовательно, наш Sub не может делать паб.

object Sub {
println("starting subscription service ..")
val s = new Subscriber(new RedisClient("localhost", 6379))
s.start
s ! Register(callback)

def sub(channels: String*) = {
s ! Subscribe(channels.toArray)
}

def unsub(channels: String*) = {
s ! Unsubscribe(channels.toArray)
}

def callback(pubsub: PubSubMessage) = pubsub match {
//..
}
}

Я еще не указал реализацию обратного вызова. Как это должно выглядеть?

Обратный вызов будет вызван, когда подписчик получит сообщение определенного типа. Согласно спецификации Redis, типы сообщений, которые может получать подписчик:

a. подписаться

б. отписаться

c. сообщение

Обратитесь к документации Redis для деталей этих форматов сообщения. В нашем случае мы моделируем их как case-классы как часть базовой реализации клиента Redis.

sealed trait PubSubMessage
case class S(channel: String, noSubscribed: Int) extends PubSubMessage
case class U(channel: String, noSubscribed: Int) extends PubSubMessage
case class M(origChannel: String, message: String) extends PubSubMessage

Наш обратный вызов должен предпринять соответствующие действия при получении любого из этих типов сообщений. Следующее может быть одной из таких реализаций. Оно настроено для конкретного приложения, которое обрабатывает различные форматы сообщений и дает соответствующую семантику, зависящую от приложения.

def callback(pubsub: PubSubMessage) = pubsub match {
case S(channel, no) => println("subscribed to " + channel + " and count = " + no)
case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no)
case M(channel, msg) =>
msg match {
// exit will unsubscribe from all channels and stop subscription service
case "exit" =>
println("unsubscribe all ..")
r.unsubscribe

// message "+x" will subscribe to channel x
case x if x startsWith "+" =>
val s: Seq[Char] = x
s match {
case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => }
}

// message "-x" will unsubscribe from channel x
case x if x startsWith "-" =>
val s: Seq[Char] = x
s match {
case Seq('-', rest @ _*) => r.unsubscribe(rest.toString)
}

// other message receive
case x =>
println("received message on channel " + channel + " as : " + x)
}
}

Обратите внимание, что в приведенной выше реализации мы специализируем некоторые сообщения на дополнительной семантике. например, если я получу сообщение как «+ t», я буду интерпретировать его как подписку на канал «t». Точно так же «выход» отпишется мне со всех каналов.

Как запустить это приложение?

Я предполагаю, что у вас есть мастер Akka с вами. Также вам нужно иметь версию Redis, которая реализует pubsub. Вы можете запустить службу подписки, используя описанную выше реализацию, а затем использовать любой другой клиент Redis для публикации сообщений. Вот пример рецепта для запуска.

Предварительное условие: Нужно запустить Redis Server (версия, которая поддерживает pubsub)

1. Download redis from http://github.com/antirez/redis
2. build using "make"
3. Run server as ./redis-server


Для запуска этого примера приложения: —

Запуск службы подписки

1. Open up another shell similarly as the above and set AKKA_HOME
2. cd $AKKA_HOME
3. sbt console
4. scala> import sample.pubsub._
5. scala> Pub.publish("a", "hello") // the first shell should get the message
6. scala> Pub.publish("c", "hi") // the first shell should NOT get this message

Еще один издательский клиент, использующий redis-cli

Open up a redis-client from where you installed redis and issue a publish command
./redis-cli publish a "hi there" ## the first shell should get the message

Веселитесь с форматами сообщений

1. Go back to the first shell
2. Sub.unsub("a") // should unsubscribe the first shell from channel "a"
3. Study the callback function defined below. It supports many other message formats.
4. In the second shell window do the following:

scala> Pub.publish("b", "+c") // will subscribe the first window to channel "c"
scala> Pub.publish("b", "+d") // will subscribe the first window to channel "d"
scala> Pub.publish("b", "-c") // will unsubscribe the first window from channel "c"
scala> Pub.publish("b", "exit") // will unsubscribe the first window from all channels

Полная реализация вышеперечисленного приведена в качестве
примера проекта в мастере Akka. И если вы не используете Akka, у меня также есть версия выше, реализованная с использованием акторов Scala в
дистрибутиве
scala-redis .

Повеселись!