【RabbitMQ】02-RabbitMQ与SpringBoot
RabbitMQ与SpringBoot
RabbitMQ在SpringBoot项目中的应用:
- 连接
- 声明Queue、Exchange、Binding
- 五种工作模式的应用
- 消息可靠性
看了几篇博客主要是这么多内容,我自己把这些试了一下
这个博客讲的是真的细RabbitMQ由浅入深入门全总结(一)、RabbitMQ由浅入深入门全总结(二)
代码传到码云了
https://gitee.com/zsqbigbig/springboot-2.3.9.git
RabbitMQ与SpringBoot连接
项目结构
pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.5.8</version>
</dependency>application.yml
1
2
3
4
5
6
7
8spring:
rabbitmq:
host: api.rabbitmq.com
port: 5672
username: admin
password: admin
# 开启生产者的消息确认
publisher-confirm-type: correlatedmodel/Article.java
1
2
3
4
5
6
7
8
9
10
11
12
13
public class Article {
private Long id;
private String author;
private String title;
private String subtitle;
private String description;
private String cover;
private String url;
}config/RabbitmqConstant
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79/**
* 声明消息队列的queue, channel, exchange, routing_key变量
* @author zsq
* @date 2021年11月18日 11:44:06
*/
public class RabbitConstant {
private static final String PRE = "zsq";
/**
* 简单模式
*/
public static final String SIMPLE_QUEUE_01 = PRE + "simple_queue_01";
/**
* 路由模式
*/
public static final String DIRECT_QUEUE_01 = PRE + "direct_queue_01";
public static final String DIRECT_EXCHANGE_01 = PRE + "direct_exchange_01";
public static final String DIRECT_ROUTING_KEY_01 = PRE + "direct_routing_key_01";
/**
* 广播模式
*/
public static final String FANOUT_QUEUE_01 = PRE + "fanout_queue_01";
public static final String FANOUT_QUEUE_02 = PRE + "fanout_queue_02";
public static final String FANOUT_QUEUE_03 = PRE + "fanout_queue_03";
public static final String FANOUT_EXCHANGE_01 = PRE + "fanout_exchange_01";
/**
* 主题模式
* * 匹配一个标识符
* # 匹配零个或多个标识符
*/
public static final String TOPIC_QUEUE_01 = PRE + "topic_queue_01";
public static final String TOPIC_QUEUE_02 = PRE + "topic_queue_02";
public static final String TOPIC_QUEUE_03 = PRE + "topic_queue_03";
public static final String TOPIC_EXCHANGE_01 = PRE + "topic_exchange_01";
public static final String TOPIC_ROUTING_KEY_01 = PRE + "topic.routing.key.student";
public static final String TOPIC_ROUTING_KEY_02 = PRE + "topic.routing.key.teacher";
public static final String TOPIC_ROUTING_KEY_03 = PRE + "topic.routing.key.*";
/**
* 过期时间设置
*/
public static final String EXPIRE_QUEUE_01 = PRE + "expire.queue.01";
public static final String EXPIRE_EXCHANGE_01 = PRE + "expire_exchange_01";
public static final String EXPIRE_ROUTING_KEY_01 = PRE + "expire.queue.*";
/**
* 死信队列
* DELAY_:延迟队列
* DEAD_:正常队列
*/
public static final String DELAY_QUEUE_01 = PRE + "delay_queue_01";
public static final String DELAY_EXCHANGE_01 = PRE + "delay_exchange_01";
public static final String DELAY_ROUTING_KEY_01 = PRE + "delay_routing_key_01";
public static final String DEAD_LETTER_QUEUE_01 = PRE + "dead_letter_queue_01";
public static final String DEAD_LETTER_EXCHANGE_01 = PRE + "dead_letter_exchange_01";
public static final String DEAD_LETTER_ROUTING_KEY_01 = PRE + "dead_letter_routing_key_01";
/**
* 消息确认机制-生产者
*/
public static final String PUBLISHER_CONFIRM_QUEUE_01 = PRE + "publisher_confirm_queue_01";
public static final String PUBLISHER_CONFIRM_EXCHANGE_01 = PRE + "publisher_confirm_exchange_01";
public static final String PUBLISHER_CONFIRM_ROUTING_KEY_01 = PRE + "publisher_confirm_routing_key_01";
/**
* 消息确认机制-消费者
*/
public static final String ACKNOWLEDGE_QUEUE_01 = PRE + "acknowledge_queue_01";
public static final String ACKNOWLEDGE_EXCHANGE_01 = PRE + "acknowledge_exchange_01";
public static final String ACKNOWLEDGE_ROUTING_KEY_01 = PRE + "acknowledge_routing_key_01";
public static final String ACKNOWLEDGE_DEAD_QUEUE_01 = PRE + "acknowledge_dead_queue_01";
public static final String ACKNOWLEDGE_DEAD_EXCHANGE_01 = PRE + "acknowledge_dead_exchange_01";
public static final String ACKNOWLEDGE_DEAD_ROUTING_KEY_01 = PRE + "acknowledge_dead_routing_key_01";
}RabbitmqApplication.java
1
2
3
4
5
6
7
8
public class RabbitmqApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitmqApplication.class);
}
}
RabbitMQ工作模式使用(五种常用)
RabbitMQ在官网上提供了六种工作模式官方文档
- 简单模式(Hello World):类似于点对点传输消息。一个生产者将消息放入队列,一个消费者收到该消息
- 工作模式(Work queues):一个生产者将消息放入队列,多个消费者中的一个收到该消息
- 订阅模式(publish/Subscribe):一个生产者将消息放入队列,多个消费者均收到该消息
- 路由模式(Routing):一个生产者将消息放入队列并传入routing_key,绑定了routing_key的消费者中的一个收到该消息
- 主题模式(Topics):一个生产者将消息放入队列并传入routing_key,绑定了符合routing_key规则的消费者均收到该消息
简单模式
The simplest thing that does something
生产者发送消息到队列,消费者从队列拿到消息消费。
如果队列没有绑定任何交换机,那么队列绑定的一定是默认的交换机。AMQP协议规定,队列必须绑定交换机。
代码
SimpleConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21/**
* 简单模式、工作模式
* 该方式创建的队列均与默认交换机绑定。
* 该队列的消费者争抢同一条消息
* @author zsq
* @date 2021年11月18日 16:25:33
*/
public class SimpleConfig {
/**
* 创建队列
* 当未绑定exchange, routing_key时,默认绑定'DEFAULT_EXCHANGE', 'DEFAULT_ROUTING_KEY'
* @return Queue
*/
public Queue getQueue() {
return new Queue(RabbitConstant.SIMPLE_QUEUE_01, true);
}
}SimpleProducer.java
1
2
3
4
5
6
7
8
9
10
11
public class SimpleProducer {
private RabbitTemplate rabbitTemplate;
public void sendSimple(String msg) {
rabbitTemplate.convertAndSend(RabbitConstant.SIMPLE_QUEUE_01, msg);
}
}SimpleConsumer.java
1
2
3
4
5
6
7
8
9
10
public class SimpleConsumer {
public void simpleConsumer01(String msg) {
log.info("【消费者】 - 【Simple】 - 01\n{}", msg);
}
}controller/RabbitmqController.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class RabbitmqController {
private Long getId() {
int min = 0, max = Integer.MAX_VALUE;
return RandomUtil.randomLong(min, max);
}
private Article getArticle() {
return Article.builder().id(getId()).author("张三").title("RabbitMQ").subtitle("RabbitMQ应用").description("消息中间件-RabbitMQ")
.cover("https://mmbiz.qpic.cn/sz_mmbiz_jpg/62VyqMwzt7fibI209vwGxtYSMo9mkNomYckXSf6GhHk2hS1oE8rs6KqBKTn1iaMCxQabfx5ep41ELEno2uO9OxGg/640?wx_fmt=jpeg&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1")
.url("https://mp.weixin.qq.com/s/OKCqRSqbYQKN1kJYkU_ZeA")
.build();
}
private SimpleProducer simpleProducer;
public String simple01() {
for (int i = 0; i < 10; i++) {
JSONObject jsonObject = JSONUtil.parseObj(getArticle());
simpleProducer.sendSimple(JSONUtil.toJsonStr(jsonObject));
}
return "simple01";
}
}
使用
调用http://localhost:12031/simple01
查看日志,生产的十条消息均被唯一的消费者该消费
工作模式
Distributing tasks among workers (the competing consumers pattern)
轮询模式:一个消费者一次只消费一条消息
公平分发:按消费者服务的性能消费消息,性能越高的机器处理的消息越多。
工作模式,就是在简单模式的基础上增加消费者。多个消费者中只有一个消费者能拿到该消息
代码
SimpleConfig.java
,SimpleProducer.java
,RabbitmqController.java
不需要更改。进修改SimpleConsumer.java
即可
SimpleConsumer.java
1 |
|
使用
调用http://localhost:12031/simple01
查看日志,生产的十条消息分别被三个消费者获取
订阅模式
Sending messages to many consumers at once
绑定到交换机的队列均会收到(fanout)交换机中的消息。
代码
FanoutConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45/**
* 广播模式
* Fanout交换机绑定多个队列,当交换机收到消息时,会下发给与之绑定的所有队列
* @author zsq
* @date 2021年11月18日 17:22:58
*/
public class FanoutConfig {
public Queue getQueue01() {
return new Queue(RabbitConstant.FANOUT_QUEUE_01);
}
public Queue getQueue02() {
return new Queue(RabbitConstant.FANOUT_QUEUE_02);
}
public Queue getQueue03() {
return new Queue(RabbitConstant.FANOUT_QUEUE_03);
}
public FanoutExchange getExchange() {
return new FanoutExchange(RabbitConstant.FANOUT_EXCHANGE_01);
}
public Binding getBinding01() {
return BindingBuilder.bind(getQueue01()).to(getExchange());
}
public Binding getBinding02() {
return BindingBuilder.bind(getQueue02()).to(getExchange());
}
public Binding getBinding03() {
return BindingBuilder.bind(getQueue03()).to(getExchange());
}
}FanoutProducer.java
1
2
3
4
5
6
7
8
9
10
11
public class FanoutProducer {
private RabbitTemplate rabbitTemplate;
public void sendFanout(JSONObject msg) {
rabbitTemplate.convertAndSend(RabbitConstant.FANOUT_EXCHANGE_01, null, msg);
}
}FanoutConsumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class FanoutConsumer {
public void fanoutConsumer01(JSONObject msg) {
Article article = JSONUtil.toBean(msg, Article.class);
log.info("【消费者】 - 【Fanout】 - 01\n{}", article);
}
public void fanoutConsumer02(JSONObject msg) {
Article article = JSONUtil.toBean(msg, Article.class);
log.info("【消费者】 - 【Fanout】 - 02\n{}", article);
}
public void fanoutConsumer03(JSONObject msg) {
Article article = JSONUtil.toBean(msg, Article.class);
log.info("【消费者】 - 【Fanout】 - 03\n{}", article);
}
}RabbitmqController.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class RabbitmqController {
private Long getId() {
int min = 0, max = Integer.MAX_VALUE;
return RandomUtil.randomLong(min, max);
}
private Article getArticle() {
return Article.builder().id(getId()).author("张三").title("RabbitMQ").subtitle("RabbitMQ应用").description("消息中间件-RabbitMQ")
.cover("https://mmbiz.qpic.cn/sz_mmbiz_jpg/62VyqMwzt7fibI209vwGxtYSMo9mkNomYckXSf6GhHk2hS1oE8rs6KqBKTn1iaMCxQabfx5ep41ELEno2uO9OxGg/640?wx_fmt=jpeg&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1")
.url("https://mp.weixin.qq.com/s/OKCqRSqbYQKN1kJYkU_ZeA")
.build();
}
private FanoutProducer fanoutProducer;
public String fanout01() {
for (int i = 0; i < 2; i++) {
Article article = getArticle();
fanoutProducer.sendFanout(JSONUtil.parseObj(article));
}
return "fanout01";
}
}
使用
调用http://localhost:12031/fanout01
查看日志,生产的两条消息被三个消费者都获取到
路由模式
指定路由键,交换机将根据指定的路由键给队列发送消息。消息会发送给所有绑定了指定交换机和路由键的队列中。
代码
DirectConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48/**
* 路由模式
* DirectExchange和Queue必须由RoutingKey进行绑定。
* 使用路由模式发送消息时,必须指定Exchange, RoutingKey进行发送。
* @author zsq
* @date 2021年11月18日 11:52:09
*/
public class DirectConfig {
/**
* 创建队列
* @return Queue
*/
public Queue getQueue01() {
return new Queue(RabbitConstant.DIRECT_QUEUE_01);
}
public Queue getQueue02() {
return new Queue(RabbitConstant.DIRECT_QUEUE_02);
}
/**
* 创建交换机
* @return DirectExchange
*/
public DirectExchange getDirectExchange() {
return new DirectExchange(RabbitConstant.DIRECT_EXCHANGE_01, true, false);
}
/**
* 将交换机和队列绑定
* @return Binding
*/
public Binding getDirectBinding01() {
return BindingBuilder.bind(getQueue01()).to(getDirectExchange()).with(RabbitConstant.DIRECT_ROUTING_KEY_01);
}
public Binding getDirectBinding02() {
return BindingBuilder.bind(getQueue02()).to(getDirectExchange()).with(RabbitConstant.DIRECT_ROUTING_KEY_01);
}
}DirectProducer.java
1
2
3
4
5
6
7
8
9
10
11
12
public class DirectProducer {
private RabbitTemplate rabbitTemplate;
public void sendDirect(JSONObject msg) {
rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_EXCHANGE_01, RabbitConstant.DIRECT_ROUTING_KEY_01, msg);
}
}DirectConsumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42/**
* 路由模式-消费者
* DIRECT_QUEUE_01消费者*2
* DIRECT_QUEUE_02消费者*1
* @author zsq
* @date 2021年11月18日 11:43:08
*/
public class DirectConsumer {
/**
* DIRECT_QUEUE_01消费者
* @param msg 消息提
*/
public void directConsumer01(JSONObject msg) {
Article article = JSONUtil.toBean(msg, Article.class);
log.info("【消费者】 - 【Direct(01)】 - 01\n{}", article);
}
/**
* DIRECT_QUEUE_01消费者
* @param msg 消息提
*/
public void directConsumer02(JSONObject msg) {
Article article = JSONUtil.toBean(msg, Article.class);
log.info("【消费者】 - 【Direct(01)】 - 02\n{}", article);
}
/**
* DIRECT_QUEUE_02消费者
* @param msg 消息提
*/
public void directConsumer03(JSONObject msg) {
Article article = JSONUtil.toBean(msg, Article.class);
log.info("【消费者】 - 【Direct(02)】 - 01\n{}", article);
}
}controller/RabbitmqController.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class RabbitmqController {
private Long getId() {
int min = 0, max = Integer.MAX_VALUE;
return RandomUtil.randomLong(min, max);
}
private Article getArticle() {
return Article.builder().id(getId()).author("张三").title("RabbitMQ").subtitle("RabbitMQ应用").description("消息中间件-RabbitMQ")
.cover("https://mmbiz.qpic.cn/sz_mmbiz_jpg/62VyqMwzt7fibI209vwGxtYSMo9mkNomYckXSf6GhHk2hS1oE8rs6KqBKTn1iaMCxQabfx5ep41ELEno2uO9OxGg/640?wx_fmt=jpeg&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1")
.url("https://mp.weixin.qq.com/s/OKCqRSqbYQKN1kJYkU_ZeA")
.build();
}
private DirectProducer directProducer;
public String direct01() {
for (int i = 0; i < 5; i++) {
Article article = getArticle();
directProducer.sendDirect(JSONUtil.parseObj(article));
}
return "direct01";
}
}
使用
调用http://localhost:12031/direct01
查看日志,生产五条消息。两个队列通过DIRECT_ROUTING_KEY_01
绑定了DIRECT_EXCHANGE_01
,每个队列都收到了这五条消息。
主题模式
Receiving messages based on a pattern (topics)
可以根据模糊匹配的路由key,交换机根据模糊匹配的routing key给队列发送消息。
#
:代表0个或多个.
隔开的字符串
*
:代表一个.
隔开的字符串
代码
TopicConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class TopicConfig {
public Queue getQueue01() {
return new Queue(RabbitConstant.TOPIC_QUEUE_01);
}
public Queue getQueue02() {
return new Queue(RabbitConstant.TOPIC_QUEUE_02);
}
public Queue getQueue03() {
return new Queue(RabbitConstant.TOPIC_QUEUE_03);
}
public TopicExchange getExchange() {
return new TopicExchange(RabbitConstant.TOPIC_EXCHANGE_01);
}
public Binding getBinding01() {
return BindingBuilder.bind(getQueue01()).to(getExchange()).with(RabbitConstant.TOPIC_ROUTING_KEY_01);
}
public Binding getBinding02() {
return BindingBuilder.bind(getQueue02()).to(getExchange()).with(RabbitConstant.TOPIC_ROUTING_KEY_02);
}
public Binding getBinding03() {
return BindingBuilder.bind(getQueue03()).to(getExchange()).with(RabbitConstant.TOPIC_ROUTING_KEY_03);
}
}TopicProducer.java
1
2
3
4
5
6
7
8
9
10
11
public class TopicProducer {
private RabbitTemplate rabbitTemplate;
public void sendTopic(String routingKey, JSONObject msg) {
rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE_01, routingKey, msg);
}
}TopicConsumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class TopicConsumer {
public void topicConsumer01(JSONObject msg) {
Article article = JSONUtil.toBean(msg, Article.class);
log.info("【消费者】 - 【Topic(student)】 - 01\n{}", article);
}
public void topicConsumer02(JSONObject msg) {
Article article = JSONUtil.toBean(msg, Article.class);
log.info("【消费者】 - 【Topic(teacher)】 - 02\n{}", article);
}
public void topicConsumer03(JSONObject msg) {
Article article = JSONUtil.toBean(msg, Article.class);
log.info("【消费者】 - 【Topic(#)】 - 03\n{}", article);
}
}controller/RabbitmqController.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class RabbitmqController {
private Long getId() {
int min = 0, max = Integer.MAX_VALUE;
return RandomUtil.randomLong(min, max);
}
private Article getArticle() {
return Article.builder().id(getId()).author("张三").title("RabbitMQ").subtitle("RabbitMQ应用").description("消息中间件-RabbitMQ")
.cover("https://mmbiz.qpic.cn/sz_mmbiz_jpg/62VyqMwzt7fibI209vwGxtYSMo9mkNomYckXSf6GhHk2hS1oE8rs6KqBKTn1iaMCxQabfx5ep41ELEno2uO9OxGg/640?wx_fmt=jpeg&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1")
.url("https://mp.weixin.qq.com/s/OKCqRSqbYQKN1kJYkU_ZeA")
.build();
}
private TopicProducer topicProducer;
public String topic01() {
for (int i = 0; i < 5; i++) {
int random = RandomUtil.randomInt(2);
String routingKey = (random == 1) ? RabbitConstant.TOPIC_ROUTING_KEY_01 : RabbitConstant.TOPIC_ROUTING_KEY_02;
String author = (random == 1) ? "student" : "teacher";
Article article = getArticle();
article.setAuthor(author);
topicProducer.sendTopic(routingKey, JSONUtil.parseObj(article));
// log.info("【生产者】 - 【Topic】 - routingKey = {},\n{}", routingKey, article);
}
return "topic01";
}
}
使用
调用http://localhost:12031/topic01
查看日志,生产五条消息。这五条消息有两条推送到通过TOPIC_ROUTING_KEY_01
绑定的队列中,有三条推送到通过TOPIC_ROUTING_KEY_02
绑定队列中。可以看到Topic(#)拿到了所有消息,Topic(student)拿到了两条消息,Topic(teacher)拿到了三条消息。
RabbitMQ进阶使用
除了上面五种常用的工作模式,RabbitMQ还提供了下面四种功能:
- 队列设置过期时间
- 死信队列
- 消息的可靠传递
- 内存及磁盘监控
设置过期时间
当队列中的消息过期时,称这个消息为”dead message“。该消息无法被消费者消费
过期时间有两种设置方法:
- 给队列中的消息Message对象设置过期时间。
- 给队列声明过期时间,队列中所有消息都相同的过期时间。
队列过期遵守规则
- 当Message和交换机都设置了过期时间时,时间短的为真实过期时间。
Queue设置过期时间
使用主题模式进行演示
ExpireConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ExpireConfig {
/**
* 给消息设置过期时间
*/
public Queue getQueue01() {
Map<String, Object> args = CollUtil.newHashMap();
// x-message-ttl:消息过期时间
args.put("x-message-ttl", 5000);
// 设置队列过期时间
args.put("x-expires", 8000);
return new Queue(RabbitConstant.EXPIRE_QUEUE_01, true, false, false, args);
}
public TopicExchange getExchange01() {
return new TopicExchange(RabbitConstant.EXPIRE_EXCHANGE_01);
}
public Binding getBinding01() {
return BindingBuilder.bind(getQueue01()).to(getExchange01()).with(RabbitConstant.EXPIRE_ROUTING_KEY_01);
}
}ExpireProducer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
public class ExpireProducer {
private RabbitTemplate rabbitTemplate;
public void sendExpire01(String msg) {
rabbitTemplate.convertAndSend(RabbitConstant.EXPIRE_EXCHANGE_01, RabbitConstant.EXPIRE_QUEUE_01, message);
}
}ExpireConsumer.java
1
2
3
4
5
6
7
8
9
10
11
12
public class ExpireConsumer {
public void expireConsumer01(Message message) {
String msg = new String(message.getBody());
log.info("【消费者】 - 【Expire(01)】:msg = {}", msg);
}
}
当消息发送5000ms后,消息过期,无法被消费。
Message设置过期时间
生产者发送消息时,我们可以为每条消息均设置一个过期时间。我找了三个方法,这三个方法效果是一样的。一个肯定就够用了,应该还有别的方法,但是没必要知道那么多。
1 | /** |
死信队列
死信队列这一完整功能,由两个队列完成。一个延迟队列,一个普通队列,这个普通队列就是死信队列。
延迟队列中的消息过期后,会将消息传入到普通队列(死信队列)中。
- 当仅对消息体设置过期时间expiration时,延迟队列依然遵守先进先出的原则。例如:延迟队列中第一条消息5s过期,第二条消息1s过期,第二条消息要等到第一条消息过期后,才会会进入死信队列。
DeadConfig.java
死信队列配置1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class DeadConfig {
/**
* 延时队列:当队列中的消息过期时,会将消息传递给绑定的死信队列中。
* 通过'x-dead-letter-exchange'和'x-dead-letter-routing-key'进行绑定
* @return Queue
*/
public Queue getDelayQueue() {
Map<String, Object> args = CollUtil.newHashMap();
args.put("x-message-ttl", 5000);
args.put("x-dead-letter-exchange", RabbitConstant.DEAD_LETTER_EXCHANGE_01);
args.put("x-dead-letter-routing-key", RabbitConstant.DEAD_LETTER_ROUTING_KEY_01);
return new Queue(RabbitConstant.DELAY_QUEUE_01, true, false, false, args);
}
/**
* 延时队列交换机
*/
public DirectExchange getDelayExchange() {
return new DirectExchange(RabbitConstant.DELAY_EXCHANGE_01);
}
/**
* 延时队列绑定交换机和路由键
*/
public Binding getDelayBinding() {
return BindingBuilder.bind(getDelayQueue()).to(getDelayExchange()).with(RabbitConstant.DELAY_ROUTING_KEY_01);
}
/**
* 声明死信队列
*/
public Queue getDeadQueue() {
return new Queue(RabbitConstant.DEAD_LETTER_QUEUE_01);
}
/**
* 声明队列交换机
*/
public DirectExchange getDeadExchange() {
return new DirectExchange(RabbitConstant.DEAD_LETTER_EXCHANGE_01);
}
/**
* 死信队列绑定交换机和路由键
*/
public Binding getDeadBinding() {
return BindingBuilder.bind(getDeadQueue()).to(getDeadExchange()).with(RabbitConstant.DEAD_LETTER_ROUTING_KEY_01);
}
}DeadProducer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class DeadProducer {
private RabbitTemplate rabbitTemplate;
public void sendDead(String msg) {
rabbitTemplate.convertAndSend(RabbitConstant.DELAY_EXCHANGE_01, RabbitConstant.DELAY_ROUTING_KEY_01, msg);
}
/**
* 为消息配置单独的过期时间
*/
public void sendDead02(String msg) {
int bound = 5000;
String expiration = String.valueOf(RandomUtil.randomInt(bound));
msg += ", expiration = " + expiration;
MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setExpiration(expiration).setContentEncoding("UTF-8").build();
Message message = MessageBuilder.withBody(msg.getBytes()).andProperties(messageProperties).build();
rabbitTemplate.send(RabbitConstant.DELAY_EXCHANGE_01, RabbitConstant.DELAY_ROUTING_KEY_01, message);
}
}DeadConsumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
public class DeadConsumer {
/**
* 延时队列过期后,消息传递到该队列中,由该队列进行消费
*/
public void consumerDead01(Message message) throws IOException {
log.info("【消费者】 - 【DeadLetter(01)】:msg = {}", new String(message.getBody()));
}
}RabbitController.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class RabbitmqController {
private DeadProducer deadProducer;
public String dead01() {
log.info("dead01");
for (int i = 0; i < 10; i++) {
String msg = String.valueOf(i);
deadProducer.sendDead01(msg);
}
return "dead01";
}
public String dead02() {
log.info("dead02");
for (int i = 0; i < 10; i++) {
String msg = String.valueOf(i);
deadProducer.sendDead02(msg);
}
return "dead02";
}
}
延迟队列过期时间相同
调用/dead01
接口。
消息在5s过期后,依次进入死信队列。
延迟队列过期时间不同
调用/dead02
接口。
严格遵守先进先出的原则,即使后面的消息过期时间短,也要等到其之前的消息过期后,才会进入到死信队列中。
消息可靠性生产者
RabbitMQ在传递消息的过程中,可能会因为网络等因素导致消息传递失败。如生产者将消息存入队列,消费者获取队列的消息时,可能导致失败。RabbitMQ提供了保证消息可靠传递的机制,通过生产者和消费者两部分来处理。
生产者为消息的发送者。RabbitMQ提供了两种方式来确保生产者的消息可靠性
- confirm机制:确保消息成功发送至交换机中。若在发送至队列时失败,不会执行confirm中的方法
- return机制:确保消息从交换机发送至队列中。
confirm机制
生产者发送消息后,会异步等待接收一个 ack 应答,收到返回的 ack 确认消息后,根据 ack是 true 还是 false,调用 confirmCallback 接口进行处理
application.yml
1
2
3
4
5
6
7
8
9
10
11spring:
rabbitmq:
host: api.rabbitmq.com
port: 5672
username: admin
password: admin
# 开启生产者的消息确认:
# 1. none:禁止发布确认
# 2. correlated:消息成功发布到交换机后会触发回调方法
# 3. simple
publisher-confirm-type: correlatedPublisherConfirmConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class PublisherConfirmConfig implements RabbitTemplate.ConfirmCallback {
/**
* 根据ack去判断消息是否发送成功,并作出相应操作
* @param ack:true-消息发送成功;false-消息发送失败
*/
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("【消息确认】 - confirm:参数:correlationDate = {}, ack = {}, cause = {}", correlationData, ack, cause);
if (ack) {
log.info("【消息确认】 - 【成功】");
} else {
log.info("【消息确认】 - 【失败】");
}
}
}PublisherConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class PublisherConfig {
public Queue getQueue() {
return new Queue(RabbitConstant.PUBLISHER_CONFIRM_QUEUE_01);
}
public DirectExchange getExchange() {
return new DirectExchange(RabbitConstant.PUBLISHER_CONFIRM_EXCHANGE_01);
}
public Binding getBinding() {
return BindingBuilder.bind(getQueue())
.to(getExchange())
.with(RabbitConstant.PUBLISHER_CONFIRM_ROUTING_KEY_01);
}
}PublisherProducer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class PublisherProducer {
private PublisherConfirmConfig publisherConfirmConfig;
private RabbitTemplate rabbitTemplate;
/**
* 消息成功发送
*/
public void sendPublisher01(String msg) {
rabbitTemplate.setConfirmCallback(publisherConfirmConfig);
rabbitTemplate.convertAndSend(RabbitConstant.PUBLISHER_CONFIRM_EXCHANGE_01, RabbitConstant.PUBLISHER_CONFIRM_ROUTING_KEY_01, msg);
}
public void sendPublisher02(String msg) {
rabbitTemplate.setConfirmCallback(publisherConfirmConfig);
rabbitTemplate.convertAndSend("随便写一个exchange", "随便写一个routing_key", msg);
}
}
使用:
正常情况
发送失败-网络连接错误
发送失败-没有交换机
return机制
application.yml
1
2
3
4spring:
rabbitmq:
# 开启消费者的消息确认:return
publisher-returns: truePublisherReturnConfig.java
1
2
3
4
5
6
7
8
9
10
public class PublisherReturnConfig implements RabbitTemplate.ReturnCallback {
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("【消息确认】 - return:\n参数:message = {}, replyCode = {}, replyText = {}, exchange = {}, routingKey = {}", new String(message.getBody()), replyCode, replyText, exchange, routingKey);
}
}PublisherProducer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class PublisherProducer {
private RabbitTemplate rabbitTemplate;
private PublisherReturnConfig publisherReturnConfig;
/**
* 成功发送
*/
public void sendPublisher03(String msg) {
// 设置失败消息的处理方式:true-将失败消息发送给回调函数;false-丢弃失败消息
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(publisherReturnConfig);
rabbitTemplate.convertAndSend(RabbitConstant.PUBLISHER_CONFIRM_EXCHANGE_01, RabbitConstant.PUBLISHER_CONFIRM_ROUTING_KEY_01, msg);
}
/**
* 发送失败
*/
public void sendPublisher04(String msg) {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(publisherReturnConfig);
rabbitTemplate.convertAndSend(RabbitConstant.PUBLISHER_CONFIRM_EXCHANGE_01, "随便写一个routing_key", msg);
}
}
正常情况:不执行returnCallback 方法
发送失败-路由键不存在:
消费者可靠性
消费者消费消息时,可能出现异常,导致消息无法被正常消费。消费者可靠性,就是为了处理这些无法被正常消费的消息。RabbitMQ提供了三种方式确保消费者的消息可靠性
- retry重试:消费者在执行方法时,如果抛出异常,方法终止,会触发RabbitMQ的重试机制。重试次数达到配置次数时如果还有异常,消息将被丢弃。该方式不能手动捕获异常,如果捕获异常的话,重试机制不生效。
- try/catch + 手动ack
retry重试
配置文件application.yml
配置的属性retry,也就是消费者执行消费操作时异常的重试配置。
application.yml
重试次数配置为3次
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21spring:
rabbitmq:
host: api.rabbitmq.com
port: 5672
username: admin
password: admin
listener:
simple:
# 消费者消息确认
# 1. auto:自动确认(默认)
# 2. manual:手动确认
# 3. none:不确认,发送后自动丢弃
acknowledge-mode: auto
retry:
# 开启重试
enabled: true
# 最大重试次数
max-attempts: 3
# 重试间隔时间
initial-interval: 3000ms
max-interval: 5000msAcknowledgeConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class AcknowledgeConfig {
public Queue getQueue() {
return new Queue(RabbitConstant.ACKNOWLEDGE_QUEUE_01);
}
public DirectExchange getExchange() {
return new DirectExchange(RabbitConstant.ACKNOWLEDGE_EXCHANGE_01);
}
public Binding getBinding() {
return BindingBuilder.bind(getQueue())
.to(getExchange())
.with(RabbitConstant.ACKNOWLEDGE_ROUTING_KEY_01);
}
}AcknowledgeProducer.java
1
2
3
4
5
6
7
8
9
10
11
public class AcknowledgeProducer {
private RabbitTemplate rabbitTemplate;
public void sendAck01(String msg) {
rabbitTemplate.convertAndSend(RabbitConstant.ACKNOWLEDGE_EXCHANGE_01, RabbitConstant.ACKNOWLEDGE_ROUTING_KEY_01, msg);
}
}AcknowledgeConsumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class AcknowledgeConsumer {
/**
* 在方法体中执行异常操作。
* 消费者中不能手动捕获异常,会被识别为方法正常运行,重试配置就不生效了。
*/
public void consumerAck01(String msg) {
log.info("【消费者】 - 【Acknowledge(01)】:msg = {}", msg);
double a = 1 / 0;
System.out.println(a);
}
}
使用:
消费者方法异常时:
同一个消息被消费者重试3次,三次重试后,仍然有异常,将异常抛出,消息被丢弃。
try/catch + 手动ack
手动ack时,需要在代码中规定是否进行重发消息,此时配置文件中所配置的重发数据失效。
application.yml
1
2
3
4
5
6
7
8
9
10
11
12
13spring:
rabbitmq:
host: api.rabbitmq.com
port: 5672
username: admin
password: admin
listener:
simple:
# 消费者消息确认
# 1. auto:自动确认(默认)
# 2. manual:手动确认
# 3. none:不确认,发送后自动丢弃
acknowledge-mode: manualAcknowledgeConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class AcknowledgeConfig {
public Queue getQueue() {
return new Queue(RabbitConstant.ACKNOWLEDGE_QUEUE_01);
}
public DirectExchange getExchange() {
return new DirectExchange(RabbitConstant.ACKNOWLEDGE_EXCHANGE_01);
}
public Binding getBinding() {
return BindingBuilder.bind(getQueue())
.to(getExchange())
.with(RabbitConstant.ACKNOWLEDGE_ROUTING_KEY_01);
}
}AcknowledgeProducer.java
1
2
3
4
5
6
7
8
9
10
11
public class AcknowledgeProducer {
private RabbitTemplate rabbitTemplate;
public void sendAck01(String msg) {
rabbitTemplate.convertAndSend(RabbitConstant.ACKNOWLEDGE_EXCHANGE_01, RabbitConstant.ACKNOWLEDGE_ROUTING_KEY_01, msg);
}
}AcknowledgeConsumer.java
消费者在执行
basicNack
方法时,requeue = true。若消费方法内的异常一直存在,则消息会被一直重试1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class AcknowledgeConsumer {
/**
* 消息无异常,执行channel.basicAck方法进行消息确认
* 消息有异常,执行channel.basicNack方法不进行ack应答
* multiple:true-处理多条;false-进处理被提供tag的消息
* requeue:true-消息重入队列;false-丢弃消息
* @param tag 队列中的唯一id
*/
public void consumerAck02(String msg, Channel channel, Long tag)throws IOException {
// true-处理多条;false-进处理被提供tag的消息
boolean multiple = false;
// true-消息重入队列;false-丢弃消息
boolean requeue = true;
try {
log.info("【消费者】 - 【Acknowledge(02)】:msg = {}, tag = {}", msg, tag);
double a = 1 / 0;
System.out.println(a);
channel.basicAck(tag, multiple);
} catch (Exception e) {
log.warn("【消费者】 - 【Acknowledge(02)】 - 【异常】:msg = {}, tag = {}", msg, tag);
channel.basicNack(tag, multiple, requeue);
}
}
}
使用:
消息处理异常,消息被一直重试
总结:
retry
和try/catch + 手动ack
都可以保证消息的可靠消费。- 若消费者方法中一直存在消费异常,会发生两种情况:1. 消息重复消息;2. 消息最终被丢弃。这两种情况都有弊端。
根据上面两点总结,当消息被异常消费时,我们捕获异常并对其进行其他操作,如:异常信息存入数据库中;发邮箱通知客服等,就避免了重复消费或丢弃消息的缺点。
try/catch + 手动ack + 死信队列
需要将channel.basicNack
中的requeue
属性设置为false
。
application.yml
1
2
3
4
5
6
7
8
9
10
11
12
13spring:
rabbitmq:
host: api.rabbitmq.com
port: 5672
username: admin
password: admin
listener:
simple:
# 消费者消息确认
# 1. auto:自动确认(默认)
# 2. manual:手动确认
# 3. none:不确认,发送后自动丢弃
acknowledge-mode: manualAcknowledgeConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class AcknowledgeConfig {
/**
* 如果该队列已存在,需要先删除掉这个队列,在启动项目。
* 因为这个队列已经存在,而这个项目中的队列需要修改这个队列的配置。当队列已存在,配置无法修改
*/
public Queue getQueue() {
Map<String, Object> args = CollUtil.newHashMap();
args.put("x-dead-letter-exchange", RabbitConstant.ACKNOWLEDGE_DEAD_EXCHANGE_01);
args.put("x-dead-letter-routing-key", RabbitConstant.ACKNOWLEDGE_DEAD_ROUTING_KEY_01);
return new Queue(RabbitConstant.ACKNOWLEDGE_QUEUE_01, true, false, false, args);
}
public DirectExchange getExchange() {
return new DirectExchange(RabbitConstant.ACKNOWLEDGE_EXCHANGE_01);
}
public Binding getBinding() {
return BindingBuilder.bind(getQueue())
.to(getExchange())
.with(RabbitConstant.ACKNOWLEDGE_ROUTING_KEY_01);
}
public Queue getDeadQueue() {
return new Queue(RabbitConstant.ACKNOWLEDGE_DEAD_QUEUE_01);
}
public DirectExchange getDeadExchange() {
return new DirectExchange(RabbitConstant.ACKNOWLEDGE_DEAD_EXCHANGE_01);
}
public Binding getDeadBinding() {
return BindingBuilder.bind(getDeadQueue())
.to(getDeadExchange())
.with(RabbitConstant.ACKNOWLEDGE_DEAD_ROUTING_KEY_01);
}
}AcknowledgeProducer.java
1
2
3
4
5
6
7
8
9
10
11
public class AcknowledgeProducer {
private RabbitTemplate rabbitTemplate;
public void sendAck01(String msg) {
rabbitTemplate.convertAndSend(RabbitConstant.ACKNOWLEDGE_EXCHANGE_01, RabbitConstant.ACKNOWLEDGE_ROUTING_KEY_01, msg);
}
}AcknowledgeConsumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class AcknowledgeConsumer {
/**
* 消息无异常,执行channel.basicAck方法进行消息确认
* 消息有异常,执行channel.basicNack方法不进行ack应答
* multiple:true-处理多条;false-进处理被提供tag的消息
* requeue:true-消息重入队列;false-丢弃消息
* @param tag 队列中的唯一id
*/
public void consumerAck02(String msg, Channel channel, Long tag)throws IOException {
// true-处理多条;false-进处理被提供tag的消息
boolean multiple = false;
// true-消息重入队列;false-丢弃消息
boolean requeue = false;
try {
log.info("【消费者】 - 【Acknowledge(02)】:msg = {}, tag = {}", msg, tag);
double a = 1 / 0;
System.out.println(a);
channel.basicAck(tag, multiple);
} catch (Exception e) {
log.warn("【消费者】 - 【Acknowledge(02)】 - 【异常】:msg = {}, tag = {}", msg, tag);
channel.basicNack(tag, multiple, requeue);
}
}
public void consumerAckDead(String msg) {
log.info("【消费者】 - 【Acknowledge】 - 【发送邮件通知客服】:msg = {}", msg);
}
}
使用:
消费异常的消息,交给死信队列处理
磁盘内存的监控
内存预警
RabbitMQ的管理界面会显示其内存使用情况,当分配给RabbitMQ的内存被完全使用时,队列会进入阻塞状态,生产者无法将消息写入到队列中。并且RabbitMQ发出警报。
RabbitMQ被分配的默认内存空间是0.4,也就是Linux操作系统所有内存的40%。
内存正常情况:
主界面
连接正常
内存超出所分配空间时:
主界面
连接阻塞
修改内存的方式
内存使用是可以修改的,根据 官网文档 有两种方式:1. 指令;2. 配置文件进行修改。
指令
1 | 百分比控制内存 |
配置文件
1 | # 指令 |
磁盘预警
当磁盘空间小于设定值时,就会出现磁盘预警。