Статьи

Redis pub / sub Используя Spring

Продолжая открывать для себя мощный набор функций Redis , стоит упомянуть о встроенной поддержке паб / суб-сообщений.

Обмен сообщениями Pub / Sub является неотъемлемой частью архитектуры многих программ. Некоторым программным системам требуется решение для обмена сообщениями для обеспечения высокой производительности, масштабируемости, устойчивости и долговечности очередей, поддержки отработки отказа, транзакций и многих других полезных функций, что в мире Java в большинстве случаев всегда приводит к использованию одной из реализаций JMS. провайдеры. В моих предыдущих проектах я активно использовал Apache ActiveMQ (сейчас движется к Apache ActiveMQ Apollo ). Хотя это отличная реализация, иногда мне просто требовалась простая поддержка очередей, и Apache ActiveMQ просто выглядел слишком сложным для этого.

Альтернативы? Пожалуйста, попробуйте Redis pub / sub! Если вы уже используете Redis в качестве хранилища ключей / значений, несколько дополнительных строк конфигурации приведут к немедленному обмену сообщениями публикации / подписки в вашем приложении.

Проект Spring Data Redis очень хорошо описывает API-интерфейс Redis pub / sub и предоставляет модель, знакомую всем, кто использует возможности Spring для интеграции с JMS.

Как всегда, давайте начнем с файла конфигурации POM. Он довольно маленький и простой, включает необходимые зависимости Spring , Spring Data Redis и Jedis , отличный Java-клиент для Redis .

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemalocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelversion>4.0.0</modelversion>
    <groupid>com.example.spring</groupid>
    <artifactid>redis</artifactid>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <project.build.sourceencoding>UTF-8</project.build.sourceencoding>
        <spring.version>3.1.1.RELEASE</spring.version>
    </properties>

    <dependencies>
        <dependency>
            <groupid>org.springframework.data</groupid>
            <artifactid>spring-data-redis</artifactid>
            <version>1.0.1.RELEASE</version>
        </dependency>

        <dependency>
            <groupid>cglib</groupid>
            <artifactid>cglib-nodep</artifactid>
            <version>2.2</version>
        </dependency>

        <dependency>
            <groupid>log4j</groupid>
            <artifactid>log4j</artifactid>
            <version>1.2.16</version>
        </dependency>

        <dependency>
            <groupid>redis.clients</groupid>
            <artifactid>jedis</artifactid>
            <version>2.0.0</version>
            <type>jar</type>
        </dependency>

        <dependency>
            <groupid>org.springframework</groupid>
            <artifactid>spring-core</artifactid>
            <version>${spring.version}</version>
        </dependency>

        <dependency>
            <groupid>org.springframework</groupid>
            <artifactid>spring-context</artifactid>
            <version>${spring.version}</version>
           </dependency>
       </dependencies>

       <build>
           <plugins>
               <plugin>
                   <groupid>org.apache.maven.plugins</groupid>
                   <artifactid>maven-compiler-plugin</artifactid>
                   <version>2.3.2</version>
                   <configuration>
                       <source>1.6
                       <target>1.6</target>
                   </configuration>
               </plugin>
           </plugins>
    </build>
</project>

Переходя к настройке контекста Spring , давайте разберемся, что нам нужно для того, чтобы издатель мог публиковать некоторые сообщения, а потребитель мог их использовать. Знание соответствующих абстракций Spring для JMS очень поможет в этом.

  • нам нужна фабрика соединений -> JedisConnectionFactory
  • нам нужен шаблон для публикации сообщений издателем -> RedisTemplate
  • нам нужен приемник сообщений для потребителя, чтобы потреблять сообщения -> RedisMessageListenerContainer

Используя
конфигурацию
Spring Java, давайте опишем наш контекст:

package com.example.redis.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericToStringSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.annotation.EnableScheduling;

import com.example.redis.IRedisPublisher;
import com.example.redis.impl.RedisMessageListener;
import com.example.redis.impl.RedisPublisherImpl;

