Статьи

Redis Publish Subscribe и длинный опрос с Spring’s DeferredResult


Redis предлагает не только хранилище значений ключей, но и публикацию сообщений о подписке.
В этом посте будет описан простой сценарий, использующий Spring Data Redis, — добавление объекта домена сообщений в хранилище с помощью вызова REST, публикация этого сообщения в канал, подписчики этого канала, получающие это сообщение, которые в результате устанавливают отложенный длительный опрос. результаты с сообщением.

Два ключевых классов в Redis публиковать подписываться механизмом является 
RedisTemplate  класса и 
RedisMessageListenerContainer  класса.
RedisTemplate  содержит 
JedisConnectionFactory  который содержит сведение о соединении Redis и, а также методы , чтобы манипулировать ключевые магазины значения, существует метод , называемый опубликовать
convertAndSend . Этот метод принимает два аргумента. Первый — это название канала, куда должны быть опубликованы сообщения, а второй — объект, подлежащий отправке. 

В этом примере публикация сообщения выполняется после того, как 
сообщение  сохраняется через аспект.
@Aspect
@Component
public class MessageAspect extends AbstractRedisAspect {

    private static final Logger LOGGER = LoggerFactory
            .getLogger(MessageAspect.class);

    @Value("${messaging.redis.channel.messages}")
    private String channelName;

    @After("execution(* com.city81.redisPubSub.repository.MessageDao.save(..))")
    public void interceptMessage(JoinPoint joinPoint) {
            
        Message message = (Message) joinPoint.getArgs()[0];
    
        // this publishes the message
        this.redisTemplate.convertAndSend(channelName, message);

    }

}
RedisMessageListenerContainer , а также проведение 
JedisConnectionFactory , держит карту слушателей сообщений , где ключ является сообщением слушателя экземпляр и значение канала. Ссылки экземпляра приемника сообщений класса , который реализует 
OnMessage  метод
MessageListener  интерфейса.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:p="http://www.springframework.org/schema/p"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/aop
        http://www.springframework.org/schema/aop/spring-aop-3.2.xsd" >

    <!-- for the redis pub sub aop beans -->
    <aop:aspectj-autoproxy />

    <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
          p:host-name="${messaging.redis.hostname}" p:port="${messaging.redis.port}"/>

    <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"
          p:connection-factory-ref="jedisConnectionFactory">
    </bean>

    <bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
        <constructor-arg>
            <ref bean="messageManager"/>
        </constructor-arg>
    </bean>

    <bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
        <property name="connectionFactory" ref="jedisConnectionFactory"/>
        <property name="messageListeners">
            <map>
                <entry key-ref="messageListener">
                    <bean class="org.springframework.data.redis.listener.ChannelTopic">
                        <constructor-arg value="${messaging.redis.channel.messages}" />
                    </bean>
                </entry>
            </map>
        </property>
    </bean>

</beans>

Когда сообщение публикуется, те подписчики, которые прослушивают этот канал, получат опубликованное сообщение с помощью 
 метода
onMessage . Опубликованное сообщение содержит сериализованный объект, который был отправлен в теле сообщения Redis, и его необходимо десериализовать и привести к исходному объекту.
    public void onMessage(
            org.springframework.data.redis.connection.Message redisMessage,
            byte[] pattern) {

        Message message = (Message) SerializationUtils.deserialize(redisMessage.getBody());
        
        // set the deferred results for the user
        for (DeferredResult<Message> deferredResult : this.messageDeferredResultList) {
                deferredResult.setResult(message);
        }

    }

Список 
DeferredResult  заполняется вызовами  метода getNewMessage службы REST 
. Это, в свою очередь, в 
MessageManager создаст 
 объект
DeferredResult , добавит его в список и вернет объект клиенту.

    public DeferredResult<Message> getNewMessage() throws Exception {

        final DeferredResult<Message> deferredResult =
                new DeferredResult<Message>(deferredResultTimeout);

        deferredResult.onCompletion(new Runnable() {
            public void run() {
                messageDeferredResultList.remove(deferredResult);
            }
        });

        deferredResult.onTimeout(new Runnable() {
            public void run() {
                messageDeferredResultList.remove(deferredResult);
            }
        });

        messageDeferredResultList.add(deferredResult);

        return deferredResult;
    }

GitHub 
репо  для этого примера содержит два простых HTML — страниц, один из которых начинается запрос длинного опроса , а другое , который добавляет сообщение. Это вызовет веб-сервис REST ниже.

@Controller
@RequestMapping("/messages")
public class MessageAPIController {

    @Inject
    private MessageManager messageManager;

    //
    // ADD A MESSAGE
    //
    @RequestMapping(value = "/add", method = RequestMethod.POST,
            produces = "application/json")
    @ResponseBody
    public Message addMessage(
            @RequestParam(required = true) String text) throws Exception {
        return messageManager.addMessage(text);
    }
    
    //
    // LONG POLLING
    //
    @RequestMapping(value = "/watch", method = RequestMethod.GET,
            produces = "application/json")
    @ResponseBody
    public DeferredResult<Message> getNewMessage() throws Exception {
        return messageManager.getNewMessage();
    }
    
    
}

Дальнейшее усовершенствование вышеупомянутого, чтобы гарантировать, что сообщения не будут пропущены между длинными запросами на опрос, состояло бы в том, чтобы хранить сообщения в Redis в отсортированном наборе с оценкой, являющейся меткой времени создания сообщения. Затем можно использовать механизм публикации Redis, чтобы сообщить подписчику, что в Redis есть новые сообщения, и затем он может извлечь их на основе времени последнего запроса и вернуть коллекцию сообщений обратно клиенту в объекте DeferredResult.