Статьи

Подключитесь к RabbitMQ (AMQP), используя Scala, Play и Akka

В этой статье мы рассмотрим, как вы можете подключиться из Scala к RabbitMQ, чтобы вы могли поддерживать протокол AMQP из ваших приложений. В этом примере я буду использовать Play Framework 2.0 в качестве контейнера (дополнительную информацию об этом см. В другой моей статье на эту тему) для запуска приложения, поскольку Play значительно упрощает разработку с помощью Scala. В этой статье также будут использоваться актеры Akka для отправки и получения сообщений от RabbitMQ.

Что такое AMQP

Во-первых, краткое введение в AMQP. AMQP расшифровывается как «Advanced Message Queuing Protocol» и является открытым стандартом для обмена сообщениями. Домашняя страница AMQP заявляет о своем видении так: «Стать стандартным протоколом взаимодействия между всеми промежуточными программами обмена сообщениями». AMQP определяет протокол транспортного уровня для обмена сообщениями, который можно использовать для интеграции приложений с различными платформами, языками и технологиями.
Существует ряд инструментов, реализующих этот протокол, но RabbitMQ привлекает все больше и больше внимания. RabbitMQ — это брокер сообщений на основе erlang с открытым исходным кодом, использующий AMQP. Все приложения, которые могут говорить на AMQP, могут подключаться и использовать RabbitMQ. Итак, в этой статье мы покажем, как вы можете подключиться из вашего приложения на основе Play2 / Scala / Akka к RabbitMQ.
В этой статье мы покажем вам, как реализовать два наиболее распространенных сценария:

  • Отправка / получение: мы настроим одного отправителя на отправку сообщения каждые пару секунд и будем использовать два прослушивателя, которые будут читать сообщения в циклическом порядке из очереди.
  • Публикация / подписка: для этого примера мы создадим почти одинаковый сценарий, но на этот раз слушатели получат сообщение одновременно.

Я полагаю, у вас есть установка RabbitMQ. Если нет, следуйте инструкциям с их сайта .

Настройка базового проекта Play 2 / Scala

Для этого примера я создал новый проект Play 2. Делать это очень легко:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
jos@Joss-MacBook-Pro.local:~/Dev/play-2.0-RC2$ ./play new Play2AndRabbitMQ
       _            _
 _ __ | | __ _ _  _| |
| '_ \| |/ _' | || |_|
|  __/|_|\____|\__ (_)
|_|            |__/
  
play! 2.0-RC2, http://www.playframework.org
  
The new application will be created in /Users/jos/Dev/play-2.0/PlayAndRabbitMQ
  
What is the application name?
> PlayAndRabbitMQ
  
Which template do you want to use for this new application?
  
  1 - Create a simple Scala application
  2 - Create a simple Java application
  3 - Create an empty project
  
> 1
  
OK, application PlayAndRabbitMQ is created.
  
Have fun!

Я привык работать в Eclipse с подключением scala-ide, поэтому я выполняю play eclipsify и импортирую проект в Eclipse.
Следующий шаг, который нам нужно сделать, это установить правильные зависимости. Play использует для этого sbt и позволяет вам настроить ваши зависимости из файла build.scala в каталоге вашего проекта. Единственная зависимость, которую мы добавим, — это клиентская библиотека java от RabbitMQ. Несмотря на то, что Lift предоставляет библиотеку AMQP на основе scala, я нахожу, что использовать RabbitMQ напрямую так же просто. После добавления зависимости мой build.scala выглядит так:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
import sbt._
import Keys._
import PlayProject._
  
object ApplicationBuild extends Build {
  
    val appName         = "PlayAndRabbitMQ"
    val appVersion      = "1.0-SNAPSHOT"
  
    val appDependencies = Seq(
      "com.rabbitmq" % "amqp-client" % "2.8.1"
    )
  
    val main = PlayProject(appName, appVersion, appDependencies, mainLang = SCALA).settings(
    )
}

Добавьте конфигурацию rabbitMQ в файл конфигурации

Для наших примеров мы можем настроить пару вещей. Очередь, куда отправлять сообщение, используемый обмен и хост, на котором работает RabbitMQ. В реальном сценарии у нас будет больше параметров конфигурации, но для этого случая у нас будет только три. Добавьте следующее в ваш application.conf, чтобы мы могли ссылаться на него из нашего приложения.

