同步操作将从 Java精选/Ebooks 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
对于Kafka的Broker而言,Kafka的复制机制和分区的多副本架构是Kafka可靠性保证的核心。把消息写入多个副本可以使Kafka在发生崩溃时仍能保证消息的持久性。
主要包括三个方面:
1、Topic 副本因子个数:replication.factor >= 3
2、同步副本列表(ISR):min.insync.replicas = 2
3、禁用unclean选举:unclean.leader.election.enable=false
逐步分析这三个配置:
副本因子
Kafka的topic是可以分区的,并且可以为分区配置多个副本,该配置可以通过replication.factor参数实现。Kafka中的分区副本包括两种类型:领导者副本(Leader Replica)和追随者副本(Follower Replica),每个分区在创建时都要选举一个副本作为领导者副本,其余的副本自动变为追随者副本。在 Kafka 中,追随者副本是不对外提供服务的,也就是说,任何一个追随者副本都不能响应消费者和生产者的读写请求。所有的请求都必须由领导者副本来处理。换句话说,所有的读写请求都必须发往领导者副本所在的 Broker,由该 Broker 负责处理。追随者副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。
一般来说,副本设为3可以满足大部分的使用场景,也有可能是5个副本(比如银行)。如果副本因子为N,那么在N-1个broker 失效的情况下,仍然能够从主题读取数据或向主题写入数据。所以,更高的副本因子会带来更高的可用性、可靠性和更少的故障。另一方面,副本因子N需要至少N个broker ,而且会有N个数据副本,也就是说它们会占用N倍的磁盘空间。实际生产环境中一般会在可用性和存储硬件之间作出权衡。
除此之外,副本的分布同样也会影响可用性。默认情况下,Kafka会确保分区的每个副本分布在不同的Broker上,但是如果这些Broker在同一个机架上,一旦机架的交换机发生故障,分区就会不可用。所以建议把Broker分布在不同的机架上,可以使用broker.rack参数配置Broker所在机架的名称。
同步副本列表
In-sync replica(ISR)称之为同步副本,ISR中的副本都是与Leader进行同步的副本,所以不在该列表的follower会被认为与Leader是不同步的。那么,ISR中存在是什么副本呢?首先可以明确的是:Leader副本总是存在于ISR中。而follower副本是否在ISR中,取决于该follower副本是否与Leader副本保持了“同步”。
Kafka的broker端有一个参数replica.lag.time.max.ms, 该参数表示follower副本滞后与Leader副本的最长时间间隔,默认是10秒。这就意味着,只要follower副本落后于leader副本的时间间隔不超过10秒,就可以认为该follower副本与leader副本是同步的,所以哪怕当前follower副本落后于Leader副本几条消息,只要在10秒之内赶上Leader副本,就不会被踢出出局。
可以看出ISR是一个动态的,所以即便是为分区配置了3个副本,还是会出现同步副本列表中只有一个副本的情况(其他副本由于不能够与leader及时保持同步,被移出ISR列表)。如果这个同步副本变为不可用,我们必须在可用性和一致性之间作出选择(CAP理论)。
根据Kafka 对可靠性保证的定义,消息只有在被写入到所有同步副本之后才被认为是已提交的。但如果这里的“所有副本”只包含一个同步副本,那么在这个副本变为不可用时,数据就会丢失。如果要确保已提交的数据被写入不止一个副本,就需要把最小同步副本数量设置为大一点的值。对于一个包含3 个副本的主题分区,如果min.insync.replicas=2 ,那么至少要存在两个同步副本才能向分区写入数据。
如果进行了上面的配置,此时必须要保证ISR中至少存在两个副本,如果ISR中的副本个数小于2,那么Broker就会停止接受生产者的请求。尝试发送数据的生产者会收到NotEnoughReplicasException异常,消费者仍然可以继续读取已有的数据。
禁用unclean选举
选择一个同步副本列表中的分区作为leader 分区的过程称为clean leader election。注意,这里要与在非同步副本中选一个分区作为leader分区的过程区分开,在非同步副本中选一个分区作为leader的过程称之为unclean leader election。由于ISR是动态调整的,所以会存在ISR列表为空的情况,通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息。在 Kafka 中,选举这种副本的过程可以通过Broker 端参数 unclean.leader.election.enable控制是否允许 Unclean 领导者选举。开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean Leader 选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。分布式系统的CAP理论说的就是这种情况。
不幸的是,unclean leader election的选举过程仍可能会造成数据的不一致,因为同步副本并不是完全同步的。由于复制是异步完成的,因此无法保证follower可以获取最新消息。比如Leader分区的最后一条消息的offset是100,此时副本的offset可能不是100,这受到两个参数的影响:
replica.lag.time.max.ms:同步副本滞后与leader副本的时间 zookeeper.session.timeout.ms:与zookeeper会话超时时间
简而言之,如果我们允许不同步的副本成为leader,那么就要承担丢失数据和出现数据不一致的风险。如果不允许它们成为leader,那么就要接受较低的可用性,因为我们必须等待原先的首领恢复到可用状态。
关于unclean选举,不同的场景有不同的配置方式。对数据质量和数据一致性要求较高的系统会禁用这种unclean的leader选举(比如银行)。如果在可用性要求较高的系统里,比如实时点击流分析系统, 一般不会禁用unclean的leader选举。
交换器(Exchange)
消息代理服务器中用于把消息路由到队列的组件。
队列(Queue)
用来存储消息的数据结构,位于硬盘或内存中。
绑定(Binding)
一套规则,告知交换器消息应该将消息投递给哪个队列。
消息队列中间件有很多,使用的场景广泛。主要解决的核心问题是削峰填谷、异步处理、模块解耦。
request.required.acks 有三个值 0 1 -1(all)
0:生产者不会等待 broker 的 ack,这个延迟最低但是存储的保证最弱当 server 挂 掉的时候就会丢数据。
1:服务端会等待 ack 值 leader 副本确认接收到消息后发送 ack 但是如果 leader 挂掉后他不确保是否复制完成新 leader 也会导致数据丢失。
-1(all):服务端会等所有的 follower 的副本受到数据后才会受到 leader 发出的 ack,这样数据不会丢失。
Kafka集群使用时需要注意以下事项:
1、Kafka集群的数量不是越多越好,最好不要超过7个,因为节点越多,消息复制所需要的时间就越长,整个群组的吞吐量就越低;
2、Kafka集群数量最好是单数,因为超过一半故障集群就不能使用了,设置为单数容错率更高。
将auto.commit.offset设为false,然后在处理一批消息后commitSync()或者异步提交commitAsync()即:
ConsumerRecords<> records = consumer.poll();
for(ConsumerRecord<> record : records){
try{
consumer.commitSync();
}
}
ISR是一组与leaders完全同步的消息副本,也就是说ISR中包含了所有提交的消息。
ISR应该总是包含所有的副本,直到出现真正的故障。
如果一个副本从leader中脱离出来,将会从ISR中删除。
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。
目前所有主流的编程语言均有与代理接口通讯的客户端库。
在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id值,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列。
在消息消费时,要求消息体中必须要有一个bizId(对于同一业务全局唯一,比如支付ID、订单ID、帖子ID等)作为去重和幂等的依据,避免同一条消息被重复消费。
针对此类问题,列举处理消息的业务场景:
1、用户获取到消息执行数据库insert插入操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
2、用户获取到消息执行redis缓存set插入操作,不需要解决重复,因为无论set几次结果都是一样的,set操作本来幂等操作。
3、如果上面两种情况还不行,可以考虑使用记录消费。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以Key-Value的形式写入到redis中。当消费者开始消费消息时,首选获取redis中有没消费过该记录。
Kafka同时设置Broker端参数和Consumer端参数。
Broker端参数:message.max.bytes、max.message.bytes(主题级别)和replica.fetch.max.bytes。
Consumer端参数:fetch.message.max.bytes。
需要注意的是Broker端的最后一个参数比较容易遗漏。必须调整Follower副本能够接收的最大消息的大小,否则,副本同步就会失败。
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。