首页 > Python资料 博客日记
RabbitMQ
2024-09-18 01:00:12Python资料围观157次
1.1 什么是MQ
MQ(Message Queue) : 翻译为 消息队列,通过典型的 生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为 消息中间件。 可以把消息传递的过程想象成 当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上, RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。
2.1 RabbitMQ
核心概念
- server:又称broker,接受客户端连接,实现AMQP实体服务。
- connection:连接和具体broker网络连接。
- channel:网络信道,几乎所有操作都在channel中进行,channel是消息读写的通道。客户端可以建立多个channel,每个channel表示一个会话任务。
- message:消息,服务器和应用程序之间传递的数据,由properties和body组成。properties可以对消息进行修饰,比如消息的优先级,延迟等高级特性;body是消息实体内容。
- Virtual host:虚拟主机,用于逻辑隔离,最上层消息的路由。一个Virtual
host可以若干个Exchange和Queue,同一个Virtual host不能有同名的Exchange或Queue。 - Exchange:交换机,接受消息,根据路由键转发消息到绑定的队列上。
- banding:Exchange和Queue之间的虚拟连接。
- routing key:一个路由规则,虚拟机根据他来确定如何路由一条消息。
- Queue:消息队列,用来存放消息的队列。
RabbitMQ 工作流程
消息生产流程
消息生产者连与RabbitMQ Broker 建立一个连接,建立好了连接之后,开启一个信道Channel
- 声明一个交换机,并设置其相关的属性(交换机类型,持久化等)
- 声明一个队列并设置其相关属性(排他性,持久化自动删除等)
- 通过路由键将交换机和队列绑定起来
- 消息生产者发送消息给 , 消息中包含了路由键,交换机等信息,交换机根据接收的路由键查找匹配对应的队列
- 查找匹配成功,则将消息存储到队列中
- 查找匹配失败,根据生产者配置的属性选择丢弃或者回退给生产者
- 关闭信道 , 关闭连接
消息消费流程
- 消息消费者连与
- 消费者向 请求消费者相应队列中的消息
- 等待 回应并投递相应队列中的消息,消费者接收消息
- 消费者确认(ack) 接收消息, 消除已经确认的消息
- 关闭信道Channel ,关闭连接
4.2第二种模型(Work queues)
Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
图解:
P:生产者:任务的发布者
C1:消费者-1,领取任务并且完成任务,假设完成速度较慢
C2:消费者-2:领取任务并完成任务,假设完成速度快
1. 开发生产者
public static void main(String[] args) throws Exception {
// // 创建连接对象
Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
channel.queueDeclare("work", false, false, false, null);
for (int i = 0; i < 20; i++) {
channel.basicPublish("","work",null,(i + "hello word queue").getBytes());
}
//关闭连接
RabbitMQUtil.closeConn(channel,conn);
}
2.开发消费者-1
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
channel.queueDeclare("work",false,false,false,null);
/**
* 参数1:String queue 队列名称
* 参数2:boolean autoAck 开启消息的自动确认机制
* 参数3:Consumer callback 消费时回调接口
*/
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费1:" + new String(body));
}
});
}
3.开发消费者-2
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
channel.queueDeclare("work",false,false,false,null);
/**
* 参数1:String queue 队列名称
* 参数2:boolean autoAck 开启消息的自动确认机制
* 参数3:Consumer callback 消费时回调接口
*/
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费2:" + new String(body));
}
});
}
5.消息自动确认机制
完成一项任务可能需要几秒钟。 你可能想知道,如果其中一个使用者(也就是消费者)开始一项漫长的任务并仅部分完成而死掉(发生异常),会发生什么情况。
使用我们当前的代码,RabbitMQ一旦向消费者传递了一条消息,便立即将其标记为删除。 在这种情况下,我们将丢失正在处理的消息。
我们还将丢失所有发送给该特定工作人员但尚未处理的消息。 但是我们不想丢失任何任务。 如果一个消费者死亡,我们希望将任务交付给另一个消费者。
1.消费者-1
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.createConn();
final Channel channel = conn.createChannel();
channel.queueDeclare("work",false,false,false,null);
//一次只接受一条未确认的消息
channel.basicQos(1);
/**
* 参数1:队列名称
* 参数2:消息自动确认 true 消费者自动向mq确认消息已经消费 false 不会自动确认
*/
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费-1:" + new String(body));
/**
* 参数1:确认队列中那个消息被消费了
* 参数2:是否开启多个消息同时确认 true 开启
*/
//手动确认消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
2.消费者-2
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.createConn();
final Channel channel = conn.createChannel();
channel.queueDeclare("work",false,false,false,null);
//一次只接受一条未确认的消息
channel.basicQos(1);
/**
* 参数1:队列名称
* 参数2:消息自动确认 true 消费者自动向mq确认消息已经消费 false 不会自动确认
*/
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费-2:" + new String(body));
/**
* 参数1:确认队列中那个消息被消费了
* 参数2:是否开启多个消息同时确认 true 开启
*/
//手动确认消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
标签:
上一篇:gin配置swagger文档
下一篇:联邦学习研究方向及论文推荐(二)
相关文章
最新发布
- 光流法结合深度学习神经网络的原理及应用(完整代码都有Python opencv)
- Python 图像处理进阶:特征提取与图像分类
- 大数据可视化分析-基于python的电影数据分析及可视化系统_9532dr50
- 【Python】入门(运算、输出、数据类型)
- 【Python】第一弹---解锁编程新世界:深入理解计算机基础与Python入门指南
- 华为OD机试E卷 --第k个排列 --24年OD统一考试(Java & JS & Python & C & C++)
- Python已安装包在import时报错未找到的解决方法
- 【Python】自动化神器PyAutoGUI —告别手动操作,一键模拟鼠标键盘,玩转微信及各种软件自动化
- Pycharm连接SQL Sever(详细教程)
- Python编程练习题及解析(49题)
点击排行
- 版本匹配指南:Numpy版本和Python版本的对应关系
- 版本匹配指南:PyTorch版本、torchvision 版本和Python版本的对应关系
- Python 可视化 web 神器:streamlit、Gradio、dash、nicegui;低代码 Python Web 框架:PyWebIO
- 相关性分析——Pearson相关系数+热力图(附data和Python完整代码)
- Anaconda版本和Python版本对应关系(持续更新...)
- Python与PyTorch的版本对应
- Windows上安装 Python 环境并配置环境变量 (超详细教程)
- Python pyinstaller打包exe最完整教程