В течение следующих нескольких недель я расскажу вам обо всех открытиях, которые я сделал во время шторма проекта, который заставил меня пропустить блог в течение трех недель. И сегодня мы начинаем с того, что некоторые клиенты Akka AMQP общаются с RabbitMQ и C ++ на другом конце.
Мотивация
У меня есть некоторый код обработки изображений в статической библиотеке, который мне нужно было использовать в моем приложении Akka. Сначала я схватил JNI. Это оказалось ошибкой, в основном из-за моего плохого кода на C ++, который мне нравился SIG_SEGV
. Вы можете иметь столько актерского надзора, сколько захотите, когда JVM мертва, это берет на себя ваших актеров. Итак, с JNI и с AMQP. Это позволило мне использовать приложение C ++ так, как если бы оно было актером. Более того, данные, которые я отправлял в код C ++, были двоичными по своему характеру, поэтому AMQP снова отлично подходил.
Код акка
Код Акка тривиален. Вам просто нужно взять клиент AMQP (мой клон по адресу https://github.com/janm399/amqp-client , собрать его и использовать в своем проекте.
object Main extends Application { // boot up Akka val actorSystem = ActorSystem() // prepare the AMQP connection factory val connectionFactory = new ConnectionFactory() connectionFactory.setHost("localhost") // connect to the AMQP exchange val amqpExchange = ExchangeParameters(name = "amq.direct", exchangeType = "", passive = true) // create a "connection owner" actor, which will try and // reconnect automatically if the connection ins lost val connection = actorSystem.actorOf( Props(new ConnectionOwner(connectionFactory))) // make a RPC client val client = ConnectionOwner.createChildActor( connection, Props(new RpcClient())) // mechanics implicit val timeout = Timeout(1000, TimeUnit.MILLISECONDS) (client ? Request(Publish(...) :: Nil))) onComplete { case response => ... } }
Все, что нам нужно заполнить, это сообщение, которое мы отправляем, и то, что мы делаем с ответом. Я оставлю второе на ваше воображение, но давайте пока сосредоточимся на сообщении.
Мы будем отправлять прямое сообщение — мы хотим установить двухточечную связь. (Когда вы просите ответ, ваш клиент получит свои собственные очереди, где сервер поместит ответ.) Мы будем использовать стандартный прямой обмен RabbitMQ, в amq.direct
. В дополнение к обмену вам нужен ключ маршрутизации, чтобы RabbitMQ знал, в какую очередь ему нужно разместить запрос. Но какую ценность мы используем?
Давайте создадим очередь и дадим ее назвать cppdemo
. Затем нам нужно привязать очередь к некоторому ключу обмена и маршрутизации. В нашем случае сообщение попадет в cppdemo
очередь при отправке на amq.direct
биржу с cppdemo.basic
ключом маршрутизации.
Вернемся к нашему коду Scala, затем
... (client ? Request(Publish("amq.direct", "cppdemo.basic", ...) :: Nil))) onComplete { case response => ... } ...
Единственное, что остается, это байты, составляющие сообщение …
C ++
И вниз по кроличьей норе мы идем. Прежде чем мы перейдем к коду C ++ и к завершению кода Akka, нам нужно выполнить некоторые настройки.
механическая обработка
Мы будем использовать Boost и клиенты RabbitMQ C и C ++ по адресам https://github.com/alanxz/rabbitmq-c и https://github.com/alanxz/SimpleAmqpClient . Вам также понадобится cmake . Мы будем нуждаться в статические библиотеки для обоих RabbitMQ клиентов, поэтому мы строим rabbitmq-c
по
cmake . -DBUILD_STATIC_LIBS=true cmake --build . sudo cmake --build . --target install
И далее с C ++ SimpleAmqpClient
cmake . -DBUILD_SHARED_LIBS=false cmake --build . sudo cmake --build . --target install
Основной код
Теперь мы готовы написать наш код на C ++. Мы установим соединение с сервером RabbitMQ и настроим сервер RCP. Мы привязываем наш сервер к той же очереди и прослушиваем сообщение, которое соответствует следующей структуре:
const int32_t message_signature = 0x1000acca; // sorry, no k in hex! typedef struct { int32_t signature; int32_t size1; int32_t size2; } message_header_t;
В дополнение к сообщениям мы определяем некоторый тип ошибки, который возвращается из нашей функции обработки.
struct ProcessingError: virtual boost::exception { }; typedef boost::error_info<struct errinfo_message_, std::string const> errinfo_message;
Теперь перейдем к нашему основному кодексу, который я оставлю без тщательного анализа. Это очень легко, и вы должны быть в состоянии следовать за ним.
std::string process(BasicMessage::ptr_t request) { const amqp_bytes_t& bytes = request->getAmqpBody(); if (bytes.len < sizeof(message_header_t)) throw ProcessingError() << errinfo_message("message too small"); const message_header_t* header = static_cast<message_header_t*>(bytes.bytes); if (header->signature != message_signature) throw ProcessingError() << errinfo_message("bad signature"); // we're good. size_t totalSize = sizeof(message_header_t) + header->size1 + header->size2; if (bytes.len != totalSize) throw ProcessingError() << errinfo_message("bad message size"); return "it worked!"; } int main() { try { Channel::ptr_t channel = Channel::Create(); channel->BindQueue("cppdemo", "amq.direct", "cppdemo.basic"); std::string tag; tag = channel->BasicConsume("cppdemo", "", true, true, false, 2); while (true) { // consume the message Envelope::ptr_t env = channel->BasicConsumeMessage(tag); BasicMessage::ptr_t request = env->Message(); try { std::string body = process(request); BasicMessage::ptr_t response = BasicMessage::Create(); channel->BasicPublish("amq.direct", request->ReplyTo(), body); } catch (ProcessingError &e) { const std::string* msg = boost::get_error_info<errinfo_message>(e); std::cerr << (*msg) << std::endl; } } } catch (std::runtime_error &e) { std::cout << "Error " << e.what() << std::endl; } }
Короче говоря, мы ждем сообщений; когда кто-то прибывает, мы проверяем, что его структура в порядке, а затем мы делаем некоторую обработку и отправляем обратно ответ. Гениально то, что мы можем запустить столько отдельных процессов этой программы, сколько захотим; если кто -то умирает (возможно , с моей faviourite SIG_SEGV
, никакие сообщения не будут потеряны , и другие программы все еще в порядке.) Simples!
Вернуться к Скала
Вернувшись в удобный мир Scala, мы просто должны правильно составить сообщение. Помните: у нас есть подпись 0x1000acca
, затем два размера и затем байты, которые складываются в оба размера. И так, мы сделаем именно это:
val os = new ByteArrayOutputStream() os.write(0x1000face) os.write(0x1) os.write(0x1) os.write(0xa) os.write(0xb) val request = Publish("amq.direct", "cppdemo.basic", os.toByteArray) (client ? Request(request :: Nil)) onComplete { case Success(r: Response) => println("*** " + r.deliveries.head.body) }
Как я уже говорил, я оставлю то, как вы обрабатываете ответ. Возможно, вы захотите извлечь тело из доставок, или вы можете сделать какую-то другую интересную обработку для вас!
Резюме
Итак, хорошая новость заключается в том, что необходимость использовать нативный код в приложениях на основе JVM не обязательно должна означать JNI; и Akka особенно хорошо подходит для инфраструктуры обмена сообщениями. Вставьте необходимый клиентский код AMQP, создайте код C ++, и все готово.