分布式消息
1. 分布式消息常见问题
互联网技术的产生无非就是系统中遇到了某种问题,而针对这些问题需要得到处理,应运而生就产生了一系列的技术。
因此,我们在研究一种技术的时候,就要先研究在现有系统中到底遇到了什么问题?解决问题的办法就是这种技术的重要关注点。即问题和解决方案。
这就是我们学习技术的方法论。下面的学习内容,也是围绕着“问题+解决方案”这个观点去布局和构思的,以后我会不断重申这种观点,以加强记忆和理解。
- 目前软件系统中遇到了什么困难?
- 引入消息中间件后解决了什么问题?
- 解耦
- 异步
- 削峰
- 引入消息中间件之后同时引入了哪些问题?
- 增加了系统的复杂度
- 如果消息中间件整体宕机?
- 如果消息重复发送?
- 如果消息顺序不对?
- 如果结果不一致?(服务 a 调用 b、c、d,依次处理后才算整个操作成功,但是引入消息中间件之后,发现 b 和 c 成功了,但是 d 没有成功,那整个操作应该算是成功能还是失败呢?怎么解决可能出现的这种情况呢?)
- 消息中间件的选型标准?
- 吞吐量和 topic 数量
- 开源社区活跃度
- 消息的实效性、分布式可用性、消息可靠性(不会丢失消息)
- 功能支持是否齐全
- 结论总结:rabbitmq 和 kafka
- rabbitmq 和 kafka
- Rabbitmq 的高可用实现方式?三种方式的区别?
- Kafka 的高可用原理?
- 生产者消费者模型
- 消息中间件的使用示例
- rabbitmq+SpringBoot 使用案例
- 相关概念
- 部署过程
- 生产者代码
- 消费者代码
- kafka+SpringBoot 使用案例
- 相关概念
- 部署过程
- 生产者代码
- 消费者代码
- rabbitmq+SpringBoot 使用案例
- MQ 中如何保证消息不被重复消费?(如何保证消息的幂等性)
- 问题的描述
生产者
生产了多条消息,导致消息有多条消息中间件
直接断电了,导致 offset 没有来得及提交,没有提交但是已经被消费的消息就可能再次被消费消费者
中间的多台主机可能会取到同一条数据,单个消息被多个消费者消费
- 解决方案
- 视业务而定,如果业务是写 redis,使用 set,天然幂等性
- 如果是常规业务操作,可以在生产消息时使用全局唯一 id,使用这个 id 作为消息的标识,每次消费时,先去 redis 中查一下是否有这个 id,如果有就说明已经被消费过了,直接丢弃,如果没有,就说明没有被消费过
- 问题的描述
- MQ 中如何保证消息的可靠性?(如何保证消息不丢失)
- 问题描述
生产者导致消息丢失
:生产者组装完消息,发送给 MQ,紧接着就去处理其他事物了,发送给 MQ 没有,具体不知道,但是生产者并接着去处理其他事物了,这就造成了到底发送消息了没有这个问题不清楚。这其中可能有网络抖动?生产者处组装完消息正要发送消息时突然宕机?消息中间件导致消息丢失
:- 针对 rabbitmq 来说,可能的原因是消息中间件没有开启持久化机制,或者是开启持久化了,但是在开始持久化时,消息中间件宕机了,导致部分消息没有被消费,也没有持久化到磁盘中,那消息中间件再次启动时,就会导致部分消息丢失;此外,如果开启了消息的过期时间,这也会造成消息没有来得及消费,就被废弃,导致消息丢失
- 针对 kafka 来说,可能的原因是某一个 broker 宕机,重新选举 partition 的 leader 时,其他 follower 还有些数据没有完成同步,此时 leader 挂掉,在某一个 follower 成为 leader 后,就会发现消息丢失
消费者导致消息丢失
:消费者刚取到消息正准备消费时,还没来得及处理消息,突然进程挂掉了或者重启了,这就导致这条消息丢失了(对于 kafka 来说,具体的表现就是取到消息后自动提交了 offset,kafka 认为消费者已经消费过这个消息,但是消费者正要消费时,此时消费者自己挂了,这就会导致这条消息丢失)
- 解决方案
生产者导致消息丢失
- 针对 rabbitmq,可以使用
事务机制
和确认机制
,区别就是事务机制
是同步处理的,这就会导致 rabbitmq 的吞吐量的下降;确认机制
是异步处理的,所以优先选择,确保消息一定会发送给 rabbitmq - 针对 kafka,同样
开启确认机制
,当 leader 收到消息,并把消息同步到所有的 follower 中后,才认为消息写入成功,否则就进行无限重试
- 针对 rabbitmq,可以使用
消息中间件导致消息丢失
- 针对 rabbitmq,需要开
启持久化机
制。除了需要关闭过期时间
外,还需要两步走,第一步开启queue的持久化
,这样可以保证 rabbitmq 持久化 queue 的元数据,当重启时,可以根据持久化文件自动创建 queue;第二步开启消息的持久化
,消息发送到 queue 中之后自动会定期持久化到本地磁盘中。这还需要结合确认机制,即当消息确定持久化到本地硬盘后才进行生产者的确认。 - 针对 kafka,第一步设置 topic 的参数
replication.factor
参数的值大于 1,要求每一个 partition 必须有至少 2 个副本;第二步设置min.insync.replicas
参数,即要求 leader 感知到至少一个 follower 还跟自己保持联系,这样才能确保 leader 挂掉之后还有至少一个 follower;第三步设置acks=all
,即要求每条消息,必须在写入所有的 replicas 后才能认为写入成功;第四步设置retries=MAX
,即一旦写入失败,就进行无限重试(此时可能会造成阻塞);(这也是kafka消息零丢失的实现原理)
- 针对 rabbitmq,需要开
消费者导致消息丢失
- 针对 rabbitmq,就是
采取确认机制
,即消费完消息后调用 rabbitmq 的确认接口,手动确认消息消费完成,rabbitmq 才会把这条消息标记为已消费; - 针对 kafka,就是
关闭自动提交offset的开关
,在处理完消息后手动提交offset
- 针对 rabbitmq,就是
- 问题描述
- 如何保证消息的顺序性?
- 问题描述
- 以消息能可靠传入 MQ 中,并且生产者生产的消息是有序的为前提;又由于存储消息的 queue 本身就具有先进先出的特性;因此问题主要出现在消费者身上,多是因为:多个消费者同时消费同一个 queue,或者是一个消费者多线程消费同一个 queue
- 解决方案
- rabbitmq:一个消费者对应一个 queue
- kafka: 内部单线程消费
- 问题描述
- 如何解决消息积压?
- 问题描述
消息积压
:由于消费者故障或者由于消费者调用的外围组件异常(如某一消息要求消费者更新数据库,但是此时数据库发生宕机事故)导致消费者消费消息特别慢,而生产者那边却是按照正常速度进行生产,这就会导致 MQ 中积压很多的消息;消息丢失
:由于消息积压过多,但是又没有来得及处理,并且消息中间件还设置了过期时间,这还会造成消息丢失的问题;
- 解决方案
- 中心思想是分三步走
- 第一步,要求
快速把生产机器上的空间腾出来
。可以先临时征用 10 倍机器,然后写一个用来分发数据的消费者程序,一边接生产环境,一边接临时机器,消费者程序只做消息换手,即从生产环境接到消息,立马放入临时机器; - 第二步,要求
快速把腾出来的消息消费掉
。可以临时征用其他机器,写一些专门用来消费这些消息的消费者程序,再接上上一步的临时机器进行实打实的消费; - 第三步,
进行补偿
。如果遇到消息丢失的问题,就要写一个补偿程序,在业务量少的时间(如凌晨三四点时)针对白天的业务数据,重新生成消息
- 第一步,要求
- 中心思想是分三步走
- 问题描述
- (开放题)如何设计一个 MQ?
- 考官目的
- 主要考察候选人的架构设计能力
- 回答提示
- 需要考虑 MQ 得支持可伸缩
- 需要考虑 MQ 得支持可持久化
- 需要考虑 MQ 得支持高可用
- 需要考虑 MQ 消息零丢失
- 考官目的
- 总结-方法论
- 考虑问题可能出现的地方时,主要从生产者消费模型的三个角色来考虑:
生产者
、消息中间件
、消费者
- 各个角色可能出现的问题无非就是下面的几种情况的组合:
- 软件非正常中断,比如:程序突然卡死、应用程序重启、超过超时时间
- 网络不稳定,比如:网络抖动
- 硬件设备非正常中断,比如:硬盘突然失效、服务器设备突然断电、网卡设备损坏
- 考虑解决方案时,就可以从以上各角色可能出现的问题分别考虑
- 考虑问题可能出现的地方时,主要从生产者消费模型的三个角色来考虑:
2. 企业级消息队列
2.1. rabbitmq
2.2. rocketmq
2.3. kafka
3. 生产实践——设备云基于 Rabbit MQ 的消息模型
3.1. 消息处理流程
3.2. 消息丢失
- 后端没有发送成功: 开启生产者确认机制,当 MQ 接收到消息后,给后端一个确认信息;
- MQ 丢失消息: 开启队列的持久化机制
- 主站接收到消息后还没来得及处理消息就宕机了,导致丢失消息: 开启消费者确认机制,后端处理接收到消息,完成业务逻辑处理之后,发送确认信息给 MQ,MQ 移除消息;
3.3. 消息重复消费
- 在消费消息时,先去 db 或 redis 中查询一下是否存在这个消息的 id:d
- 如果存在则证明已经消费过了,之后直接抛弃这条消息,并发确认信息给 MQ,让 MQ 移除这条消息;
- 如果不存在,就处理这条消息,之后发送确认信息给 MQ,让 MQ 移除这条消息,再之后把这条消息的 id 记录到 db 或 redis 中;
3.4. 消息积压
- 事前,预估系统消息容量,架设足够服务器资源,尽量避免消息积压情况的发生;
- 事中,紧急扩容后端服务器资源,让后端尽快处理掉积压的消息;
- 事后,反思总结经验教训;
- 定位消息积压问题,查看是否是业务正常逻辑导致;若是,则需要再次评估后端服务器资源与 MQ 服务器资源是否需要扩容;若不是,则需要定位消息积压原因,并进行修复;
3.5. 实时命令时序图
plantuml
@startuml
autonumber
actor "用户" as User
participant "backend" as backend #red
participant "redis" as redis #yellow
participant "db" as db #green
participant "rabbitmq" as rabbitmq #pink
participant "ipc" as ipc #orange
participant "machine" as machine #olive
activate User
User -> backend: 下发抄表命令
activate backend
activate db
backend -> db: 获取设备信息
deactivate db
backend -> backend: 组装命令信息
activate rabbitmq
backend -> rabbitmq: 发送抄表命令消息到队列
deactivate rabbitmq
activate redis
backend -> redis: 生成命令key
activate db
backend -> db: 并记录抄表操作信息到db
deactivate db
note right of redis: 以hash结构,设备id作为值对象的key存入redis
deactivate redis
backend -> User: 发送回调接口,进行重定位
deactivate backend
deactivate User
activate rabbitmq
activate ipc
ipc -> rabbitmq: 监听相关队列
ipc -> ipc: 解析消息并解析请求链路
activate machine
ipc -> machine: 发送抄表命令
machine -> ipc: 接收抄表结果
deactivate machine
ipc -> rabbitmq: 发送抄表结果消息
deactivate ipc
activate backend
backend -> rabbitmq: 监听到抄表结果信息
deactivate rabbitmq
activate redis
backend -> redis: 回查设备信息,并将设备信息与抄表信息一一组装
activate db
backend -> db: 并记录抄表记录结果信息
deactivate db
deactivate redis
deactivate backend
activate User
User -> backend: 轮训回调接口
activate backend
backend -> redis: 查询抄表结果
backend -> User: 发送响应结果
deactivate backend
deactivate User
@enduml
- redis 的存在的目的在于把抄表结果与设备信息对应起来,所以使用 hash 的数据结构更为妥当,让设备 id 作为值对象的 key
- 第 12 步之后,由后端监听程序获取抄表结果,并把抄表结果存入 redis,之后把抄表动作信息记录到 db 中
- 轮训频率为 1s 一次
3.6. 异步抄表时序图
- 这里的响应结果为操作成功,只是代表发送抄表命令的动作是成功的,并不代表抄表结果成功返回;
- 要想查询抄表动作是否成功,需要去操作中心再次调用接口去查询;