Статьи

Более продвинутый материал с JMS и AWS SQS

Как вы, возможно, знаете, SQS в AWS SQS означает «Простая служба очереди». Играя с этим, я недавно нашел одну из причин, почему это можно назвать «простым». В двух предыдущих постах ( здесь и здесь ) я показал использование SQS в качестве поставщика очереди JMS в сочетании с Spring Framework . С этой базовой установкой я решил пойти дальше и начал экспериментировать с шаблоном запрос-ответ в сочетании с JMS (используя свойство JMS ‘JMSReplyTo’ и временные очереди). В этой довольно классической статье хорошо объясняется, как это работает и почему это работает таким образом.

Чтобы показать, как это должно работать, я сначала покажу настройки, которые я использовал с Apache ActiveMQ . Позвольте мне показать bean-компонент, который выбирает сообщение из очереди, выполняет действие над содержимым и отправляет ответ JMSReplyTo в заголовке JMS. Поскольку я использую Spring, это звучит сложнее, чем есть на самом деле. Сначала код Java:

package net.pascalalma.aws.sqs.requestresponse;
 
import org.springframework.stereotype.Service;
 
@Service
public class MyMessageService implements ResponsiveTextMessageDelegate {
 
    public String onMessage(String txt) {
       return String.valueOf(txt.length());
    }
}

Это довольно простой класс, я бы сказал. Он реализует ResponsiveTextMessageDelegate (подробности этого интерфейса описаны здесь ) и просто возвращает длину содержимого входящего сообщения. Все остальное, что нужно сделать, заботится Spring Framework. Конфигурация Spring для этого сервиса выглядит так:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd">
 
    <context:component-scan base-package="net.pascalalma.aws.sqs.requestresponse"></context:component-scan>
    <context:annotation-config/>
 
    <!-- ActiveMQ config -->
    <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL">
            <value>tcp://localhost:61616</value>
        </property>
    </bean>
     
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsFactory"/>
    </bean>
    <bean id="requestQueueName" class="java.lang.String">
        <constructor-arg value="DefaultDemoQueue"/>
    </bean>
 
    <bean id="myMessageService" class="net.pascalalma.aws.sqs.requestresponse.MyMessageService" />
 
    <bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <property name="delegate" ref="myMessageService"/>
        <property name="defaultListenerMethod" value="onMessage"/>
        <property name="messageConverter" ref="messageConverter" />
    </bean>
     
    <bean id="messageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" />
     
    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="jmsFactory"/>
        <property name="destinationName" ref="requestQueueName"/>
        <property name="messageListener" ref="messageListener"/>
    </bean>
</beans>

Это в основном такая же конфигурация, как описано в моем предыдущем посте. Единственное отличие состоит в том, что теперь я использую конвертер SimpleMessageConverter, который заботится о преобразовании возвращаемой строки в TextMessage. Если бы мы не определяли этот конвертер, мы получили бы следующую ошибку:

