Как вы, возможно, знаете, SQS в AWS SQS означает «Простая служба очереди». Играя с этим, я недавно нашел одну из причин, почему это можно назвать «простым». В двух предыдущих постах ( здесь и здесь ) я показал использование SQS в качестве поставщика очереди JMS в сочетании с Spring Framework . С этой базовой установкой я решил пойти дальше и начал экспериментировать с шаблоном запрос-ответ в сочетании с JMS (используя свойство JMS ‘JMSReplyTo’ и временные очереди). В этой довольно классической статье хорошо объясняется, как это работает и почему это работает таким образом.
Чтобы показать, как это должно работать, я сначала покажу настройки, которые я использовал с Apache ActiveMQ . Позвольте мне показать bean-компонент, который выбирает сообщение из очереди, выполняет действие над содержимым и отправляет ответ JMSReplyTo в заголовке JMS. Поскольку я использую Spring, это звучит сложнее, чем есть на самом деле. Сначала код Java:
package net.pascalalma.aws.sqs.requestresponse; import org.springframework.stereotype.Service; @Service public class MyMessageService implements ResponsiveTextMessageDelegate { public String onMessage(String txt) { return String.valueOf(txt.length()); } }
Это довольно простой класс, я бы сказал. Он реализует ResponsiveTextMessageDelegate (подробности этого интерфейса описаны здесь ) и просто возвращает длину содержимого входящего сообщения. Все остальное, что нужно сделать, заботится Spring Framework. Конфигурация Spring для этого сервиса выглядит так:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"> <context:component-scan base-package="net.pascalalma.aws.sqs.requestresponse"></context:component-scan> <context:annotation-config/> <!-- ActiveMQ config --> <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://localhost:61616</value> </property> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsFactory"/> </bean> <bean id="requestQueueName" class="java.lang.String"> <constructor-arg value="DefaultDemoQueue"/> </bean> <bean id="myMessageService" class="net.pascalalma.aws.sqs.requestresponse.MyMessageService" /> <bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <property name="delegate" ref="myMessageService"/> <property name="defaultListenerMethod" value="onMessage"/> <property name="messageConverter" ref="messageConverter" /> </bean> <bean id="messageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" /> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsFactory"/> <property name="destinationName" ref="requestQueueName"/> <property name="messageListener" ref="messageListener"/> </bean> </beans>
Это в основном такая же конфигурация, как описано в моем предыдущем посте. Единственное отличие состоит в том, что теперь я использую конвертер SimpleMessageConverter, который заботится о преобразовании возвращаемой строки в TextMessage. Если бы мы не определяли этот конвертер, мы получили бы следующую ошибку:
ava.lang.NoSuchMethodException: net.pascalalma.aws.sqs.requestresponse.MyMessageService.onMessage(org.apache.activemq.command.ActiveMQTextMessage
Далее нам нужен клиентский компонент Service, который может «общаться» с нашим сервисом. Это может выглядеть так в Java:
package net.pascalalma.aws.sqs.requestresponse; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.jms.core.SessionCallback; import org.springframework.jms.support.JmsUtils; import org.springframework.stereotype.Component; import org.springframework.util.Assert; import javax.annotation.Resource; import javax.jms.*; import java.util.Random; @Component public class MyMessageServiceClient { final static Logger logger = Logger.getLogger(MyMessageServiceClient.class); @Resource private JmsTemplate jmsTemplate; @Autowired private String requestQueueName; public String process(final String txt) { //Setup a message producer to send message to the queue the server is consuming from Message response = jmsTemplate.sendAndReceive(requestQueueName, new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage message = session.createTextMessage(); message.setText(txt); return message; } }); String result = null; try { result = ((TextMessage) response).getText(); } catch (JMSException e) { logger.error(e); } return result; } }
Мы видим, что мы используем sendAndReceive jmsTemplate для отправки сообщения, созданного в обратном вызове MessageCreator, и ожидания ответного сообщения. Соответствующая конфигурация Spring для этого класса:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"> <context:component-scan base-package="net.pascalalma.aws.sqs.requestresponse"></context:component-scan> <context:annotation-config/> <!-- ActiveMQ config --> <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://localhost:61616</value> </property> </bean> <!-- End ActiveMQ specific --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsFactory"/> </bean> <bean id="requestQueueName" class="java.lang.String"> <constructor-arg value="DefaultDemoQueue"/> </bean> <bean id="myMessageServiceClient" class="net.pascalalma.aws.sqs.requestresponse.MyMessageServiceClient"/> </beans>
Теперь остается некоторый «контейнер», чтобы увидеть эти компоненты в действиях, для которых я создал основной класс для «серверной» части:
package net.pascalalma.aws.sqs.requestresponse; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class MessageServiceMain { public static void main(String[] args) { //Build application context by reading spring-config.xml ApplicationContext ctx = new ClassPathXmlApplicationContext(new String[]{"requestresponse/application-context.xml"}); } }
Запуск этого класса в вашей среде IDE или терминале просто читает конфигурацию SPring и создает экземпляры служебных компонентов. Основной класс клиента имеет немного больше кода:
package net.pascalalma.aws.sqs.requestresponse; import org.apache.log4j.Logger; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import java.util.HashMap; import java.util.Map; import java.util.Random; public class MessageServiceClientMain { final static Logger logger = Logger.getLogger(MessageServiceClientMain.class); public static void main(String[] args) { //Build application context by reading spring-config.xml ApplicationContext ctx = new ClassPathXmlApplicationContext(new String[]{"requestresponse/application-context-client.xml"}); //Get an instance of ProductService class; MyMessageServiceClient messageServiceClient = (MyMessageServiceClient) ctx.getBean("myMessageServiceClient"); //Call getProduct method of ProductService String random = createRandomString(); for (int i=0; i<16; i++) { String key = random.substring(i); logger.info("Sending to service: " + key); logger.info("Sending to service with length: " + key.length()); String result = messageServiceClient.process(key); logger.info("Received from service: " + result); logger.info("======================================================"); } } private static String createRandomString() { Random random = new Random(System.currentTimeMillis()); long randomLong = random.nextLong(); return Long.toHexString(randomLong); } }
Запуск этого класса сгенерирует сообщения и отправит их службе, а также напечатает результат, полученный от службы, следующим образом:
2015-04-20 20:29:14 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 42fdcd4355cc5314 2015-04-20 20:29:14 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 16 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 16 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ====================================================== 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 2fdcd4355cc5314 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 15 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 15 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ======================================================
Все идет нормально. Теперь давайте использовать AWS SQS вместо локального экземпляра Active MQ. Это легко сделать, просто изменив конфигурацию для используемого JmsFactory в обеих наших конфигурациях Spring:
... <bean id="credentialsProviderBean" class="com.amazonaws.auth.DefaultAWSCredentialsProviderChain"/> <bean id="connectionFactoryBuilder" class="com.amazon.sqs.javamessaging.SQSConnectionFactory$Builder"> <property name="regionName" value="eu-west-1"/> <property name="numberOfMessagesToPrefetch" value="5"/> <property name="awsCredentialsProvider" ref="credentialsProviderBean"/> </bean> <bean id="jmsFactory" class="com.amazon.sqs.javamessaging.SQSConnectionFactory" factory-bean="connectionFactoryBuilder" factory-method="build"/> ...
Теперь, если мы запустим приложение «сервер» и приложение «клиент», мы получим следующий вывод:
2015-04-25 20:22:49 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: f1db848691a26c85 2015-04-25 20:22:49 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 16 Exception in thread "main" org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is javax.jms.JMSException: Unsupported Method at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316) at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169) at org.springframework.jms.core.JmsTemplate.executeLocal(JmsTemplate.java:986) at org.springframework.jms.core.JmsTemplate.sendAndReceive(JmsTemplate.java:922) at net.pascalalma.aws.sqs.requestresponse.MyMessageServiceClient.process(MyMessageServiceClient.java:29) at net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain.main(MessageServiceClientMain.java:29) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: javax.jms.JMSException: Unsupported Method at com.amazon.sqs.javamessaging.SQSSession.createTemporaryQueue(SQSSession.java:744) at org.springframework.jms.core.JmsTemplate.doSendAndReceive(JmsTemplate.java:946) at org.springframework.jms.core.JmsTemplate$12.doInJms(JmsTemplate.java:926) at org.springframework.jms.core.JmsTemplate$12.doInJms(JmsTemplate.java:922) at org.springframework.jms.core.JmsTemplate.executeLocal(JmsTemplate.java:983) ... 8 more
Как вы можете видеть, мы получаем трассировку стека, сообщающую нам, что метод JMS ‘createTeoraryQueue’ не поддерживается SQS! Пока что за поддержку JMS. Я предполагаю, что именно поэтому они называют это Simple Queuing Service, поскольку реализованы только некоторые из возможных методов JMS. Я искал больше информации об этом, но без какой-либо удачи. Тем не менее, я столкнулся с этой структурой: Nevado JMS . Они утверждали, что являются JMS-драйвером для AWS SQS / SNS, поэтому я решил попробовать. Сначала я добавил следующую зависимость в pom моего проекта:
<dependency> <groupId>org.skyscreamer</groupId> <artifactId>nevado-jms</artifactId> <version>1.3.1</version> </dependency>
И затем снова изменил JmsFactory в обоих моих конфигах Spring, на этот раз:
... <bean id="sqsConnectorFactory" class="org.skyscreamer.nevado.jms.connector.amazonaws.AmazonAwsSQSConnectorFactory" /> <bean id="jmsFactory" class="org.skyscreamer.nevado.jms.NevadoConnectionFactory"> <property name="sqsConnectorFactory" ref="sqsConnectorFactory" /> <property name="awsAccessKey" value="${aws.accessKey}" /> <property name="awsSecretKey" value="${aws.secretKey}" /> </bean> ...
Теперь, когда я проводил основные занятия, я получил ожидаемый результат:
2015-04-25 20:33:27 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: dad74fbff8e0a2f2 2015-04-25 20:33:27 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 16 2015-04-25 20:33:53 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 16 2015-04-25 20:33:53 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ====================================================== 2015-04-25 20:33:53 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: ad74fbff8e0a2f2 2015-04-25 20:33:53 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 15 2015-04-25 20:34:04 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 15 2015-04-25 20:34:04 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ====================================================== 2015-04-25 20:34:04 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: d74fbff8e0a2f2 2015-04-25 20:34:04 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 14 2015-04-25 20:34:09 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 14 2015-04-25 20:34:09 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ====================================================== 2015-04-25 20:34:09 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 74fbff8e0a2f2 2015-04-25 20:34:09 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 13 2015-04-25 20:34:17 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 13 2015-04-25 20:34:17 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ====================================================== 2015-04-25 20:34:17 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 4fbff8e0a2f2 2015-04-25 20:34:17 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 12 2015-04-25 20:34:21 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 12 2015-04-25 20:34:21 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ======================================================
Таким образом, это показывает, что более продвинутые вещи все еще возможны с так называемыми «простыми» сервисами, хотя и здесь требуется некоторая помощь со стороны сообщества.