1
2
3
4
#rabbit-mq configuration
rabbitmq.host=localhost
rabbitmq.queue=queue1
rabbitmq.exchange=exchange1

Теперь мы можем получить доступ к этим файлам конфигурации с помощью ConfigFactory. Для легкого доступа создайте следующий объект:

1
2
3
4
5
object Config {
  val RABBITMQ_HOST = ConfigFactory.load().getString("rabbitmq.host");
  val RABBITMQ_QUEUE = ConfigFactory.load().getString("rabbitmq.queue");
  val RABBITMQ_EXCHANGEE = ConfigFactory.load().getString("rabbitmq.exchange");
}

Инициализируйте соединение с RabbitMQ

У нас есть еще один объект для определения, прежде чем мы рассмотрим, как мы можем использовать RabbitMQ для отправки и получения сообщений. для работы с RabbitMQ нам требуется соединение. Мы можем получить соединение с сервером, используя ConnectionFactory. Посмотрите на javadocs для получения дополнительной информации о том, как настроить соединение.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
object RabbitMQConnection {
  
  private val connection: Connection = null;
  
  /**
   * Return a connection if one doesn't exist. Else create
   * a new one
   */
  def getConnection(): Connection = {
    connection match {
      case null => {
        val factory = new ConnectionFactory();
        factory.setHost(Config.RABBITMQ_HOST);
        factory.newConnection();
      }
      case _ => connection
    }
  }
}

Запустите слушатели при запуске приложения

