首页 > Python资料 博客日记
RabbitMQ
2024-09-18 01:00:12Python资料围观88次
1.1 什么是MQ
MQ(Message Queue) : 翻译为 消息队列,通过典型的 生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为 消息中间件。 可以把消息传递的过程想象成 当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上, RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。
2.1 RabbitMQ
核心概念
- server:又称broker,接受客户端连接,实现AMQP实体服务。
- connection:连接和具体broker网络连接。
- channel:网络信道,几乎所有操作都在channel中进行,channel是消息读写的通道。客户端可以建立多个channel,每个channel表示一个会话任务。
- message:消息,服务器和应用程序之间传递的数据,由properties和body组成。properties可以对消息进行修饰,比如消息的优先级,延迟等高级特性;body是消息实体内容。
- Virtual host:虚拟主机,用于逻辑隔离,最上层消息的路由。一个Virtual
host可以若干个Exchange和Queue,同一个Virtual host不能有同名的Exchange或Queue。 - Exchange:交换机,接受消息,根据路由键转发消息到绑定的队列上。
- banding:Exchange和Queue之间的虚拟连接。
- routing key:一个路由规则,虚拟机根据他来确定如何路由一条消息。
- Queue:消息队列,用来存放消息的队列。
RabbitMQ 工作流程
消息生产流程
消息生产者连与RabbitMQ Broker 建立一个连接,建立好了连接之后,开启一个信道Channel
- 声明一个交换机,并设置其相关的属性(交换机类型,持久化等)
- 声明一个队列并设置其相关属性(排他性,持久化自动删除等)
- 通过路由键将交换机和队列绑定起来
- 消息生产者发送消息给 , 消息中包含了路由键,交换机等信息,交换机根据接收的路由键查找匹配对应的队列
- 查找匹配成功,则将消息存储到队列中
- 查找匹配失败,根据生产者配置的属性选择丢弃或者回退给生产者
- 关闭信道 , 关闭连接
消息消费流程
- 消息消费者连与
- 消费者向 请求消费者相应队列中的消息
- 等待 回应并投递相应队列中的消息,消费者接收消息
- 消费者确认(ack) 接收消息, 消除已经确认的消息
- 关闭信道Channel ,关闭连接
4.2第二种模型(Work queues)
Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
图解:
P:生产者:任务的发布者
C1:消费者-1,领取任务并且完成任务,假设完成速度较慢
C2:消费者-2:领取任务并完成任务,假设完成速度快
1. 开发生产者
public static void main(String[] args) throws Exception {
// // 创建连接对象
Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
channel.queueDeclare("work", false, false, false, null);
for (int i = 0; i < 20; i++) {
channel.basicPublish("","work",null,(i + "hello word queue").getBytes());
}
//关闭连接
RabbitMQUtil.closeConn(channel,conn);
}
2.开发消费者-1
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
channel.queueDeclare("work",false,false,false,null);
/**
* 参数1:String queue 队列名称
* 参数2:boolean autoAck 开启消息的自动确认机制
* 参数3:Consumer callback 消费时回调接口
*/
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费1:" + new String(body));
}
});
}
3.开发消费者-2
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
channel.queueDeclare("work",false,false,false,null);
/**
* 参数1:String queue 队列名称
* 参数2:boolean autoAck 开启消息的自动确认机制
* 参数3:Consumer callback 消费时回调接口
*/
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费2:" + new String(body));
}
});
}
5.消息自动确认机制
完成一项任务可能需要几秒钟。 你可能想知道,如果其中一个使用者(也就是消费者)开始一项漫长的任务并仅部分完成而死掉(发生异常),会发生什么情况。
使用我们当前的代码,RabbitMQ一旦向消费者传递了一条消息,便立即将其标记为删除。 在这种情况下,我们将丢失正在处理的消息。
我们还将丢失所有发送给该特定工作人员但尚未处理的消息。 但是我们不想丢失任何任务。 如果一个消费者死亡,我们希望将任务交付给另一个消费者。
1.消费者-1
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.createConn();
final Channel channel = conn.createChannel();
channel.queueDeclare("work",false,false,false,null);
//一次只接受一条未确认的消息
channel.basicQos(1);
/**
* 参数1:队列名称
* 参数2:消息自动确认 true 消费者自动向mq确认消息已经消费 false 不会自动确认
*/
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费-1:" + new String(body));
/**
* 参数1:确认队列中那个消息被消费了
* 参数2:是否开启多个消息同时确认 true 开启
*/
//手动确认消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
2.消费者-2
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.createConn();
final Channel channel = conn.createChannel();
channel.queueDeclare("work",false,false,false,null);
//一次只接受一条未确认的消息
channel.basicQos(1);
/**
* 参数1:队列名称
* 参数2:消息自动确认 true 消费者自动向mq确认消息已经消费 false 不会自动确认
*/
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费-2:" + new String(body));
/**
* 参数1:确认队列中那个消息被消费了
* 参数2:是否开启多个消息同时确认 true 开启
*/
//手动确认消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
标签:
上一篇:gin配置swagger文档
下一篇:联邦学习研究方向及论文推荐(二)
相关文章
最新发布
- 【Python】selenium安装+Microsoft Edge驱动器下载配置流程
- Python 中自动打开网页并点击[自动化脚本],Selenium
- Anaconda基础使用
- 【Python】成功解决 TypeError: ‘<‘ not supported between instances of ‘str’ and ‘int’
- manim边学边做--三维的点和线
- CPython是最常用的Python解释器之一,也是Python官方实现。它是用C语言编写的,旨在提供一个高效且易于使用的Python解释器。
- Anaconda安装配置Jupyter(2024最新版)
- Python中读取Excel最快的几种方法!
- Python某城市美食商家爬虫数据可视化分析和推荐查询系统毕业设计论文开题报告
- 如何使用 Python 批量检测和转换 JSONL 文件编码为 UTF-8
点击排行
- 版本匹配指南:Numpy版本和Python版本的对应关系
- 版本匹配指南:PyTorch版本、torchvision 版本和Python版本的对应关系
- Python 可视化 web 神器:streamlit、Gradio、dash、nicegui;低代码 Python Web 框架:PyWebIO
- 相关性分析——Pearson相关系数+热力图(附data和Python完整代码)
- Python与PyTorch的版本对应
- Anaconda版本和Python版本对应关系(持续更新...)
- Python pyinstaller打包exe最完整教程
- Could not build wheels for llama-cpp-python, which is required to install pyproject.toml-based proj