ava.lang.NoSuchMethodException: net.pascalalma.aws.sqs.requestresponse.MyMessageService.onMessage(org.apache.activemq.command.ActiveMQTextMessage

Далее нам нужен клиентский компонент Service, который может «общаться» с нашим сервисом. Это может выглядеть так в Java:

package net.pascalalma.aws.sqs.requestresponse;
 
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.SessionCallback;
import org.springframework.jms.support.JmsUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
 
import javax.annotation.Resource;
import javax.jms.*;
import java.util.Random;
 
@Component
public class MyMessageServiceClient {
 
    final static Logger logger = Logger.getLogger(MyMessageServiceClient.class);
 
    @Resource
    private JmsTemplate jmsTemplate;
 
    @Autowired
    private String requestQueueName;
 
    public String process(final String txt) {
        //Setup a message producer to send message to the queue the server is consuming from
        Message response = jmsTemplate.sendAndReceive(requestQueueName,
                        new MessageCreator() {
                            public Message createMessage(Session session) throws JMSException {
                                TextMessage message = session.createTextMessage();
                                message.setText(txt);
                                return message;
                            }
                        });
 
        String result = null;
        try {
            result = ((TextMessage) response).getText();
        } catch (JMSException e) {
            logger.error(e);
        }
        return result;
    }
}

Мы видим, что мы используем sendAndReceive jmsTemplate для отправки сообщения, созданного в обратном вызове MessageCreator, и ожидания ответного сообщения. Соответствующая конфигурация Spring для этого класса:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd">
 
  <context:component-scan base-package="net.pascalalma.aws.sqs.requestresponse"></context:component-scan>
  <context:annotation-config/>
 
  <!-- ActiveMQ config -->
  <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL">
      <value>tcp://localhost:61616</value>
    </property>
  </bean>
  <!-- End ActiveMQ specific -->
 
  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="jmsFactory"/>
  </bean>
  <bean id="requestQueueName" class="java.lang.String">
    <constructor-arg value="DefaultDemoQueue"/>
  </bean>
  <bean id="myMessageServiceClient" class="net.pascalalma.aws.sqs.requestresponse.MyMessageServiceClient"/>
</beans>

Теперь остается некоторый «контейнер», чтобы увидеть эти компоненты в действиях, для которых я создал основной класс для «серверной» части:

package net.pascalalma.aws.sqs.requestresponse;
 
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
 
public class MessageServiceMain {
 
    public static void main(String[] args) {
        //Build application context by reading spring-config.xml
        ApplicationContext ctx = new ClassPathXmlApplicationContext(new String[]{"requestresponse/application-context.xml"});
    }
}

Запуск этого класса в вашей среде IDE или терминале просто читает конфигурацию SPring и создает экземпляры служебных компонентов. Основной класс клиента имеет немного больше кода:

package net.pascalalma.aws.sqs.requestresponse;
 
import org.apache.log4j.Logger;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
 
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
 
public class MessageServiceClientMain {
 
    final static Logger logger = Logger.getLogger(MessageServiceClientMain.class);
 
    public static void main(String[] args) {
        //Build application context by reading spring-config.xml
        ApplicationContext ctx = new ClassPathXmlApplicationContext(new String[]{"requestresponse/application-context-client.xml"});
 
        //Get an instance of ProductService class;
        MyMessageServiceClient messageServiceClient = (MyMessageServiceClient) ctx.getBean("myMessageServiceClient");
 
        //Call getProduct method of ProductService
        String random = createRandomString();
 
        for (int i=0; i<16; i++) {
            String key = random.substring(i);
            logger.info("Sending to service: " + key);
            logger.info("Sending to service with length: " + key.length());
            String result = messageServiceClient.process(key);
            logger.info("Received from service: " + result);
            logger.info("======================================================");
        }
    }
 
    private static String createRandomString() {
        Random random = new Random(System.currentTimeMillis());
        long randomLong = random.nextLong();
        return Long.toHexString(randomLong);
    }
}

Запуск этого класса сгенерирует сообщения и отправит их службе, а также напечатает результат, полученный от службы, следующим образом:

2015-04-20 20:29:14 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 42fdcd4355cc5314
2015-04-20 20:29:14 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 16
2015-04-20 20:29:15 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 16
2015-04-20 20:29:15 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ======================================================
2015-04-20 20:29:15 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 2fdcd4355cc5314
2015-04-20 20:29:15 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 15
2015-04-20 20:29:15 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 15
2015-04-20 20:29:15 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ======================================================

Все идет нормально. Теперь давайте использовать AWS SQS вместо локального экземпляра Active MQ. Это легко сделать, просто изменив конфигурацию для используемого JmsFactory в обеих наших конфигурациях Spring:

...
  <bean id="credentialsProviderBean" class="com.amazonaws.auth.DefaultAWSCredentialsProviderChain"/>
 
  <bean id="connectionFactoryBuilder" class="com.amazon.sqs.javamessaging.SQSConnectionFactory$Builder">
    <property name="regionName" value="eu-west-1"/>
    <property name="numberOfMessagesToPrefetch" value="5"/>
    <property name="awsCredentialsProvider" ref="credentialsProviderBean"/>
  </bean>
 
  <bean id="jmsFactory" class="com.amazon.sqs.javamessaging.SQSConnectionFactory"
        factory-bean="connectionFactoryBuilder"
        factory-method="build"/>
...

Теперь, если мы запустим приложение «сервер» и приложение «клиент», мы получим следующий вывод:

2015-04-25 20:22:49 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: f1db848691a26c85
2015-04-25 20:22:49 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 16
Exception in thread "main" org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is javax.jms.JMSException: Unsupported Method
    at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316)
    at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169)
    at org.springframework.jms.core.JmsTemplate.executeLocal(JmsTemplate.java:986)
    at org.springframework.jms.core.JmsTemplate.sendAndReceive(JmsTemplate.java:922)
    at net.pascalalma.aws.sqs.requestresponse.MyMessageServiceClient.process(MyMessageServiceClient.java:29)
    at net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain.main(MessageServiceClientMain.java:29)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: javax.jms.JMSException: Unsupported Method
    at com.amazon.sqs.javamessaging.SQSSession.createTemporaryQueue(SQSSession.java:744)
    at org.springframework.jms.core.JmsTemplate.doSendAndReceive(JmsTemplate.java:946)
    at org.springframework.jms.core.JmsTemplate$12.doInJms(JmsTemplate.java:926)
    at org.springframework.jms.core.JmsTemplate$12.doInJms(JmsTemplate.java:922)
    at org.springframework.jms.core.JmsTemplate.executeLocal(JmsTemplate.java:983)
    ... 8 more

