RabbitMQ Concurrency

一、RabbitMQ可靠性投递与实践

1. RabbitMQ发送消息的几个环节img

    1. 消息从生产者发送到Broker
    2. 消息从Exchange到Queue
    3. 消息在Queue中存储
    4. 消费者订阅Queue并消费消息
  1. 上面几个环节的可靠投递机制

    1. 生产者发送消息到Broker,可能会因为网络或者Broker问题(硬盘故障、满了)导致发送失败,生产者不确定Broker有没有正确收到。因此,RabbitMQ提供了两种机制服务端确认机制,服务端会通过某种方式返回一个应答,只要生产者收到这个应答就知道消息发送成功。
      1. Transaction(事务)模式
        1. 创建channel的时候,可以把信道设置成事务模式,如果channel.txCommit()的方法调用成功,就说明事务提交成功,则消息一定到达RabbitMQ中。
        2. 事务提交之前由于RabbitMQ异常或者其他原因抛出异常,可通过捕获使用channel.txRollback()实现事务回滚。
        3. Transaction模式中,只有收到服务端的Commit-OK的指令,才算提交成功
        4. img
        5. 缺点:transaction模式是阻塞的,一条消息没有发送完毕,不能发送下一条消息,会窄干RabbitMQ服务器的性能。
      1. Confirm(确认)模式
        1. 确认模式有三种:
          1. 普通确认模式:生产者通过调用channel.confirmSelect()方法将信道设置为Confirm模式,一旦消息被投递到交换机之后(是否路由到队列没关系),RabbitMQ就会发送一个确认消息(Basic.Ack)给生产者,也就是调用channel.waitForConfirms()返回true,这样生产者就知道消息被服务端接收了。
          2. 批量确认模式:生产者开启Confirm模式后,先发送一批消息,只要channel.waitForConfirmsOrDie()方法没有抛出异常,就代表消息被服务端接收(只要有一个消息未被Broker确认就会IOException);批量确认模式两个问题:批量的数量不好确定,数量少的话,效率提升不上去;数量多的话,比如发送1000条消息才确认一次,如果前面999条消息都确认了,最后1条失败了,那么消息都要重发
          3. 异步确认模式:一边发送一边确认,添加一个ConfirmListener,并且用一个SortedSet来维护批次中没有被确认的消息。
    1. 消息从交换机路由到队列,可能失败的2种可能:routeKey错误或者队列不存在
      1. 处理方式有2种:
        1. 让服务端重发消息给生产者。生产这收到无法路由的消息可以做其他操作
        2. 让交换机路由到另外一个备用交换机。在创建交换机的时候可以指定备份交换机。(注意队列可以指定死信交换机;交换机可以指定备份交换机)
    1. 消息在队列存储,第三个环节是消息在队列存储,如果没有消费者的话,队列一直存储在数据库中。如果RabbitMQ的服务或者硬件发生故障,可能会导致消息丢失,因此我们要把消息和元数据(队列,交换机,绑定)都保存磁盘
      1. 交换机持久化
      2. 队列持久化
      3. 消息持久化
      4. RabbitMQ服务集群
    1. 消息投递到消费者,如果消费收到消息后没来得及处理或者处理过程发生异常,会导致失败。RabbitMQ服务端需要知道消费者对消息的接收情况,并决定是否重新投递。RabbitMQ提供消费者的消息确认机制,自动或者手动发送ACK给服务端
      1. 自动ACK,这个事默认情况,消费者会在收到消息的时候就自动发送ACK。注意不是方法执行完毕时才发送ACK。
      2. 手动ACK,如果需要等待消息消费完毕或者执行完毕才发送ACK,需要把自动ACK设置成手动ACK。其他,特殊情况如果消费者无法处理消息,可以拒绝消息:Basic.Reject()单条拒绝,Basic.Nack()批量拒绝

2. MQ的一些使用特性

    1. 生产者如何知道消费者消费成功?一般有两种方式:
      1. 消费者收到消息处理完毕后,调用生产者的API(破坏了解耦)
      2. 消费者收到消息处理完毕后,发送一条响应消息给生产者
    1. 补偿机制
      1. 生产者的API没有被调用或者没有收到响应消息?需要由服务端定时重新发送,设计定时重发需要设计几个参数:
        1. 间隔多久重发,一般可设置衰减机制
        2. 重发多少次
    1. 消息幂等性
      1. 为了避免消息的重复处理,一般在消费端控制。
      2. 出现消息重复可能原因:
        1. 生产者重复投递
        2. 消费者未发送ACk或者其他原因,导致消息重新消费
        3. 生产者代码或者网络问题
      1. 解决办法
        1. 对于重复发送的消息,对每条消息生成一个唯一的业务ID,通过日志或者消息落库来做重复控制。
    1. 最终一致性
      1. 如果消费者宕机或者出现BUG,尝试多次重发后,消息没有得到处理。
      2. 定时处理或者人工补偿
    1. 消息顺序性
      1. 是指消费者消费消息的顺序跟生产者生产消息的顺序一致的。例如:1、发表微博 2、发表评论 3、删除微博。顺序不能颠倒
      2. 一个队列有多个消费者时,由于消费速度是不一致的,因此顺序无法保证。一个队列仅有一个消费者的情况下才能保证顺序消费

3. 集群与高可用

    1. 为什么要做集群?
      1. 高可用,如果集群中某些MQ服务器不可用,客户端还能连接到其他MQ服务器,不影响业务
      2. 负载均衡,在并发的场景下,单台的MQ处理消息有限,可以分发给多台MQ服务器,减少消息延迟
    1. RabbitMQ如何做集群?
      1. RabbitMQ的节点类型
        1. 磁盘节点:将元数据放在磁盘中。未指定类型的情况下,默认磁盘节点。
        2. 内存节点:将元数据放在内存中,如果是持久化的消息,会同时存放内存和磁盘
      1. 概念解释:
        1. 元数据:包括队列名字属性,交换机类型、名字属性、绑定、vhost定义
        2. 集群中至少需要一个磁盘节点来持久化元数据,否则全部内存节点崩溃时就无法同步元数据。我们一般把应用连接到内存节点,磁盘节点用来备份
      1. RabbitMQ的集群模式
        1. 普通集群模式
          1. img
          2. 普通集群模式下,不同MQ节点之间只会同步元数据(队列名字属性,交换机类型、名字属性、绑定、vhost定义)而不会同步消息
          3. img
          4. 缺点:只要节点1挂了,队列1的所有数据都丢失了,为什么不直接把消息在所有节点都复制一份呢?主要出于浪费存储空间和同步数据的网络开销问题。如果需要保证队列的高可用,就不能用这种集群模式
        1. 镜像集群
          1. img
          2. 镜像队列模式下,消息内容会在镜像节点间同步,可用性更高。不过也有一定的副作用:系统性能会降低,节点过多的情况下同步代价比较大
    1. 高可用
      1. 集群搭建成功后,如果存在多个内存节点,生产者和消费者应该连接到哪个内存节点呢?我们需要一个负载均衡组件(HAProxy、LVS、nginx)来做路由。这时生产者和消费者只要路由到负载组件的IP即可。
      2. 负载均衡分为四层负载七层负载
        1. 四层负载:工作在OSI模型的第四层,即传输层,根据IP和端口进行转发(LVS支持四层负载)
        2. 七层负载:OSI模型的第七层,应用层。可以根据请求资源类型分配到后端服务器(nginx支持七层负载,HAProxy支持四层和七层负载)
        3. 负载软件本身也需要集群,也会面临同样选择哪个负载组件的问题?
        4. img