Статьи

Redis Pub / Sub с использованием Spring

Продолжая открывать для себя мощный набор функций Redis , стоит упомянуть о встроенной поддержке паб / суб-сообщений.

Обмен сообщениями Pub / Sub является неотъемлемой частью многих архитектур программного обеспечения. Некоторым программным системам требуется решение для обмена сообщениями для обеспечения высокой производительности, масштабируемости, устойчивости и долговечности очередей, поддержки отработки отказа, транзакций и многих других полезных функций, что в мире Java в большинстве случаев всегда приводит к использованию одной из реализаций JMS. провайдеры. В моих предыдущих проектах я активно использовал Apache ActiveMQ (сейчас движется к Apache ActiveMQ Apollo ). Хотя это отличная реализация, иногда мне просто требовалась простая поддержка очередей, и Apache ActiveMQ просто выглядел слишком сложным для этого.

Альтернативы? Пожалуйста, попробуйте Redis pub / sub! Если вы уже используете Redis в качестве хранилища ключей / значений, несколько дополнительных строк конфигурации приведут к немедленному обмену сообщениями публикации / подписки в вашем приложении.

Проект Spring Data Redis очень хорошо описывает API-интерфейс Redis pub / sub и предоставляет модель, знакомую всем, кто использует возможности Spring для интеграции с JMS.

Как всегда, давайте начнем с файла конфигурации POM. Он довольно маленький и простой, включает необходимые зависимости Spring , Spring Data Redis и Jedis , отличный Java-клиент для Redis .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
 
    <modelversion>4.0.0</modelversion>
    <groupid>com.example.spring</groupid>
    <artifactid>redis</artifactid>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
 
    <properties>
        <project.build.sourceencoding>UTF-8</project.build.sourceencoding>
        <spring.version>3.1.1.RELEASE</spring.version>
    </properties>
 
    <dependencies>
        <dependency>
            <groupid>org.springframework.data</groupid>
            <artifactid>spring-data-redis</artifactid>
            <version>1.0.1.RELEASE</version>
        </dependency>
 
        <dependency>
            <groupid>cglib</groupid>
            <artifactid>cglib-nodep</artifactid>
            <version>2.2</version>
        </dependency>
 
        <dependency>
            <groupid>log4j</groupid>
            <artifactid>log4j</artifactid>
            <version>1.2.16</version>
        </dependency>
 
        <dependency>
            <groupid>redis.clients</groupid>
            <artifactid>jedis</artifactid>
            <version>2.0.0</version>
            <type>jar</type>
        </dependency>
 
        <dependency>
            <groupid>org.springframework</groupid>
            <artifactid>spring-core</artifactid>
            <version>${spring.version}</version>
        </dependency>
 
        <dependency>
            <groupid>org.springframework</groupid>
            <artifactid>spring-context</artifactid>
            <version>${spring.version}</version>
           </dependency>
       </dependencies>
 
       <build>
           <plugins>
               <plugin>
                   <groupid>org.apache.maven.plugins</groupid>
                   <artifactid>maven-compiler-plugin</artifactid>
                   <version>2.3.2</version>
                   <configuration>
                       <source>1.6
                       <target>1.6</target>
                   </configuration>
               </plugin>
           </plugins>
    </build>
</project>

Переходя к настройке контекста Spring , давайте разберемся, что нам нужно для того, чтобы издатель мог публиковать некоторые сообщения, а потребитель мог их использовать. Знание соответствующих абстракций Spring для JMS очень поможет в этом.

  • нам нужна фабрика соединений -> JedisConnectionFactory
  • нам нужен шаблон для публикации сообщений издателем -> RedisTemplate
  • нам нужен приемник сообщений для потребителя, чтобы потреблять сообщения -> RedisMessageListenerContainer

Используя конфигурацию Spring Java, давайте опишем наш контекст:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.example.redis.config;
 
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericToStringSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.annotation.EnableScheduling;
 
import com.example.redis.IRedisPublisher;
import com.example.redis.impl.RedisMessageListener;
import com.example.redis.impl.RedisPublisherImpl;
 
@Configuration
@EnableScheduling
public class AppConfig {
    @Bean
    JedisConnectionFactory jedisConnectionFactory() {
        return new JedisConnectionFactory();
    }
 
