同步操作将从 Java精选/Ebooks 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
1、系统可用性降低
各个系统正常运行,而开发人员增加消息队列插件到项目中,如果此时消息队列服务挂了,那么系统也无法正常运行。因此,系统可用性也就降低了。
2、系统复杂性增加
系统中集成消息队列要考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费、如何保证保证消息可靠传输等问题。因此,需要考虑的各个方面问题更多,增大了系统复杂性。
Kafka中体现零拷贝使用场景的地方有两处:基于mmap的索引和日志文件读写所用的TransportLayer。
基于mmap的索引
索引都是基于MappedByteBuffer的,也就是让用户态和内核态共享内核态 的数据缓冲区,此时,数据不需要复制到用户态空间。不过,mmap 虽然避免了不必要的 拷贝,但不一定就能保证很高的性能。在不同的操作系统下,mmap 的创建和销毁成本可 能是不一样的。很高的创建和销毁开销会抵消 零拷贝 带来的性能优势。由于这种不确 定性,在Kafka中,只有索引应用了mmap,最核心的日志并未使用mmap机制。
日志文件读写所用的TransportLayer
TransportLayer是Kafka传输层的接口。它的某个实现类使用了FileChannel的transferTo方法。该方法底层使用sendfile实现了零拷贝。对Kafka而言,如果I/O通道使用普通的PLAINTEXT,那么,Kafka就可以利用零拷贝特性,直接将页缓存中的数据发送到网卡的 Buffer 中,避免中间的多次拷贝。相反,如果I/O通道启用了SSL,那么,Kafka便无法利用零拷贝特性了。
消息持久化的缺点是很消耗性能,因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量。可使用固态硬盘来提高读写速度,以达到缓解消息持久化的缺点。
和 MQTT 的事务定义一样都是3种。
1)最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输。
2)最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输。
3)精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的。
从Kafka的生产者与消费者的角度来看待数据丢失的问题。
Producer
1、retries=Long.MAX_VALUE
设置retries为一个较大的值。这里的retries同样是Producer的参数,对应前面提到的Producer自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了retries>0的Producer能够自动重试消息发送,避免消息丢失。
2、acks=all
设置acks=all。acks是Producer的一个参数,代表了你对“已提交”消息的定义。如果设置成all,则表明所有副本Broker都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
3、max.in.flight.requests.per.connections=1
该参数指定了生产者在收到服务器晌应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为1可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
4、Producer要使用带有回调通知的API,也就是说不要使用producer.send(msg),而要使用producer.send(msg,callback)。
5、其他错误处理
使用生产者内置的重试机制,可以在不造成消息丢失的情况下轻松地处理大部分错误,不过仍然需要处理其他类型的错误,例如消息大小错误、序列化错误等等。
Consumer
1、禁用自动提交:enable.auto.commit=false
2、消费者处理完消息之后再提交offset
3、配置auto.offset.reset
这个参数指定了在没有偏移量可提交时(比如消费者第l次启动时)或者请求的偏移量在broker上不存在时(比如数据被删了),消费者会做些什么。
这个参数有两种配置。一种是earliest:消费者会从分区的开始位置读取数据,不管偏移量是否有效,这样会导致消费者读取大量的重复数据,但可以保证最少的数据丢失。一种是latest(默认),如果选择了这种配置,消费者会从分区的末尾开始读取数据,这样可以减少重复处理消息,但很有可能会错过一些消息。
在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id值,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列。
在消息消费时,要求消息体中必须要有一个bizId(对于同一业务全局唯一,比如支付ID、订单ID、帖子ID等)作为去重和幂等的依据,避免同一条消息被重复消费。
针对此类问题,列举处理消息的业务场景:
1、用户获取到消息执行数据库insert插入操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
2、用户获取到消息执行redis缓存set插入操作,不需要解决重复,因为无论set几次结果都是一样的,set操作本来幂等操作。
3、如果上面两种情况还不行,可以考虑使用记录消费。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以Key-Value的形式写入到redis中。当消费者开始消费消息时,首选获取redis中有没消费过该记录。
将auto.commit.offset设为false,然后在处理一批消息后commitSync()或者异步提交commitAsync()即:
ConsumerRecords<> records = consumer.poll();
for(ConsumerRecord<> record : records){
try{
consumer.commitSync();
}
}
Kafka同时设置Broker端参数和Consumer端参数。
Broker端参数:message.max.bytes、max.message.bytes(主题级别)和replica.fetch.max.bytes。
Consumer端参数:fetch.message.max.bytes。
需要注意的是Broker端的最后一个参数比较容易遗漏。必须调整Follower副本能够接收的最大消息的大小,否则,副本同步就会失败。
RabbitMQ 要实现消息持久化,必须满足以下 4 个条件:
1、投递消息的时候durable设置为true,消息持久化,代码:channel.queueDeclare(x, true, false, false, null),第2个参数设置为true持久化;
2、设置投递模式deliveryMode设置为2(持久),代码:channel.basicPublish(x, x, MessageProperties.PERSISTENTTEXTPLAIN,x),第3个参数设置为存储纯文本到磁盘;
3、消息已经到达持久化交换器上;
4、消息已经到达持久化的队列。
MQ全称为Message Queue 消息队列(MQ)是一种应用程序对应用程序的通信方法。
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。
MQ是消费生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息。
消息生产者只需要把消息发布到MQ中而不用管谁来获取,消息消费者只管从MQ中获取消息而不管是谁发布的消息,这样生产者和消费者双方都不用清楚对方的存在。
目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。