首页 > Python资料 博客日记
RabbitMQ 持久化与不公平分发
2024-10-26 15:00:15Python资料围观58次
RabbitMQ 持久化与不公平分发
1. RabbitMQ 持久化 (Durability)
概念
持久化是指将消息或队列保存在磁盘上,以确保即使 RabbitMQ 服务器宕机或重启,数据也不会丢失。
持久化的三个层面
-
队列持久化:
队列持久化意味着即使 RabbitMQ 重启后,队列依然存在,但它不会保证队列中的消息仍然存在。-
队列持久化声明:
channel.queueDeclare("queue_name", true, false, false, null);
其中,
true
表示队列持久化。
-
-
消息持久化:
消息持久化是在生产者发送消息时指定的,确保消息在服务器重启后依然能够保留在队列中。- 消息持久化声明:
channel.basicPublish("", "queue_name", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
注意:队列持久化和消息持久化是独立的。即使队列是持久化的,消息也需要单独设置为持久化。
- 消息持久化声明:
-
交换机(Exchange)持久化:
交换机也可以设置为持久化,确保 RabbitMQ 重启后,交换机不会丢失。- 交换机持久化声明:
其中,channel.exchangeDeclare("exchange_name", "direct", true);
true
表示交换机持久化。
- 交换机持久化声明:
持久化的注意事项
- 持久化操作会稍微影响性能,因为需要将数据写入磁盘。
- 队列持久化并不会让未持久化的消息在 RabbitMQ 重启后恢复,消息也需要设置为持久化。
2. 不公平分发 (Fair Dispatch)
概念
RabbitMQ 默认采用轮询的方式来分发消息给消费者,即每个消费者会按照平等的顺序接收消息。这种机制在某些情况下会导致某些消费者的任务积压过多,而其他消费者的任务处理过快,导致系统资源浪费。这时,我们可以使用不公平分发(也称为预取机制)来优化分发过程。
不公平分发实现
不公平分发的实现方式是通过设置 Qos(Quality of Service) 预取值,使得每个消费者一次只会接收特定数量的消息,直到这些消息处理完并发送应答后,才会接收新的消息。
步骤
-
设置 QoS:
在消费者端使用basicQos
方法来设置每次接收的消息数量。通常设置为1
,表示消费者一次只会接收一条消息,在处理完并应答之后再接收下一条消息。// 设置为每次处理一条消息 channel.basicQos(1);
-
手动消息应答:
RabbitMQ 的默认行为是自动应答,即消费者接收到消息后立即应答,不论是否处理完成。要实现不公平分发,需要手动确认消息,在消息处理完之后再发送应答。- 手动应答代码示例:
boolean autoAck = false; // 关闭自动应答 channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {}); // 消息处理完成后,手动应答 channel.basicAck(deliveryTag, false);
- 手动应答代码示例:
不公平分发的优点
- 避免消费者压力过大:某些消费者可能任务处理较慢,通过 QoS 设置,可以避免一次性收到大量任务导致压力过大。
- 优化资源使用:确保任务更合理地分配到消费者,提升整体的任务处理效率。
不公平分发的注意事项
basicQos(1)
的设置会稍微降低吞吐量,但可以确保消息公平分配给消费者。- 必须确保消息的手动应答,否则未应答的消息可能会再次发送给其他消费者,导致重复消费。
3. RabbitMQ 持久化与不公平分发结合
在实际应用中,RabbitMQ 持久化与不公平分发可以结合使用,以确保消息的可靠性和消费者的负载平衡。
- 持久化 确保消息和队列在服务重启时不会丢失,保证了系统的可靠性。
- 不公平分发 则通过手动应答和 QoS 设置,确保消费者可以根据自己的处理能力,公平地获取消息并进行处理,防止某个消费者处理任务过于缓慢。
代码示例
// 创建通道并声明持久化队列
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare("task_queue", true, false, false, null);
// 生产者发送持久化消息
String message = "Hello World";
channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
// 消费者设置 QoS 和手动应答
channel.basicQos(1);
boolean autoAck = false; // 关闭自动应答
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String receivedMessage = new String(delivery.getBody(), "UTF-8");
System.out.println("Received: " + receivedMessage);
// 模拟任务处理
doWork(receivedMessage);
// 手动应答
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// 消费消息
channel.basicConsume("task_queue", autoAck, deliverCallback, consumerTag -> {});
标签:
上一篇:AWD的复现
下一篇:报表工具怎么选?山海鲸VS帆软,哪个更适合你?
相关文章
最新发布
- 光流法结合深度学习神经网络的原理及应用(完整代码都有Python opencv)
- Python 图像处理进阶:特征提取与图像分类
- 大数据可视化分析-基于python的电影数据分析及可视化系统_9532dr50
- 【Python】入门(运算、输出、数据类型)
- 【Python】第一弹---解锁编程新世界:深入理解计算机基础与Python入门指南
- 华为OD机试E卷 --第k个排列 --24年OD统一考试(Java & JS & Python & C & C++)
- Python已安装包在import时报错未找到的解决方法
- 【Python】自动化神器PyAutoGUI —告别手动操作,一键模拟鼠标键盘,玩转微信及各种软件自动化
- Pycharm连接SQL Sever(详细教程)
- Python编程练习题及解析(49题)
点击排行
- 版本匹配指南:Numpy版本和Python版本的对应关系
- 版本匹配指南:PyTorch版本、torchvision 版本和Python版本的对应关系
- Python 可视化 web 神器:streamlit、Gradio、dash、nicegui;低代码 Python Web 框架:PyWebIO
- 相关性分析——Pearson相关系数+热力图(附data和Python完整代码)
- Anaconda版本和Python版本对应关系(持续更新...)
- Python与PyTorch的版本对应
- Windows上安装 Python 环境并配置环境变量 (超详细教程)
- Python pyinstaller打包exe最完整教程