Как вы, возможно, знаете, SQS в AWS SQS означает «Простая служба очереди». Играя с этим, я недавно нашел одну из причин, почему это можно назвать «простым». В двух предыдущих постах ( здесь и здесь ) я показал использование SQS в качестве поставщика очереди JMS в сочетании с Spring Framework . С этой базовой установкой я решил пойти дальше и начал экспериментировать с шаблоном запрос-ответ в сочетании с JMS (используя свойство JMS ‘JMSReplyTo’ и временные очереди). В этой довольно классической статье хорошо объясняется, как это работает и почему это работает таким образом.
Чтобы показать, как это должно работать, я сначала покажу настройки, которые я использовал с Apache ActiveMQ . Позвольте мне показать bean-компонент, который выбирает сообщение из очереди, выполняет действие над содержимым и отправляет ответ JMSReplyTo в заголовке JMS. Поскольку я использую Spring, это звучит сложнее, чем есть на самом деле. Сначала код Java:
package net.pascalalma.aws.sqs.requestresponse;
import org.springframework.stereotype.Service;
@Service
public class MyMessageService implements ResponsiveTextMessageDelegate {
public String onMessage(String txt) {
return String.valueOf(txt.length());
}
}
Это довольно простой класс, я бы сказал. Он реализует ResponsiveTextMessageDelegate (подробности этого интерфейса описаны здесь ) и просто возвращает длину содержимого входящего сообщения. Все остальное, что нужно сделать, заботится Spring Framework. Конфигурация Spring для этого сервиса выглядит так:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd">
<context:component-scan base-package="net.pascalalma.aws.sqs.requestresponse"></context:component-scan>
<context:annotation-config/>
<!-- ActiveMQ config -->
<bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory"/>
</bean>
<bean id="requestQueueName" class="java.lang.String">
<constructor-arg value="DefaultDemoQueue"/>
</bean>
<bean id="myMessageService" class="net.pascalalma.aws.sqs.requestresponse.MyMessageService" />
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<property name="delegate" ref="myMessageService"/>
<property name="defaultListenerMethod" value="onMessage"/>
<property name="messageConverter" ref="messageConverter" />
</bean>
<bean id="messageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" />
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsFactory"/>
<property name="destinationName" ref="requestQueueName"/>
<property name="messageListener" ref="messageListener"/>
</bean>
</beans>
Это в основном такая же конфигурация, как описано в моем предыдущем посте. Единственное отличие состоит в том, что теперь я использую конвертер SimpleMessageConverter, который заботится о преобразовании возвращаемой строки в TextMessage. Если бы мы не определяли этот конвертер, мы получили бы следующую ошибку:
ava.lang.NoSuchMethodException: net.pascalalma.aws.sqs.requestresponse.MyMessageService.onMessage(org.apache.activemq.command.ActiveMQTextMessage
Далее нам нужен клиентский компонент Service, который может «общаться» с нашим сервисом. Это может выглядеть так в Java:
package net.pascalalma.aws.sqs.requestresponse;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.SessionCallback;
import org.springframework.jms.support.JmsUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import javax.annotation.Resource;
import javax.jms.*;
import java.util.Random;
@Component
public class MyMessageServiceClient {
final static Logger logger = Logger.getLogger(MyMessageServiceClient.class);
@Resource
private JmsTemplate jmsTemplate;
@Autowired
private String requestQueueName;
public String process(final String txt) {
//Setup a message producer to send message to the queue the server is consuming from
Message response = jmsTemplate.sendAndReceive(requestQueueName,
new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage();
message.setText(txt);
return message;
}
});
String result = null;
try {
result = ((TextMessage) response).getText();
} catch (JMSException e) {
logger.error(e);
}
return result;
}
}
Мы видим, что мы используем sendAndReceive jmsTemplate для отправки сообщения, созданного в обратном вызове MessageCreator, и ожидания ответного сообщения. Соответствующая конфигурация Spring для этого класса:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd">
<context:component-scan base-package="net.pascalalma.aws.sqs.requestresponse"></context:component-scan>
<context:annotation-config/>
<!-- ActiveMQ config -->
<bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
</bean>
<!-- End ActiveMQ specific -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory"/>
</bean>
<bean id="requestQueueName" class="java.lang.String">
<constructor-arg value="DefaultDemoQueue"/>
</bean>
<bean id="myMessageServiceClient" class="net.pascalalma.aws.sqs.requestresponse.MyMessageServiceClient"/>
</beans>
Теперь остается некоторый «контейнер», чтобы увидеть эти компоненты в действиях, для которых я создал основной класс для «серверной» части:
package net.pascalalma.aws.sqs.requestresponse;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class MessageServiceMain {
public static void main(String[] args) {
//Build application context by reading spring-config.xml
ApplicationContext ctx = new ClassPathXmlApplicationContext(new String[]{"requestresponse/application-context.xml"});
}
}
Запуск этого класса в вашей среде IDE или терминале просто читает конфигурацию SPring и создает экземпляры служебных компонентов. Основной класс клиента имеет немного больше кода:
package net.pascalalma.aws.sqs.requestresponse;
import org.apache.log4j.Logger;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
public class MessageServiceClientMain {
final static Logger logger = Logger.getLogger(MessageServiceClientMain.class);
public static void main(String[] args) {
//Build application context by reading spring-config.xml
ApplicationContext ctx = new ClassPathXmlApplicationContext(new String[]{"requestresponse/application-context-client.xml"});
//Get an instance of ProductService class;
MyMessageServiceClient messageServiceClient = (MyMessageServiceClient) ctx.getBean("myMessageServiceClient");
//Call getProduct method of ProductService
String random = createRandomString();
for (int i=0; i<16; i++) {
String key = random.substring(i);
logger.info("Sending to service: " + key);
logger.info("Sending to service with length: " + key.length());
String result = messageServiceClient.process(key);
logger.info("Received from service: " + result);
logger.info("======================================================");
}
}
private static String createRandomString() {
Random random = new Random(System.currentTimeMillis());
long randomLong = random.nextLong();
return Long.toHexString(randomLong);
}
}
Запуск этого класса сгенерирует сообщения и отправит их службе, а также напечатает результат, полученный от службы, следующим образом:
2015-04-20 20:29:14 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 42fdcd4355cc5314 2015-04-20 20:29:14 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 16 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 16 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ====================================================== 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 2fdcd4355cc5314 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 15 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 15 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ======================================================
Все идет нормально. Теперь давайте использовать AWS SQS вместо локального экземпляра Active MQ. Это легко сделать, просто изменив конфигурацию для используемого JmsFactory в обеих наших конфигурациях Spring:
...
<bean id="credentialsProviderBean" class="com.amazonaws.auth.DefaultAWSCredentialsProviderChain"/>
<bean id="connectionFactoryBuilder" class="com.amazon.sqs.javamessaging.SQSConnectionFactory$Builder">
<property name="regionName" value="eu-west-1"/>
<property name="numberOfMessagesToPrefetch" value="5"/>
<property name="awsCredentialsProvider" ref="credentialsProviderBean"/>
</bean>
<bean id="jmsFactory" class="com.amazon.sqs.javamessaging.SQSConnectionFactory"
factory-bean="connectionFactoryBuilder"
factory-method="build"/>
...
Теперь, если мы запустим приложение «сервер» и приложение «клиент», мы получим следующий вывод:
2015-04-25 20:22:49 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: f1db848691a26c85
2015-04-25 20:22:49 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 16
Exception in thread "main" org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is javax.jms.JMSException: Unsupported Method
at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316)
at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169)
at org.springframework.jms.core.JmsTemplate.executeLocal(JmsTemplate.java:986)
at org.springframework.jms.core.JmsTemplate.sendAndReceive(JmsTemplate.java:922)
at net.pascalalma.aws.sqs.requestresponse.MyMessageServiceClient.process(MyMessageServiceClient.java:29)
at net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain.main(MessageServiceClientMain.java:29)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: javax.jms.JMSException: Unsupported Method
at com.amazon.sqs.javamessaging.SQSSession.createTemporaryQueue(SQSSession.java:744)
at org.springframework.jms.core.JmsTemplate.doSendAndReceive(JmsTemplate.java:946)
at org.springframework.jms.core.JmsTemplate$12.doInJms(JmsTemplate.java:926)
at org.springframework.jms.core.JmsTemplate$12.doInJms(JmsTemplate.java:922)
at org.springframework.jms.core.JmsTemplate.executeLocal(JmsTemplate.java:983)
... 8 more
Как вы можете видеть, мы получаем трассировку стека, сообщающую нам, что метод JMS ‘createTeoraryQueue’ не поддерживается SQS! Пока что за поддержку JMS. Я предполагаю, что именно поэтому они называют это Simple Queuing Service, поскольку реализованы только некоторые из возможных методов JMS. Я искал больше информации об этом, но без какой-либо удачи. Тем не менее, я столкнулся с этой структурой: Nevado JMS . Они утверждали, что являются JMS-драйвером для AWS SQS / SNS, поэтому я решил попробовать. Сначала я добавил следующую зависимость в pom моего проекта:
<dependency> <groupId>org.skyscreamer</groupId> <artifactId>nevado-jms</artifactId> <version>1.3.1</version> </dependency>
И затем снова изменил JmsFactory в обоих моих конфигах Spring, на этот раз:
...
<bean id="sqsConnectorFactory" class="org.skyscreamer.nevado.jms.connector.amazonaws.AmazonAwsSQSConnectorFactory" />
<bean id="jmsFactory" class="org.skyscreamer.nevado.jms.NevadoConnectionFactory">
<property name="sqsConnectorFactory" ref="sqsConnectorFactory" />
<property name="awsAccessKey" value="${aws.accessKey}" />
<property name="awsSecretKey" value="${aws.secretKey}" />
</bean>
...
Теперь, когда я проводил основные занятия, я получил ожидаемый результат:
2015-04-25 20:33:27 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: dad74fbff8e0a2f2 2015-04-25 20:33:27 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 16 2015-04-25 20:33:53 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 16 2015-04-25 20:33:53 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ====================================================== 2015-04-25 20:33:53 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: ad74fbff8e0a2f2 2015-04-25 20:33:53 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 15 2015-04-25 20:34:04 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 15 2015-04-25 20:34:04 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ====================================================== 2015-04-25 20:34:04 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: d74fbff8e0a2f2 2015-04-25 20:34:04 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 14 2015-04-25 20:34:09 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 14 2015-04-25 20:34:09 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ====================================================== 2015-04-25 20:34:09 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 74fbff8e0a2f2 2015-04-25 20:34:09 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 13 2015-04-25 20:34:17 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 13 2015-04-25 20:34:17 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ====================================================== 2015-04-25 20:34:17 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 4fbff8e0a2f2 2015-04-25 20:34:17 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 12 2015-04-25 20:34:21 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 12 2015-04-25 20:34:21 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ======================================================
Таким образом, это показывает, что более продвинутые вещи все еще возможны с так называемыми «простыми» сервисами, хотя и здесь требуется некоторая помощь со стороны сообщества.