    @Bean
    RedisTemplate< String, Object > redisTemplate() {
        final RedisTemplate< String, Object > template =  new RedisTemplate< String, Object >();
        template.setConnectionFactory( jedisConnectionFactory() );
        template.setKeySerializer( new StringRedisSerializer() );
        template.setHashValueSerializer( new GenericToStringSerializer< Object >( Object.class ) );
        template.setValueSerializer( new GenericToStringSerializer< Object >( Object.class ) );
        return template;
    }
 
    @Bean
    MessageListenerAdapter messageListener() {
        return new MessageListenerAdapter( new RedisMessageListener() );
    }
 
    @Bean
    RedisMessageListenerContainer redisContainer() {
        final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
 
        container.setConnectionFactory( jedisConnectionFactory() );
        container.addMessageListener( messageListener(), topic() );
 
        return container;
    }
  
    @Bean
    IRedisPublisher redisPublisher() {
        return new RedisPublisherImpl( redisTemplate(), topic() );
    }
 
    @Bean
    ChannelTopic topic() {
        return new ChannelTopic( 'pubsub:queue' );
    }
}

Очень просто и понятно. Наличие аннотации @EnableScheduling не является обязательным и требуется только для нашей реализации издателя: издатель будет публиковать строковое сообщение каждые 100 мс.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.example.redis.impl;
 
import java.util.concurrent.atomic.AtomicLong;
 
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.scheduling.annotation.Scheduled;
 
import com.example.redis.IRedisPublisher;
 
public class RedisPublisherImpl implements IRedisPublisher {
    private final RedisTemplate< String, Object > template;
    private final ChannelTopic topic;
    private final AtomicLong counter = new AtomicLong( 0 );
 
    public RedisPublisherImpl( final RedisTemplate< String, Object > template,
            final ChannelTopic topic ) {
        this.template = template;
        this.topic = topic;
    }
 
    @Scheduled( fixedDelay = 100 )
    public void publish() {
        template.convertAndSend( topic.getTopic(), 'Message ' + counter.incrementAndGet() +
            ', ' + Thread.currentThread().getName() );
 }
}

И, наконец, наша реализация прослушивателя сообщений (которая просто печатает сообщение на консоли).

01
02
03
04
05
06
07
08
09
10
11
package com.example.redis.impl;
 
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
 
public class RedisMessageListener implements MessageListener {
    @Override
    public void onMessage( final Message message, final byte[] pattern ) {
        System.out.println( 'Message received: ' + message.toString() );
    }
}

Удивительно, всего два небольших класса, одна конфигурация для объединения вещей, и у нас есть полная поддержка обмена сообщениями в пабах / суб-приложениях в нашем приложении! Давайте запустим приложение как автономное …

01
02
03
04
05
06
07
08
09
10
11
12
package com.example.redis;
 
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
 
import com.example.redis.config.AppConfig;
 
public class RedisPubSubStarter {
    public static void main(String[] args) {
        new AnnotationConfigApplicationContext( AppConfig.class );
    }
}

… И посмотрите следующий вывод в консоли:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
...
Message received: Message 1, pool-1-thread-1
Message received: Message 2, pool-1-thread-1
Message received: Message 3, pool-1-thread-1
Message received: Message 4, pool-1-thread-1
Message received: Message 5, pool-1-thread-1
Message received: Message 6, pool-1-thread-1
Message received: Message 7, pool-1-thread-1
Message received: Message 8, pool-1-thread-1
Message received: Message 9, pool-1-thread-1
Message received: Message 10, pool-1-thread-1
Message received: Message 11, pool-1-thread-1
Message received: Message 12, pool-1-thread-1
Message received: Message 13, pool-1-thread-1
Message received: Message 14, pool-1-thread-1
Message received: Message 15, pool-1-thread-1
Message received: Message 16, pool-1-thread-1
...

Большой! Существует гораздо больше того, что вы можете сделать с пабом / подпунктом Redis , отличная документация доступна для вас на официальном сайте Redis .

Ссылка: Redis pub / sub с использованием Spring от нашего партнера по JCG Андрея Редько в блоге Андрея Редько {devmind} .