RabbitMQ Basic

一、为什么使用MQ

    1. 实现异步通信,异步通信客户端不需要等待,可以减少客户端性能消耗,大大提升用户体验
    2. 实现系统解耦,实现了系统之间依赖关系解耦,系统的可扩展性和可维护性得到了提升
      1. 例如一个订单系统,引入MQ后img
    1. 实现流量削峰
      1. 可以把流量承接下来,把消息转发到MQ服务器,业务层根据自己的消费速率去处理这些消息
      2. 控制流量,超过一定阈值可丢弃或做其他处理

二、什么是AMQP

AMQP是一个工作于应用层的协议,本质上进程间传递异步消息的网络协议,他是跨语言和跨平台的,不管什么语言的客户端、什么样的MQ服务器只要遵循AMQP协议,就能进行消息的交互。

三、RabbitMQ

    1. RabbitMQ主要架构:img
    2. broker,RabbitMQ服务器默认端口5672,我们把它叫做broker。MQ服务器帮我们做的事情就是存储、转发消息
    3. Connection,无论是生产者还是消费者,都必须和broker建立一个连接,是一个TCP连接。
    4. Channel
      1. 如果所有的生产者和消费者都使用TCP长连接的话,对broker来说肯定会造成性能损耗,也会浪费时间。
      2. AMQP引入Channel的概念,他是一个虚拟的连接。这样我们可以在保持的TCP长连接里面去创建和释放Channel,大大减少资源消耗。不同的Channel是相互隔离的,每个Channel都有自己的编号
    1. Queue
      1. 在Broker里面用来存储消息的对象叫做Queue。生产者发送的消息到达队列,在队列中存储,消费者从队列消费消息
    1. Consumer,RabbitMQ消费消息有两种模式
      1. 一种是Pull模式,对应的方法是basicGet。消息存放在服务端,只有消费者主动获取才能拿到消息。每隔一段时间获取一次消息,消息的实时性会降低。但是好处是可以根据自己消费能力决定获取频率
      2. 一种是Push模式,对应方法是basicConsume,只要生产者发消息到服务器,就马上推送消费者,实时性很高。如果消费不过来可能会造成消息堆积。
      3. 由于队列有FIFO的特性,只有确认前一条消息被消费接收后,broker才会把这条消息从数据库删除,继续投递下一条消息
    1. Exchange
      1. 消息路由组件,Exchange是不会存储消息的,只做一件事:按规则分发消息
      2. Exchange和队列是多对多的绑定关系,一个Exchange的消息可以路由给多个队列,一个队列也可以接收多个Exchange的消息。
    1. Vhost,虚拟主机。除了可以提高硬件资源的利用率,还可以实现资源的隔离和权限的控制。类似编程语言的namespace和package,不同的Vhost可以有同名的Exchange和Queue
  1. RabbitMQ路由方式

    1. RabbitMQ中一共有4种类型的交换机,Direct、Topic、Fanout、Headers。
    2. Direct直连
      1. 一个队列与Direct类型的交换机绑定,需要指定一个明确的绑定键(binding key)
      2. 生产者发送消息时会携带一个路由键(routing key)
      3. 当消息的路由键和某个队列的绑定键完全匹配时,这条消息才会从交换机路由到这个队列上。多个队列也可以使用相同的绑定键。
    1. Topic主题
      1. 一个队列与主题类型的交换机绑定时,可以在绑定键中使用通配符。支持两个通配符:
        1. #表示0个或者多个单词
        2. *表示一个单词
        3. 单词(word)指的是英文的店“.”隔开的字符。例如a.bc.def 是3个单词
    1. Fanout广播
      1. 广播交换机与队列绑定时,不需要指定绑定键。因此生产者发送消息到广播类型的交换机上,也不需要携带路由键。消息到达交换机时,所有与之绑定的队列,都会收到相同的副本。

四、RabbitMQ 进阶知识

  1. 怎么实现订单延时关闭?

    1. 假设有个场景,超过30分钟未付款的订单自动关闭,这个功能怎么实现?RabbitMQ本身不支持延时投递,总的来说有2种方案。
      1. 用定时任务扫描
        1. 比如每隔1分钟扫描一次,查出30分钟之前未付款的订单,把状态改成关闭即可。缺点就是有延时,而且数据量大的情况服务器压力大。
      1. 利用RabbitMQ的死信队列(Dead Letter Queue)实现
        1. 先了解两个过期属性。RabbitMQ队列有一个消息过期属性,叫x-message-ttl,所有队列中的消息超过时间未被消费时,都会过期。
        2. 消息的过期属性,在发送消息时通过MessageProperties指定消息属性。img
        3. 如果队列msg TTL是6s,消息的TTL是10s过期,同时指定了Queue TTL和Msg TTL,则小的那个时间失效。
        4. 死信队列
          1. 消息过期后,如果没有任何配置,是会直接丢弃的。我们可以通过配置让消息进入死信队列。
          2. 队列在创建的时候,可以指定一个死信交换机DLX(Dead Letter Exchange),死信交换机绑定的队列被称为私信队列DLQ(Dead Letter Queue)。如果消息过期了,队列指定了DLX,就会发送到DLX。如果DLX绑定了DLQ,就会路由到DLQ,最后消费。
          3. 实现流程:略
        1. 使用死信队列实现延时消息的缺点:
          1. 如果统一用队列来设置消息TTL,当梯度多时,比如1分钟,2分钟,3分钟….需要创建很多的交换机和队列
          2. 如果单独设置消息的TTL,则可能会造成队列中消息的阻塞,即前一条消息没有消费,后面的消息无法投递(比如,第一条消息TTL是30分钟,第二条消息TTL是10分钟。10分钟后,即使第二条消息应该投递了,但是由于第一条消息还未出队,所以无法投递)
          3. 可能存在一定的时间误差
        1. 延时队列的其他实现
          1. 在RabbitMQ 3.5.7及以后得版本提供了一个插件来实现延时队列功能。通过声明一个x-delayed-message类型的Exchange来使用
  2. 磁盘满了怎么办?

    1. RabbitMQ的流量控制,一个是服务端,一个是消费端。
    2. 服务端流量控制
      1. 队列长度:x-max-length,设置队列中最大存储消息数,超过这个数量,对头消息会被丢弃;x-max-length-bytes,队列中最大的消息容量,超过这个容量消息会被丢弃
      2. 内存控制:默认占用40%以上的内存时,MQ会主动抛出一个内存警告并阻塞所有连接。默认值0.4;
      3. 磁盘控制:另外一种通过磁盘来控制消息的发布。当磁盘剩余的可用空间低于指定的值(默认50MB)触发流量控制措施。
    1. 消费端限流控制
      1. 因为消费端会在本地缓存消息,如果消息数量过多,可能会导致OOM或者影响其他服务运行。我们希望在一定数量的消息消费完之前,不再推送消息,就用到消费端的流量限制措施。
      2. 设置Consumer或者Channel的prefetch count值,含义为消费端超过这个数值的消息未被确认。RabbitMQ就会停止投递新的消息给该消费者。
  3. RabbitMQ特性总结