Статьи

Акка клиент, C ++ сервер через RabbitMQ

В течение следующих нескольких недель я расскажу вам обо всех открытиях, которые я сделал во время шторма проекта, который заставил меня пропустить блог в течение трех недель. И сегодня мы начинаем с того, что некоторые клиенты Akka AMQP общаются с RabbitMQ и C ++ на другом конце.

Мотивация

У меня есть некоторый код обработки изображений в статической библиотеке, который мне нужно было использовать в моем приложении Akka. Сначала я схватил JNI. Это оказалось ошибкой, в основном из-за моего плохого кода на C ++, который мне нравился SIG_SEGV. Вы можете иметь столько актерского надзора, сколько захотите, когда JVM мертва, это берет на себя ваших актеров. Итак, с JNI и с AMQP. Это позволило мне использовать приложение C ++ так, как если бы оно было актером. Более того, данные, которые я отправлял в код C ++, были двоичными по своему характеру, поэтому AMQP снова отлично подходил.

Код акка

Код Акка тривиален. Вам просто нужно взять клиент AMQP (мой клон по адресу https://github.com/janm399/amqp-client , собрать его и использовать в своем проекте.

object Main extends Application {
  // boot up Akka
  val actorSystem = ActorSystem()
  // prepare the AMQP connection factory
  val connectionFactory = new ConnectionFactory()
  connectionFactory.setHost("localhost")
  // connect to the AMQP exchange
  val amqpExchange = ExchangeParameters(name = "amq.direct", 
                                        exchangeType = "", passive = true)

  // create a "connection owner" actor, which will try and 
  // reconnect automatically if the connection ins lost
  val connection = actorSystem.actorOf(
                     Props(new ConnectionOwner(connectionFactory)))
  // make a RPC client
  val client = ConnectionOwner.createChildActor(
                     connection, Props(new RpcClient()))

  // mechanics
  implicit val timeout = Timeout(1000, TimeUnit.MILLISECONDS)

  (client ? Request(Publish(...) :: Nil))) onComplete {
    case response => ...
  }
}

Все, что нам нужно заполнить, это сообщение, которое мы отправляем, и то, что мы делаем с ответом. Я оставлю второе на ваше воображение, но давайте пока сосредоточимся на сообщении.

Мы будем отправлять прямое сообщение — мы хотим установить двухточечную связь. (Когда вы просите ответ, ваш клиент получит свои собственные очереди, где сервер поместит ответ.) Мы будем использовать стандартный прямой обмен RabbitMQ, в amq.direct. В дополнение к обмену вам нужен ключ маршрутизации, чтобы RabbitMQ знал, в какую очередь ему нужно разместить запрос. Но какую ценность мы используем?

Давайте создадим очередь и дадим ее назвать cppdemo. Затем нам нужно привязать очередь к некоторому ключу обмена и маршрутизации. В нашем случае сообщение попадет в cppdemoочередь при отправке на amq.directбиржу с cppdemo.basicключом маршрутизации.

Вернемся к нашему коду Scala, затем

...
(client ? Request(Publish("amq.direct", "cppdemo.basic", ...) :: Nil)))
  onComplete {
    case response => ...
  }
...

Единственное, что остается, это байты, составляющие сообщение …

C ++

И вниз по кроличьей норе мы идем. Прежде чем мы перейдем к коду C ++ и к завершению кода Akka, нам нужно выполнить некоторые настройки.

механическая обработка

Мы будем использовать Boost и клиенты RabbitMQ C и C ++ по адресам https://github.com/alanxz/rabbitmq-c и https://github.com/alanxz/SimpleAmqpClient . Вам также понадобится cmake . Мы будем нуждаться в статические библиотеки для обоих RabbitMQ клиентов, поэтому мы строим rabbitmq-cпо

cmake . -DBUILD_STATIC_LIBS=true
cmake --build .
sudo cmake --build . --target install

И далее с C ++ SimpleAmqpClient

cmake . -DBUILD_SHARED_LIBS=false 
cmake --build .
sudo cmake --build . --target install

