同步操作将从 turnon/blog 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
title: Kafka 可靠传输
date: 2021-04-14 15:05:34
categories:
- 分布式
- 分布式通信
- MQ
- Kafka
tags:
- MQ
- Kafka
permalink: /pages/481bdd/
如何保证消息的可靠性传输,或者说,如何保证消息不丢失?这对于任何 MQ 都是核心问题。
一条消息从生产到消费,可以划分三个阶段:
这三个阶段都可能丢失数据,所以要保证消息丢失,就需要任意一环都保证可靠。
存储阶段指的是 Kafka Server,也就是 Broker 如何保证消息不丢失。
一句话概括,Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。
上面的话可以解读为:
Kafka 的副本机制是 kafka 可靠性保证的核心。
Kafka 的主题被分为多个分区,分区是基本的数据块。每个分区可以有多个副本,有一个是 Leader(主副本),其他是 Follower(从副本)。所有数据都直接发送给 Leader,或者直接从 Leader 读取事件。Follower 只需要与 Leader 保持同步,并及时复制最新的数据。当 Leader 宕机时,从 Follower 中选举一个成为新的 Leader。
Broker 有 3 个配置参数会影响 Kafka 消息存储的可靠性。
replication.factor
的作用是设置每个分区的副本数。replication.factor
是主题级别配置; default.replication.factor
是 broker 级别配置。
副本数越多,数据可靠性越高;但由于副本数增多,也会增加同步副本的开销,可能会降低集群的可用性。一般,建议设为 3,这也是 Kafka 的默认值。
unclean.leader.election.enable
用于控制是否支持不同步的副本参与选举 Leader。unclean.leader.election.enable
是 broker 级别(实际上是集群范围内)配置,默认值为 true。
min.insync.replicas
控制的是消息至少要被写入到多少个副本才算是“已提交”。min.insync.replicas
是主题级别和 broker 级别配置。
尽管可以为一个主题配置 3 个副本,但还是可能会出现只有一个同步副本的情况。如果这个同步副本变为不可用,则必须在可用性和数据一致性之间做出选择。Kafka 中,消息只有被写入到所有的同步副本之后才被认为是已提交的。但如果只有一个同步副本,那么在这个副本不可用时,则数据就会丢失。
如果要确保已经提交的数据被已写入不止一个副本,就需要把最小同步副本的设置为大一点的值。
注意:要确保
replication.factor
>min.insync.replicas
。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成replication.factor = min.insync.replicas + 1
。
在生产消息阶段,消息队列一般通过请求确认机制,来保证消息的可靠传递,Kafka 也不例外。
Kafka 生产者 中提到了,Kafka 有三种发送方式:同步、异步、异步回调。
同步方式能保证消息不丢失,但性能太差;异步方式发送消息,通常会立即返回,但消息可能丢失。
解决生产者丢失消息的方案:
生产者使用异步回调方式 producer.send(msg, callback)
发送消息。callback(回调)能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。
然后,需要基于以下几点来保证 Kafka 生产者的可靠性:
生产者可选的确认模式有三种:acks=0
、acks=1
、acks=all
。
acks=0
、acks=1
都有丢失数据的风险。
acks=all
意味着会等待所有同步副本都收到消息。再结合 min.insync.replicas
,就可以决定在得到确认响应前,至少有多少副本能够收到消息。
这是最保险的做法,但也会降低吞吐量。
如果 broker 返回的错误可以通过重试来解决,生产者会自动处理这些错误。
LEADER_NOT_AVAILABLE
,主副本不可用,可能过一段时间,集群就会选举出新的主副本,重试可以解决问题。INVALID_CONFIG
,即使重试,也无法改变配置选项,重试没有意义。需要注意的是:有时可能因为网络问题导致没有收到确认,但实际上消息已经写入成功。生产者会认为出现临时故障,重试发送消息,这样就会出现重复记录。所以,尽可能在业务上保证幂等性。
设置 retries
为一个较大的值。这里的 retries
同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
开发者需要自行处理的错误:
前文已经提到,消费者只能读取已提交的消息。这就保证了消费者接收到消息时已经具备了数据一致性。
消费者唯一要做的是确保哪些消息是已经读取过的,哪些是没有读取过的(通过提交偏移量给 Broker 来确认)。如果消费者提交了偏移量却未能处理完消息,那么就有可能造成消息丢失,这也是消费者丢失消息的主要原因。
group.id
- 如果希望消费者可以看到主题的所有消息,那么需要为它们设置唯一的 group.id
。auto.offset.reset
- 有两个选项:
earliest
- 消费者会从分区的开始位置读取数据latest
- 消费者会从分区末尾位置读取数据enable.auto.commit
- 消费者自动提交偏移量。如果设为 true,处理流程更简单,但无法保证重复处理消息。auto.commit.interval.ms
- 自动提交的频率,默认为每 5 秒提交一次。如果
enable.auto.commit
设为 true,即自动提交,就无需考虑提交偏移量的问题。
如果选择显示提交偏移量,需要考虑以下问题:
在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:
绝大部分消息队列提供的服务质量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 都是这样。也就是说,消息队列很难保证消息不重复。
一般解决重复消息的办法是,在消费端,保证消费消息的操作具备幂等性。
常用的实现幂等操作的方法:
关系型数据库可以使用 INSERT IF NOT EXIST
语句防止重复;Redis 可以使用 SETNX
命令来防止重复;其他数据库只要支持类似语义,也是一个道理。
如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。
但是,如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?更加通用的方法是,给数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。
还有一种通用性最强,适用范围最广的实现幂等性方法:记录并检查操作,也称为“Token 机制或者 GUID(全局唯一 ID)机制”,实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。
具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。
需要注意的是,“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现 Bug。这一组操作可以通过分布式事务或分布式锁来保证其原子性。
某些场景下,可能会要求按序发送消息。
Kafka 每一个 Partition 只能隶属于消费者群组中的一个 Consumer,换句话说,每个 Partition 只能被一个 Consumer 消费。所以,如果 Topic 是单 Partition,自然是有序的。
方案分析
优点:简单粗暴。开发者什么也不用做。
缺点:Kafka 基于 Partition 实现其高并发能力,如果使用单 Partition,会严重限制 Kafka 的吞吐量。
结论:作为分布式消息引擎,限制并发能力,显然等同于自废武功,所以,这个方案几乎是不可接受的。
(1)生产者端显示指定 key 发往一个指定的 Partition,就可以保证同一个 key 在这个 Partition 中是有序的。
(2)接下来,消费者端为每个 key 设定一个缓存队列,然后让一个独立线程负责消费指定 key 的队列,这就保证了消费消息也是有序的。
先修复消费者,然后停掉当前所有消费者。
新建 Topic,扩大分区,以提高并发处理能力。
创建临时消费者程序,并部署在多节点上,扩大消费处理能力。
最后处理完积压消息后,恢复原先部署架构。
建议从 3 个层面验证系统的可靠性:
error-rate
和 retry-rate
。如果这两个指标上升,说明系统出了问题。consumer-lag
,该指标表明了消费者的处理速度与最近提交到分区里的偏移量之间还有多少差距。生产者
producer.send(msg)
,而要使用 producer.send(msg, callback)
。记住,一定要使用带有回调通知的 send
方法。acks = all
。acks
是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。retries
为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0
的 Producer 能够自动重试消息发送,避免消息丢失。服务器(Kafka Broker)
unclean.leader.election.enable = false。
这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。replication.factor
>= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。min.insync.replicas
> 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。replication.factor
> min.insync.replicas
。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1
。消费者
enable.auto.commit
,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。