Статьи

Создание управляемого сообщениями компонента с помощью AWS SQS весной

В моем предыдущем посте я показал простой пример использования AWS SQS с Spring Framework для помещения сообщений в очередь и их чтения из очереди. В этом посте я делаю еще один шаг и использую Spring для создания «управляемого сообщениями компонента», поэтому каждое сообщение, помещаемое в очередь, выбирается и обрабатывается «автоматически». AWS называет это асинхронным способом на странице документации . Чтобы сделать это, я определю MessageListener в Spring и настрою его для прослушивания моей очереди, как описано здесь . Чтобы увидеть первоначальную настройку проекта, пожалуйста, посмотрите мой предыдущий пост, так как я больше не буду его здесь показывать.

Контекст приложения Spring будет определять прослушиватель сообщений (и соответствующие объекты) следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
<bean id="amazonMessageListener" class="net.pascalalma.aws.sqs.SpringMessageDrivenBean" />
 
  <bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <property name="delegate" ref="amazonMessageListener"/>
    <property name="defaultListenerMethod" value="onMessage"/>
    <property name="messageConverter">
      <null/>
    </property>
  </bean>
 
  <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destinationName" ref="queueName" />
    <property name="messageListener" ref="messageListener" />
  </bean>

Сначала я определил свой компонент MDB (MessageDrivenBean) и назвал его «amazonMessageListener». Затем я использую этот MDB в качестве «делегата» для адаптера «messageListener». Этот компонент-адаптер также может позаботиться о преобразовании полезной нагрузки сообщения (здесь игнорируется) и вызове правильного метода в делегированном слушателе.

В бине ‘jmsContainer’ ‘адаптер’ связан с используемой фабрикой соединений JMS и местом назначения для прослушивания.

Все, что осталось, это исходный код самого MDB:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package net.pascalalma.aws.sqs;
 
import org.apache.log4j.Logger;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
 
public class SpringMessageDrivenBean {
 
    final static Logger logger = Logger.getLogger(SpringMessageDrivenBean.class);
 
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                logger.info(String.format("MDB received: %s ", ((TextMessage) message).getText()));
            }
            catch (JMSException ex) {
                throw new RuntimeException(ex);
            }
        }
        else {
            throw new IllegalArgumentException("Message must be of type TextMessage");
        }
    }
}

Я думаю, это довольно просто. Метод ‘onMessage’ вызывается для каждого сообщения, помещенного в очередь, и в этом случае он просто печатает текстовое содержимое сообщения. Чтобы увидеть, как это работает, я использую следующий основной класс:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
package net.pascalalma.aws.sqs;
 
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
 
public class SpringMdbMain {
 
    public static void main(String[] args) {
        //Build application context by reading spring-config.xml
        ApplicationContext ctx = new ClassPathXmlApplicationContext(new String[]{"application-context.xml"});
 
        //Get an instance of ProviderService class;
        MyMessageProvider prdSvc = (MyMessageProvider) ctx.getBean("myMessageProviderService");
 
        //Call getProduct method of ProductService
        prdSvc.sendMessage("This is a test A");
        prdSvc.sendMessage("This is a test B");
        prdSvc.sendMessage("This is a test C");
        prdSvc.sendMessage("This is a test D");
    }
}

Это приводит к следующему выводу:

01
02
03
04
05
06
07
08
09
10
11
12
2015-04-11 13:17:20 DEBUG net.pascalalma.aws.sqs.MyMessageProvider(23) - Sending message with txt: This is a test A
2015-04-11 13:17:26 DEBUG net.pascalalma.aws.sqs.MyMessageProvider(36) - Message sent
2015-04-11 13:17:26 DEBUG net.pascalalma.aws.sqs.MyMessageProvider(23) - Sending message with txt: This is a test B
2015-04-11 13:17:26 INFO  net.pascalalma.aws.sqs.SpringMessageDrivenBean(16) - MDB received: This is a test A
2015-04-11 13:17:26 DEBUG net.pascalalma.aws.sqs.MyMessageProvider(36) - Message sent
2015-04-11 13:17:26 DEBUG net.pascalalma.aws.sqs.MyMessageProvider(23) - Sending message with txt: This is a test C
2015-04-11 13:17:26 INFO  net.pascalalma.aws.sqs.SpringMessageDrivenBean(16) - MDB received: This is a test B
2015-04-11 13:17:27 DEBUG net.pascalalma.aws.sqs.MyMessageProvider(36) - Message sent
2015-04-11 13:17:27 DEBUG net.pascalalma.aws.sqs.MyMessageProvider(23) - Sending message with txt: This is a test D
2015-04-11 13:17:27 INFO  net.pascalalma.aws.sqs.SpringMessageDrivenBean(16) - MDB received: This is a test C
2015-04-11 13:17:27 DEBUG net.pascalalma.aws.sqs.MyMessageProvider(36) - Message sent
2015-04-11 13:17:27 INFO  net.pascalalma.aws.sqs.SpringMessageDrivenBean(16) - MDB received: This is a test D