Основной код

Теперь мы готовы написать наш код на C ++. Мы установим соединение с сервером RabbitMQ и настроим сервер RCP. Мы привязываем наш сервер к той же очереди и прослушиваем сообщение, которое соответствует следующей структуре:

const int32_t message_signature = 0x1000acca;
// sorry, no k in hex!

typedef struct {
  int32_t signature;
  int32_t size1;
  int32_t size2;
  
} message_header_t;

В дополнение к сообщениям мы определяем некоторый тип ошибки, который возвращается из нашей функции обработки.

struct ProcessingError: virtual boost::exception { };
typedef 
  boost::error_info<struct errinfo_message_, std::string const> errinfo_message;

Теперь перейдем к нашему основному кодексу, который я оставлю без тщательного анализа. Это очень легко, и вы должны быть в состоянии следовать за ним.

std::string process(BasicMessage::ptr_t request) {
  const amqp_bytes_t& bytes = request->getAmqpBody();
  if (bytes.len < sizeof(message_header_t)) 
    throw ProcessingError() << errinfo_message("message too small");
  const message_header_t* header = 
    static_cast<message_header_t*>(bytes.bytes);
  if (header->signature != message_signature) 
    throw ProcessingError() << errinfo_message("bad signature");
  
  // we're good.
  size_t totalSize = sizeof(message_header_t) + 
                     header->size1 + header->size2;
  if (bytes.len != totalSize) 
    throw ProcessingError() << errinfo_message("bad message size");

  return "it worked!";
}

int main() {
  try {
    Channel::ptr_t channel = Channel::Create();
    
    channel->BindQueue("cppdemo", "amq.direct", "cppdemo.basic");
    
    std::string tag;
    tag = channel->BasicConsume("cppdemo", "", true, true, false, 2);
    
    while (true) {
      // consume the message
      Envelope::ptr_t env = channel->BasicConsumeMessage(tag);
      BasicMessage::ptr_t request = env->Message();
      try {
        std::string body = process(request);
        BasicMessage::ptr_t response = BasicMessage::Create();
        channel->BasicPublish("amq.direct", request->ReplyTo(), body);
      } catch (ProcessingError &e) {
        const std::string* msg = 
          boost::get_error_info<errinfo_message>(e);
        std::cerr << (*msg) << std::endl;
      }      
    }
  } catch (std::runtime_error &e) {
    std::cout << "Error " << e.what() << std::endl;
  }
  
}

Короче говоря, мы ждем сообщений; когда кто-то прибывает, мы проверяем, что его структура в порядке, а затем мы делаем некоторую обработку и отправляем обратно ответ. Гениально то, что мы можем запустить столько отдельных процессов этой программы, сколько захотим; если кто -то умирает (возможно , с моей faviourite SIG_SEGV, никакие сообщения не будут потеряны , и другие программы все еще в порядке.) Simples!

Вернуться к Скала

Вернувшись в удобный мир Scala, мы просто должны правильно составить сообщение. Помните: у нас есть подпись 0x1000acca, затем два размера и затем байты, которые складываются в оба размера. И так, мы сделаем именно это:

val os = new ByteArrayOutputStream()
os.write(0x1000face)
os.write(0x1)
os.write(0x1)

os.write(0xa)
os.write(0xb)
val request = Publish("amq.direct", "cppdemo.basic", os.toByteArray)
(client ? Request(request :: Nil)) onComplete {
  case Success(r: Response) => println("*** " + r.deliveries.head.body)
}

Как я уже говорил, я оставлю то, как вы обрабатываете ответ. Возможно, вы захотите извлечь тело из доставок, или вы можете сделать какую-то другую интересную обработку для вас!

Резюме

Итак, хорошая новость заключается в том, что необходимость использовать нативный код в приложениях на основе JVM не обязательно должна означать JNI; и Akka особенно хорошо подходит для инфраструктуры обмена сообщениями. Вставьте необходимый клиентский код AMQP, создайте код C ++, и все готово.