首页 > Python资料 博客日记

RabbitMQ

2024-09-18 01:00:12Python资料围观157

本篇文章分享RabbitMQ,对你有帮助的话记得收藏一下,看Python资料网收获更多编程知识

1.1 什么是MQ

MQ(Message Queue) : 翻译为 消息队列,通过典型的 生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为 消息中间件。 可以把消息传递的过程想象成 当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上, RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。

2.1 RabbitMQ

核心概念

  • server:又称broker,接受客户端连接,实现AMQP实体服务。
  • connection:连接和具体broker网络连接。
  • channel:网络信道,几乎所有操作都在channel中进行,channel是消息读写的通道。客户端可以建立多个channel,每个channel表示一个会话任务。
  • message:消息,服务器和应用程序之间传递的数据,由properties和body组成。properties可以对消息进行修饰,比如消息的优先级,延迟等高级特性;body是消息实体内容。
  • Virtual host:虚拟主机,用于逻辑隔离,最上层消息的路由。一个Virtual
    host可以若干个Exchange和Queue,同一个Virtual host不能有同名的Exchange或Queue。
  • Exchange:交换机,接受消息,根据路由键转发消息到绑定的队列上。
  • banding:Exchange和Queue之间的虚拟连接。
  • routing key:一个路由规则,虚拟机根据他来确定如何路由一条消息。
  • Queue:消息队列,用来存放消息的队列。

RabbitMQ 工作流程

消息生产流程

消息生产者连与RabbitMQ Broker 建立一个连接,建立好了连接之后,开启一个信道Channel

  • 声明一个交换机,并设置其相关的属性(交换机类型,持久化等)
  • 声明一个队列并设置其相关属性(排他性,持久化自动删除等)
  • 通过路由键将交换机和队列绑定起来
  • 消息生产者发送消息给 , 消息中包含了路由键,交换机等信息,交换机根据接收的路由键查找匹配对应的队列
  • 查找匹配成功,则将消息存储到队列中
  • 查找匹配失败,根据生产者配置的属性选择丢弃或者回退给生产者
  • 关闭信道 , 关闭连接

消息消费流程

  • 消息消费者连与
  • 消费者向 请求消费者相应队列中的消息
  • 等待 回应并投递相应队列中的消息,消费者接收消息
  • 消费者确认(ack) 接收消息, 消除已经确认的消息
  • 关闭信道Channel ,关闭连接

4.2第二种模型(Work queues)

Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

图解:

P:生产者:任务的发布者
C1:消费者-1,领取任务并且完成任务,假设完成速度较慢
C2:消费者-2:领取任务并完成任务,假设完成速度快

1. 开发生产者

public static void main(String[] args) throws Exception {
// // 创建连接对象
        Connection conn = RabbitMQUtil.createConn();

        Channel channel = conn.createChannel();

        channel.queueDeclare("work", false, false, false, null);

        for (int i = 0; i < 20; i++) {
            channel.basicPublish("","work",null,(i + "hello word queue").getBytes());
        }
//关闭连接
        RabbitMQUtil.closeConn(channel,conn);
    }


2.开发消费者-1

 public static void main(String[] args) throws IOException {
        Connection conn = RabbitMQUtil.createConn();
        Channel channel = conn.createChannel();
        channel.queueDeclare("work",false,false,false,null);
      /**
        * 参数1:String queue 队列名称
        * 参数2:boolean autoAck 开启消息的自动确认机制
        * 参数3:Consumer callback  消费时回调接口
        */
        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费1:" + new String(body));
            }
        });
    }


3.开发消费者-2

 public static void main(String[] args) throws IOException {
        Connection conn = RabbitMQUtil.createConn();
        Channel channel = conn.createChannel();
        channel.queueDeclare("work",false,false,false,null);
      /**
        * 参数1:String queue 队列名称
        * 参数2:boolean autoAck 开启消息的自动确认机制
        * 参数3:Consumer callback  消费时回调接口
        */
        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费2:" + new String(body));
            }
        });
    }


5.消息自动确认机制

完成一项任务可能需要几秒钟。 你可能想知道,如果其中一个使用者(也就是消费者)开始一项漫长的任务并仅部分完成而死掉(发生异常),会发生什么情况。
使用我们当前的代码,RabbitMQ一旦向消费者传递了一条消息,便立即将其标记为删除。 在这种情况下,我们将丢失正在处理的消息。
我们还将丢失所有发送给该特定工作人员但尚未处理的消息。 但是我们不想丢失任何任务。 如果一个消费者死亡,我们希望将任务交付给另一个消费者。

1.消费者-1

public static void main(String[] args) throws IOException {
        Connection conn = RabbitMQUtil.createConn();
        final Channel channel = conn.createChannel();
        channel.queueDeclare("work",false,false,false,null);
        //一次只接受一条未确认的消息
        channel.basicQos(1);
        /**
         * 参数1:队列名称
         * 参数2:消息自动确认 true 消费者自动向mq确认消息已经消费 false 不会自动确认
         */
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费-1:" + new String(body));
                /**
                 * 参数1:确认队列中那个消息被消费了
                 * 参数2:是否开启多个消息同时确认  true 开启
                 */
                //手动确认消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }


2.消费者-2

public static void main(String[] args) throws IOException {
        Connection conn = RabbitMQUtil.createConn();
        final Channel channel = conn.createChannel();
        channel.queueDeclare("work",false,false,false,null);
        //一次只接受一条未确认的消息
        channel.basicQos(1);
        /**
         * 参数1:队列名称
         * 参数2:消息自动确认 true 消费者自动向mq确认消息已经消费 false 不会自动确认
         */
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费-2:" + new String(body));
                /**
                 * 参数1:确认队列中那个消息被消费了
                 * 参数2:是否开启多个消息同时确认  true 开启
                 */
                //手动确认消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }



版权声明:本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!

标签:

相关文章

本站推荐