Как вы можете видеть, мы получаем трассировку стека, сообщающую нам, что метод JMS ‘createTeoraryQueue’ не поддерживается SQS! Пока что за поддержку JMS. Я предполагаю, что именно поэтому они называют это Simple Queuing Service, поскольку реализованы только некоторые из возможных методов JMS. Я искал больше информации об этом, но без какой-либо удачи. Тем не менее, я столкнулся с этой структурой: Nevado JMS . Они утверждали, что являются JMS-драйвером для AWS SQS / SNS, поэтому я решил попробовать. Сначала я добавил следующую зависимость в pom моего проекта:

<dependency>
  <groupId>org.skyscreamer</groupId>
  <artifactId>nevado-jms</artifactId>
  <version>1.3.1</version>
</dependency>

И затем снова изменил JmsFactory в обоих моих конфигах Spring, на этот раз:

...
  <bean id="sqsConnectorFactory" class="org.skyscreamer.nevado.jms.connector.amazonaws.AmazonAwsSQSConnectorFactory" />
  <bean id="jmsFactory" class="org.skyscreamer.nevado.jms.NevadoConnectionFactory">
    <property name="sqsConnectorFactory" ref="sqsConnectorFactory" />
    <property name="awsAccessKey" value="${aws.accessKey}" />
    <property name="awsSecretKey" value="${aws.secretKey}" />
  </bean>    
...

Теперь, когда я проводил основные занятия, я получил ожидаемый результат:

2015-04-25 20:33:27 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: dad74fbff8e0a2f2
2015-04-25 20:33:27 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 16
2015-04-25 20:33:53 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 16
2015-04-25 20:33:53 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ======================================================
2015-04-25 20:33:53 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: ad74fbff8e0a2f2
2015-04-25 20:33:53 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 15
2015-04-25 20:34:04 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 15
2015-04-25 20:34:04 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ======================================================
2015-04-25 20:34:04 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: d74fbff8e0a2f2
2015-04-25 20:34:04 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 14
2015-04-25 20:34:09 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 14
2015-04-25 20:34:09 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ======================================================
2015-04-25 20:34:09 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 74fbff8e0a2f2
2015-04-25 20:34:09 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 13
2015-04-25 20:34:17 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 13
2015-04-25 20:34:17 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ======================================================
2015-04-25 20:34:17 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 4fbff8e0a2f2
2015-04-25 20:34:17 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 12
2015-04-25 20:34:21 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 12
2015-04-25 20:34:21 INFO  net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ======================================================

Таким образом, это показывает, что более продвинутые вещи все еще возможны с так называемыми «простыми» сервисами, хотя и здесь требуется некоторая помощь со стороны сообщества.