В течение следующих нескольких недель я расскажу вам обо всех открытиях, которые я сделал во время шторма проекта, который заставил меня пропустить блог в течение трех недель. И сегодня мы начинаем с того, что некоторые клиенты Akka AMQP общаются с RabbitMQ и C ++ на другом конце.
Мотивация
У меня есть некоторый код обработки изображений в статической библиотеке, который мне нужно было использовать в моем приложении Akka. Сначала я схватил JNI. Это оказалось ошибкой, в основном из-за моего плохого кода на C ++, который мне нравился SIG_SEGV. Вы можете иметь столько актерского надзора, сколько захотите, когда JVM мертва, это берет на себя ваших актеров. Итак, с JNI и с AMQP. Это позволило мне использовать приложение C ++ так, как если бы оно было актером. Более того, данные, которые я отправлял в код C ++, были двоичными по своему характеру, поэтому AMQP снова отлично подходил.
Код акка
Код Акка тривиален. Вам просто нужно взять клиент AMQP (мой клон по адресу https://github.com/janm399/amqp-client , собрать его и использовать в своем проекте.
object Main extends Application {
// boot up Akka
val actorSystem = ActorSystem()
// prepare the AMQP connection factory
val connectionFactory = new ConnectionFactory()
connectionFactory.setHost("localhost")
// connect to the AMQP exchange
val amqpExchange = ExchangeParameters(name = "amq.direct",
exchangeType = "", passive = true)
// create a "connection owner" actor, which will try and
// reconnect automatically if the connection ins lost
val connection = actorSystem.actorOf(
Props(new ConnectionOwner(connectionFactory)))
// make a RPC client
val client = ConnectionOwner.createChildActor(
connection, Props(new RpcClient()))
// mechanics
implicit val timeout = Timeout(1000, TimeUnit.MILLISECONDS)
(client ? Request(Publish(...) :: Nil))) onComplete {
case response => ...
}
}
Все, что нам нужно заполнить, это сообщение, которое мы отправляем, и то, что мы делаем с ответом. Я оставлю второе на ваше воображение, но давайте пока сосредоточимся на сообщении.
Мы будем отправлять прямое сообщение — мы хотим установить двухточечную связь. (Когда вы просите ответ, ваш клиент получит свои собственные очереди, где сервер поместит ответ.) Мы будем использовать стандартный прямой обмен RabbitMQ, в amq.direct. В дополнение к обмену вам нужен ключ маршрутизации, чтобы RabbitMQ знал, в какую очередь ему нужно разместить запрос. Но какую ценность мы используем?
Давайте создадим очередь и дадим ее назвать cppdemo. Затем нам нужно привязать очередь к некоторому ключу обмена и маршрутизации. В нашем случае сообщение попадет в cppdemoочередь при отправке на amq.directбиржу с cppdemo.basicключом маршрутизации.
Вернемся к нашему коду Scala, затем
...
(client ? Request(Publish("amq.direct", "cppdemo.basic", ...) :: Nil)))
onComplete {
case response => ...
}
...
Единственное, что остается, это байты, составляющие сообщение …
C ++
И вниз по кроличьей норе мы идем. Прежде чем мы перейдем к коду C ++ и к завершению кода Akka, нам нужно выполнить некоторые настройки.
механическая обработка
Мы будем использовать Boost и клиенты RabbitMQ C и C ++ по адресам https://github.com/alanxz/rabbitmq-c и https://github.com/alanxz/SimpleAmqpClient . Вам также понадобится cmake . Мы будем нуждаться в статические библиотеки для обоих RabbitMQ клиентов, поэтому мы строим rabbitmq-cпо
cmake . -DBUILD_STATIC_LIBS=true cmake --build . sudo cmake --build . --target install
И далее с C ++ SimpleAmqpClient
cmake . -DBUILD_SHARED_LIBS=false cmake --build . sudo cmake --build . --target install
Основной код
Теперь мы готовы написать наш код на C ++. Мы установим соединение с сервером RabbitMQ и настроим сервер RCP. Мы привязываем наш сервер к той же очереди и прослушиваем сообщение, которое соответствует следующей структуре:
const int32_t message_signature = 0x1000acca;
// sorry, no k in hex!
typedef struct {
int32_t signature;
int32_t size1;
int32_t size2;
} message_header_t;
В дополнение к сообщениям мы определяем некоторый тип ошибки, который возвращается из нашей функции обработки.
struct ProcessingError: virtual boost::exception { };
typedef
boost::error_info<struct errinfo_message_, std::string const> errinfo_message;
Теперь перейдем к нашему основному кодексу, который я оставлю без тщательного анализа. Это очень легко, и вы должны быть в состоянии следовать за ним.
std::string process(BasicMessage::ptr_t request) {
const amqp_bytes_t& bytes = request->getAmqpBody();
if (bytes.len < sizeof(message_header_t))
throw ProcessingError() << errinfo_message("message too small");
const message_header_t* header =
static_cast<message_header_t*>(bytes.bytes);
if (header->signature != message_signature)
throw ProcessingError() << errinfo_message("bad signature");
// we're good.
size_t totalSize = sizeof(message_header_t) +
header->size1 + header->size2;
if (bytes.len != totalSize)
throw ProcessingError() << errinfo_message("bad message size");
return "it worked!";
}
int main() {
try {
Channel::ptr_t channel = Channel::Create();
channel->BindQueue("cppdemo", "amq.direct", "cppdemo.basic");
std::string tag;
tag = channel->BasicConsume("cppdemo", "", true, true, false, 2);
while (true) {
// consume the message
Envelope::ptr_t env = channel->BasicConsumeMessage(tag);
BasicMessage::ptr_t request = env->Message();
try {
std::string body = process(request);
BasicMessage::ptr_t response = BasicMessage::Create();
channel->BasicPublish("amq.direct", request->ReplyTo(), body);
} catch (ProcessingError &e) {
const std::string* msg =
boost::get_error_info<errinfo_message>(e);
std::cerr << (*msg) << std::endl;
}
}
} catch (std::runtime_error &e) {
std::cout << "Error " << e.what() << std::endl;
}
}
Короче говоря, мы ждем сообщений; когда кто-то прибывает, мы проверяем, что его структура в порядке, а затем мы делаем некоторую обработку и отправляем обратно ответ. Гениально то, что мы можем запустить столько отдельных процессов этой программы, сколько захотим; если кто -то умирает (возможно , с моей faviourite SIG_SEGV, никакие сообщения не будут потеряны , и другие программы все еще в порядке.) Simples!
Вернуться к Скала
Вернувшись в удобный мир Scala, мы просто должны правильно составить сообщение. Помните: у нас есть подпись 0x1000acca, затем два размера и затем байты, которые складываются в оба размера. И так, мы сделаем именно это:
val os = new ByteArrayOutputStream()
os.write(0x1000face)
os.write(0x1)
os.write(0x1)
os.write(0xa)
os.write(0xb)
val request = Publish("amq.direct", "cppdemo.basic", os.toByteArray)
(client ? Request(request :: Nil)) onComplete {
case Success(r: Response) => println("*** " + r.deliveries.head.body)
}
Как я уже говорил, я оставлю то, как вы обрабатываете ответ. Возможно, вы захотите извлечь тело из доставок, или вы можете сделать какую-то другую интересную обработку для вас!
Резюме
Итак, хорошая новость заключается в том, что необходимость использовать нативный код в приложениях на основе JVM не обязательно должна означать JNI; и Akka особенно хорошо подходит для инфраструктуры обмена сообщениями. Вставьте необходимый клиентский код AMQP, создайте код C ++, и все готово.
