同步操作将从 Java精选/Ebooks 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
从Kafka的生产者与消费者的角度来看待数据丢失的问题。
Producer
1、retries=Long.MAX_VALUE
设置retries为一个较大的值。这里的retries同样是Producer的参数,对应前面提到的Producer自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了retries>0的Producer能够自动重试消息发送,避免消息丢失。
2、acks=all
设置acks=all。acks是Producer的一个参数,代表了你对“已提交”消息的定义。如果设置成all,则表明所有副本Broker都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
3、max.in.flight.requests.per.connections=1
该参数指定了生产者在收到服务器晌应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为1可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
4、Producer要使用带有回调通知的API,也就是说不要使用producer.send(msg),而要使用producer.send(msg,callback)。
5、其他错误处理
使用生产者内置的重试机制,可以在不造成消息丢失的情况下轻松地处理大部分错误,不过仍然需要处理其他类型的错误,例如消息大小错误、序列化错误等等。
Consumer
1、禁用自动提交:enable.auto.commit=false
2、消费者处理完消息之后再提交offset
3、配置auto.offset.reset
这个参数指定了在没有偏移量可提交时(比如消费者第l次启动时)或者请求的偏移量在broker上不存在时(比如数据被删了),消费者会做些什么。
这个参数有两种配置。一种是earliest:消费者会从分区的开始位置读取数据,不管偏移量是否有效,这样会导致消费者读取大量的重复数据,但可以保证最少的数据丢失。一种是latest(默认),如果选择了这种配置,消费者会从分区的末尾开始读取数据,这样可以减少重复处理消息,但很有可能会错过一些消息。
ConnectionFactory(连接管理器):应用程序与RabbitMQ之间建立连接的管理器
Channel(信道):消息推送使用的通道
Exchange(交换器):用于接受、分配消息
Queue(队列):用于存储生产者的消息
RoutingKey(路由键):用于把生产者的数据分配到交换器上
BindKey(绑定键):用于把交换器的消息绑定到队列上
Kafka有两种数据保存策略:按照过期时间保留和按照存储的消息大小保留。
AMQP(Advanced Message Queueing Protocol)协议是一个开放的标准的的协议,它定义了系统之间如何传递消息。
AMQP不仅定义了consumer、producer、broker之间如何交互,也定义了消息的格式和命令的交换。
RabbitMQ就是AMQP协议的Erlang的实现(当然RabbitMQ还支持STOMP2、MQTT3等协议)AMQP的模型架构和RabbitMQ的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定。
RabbitMQ中的交换器、交换器类型、队列、绑定、路由键等都是遵循的AMQP协议中相应的概念。
目前RabbitMQ最新版本默认支持的是AMQP0-9-1。
生产者在主题上发布消息:
bin/kafka-console-producer.sh --broker-list 192.168.43.49:9092 --topic Hello-Kafka
注意这里的IP是server.properties中的 listeners的配置。接下来每个新行就是 输入一条新消息。
消费者接受消息:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic Hello-Kafka --from-beginning
如果用户位于与broker不同的数据中心,则可能需要调优套接口缓冲区大小,以对长网络延迟进行摊销。
RabbitMQ接收到消息后可以不消费,在消息确认消费之前,可以做以下两件事:
拒绝消息消费,使用channel.basicReject(消息编号, true) 方法,消息会被分配给其他订阅者;
设置为死信队列,死信队列是用于专门存放被拒绝的消息队列。
Zookeeper是一个开放源码的、高性能的协调服务,它用于Kafka的分布式应用。
Zookeeper主要用于在集群中不同节点之间进行通信。
在Kafka中,它被用于提交偏移量,因此如果节点在任何情况下都失败了,它都可以从之前提交的偏移量中获取。
除此之外,它还执行其他活动,如:leader检测、分布式同步、配置管理、识别新节点何时离开或连接、集群、节点实时状态等等。
和 MQTT 的事务定义一样都是3种。
1)最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输。
2)最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输。
3)精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的。
使用kafka-consumer-groups.sh命令进行查看。
$ bin/kafka-consumer-groups.sh --bootstrap-server cdh02:9092 --describe --group my-group
## 会显示下面的一些指标信息
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
主题 分区 当前offset LEO 滞后消息数 消费者id 主机 客户端id
一般情况下,如果运行良好,CURRENT-OFFSET的值会和LOG-END-OFFSET的值非常接近。通过这个命令可以查看哪个分区的消费出现了滞后消费的情况。
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。