同步操作将从 Java精选/Ebooks 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
交换器(Exchange)
消息代理服务器中用于把消息路由到队列的组件。
队列(Queue)
用来存储消息的数据结构,位于硬盘或内存中。
绑定(Binding)
一套规则,告知交换器消息应该将消息投递给哪个队列。
Kafka分布式的单位是partition,同一个partition用一个write ahead log组织,所以可以保证FIFO的顺序。不同partition之间不能保证顺序。但是绝大多数用户都可以通过message key来定义,因为同一个key的message可以保证只发送到同一个partition。
Kafka中发送1条消息的时候,可以指定(topic,partition,key)3个参数。partiton和key是可选的。如果你指定了partition,那就是所有消息发往同1个partition,就是有序的。并且在消费端,Kafka保证,1个partition只能被1个consumer消费。或者你指定key(比如orderid),具有同1个key的所有消息,会发往同1个partition。
1、生产者丢数据
生产者的消息没有投递到MQ中怎么办?从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。
transaction机制就是说,发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())。
然而缺点就是吞吐量下降了。因此,按照博主的经验,生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。
2、消息队列丢数据
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步
1)将queue的持久化标识durable设置为true,则代表是一个持久的队列
2)发送消息的时候将deliveryMode=2
这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入mirrored-queue即镜像队列,但也不能保证消息百分百不丢失(整个集群都挂掉)
3、消费者丢数据
启用手动确认模式可以解决这个问题
1)自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试。
2)手动确认模式,如果消费者来不及处理就死掉时,没有响应ack时会重复发送一条信息给其他消费者;如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常;如果对异常进行了捕获,但是没有在finally里ack,也会一直重复发送消息(重试机制)。
3)不确认模式,acknowledge="none" 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。
kafka使用seek(TopicPartition, long)指定新的消费位置。
用于查找服务器保留的最早和最新的offset 的特殊的方法也可用。seekToBeginning(Collection)和seekToEnd(Collection)
RabbitMQ保证消息的顺序性方式有两种:
1)拆分多个queue,每个queue对应一个consumer(消费者),就是多一些queue。
2)一个queue,对应一个consumer(消费者),然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理。
1)Kafka持久化日志,这些日志可以被重复读取和无限期保留。
2)Kafka是一个分布式系统:它以集群的方式运行,可以灵活伸缩,在内部通过复制数据提升容错能力和高可用性。
3)Kafka支持实时的流式处理。
在大多数队列系统中,作为生产者的类无法做到这一点,它的作用是触发并忘记消息。broker将完成剩下的工作,比如使用id进行适当的元数据处理、偏移量等。
作为消息的用户,可以从Kafka broker中获得补偿。如果注意SimpleConsumer类,会发现它会获取包括偏移量作为列表的MultiFetchResponse对象。此外,当对Kafka消息进行迭代时,会拥有包括偏移量和消息发送的MessageAndOffset对象。
Kafka是分布式发布-订阅消息系统,它最初是由LinkedIn公司开发的,之后成为Apache项目的一部分,Kafka是一个分布式,可划分的,冗余备份的持久性的日志服务,它主要用于处理流式数据。
Apach Kafka是一款分布式流处理框架,用于实时构建流处理应用。它有一个核心的功能广为人知,即作为企业级的消息引擎被广泛使用。
ISR是一组与leaders完全同步的消息副本,也就是说ISR中包含了所有提交的消息。
ISR应该总是包含所有的副本,直到出现真正的故障。
如果一个副本从leader中脱离出来,将会从ISR中删除。
使用kafka-consumer-groups.sh命令进行查看。
$ bin/kafka-consumer-groups.sh --bootstrap-server cdh02:9092 --describe --group my-group
## 会显示下面的一些指标信息
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
主题 分区 当前offset LEO 滞后消息数 消费者id 主机 客户端id
一般情况下,如果运行良好,CURRENT-OFFSET的值会和LOG-END-OFFSET的值非常接近。通过这个命令可以查看哪个分区的消费出现了滞后消费的情况。
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。