В предыдущем уроке мы реализовали пример Spring Boot + RabbitMQ, чтобы понять различные типы обмена . В этом руководстве мы будем реализовывать пример Spring Boot + RabbitMQ, чтобы повторить сообщения об исключении. Если исключение все еще существует после максимальных попыток, мы помещаем сообщение в очередь недоставленных сообщений, где оно может быть проанализировано и исправлено позже.
Что такое очередь мертвых писем?
В словаре английского языка письмо-переписка — это недоставленная почта, которая не может быть доставлена адресату. Очередь недоставленных сообщений (DLQ), иногда известная как очередь недоставленных сообщений, является очередью хранения сообщений, которые не могут быть доставлены по назначению из-за чего-либо.
Согласно Википедии — В очереди сообщений очередь недоставленных сообщений является реализацией службы для хранения сообщений, которые соответствуют одному или нескольким из следующих критериев сбоя:
- Сообщение, которое отправляется в несуществующую очередь
- Превышен предел длины очереди
- Превышен предел длины сообщения
- Сообщение отклонено другим обменом очередей
- Сообщение достигает порогового значения счетчика чтения, поскольку оно не используется. Иногда это называют «очередью возврата»
Позже мы можем проанализировать сообщения в DLQ, чтобы узнать причину сбоя сообщений.
Этот учебник объясняется в видео YouTube ниже.
Мы будем реализовывать два модуля:
- Модуль Spring Boot Producer — он создаст сообщение и поместит его в очередь RabbitMQ. Он также будет отвечать за создание необходимых очередей, включая очередь недоставленных сообщений.
- Модуль Spring Boot Consumer — он будет использовать сообщение из очереди RabbitMQ. Мы будем выдавать исключение и затем повторять попытку сообщения. После максимального количества попыток он будет помещен в очередь недоставленных сообщений.
Модуль Spring Boot + RabbitMQ Producer
Проект Maven будет следующим:
Файл pom.xml будет иметь следующие зависимости:
XML
1
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3
<modelVersion>4.0.0</modelVersion>
4
<groupId>com.javainuse</groupId>
5
<artifactId>spring-boot-rabbitmq-producer</artifactId>
6
<version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><parent>
7
<groupId>org.springframework.boot</groupId>
8
<artifactId>spring-boot-starter-parent</artifactId>
9
<version>2.1.1.RELEASE</version>
10
<relativePath />
11
</parent><properties>
12
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
13
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
14
<java.version>1.8</java.version>
15
</properties><dependencies>
16
<dependency>
17
<groupId>org.springframework.boot</groupId>
18
<artifactId>spring-boot-starter-amqp</artifactId>
19
</dependency>
20
<dependency>
21
<groupId>org.springframework.boot</groupId>
22
<artifactId>spring-boot-starter-web</artifactId>
23
</dependency>
24
<dependency>
25
<groupId>org.springframework.boot</groupId>
26
<artifactId>spring-boot-starter-logging</artifactId>
27
</dependency>
28
</dependencies><build>
29
<plugins>
30
<plugin>
31
<groupId>org.springframework.boot</groupId>
32
<artifactId>spring-boot-maven-plugin</artifactId>
33
</plugin>
34
</plugins>
35
</build></project>
Определите класс домена Employee следующим образом:
Джава
xxxxxxxxxx
1
package com.javainuse.model;import com.fasterxml.jackson.annotation.JsonIdentityInfo;
2
import com.fasterxml.jackson.annotation.ObjectIdGenerators; (generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id", scope = Employee.class)
3
public class Employee { private String empName;
4
private String empId;
5
private int salary; public String getEmpName() {
6
return empName;
7
} public void setEmpName(String empName) {
8
this.empName = empName;
9
} public String getEmpId() {
10
return empId;
11
} public void setEmpId(String empId) {
12
this.empId = empId;
13
} public int getSalary() {
14
return salary;
15
} public void setSalary(int salary) {
16
this.salary = salary;
17
}
18
public String toString() {
19
return "Employee [empName=" + empName + ", empId=" + empId + ", salary=" + salary + "]";
20
}
21
}
Затем определите класс конфигурации, где мы:
- Создайте прямые обмены с именами — deadLetterExchange и javainuseExchange.
- Создайте очередь с именами javainuse и dlq. Для очереди javainuse укажите аргумент x-dead-letter-exchange как deadLetterExchange. Это означает, что любое сообщение в очереди javainuse, которое не может быть доставлено, будет отправлено в deadLetterExchange.
- Свяжите очередь javainuse с помощью javainuseExchange и очередь dlq с помощью deadLetterExchange.
Джава
xxxxxxxxxx
1
package com.javainuse.config;
2
import org.springframework.amqp.core.AmqpTemplate;
4
import org.springframework.amqp.core.Binding;
5
import org.springframework.amqp.core.BindingBuilder;
6
import org.springframework.amqp.core.DirectExchange;
7
import org.springframework.amqp.core.Queue;
8
import org.springframework.amqp.core.QueueBuilder;
9
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
10
import org.springframework.amqp.rabbit.core.RabbitTemplate;
11
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
12
import org.springframework.amqp.support.converter.MessageConverter;
13
import org.springframework.context.annotation.Bean;
14
import org.springframework.context.annotation.Configuration;
15
17
public class RabbitMQConfig {
18
20
DirectExchange deadLetterExchange() {
21
return new DirectExchange("deadLetterExchange");
22
}
23
24
25
DirectExchange exchange() {
26
return new DirectExchange("javainuseExchange");
27
}
28
30
Queue dlq() {
31
return QueueBuilder.durable("deadLetter.queue").build();
32
}
33
35
Queue queue() {
36
return QueueBuilder.durable("javainuse.queue").withArgument("x-dead-letter-exchange", "deadLetterExchange")
37
.withArgument("x-dead-letter-routing-key", "deadLetter").build();
38
}
39
41
Binding DLQbinding() {
42
return BindingBuilder.bind(dlq()).to(deadLetterExchange()).with("deadLetter");
43
}
44
46
Binding binding() {
47
return BindingBuilder.bind(queue()).to(exchange()).with("javainuse");
48
}
49
51
public MessageConverter jsonMessageConverter() {
52
return new Jackson2JsonMessageConverter();
53
}
54
public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
56
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
57
rabbitTemplate.setMessageConverter(jsonMessageConverter());
58
return rabbitTemplate;
59
}
60
}
Создайте класс RabbitMQWebController, где мы предоставляем API для отправки сообщения в RabbitMQ Exchange.
Джава
xxxxxxxxxx
1
package com.javainuse.controller;import org.springframework.amqp.core.AmqpTemplate;
2
import org.springframework.beans.factory.annotation.Autowired;
3
import org.springframework.web.bind.annotation.GetMapping;
4
import org.springframework.web.bind.annotation.RequestMapping;
5
import org.springframework.web.bind.annotation.RequestParam;
6
import org.springframework.web.bind.annotation.RestController;
7
value = "/javainuse-rabbitmq/") (
8
public class RabbitMQWebController {
9
private AmqpTemplate amqpTemplate; (value = "/producer")
10
public String producer( ("empName") String empName, ("empId") String empId, ("salary") int salary) {
11
Employee emp=new Employee();
12
emp.setEmpId(empId);
13
emp.setEmpName(empName);
14
emp.setSalary(salary); amqpTemplate.convertAndSend("javainuseExchange", "javainuse", emp);
15
return "Message sent to the RabbitMQ Successfully";
16
}
17
}
Создайте класс начальной загрузки Spring с аннотацией SpringBootApplication.
Джава
xxxxxxxxxx
1
package com.javainuse;import org.springframework.boot.SpringApplication;
2
import org.springframework.boot.autoconfigure.SpringBootApplication;
3
public class SpringBootHelloWorldApplication { public static void main(String[] args) {
4
SpringApplication.run(SpringBootHelloWorldApplication.class, args);
5
}
6
}
Модуль пружинной загрузки
Проект будет следующим:
Определите файл pom.xml следующим образом: Добавьте зависимость spring-boot-starter-amqp .
XML
xxxxxxxxxx
1
2
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4
<modelVersion>4.0.0</modelVersion><groupId>com.javainuse</groupId>
5
<artifactId>spring-boot-rabbitmq-consumer</artifactId>
6
<version>0.0.1-SNAPSHOT</version>
7
<packaging>jar</packaging><name>spring-boot-rabbitmq-consumer</name><parent>
8
<groupId>org.springframework.boot</groupId>
9
<artifactId>spring-boot-starter-parent</artifactId>
10
<version>2.1.1.RELEASE</version>
11
<relativePath /> <!-- lookup parent from repository -->
12
</parent><properties>
13
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
14
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
15
<java.version>1.8</java.version>
16
</properties><dependencies>
17
<dependency>
18
<groupId>org.springframework.boot</groupId>
19
<artifactId>spring-boot-starter-amqp</artifactId>
20
</dependency>
21
<dependency>
22
<groupId>org.springframework.boot</groupId>
23
<artifactId>spring-boot-starter-web</artifactId>
24
</dependency>
25
<dependency>
26
<groupId>org.springframework.boot</groupId>
27
<artifactId>spring-boot-starter-logging</artifactId>
28
</dependency>
29
</dependencies><build>
30
<plugins>
31
<plugin>
32
<groupId>org.springframework.boot</groupId>
33
<artifactId>spring-boot-maven-plugin</artifactId>
34
</plugin>
35
</plugins>
36
</build><description>SpringBootRabbitMQConsumer</description>
37
</project>
Определите класс домена Employee следующим образом:
Джава
xxxxxxxxxx
1
package com.javainuse.model;import com.fasterxml.jackson.annotation.JsonIdentityInfo;
2
import com.fasterxml.jackson.annotation.ObjectIdGenerators; (generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id", scope = Employee.class)
3
public class Employee { private String empName;
4
private String empId;
5
private int salary; public String getEmpName() {
6
return empName;
7
} public void setEmpName(String empName) {
8
this.empName = empName;
9
} public String getEmpId() {
10
return empId;
11
} public void setEmpId(String empId) {
12
this.empId = empId;
13
}
14
public String toString() {
15
return "Employee [empName=" + empName + ", empId=" + empId + ", salary=" + salary + "]";
16
} public int getSalary() {
17
return salary;
18
} public void setSalary(int salary) {
19
this.salary = salary;
20
}
21
}
Определите пользовательское проверенное исключение с именем InvalidSalaryException следующим образом:
Джава
xxxxxxxxxx
1
package com.javainuse.exception;public class InvalidSalaryException extends Exception { private static final long serialVersionUID = -3154618962130084535L;}
Определите класс RabbitMQConsumer, который использует сообщение от RabbitMQ, используя RabbitListener. Слушатель RabbitMQ прослушивает очередь RabbitMQ на предмет входящих сообщений. Для базовой конфигурации мы указываем имя очереди / темы (имя очереди / темы, где должно использоваться сообщение). Также здесь мы будем проверять входящее сообщение для поля зарплаты. Если это поле отрицательное, мы будем выдавать исключение InvalidSalaryException.
Джава
xxxxxxxxxx
1
package com.javainuse.service;import org.slf4j.Logger;
2
import org.slf4j.LoggerFactory;
3
import org.springframework.amqp.rabbit.annotation.RabbitListener;
4
import org.springframework.stereotype.Component;import com.javainuse.exception.InvalidSalaryException;
5
import com.javainuse.model.Employee;
6
public class RabbitMQConsumer { private static final Logger logger = LoggerFactory.getLogger(RabbitMQConsumer.class); (queues = "javainuse.queue")
7
public void recievedMessage(Employee employee) throws InvalidSalaryException {
8
logger.info("Recieved Message From RabbitMQ: " + employee);
9
if (employee.getSalary() < 0) {
10
throw new InvalidSalaryException();
11
}
12
}
13
}
Затем определите следующие свойства в application.yml. Здесь мы включаем механизм повтора Spring Boot RabbitMQ и указываем еще несколько дополнительных параметров:
- Начальный интервал: сообщение следует повторить через 3 секунды.
- Макс. Попыток: сообщение следует повторить максимум 6 раз. После чего он будет отправлен в очередь мертвых писем.
- Макс. Интервал: максимальный интервал между двумя попытками не должен превышать 10 с.
- Множитель: интервал между повторными попытками умножается на 2. Но этот интервал никогда не может превышать максимальный интервал. Таким образом, значения интервала повторения будут 3 с, 6 с, 10 с, 10 с, 10 с. Как 10 секунд максимальный указанный интервал.
Джава
xxxxxxxxxx
1
spring:
2
rabbitmq:
3
listener:
4
simple:
5
retry:
6
enabled: true
7
initial-interval: 3s
8
max-attempts: 6
9
max-interval: 10s
10
multiplier: 2
11
12
server:
13
port: 8081
Наконец, определите Spring Boot Class с помощью аннотации @SpringBootApplication:
Джава
xxxxxxxxxx
1
package com.javainuse;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
2
import org.springframework.boot.SpringApplication;
3
import org.springframework.boot.autoconfigure.SpringBootApplication;
4
import org.springframework.context.annotation.Bean;
5
public class SpringBootConsumerApplication { public static void main(String[] args) {
6
SpringApplication.run(SpringBootConsumerApplication.class, args);
7
}
8
public Jackson2JsonMessageConverter converter() {
9
return new Jackson2JsonMessageConverter();
10
}
11
}
В предыдущем уроке мы показали, как установить RabbitMQ и начать работу.
Запустите приложения Producer и Consumer и перейдите по адресу http: // localhost: 8080 / javainuse-rabbitmq / Manufacturer? EmpName = emp1 & empId = emp001 & salary = -50 . Сообщение будет отправлено в очередь RabbitMQ с именем javainuse.queue и использовано приложением потребителя. Так как зарплата отрицательна, InvalidSalaryException будет выброшено. Это сообщение будет повторено 6 раз, а затем будет помещено в очередь недоставленных сообщений.
Спасибо за прочтение!