Нам нужно сделать еще одну вещь, прежде чем мы сможем взглянуть на код RabbitMQ. Мы должны убедиться, что наши слушатели сообщений зарегистрированы при запуске приложения и наши отправители начинают отправку. Игра 2 обеспечивает
GlobalSettings объект для этого, который вы можете расширить для выполнения кода при запуске приложения. В нашем примере мы будем использовать следующий объект (помните, это нужно сохранить в пространстве имен по умолчанию:

01
02
03
04
05
06
07
08
09
10
import play.api.mvc._
import play.api._
import rabbitmq.Sender
  
object Global extends GlobalSettings {
  
  override def onStart(app: Application) {
    Sender.startSending
  }
}

Мы рассмотрим эту операцию Sender.startSending, которая инициализирует всех отправителей и получателей в следующих разделах.

Настройка отправки и получения сценария

Давайте посмотрим на код Sender.startSending, который настроит отправителя, который отправляет сообщения в определенную очередь. Для этого мы используем следующий фрагмент кода:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
object Sender {
  
  def startSending = {
    // create the connection
    val connection = RabbitMQConnection.getConnection();
    // create the channel we use to send
    val sendingChannel = connection.createChannel();
    // make sure the queue exists we want to send to
    sendingChannel.queueDeclare(Config.RABBITMQ_QUEUE, false, false, false, null);
  
   Akka.system.scheduler.schedule(2 seconds, 1 seconds
          , Akka.system.actorOf(Props(
               new SendingActor(channel = sendingChannel,
                                          queue = Config.RABBITMQ_QUEUE)))
          , "MSG to Queue");
  }
}
  
class SendingActor(channel: Channel, queue: String) extends Actor {
  
  def receive = {
    case some: String => {
      val msg = (some + " : " + System.currentTimeMillis());
      channel.basicPublish("", queue, null, msg.getBytes());
      Logger.info(msg);
    }
    case _ => {}
  }
}

В этом коде мы предпринимаем следующие шаги:

  1. Используйте фабрику для получения соединения с RabbitMQ
  2. Создайте канал в этом соединении, чтобы использовать его для связи с RabbitMQ
  3. Используйте канал для создания очереди (если она еще не существует)
  4. Расписание Akka, чтобы отправить сообщение актеру каждую секунду.

Это все должно быть довольно просто. Единственной (несколько) сложной частью является часть планирования. Что эта операция по расписанию делает это. Мы говорим Акке запланировать отправку сообщения актеру. Нам нужна задержка в 2 секунды до его запуска, и мы хотим повторять эту работу каждую секунду. Актер, который должен использоваться для этого, является SendingActor, которого вы также можете увидеть в этом списке. Этому субъекту необходим доступ к каналу для отправки сообщения, и этому субъекту также необходимо знать, куда отправить сообщение, которое он получает. Это очередь.
Таким образом, каждую секунду этот актер будет получать сообщение, добавлять метку времени и использовать предоставленный канал для отправки этого сообщения в очередь: channel.basicPublish («», queue, null, msg.getBytes ()) ;. Теперь, когда мы отправляем сообщение каждую секунду, было бы неплохо, чтобы в этой очереди были прослушиватели, способные принимать сообщения. Для получения сообщений мы также создали Actor, который бесконечно слушает определенную очередь.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class ListeningActor(channel: Channel, queue: String, f: (String) => Any) extends Actor {
  
  // called on the initial run
  def receive = {
    case _ => startReceving
  }
  
  def startReceving = {
  
    val consumer = new QueueingConsumer(channel);
    channel.basicConsume(queue, true, consumer);
  
    while (true) {
      // wait for the message
      val delivery = consumer.nextDelivery();
      val msg = new String(delivery.getBody());
  
      // send the message to the provided callback function
      // and execute this in a subactor
      context.actorOf(Props(new Actor {
        def receive = {
          case some: String => f(some);
        }
      })) ! msg
    }
  }
}

Этот актер немного сложнее, чем тот, который мы использовали для отправки. Когда этот субъект получает сообщение (вид сообщения не имеет значения), он начинает прослушивать очередь, в которой он был создан. Это делается путем создания потребителя с использованием предоставленного канала и говорит потребителям начать прослушивание в указанной очереди. Метод consumer.nextDelivery () блокируется до тех пор, пока в ожидающей очереди не появится сообщение. После получения сообщения создается новый субъект, которому отправляется сообщение. Этот новый субъект передает сообщение в предоставленный метод, где вы можете поместить свою бизнес-логику.
Чтобы использовать этот слушатель, нам нужно предоставить следующие аргументы:

  • Канал: разрешает доступ к RabbitMQ
  • Очередь: очередь для прослушивания сообщений
  • f: функция, которую мы выполним при получении сообщения.

Последний шаг для этого первого примера — склеивание всего вместе. Мы делаем это путем добавления нескольких вызовов методов в метод Sender.startSending.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
  def startSending = {
   ...
    val callback1 = (x: String) => Logger.info("Recieved on queue callback 1: " + x);
  
    setupListener(connection.createChannel(),Config.RABBITMQ_QUEUE, callback1);
  
    // create an actor that starts listening on the specified queue and passes the
    // received message to the provided callback
    val callback2 = (x: String) => Logger.info("Recieved on queue callback 2: " + x);
  
    // setup the listener that sends to a specific queue using the SendingActor
    setupListener(connection.createChannel(),Config.RABBITMQ_QUEUE, callback2);
   ...
  }
  
  private def setupListener(receivingChannel: Channel, queue: String, f: (String) => Any) {
    Akka.system.scheduler.scheduleOnce(2 seconds,
        Akka.system.actorOf(Props(new ListeningActor(receivingChannel, queue, f))), "");
  }

В этом коде вы можете видеть, что мы определяем функцию обратного вызова и используем эту функцию обратного вызова вместе с очередью и каналом для создания ListeningActor. Мы используем метод scheduleOnce для запуска этого слушателя в отдельном потоке. Теперь, имея этот код, мы можем запустить приложение (play run), открыть localhost: 9000, чтобы запустить приложение, и мы должны увидеть что-то вроде следующего:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
[info] play - Starting application default Akka system.
[info] play - Application started (Dev)
[info] application - MSG to Exchange : 1334324531424
[info] application - MSG to Queue : 1334324531424
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324531424
[info] application - MSG to Exchange : 1334324532522
[info] application - MSG to Queue : 1334324532522
[info] application - Recieved on queue callback 1: MSG to Queue : 1334324532522
[info] application - MSG to Exchange : 1334324533622
[info] application - MSG to Queue : 1334324533622
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324533622
[info] application - MSG to Exchange : 1334324534722
[info] application - MSG to Queue : 1334324534722
[info] application - Recieved on queue callback 1: MSG to Queue : 1334324534722
[info] application - MSG to Exchange : 1334324535822
[info] application - MSG to Queue : 1334324535822
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324535822

Здесь вы можете четко видеть, как обрабатываются сообщения в циклическом порядке.

Настройка сценария публикации и подписки

Как только мы запустим приведенный выше код, добавление функции публикации / подписки станет очень тривиальным. Вместо SendingActor теперь мы используем PublishingActor:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
class PublishingActor(channel: Channel, exchange: String) extends Actor {
  
  /**
   * When we receive a message we sent it using the configured channel
   */
  def receive = {
    case some: String => {
      val msg = (some + " : " + System.currentTimeMillis());
      channel.basicPublish(exchange, "", null, msg.getBytes());
      Logger.info(msg);
    }
    case _ => {}
  }
}

RabbitMQ использует обмен, чтобы позволить нескольким получателям получать одно и то же сообщение (и целый ряд других расширенных функций). Единственное изменение в коде от другого участника — это то, что на этот раз мы отправляем сообщение в обмен, а не в очередь. Код слушателя точно такой же, единственное, что нам нужно сделать, это подключить очередь к определенному обмену. Так что слушатели в этой очереди получают сообщения, отправленные на биржу. Мы делаем это еще раз из метода установки, который мы использовали ранее.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    ...
    // create a new sending channel on which we declare the exchange
    val sendingChannel2 = connection.createChannel();
    sendingChannel2.exchangeDeclare(Config.RABBITMQ_EXCHANGEE, "fanout");
  
    // define the two callbacks for our listeners
    val callback3 = (x: String) => Logger.info("Recieved on exchange callback 3: " + x);
    val callback4 = (x: String) => Logger.info("Recieved on exchange callback 4: " + x);
  
    // create a channel for the listener and setup the first listener
    val listenChannel1 = connection.createChannel();
    setupListener(listenChannel1,listenChannel1.queueDeclare().getQueue(),
                   Config.RABBITMQ_EXCHANGEE, callback3);
  
    // create another channel for a listener and setup the second listener
    val listenChannel2 = connection.createChannel();
    setupListener(listenChannel2,listenChannel2.queueDeclare().getQueue(),
                   Config.RABBITMQ_EXCHANGEE, callback4);
  
    // create an actor that is invoked every two seconds after a delay of
    // two seconds with the message "msg"
    Akka.system.scheduler.schedule(2 seconds, 1 seconds, Akka.system.actorOf(Props(
               new PublishingActor(channel = sendingChannel2
                    , exchange = Config.RABBITMQ_EXCHANGEE))),
         "MSG to Exchange");
    ...

Мы также создали перегруженный метод для setupListener, который в качестве дополнительного параметра также принимает имя используемого обмена.

1
2
3
4
5
6
  private def setupListener(channel: Channel, queueName : String, exchange: String, f: (String) => Any) {
    channel.queueBind(queueName, exchange, "");
  
    Akka.system.scheduler.scheduleOnce(2 seconds,
        Akka.system.actorOf(Props(new ListeningActor(channel, queueName, f))), "");
  }

В этом небольшом фрагменте кода вы можете видеть, что мы связываем предоставленную очередь (которая является случайным именем в нашем примере) с указанным обменом. После этого мы создаем нового слушателя, как мы видели раньше.
Запуск этого кода сейчас приведет к следующему выводу:

1
2
3
4
5
6
7
8
9
[info] play - Application started (Dev)
[info] application - MSG to Exchange : 1334325448907
[info] application - MSG to Queue : 1334325448907
[info] application - Recieved on exchange callback 3: MSG to Exchange : 1334325448907
[info] application - Recieved on exchange callback 4: MSG to Exchange : 1334325448907
[info] application - MSG to Exchange : 1334325450006
[info] application - MSG to Queue : 1334325450006
[info] application - Recieved on exchange callback 4: MSG to Exchange : 1334325450006
[info] application - Recieved on exchange callback 3: MSG to Exchange : 1334325450006

Как видите, в этом сценарии оба слушателя получают одно и то же сообщение. Это в значительной степени завершает эту статью. Как вы уже видели, использование клиентского API на основе Java для RabbitMQ более чем достаточно и его легко использовать из Scala. Обратите внимание, что этот пример не готов к производству, вы должны позаботиться о том, чтобы закрыть соединения, красиво отключить слушателей и актеров. Весь этот код выключения здесь не показан.

Ссылка: подключитесь к RabbitMQ (AMQP) с помощью Scala, Play и Akka от нашего партнера JCG Йоса Дирксена в блоге Smart Java .