@Configuration
@EnableScheduling
public class AppConfig {
    @Bean
    JedisConnectionFactory jedisConnectionFactory() {
        return new JedisConnectionFactory();
    }

    @Bean
    RedisTemplate< String, Object > redisTemplate() {
        final RedisTemplate< String, Object > template =  new RedisTemplate< String, Object >();
        template.setConnectionFactory( jedisConnectionFactory() );
        template.setKeySerializer( new StringRedisSerializer() );
        template.setHashValueSerializer( new GenericToStringSerializer< Object >( Object.class ) );
        template.setValueSerializer( new GenericToStringSerializer< Object >( Object.class ) );
        return template;
    }

    @Bean
    MessageListenerAdapter messageListener() {
        return new MessageListenerAdapter( new RedisMessageListener() );
    }

    @Bean
    RedisMessageListenerContainer redisContainer() {
        final RedisMessageListenerContainer container = new RedisMessageListenerContainer();

        container.setConnectionFactory( jedisConnectionFactory() );
        container.addMessageListener( messageListener(), topic() );

        return container;
    }

    @Bean
    IRedisPublisher redisPublisher() {
        return new RedisPublisherImpl( redisTemplate(), topic() );
    }

    @Bean
    ChannelTopic topic() {
        return new ChannelTopic( "pubsub:queue" );
    }
}

Очень просто и понятно. Наличие аннотации @EnableScheduling не является обязательным и требуется только для нашей реализации издателя: издатель будет публиковать строковое сообщение каждые 100 мс.

package com.example.redis.impl;

import java.util.concurrent.atomic.AtomicLong;

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.scheduling.annotation.Scheduled;

import com.example.redis.IRedisPublisher;

public class RedisPublisherImpl implements IRedisPublisher {
    private final RedisTemplate< String, Object > template;
    private final ChannelTopic topic; 
    private final AtomicLong counter = new AtomicLong( 0 );

    public RedisPublisherImpl( final RedisTemplate< String, Object > template, 
            final ChannelTopic topic ) {
        this.template = template;
        this.topic = topic;
    }

    @Scheduled( fixedDelay = 100 )
    public void publish() {
        template.convertAndSend( topic.getTopic(), "Message " + counter.incrementAndGet() + 
            ", " + Thread.currentThread().getName() );
 }
}

И, наконец, наша реализация прослушивателя сообщений (которая просто печатает сообщение на консоли).

package com.example.redis.impl;

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;

public class RedisMessageListener implements MessageListener {
    @Override
    public void onMessage( final Message message, final byte[] pattern ) {
        System.out.println( "Message received: " + message.toString() );
    }
}

Удивительно, всего два небольших класса, одна конфигурация для объединения вещей, и у нас есть полная поддержка обмена сообщениями в пабах / суб-приложениях в нашем приложении! Давайте запустим приложение как автономное …

package com.example.redis;

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import com.example.redis.config.AppConfig;

public class RedisPubSubStarter {
    public static void main(String[] args) {
        new AnnotationConfigApplicationContext( AppConfig.class );
    }
}

… и увидеть следующий вывод в консоли: 

...
Message received: Message 1, pool-1-thread-1
Message received: Message 2, pool-1-thread-1
Message received: Message 3, pool-1-thread-1
Message received: Message 4, pool-1-thread-1
Message received: Message 5, pool-1-thread-1
Message received: Message 6, pool-1-thread-1
Message received: Message 7, pool-1-thread-1
Message received: Message 8, pool-1-thread-1
Message received: Message 9, pool-1-thread-1
Message received: Message 10, pool-1-thread-1
Message received: Message 11, pool-1-thread-1
Message received: Message 12, pool-1-thread-1
Message received: Message 13, pool-1-thread-1
Message received: Message 14, pool-1-thread-1
Message received: Message 15, pool-1-thread-1
Message received: Message 16, pool-1-thread-1
...

Большой! Существует гораздо больше того, что вы можете сделать с пабом / подпунктом Redis , отличная документация доступна для вас на официальном сайте Redis