Kafka消息无丢失/重复消费配置

消息丢失/重复消费的场景:
  • 提交消息失败

    • 使用producer.send(msg)提交消息。因为没有回调结果,这时可能消息broker因为网络波动并没有收到,此时消息就丢失了。所以建议使用有回调函数的producer.send(msg,callback)

    • 自动提交offset。可能你使用了多线程处理消息并且是自动提交。如果某个线程处理失败,并且没有显示地通知那么自动提交后就会丢失消息。

  • Broker端丢失消息

    • 使用了unclean leader 选举。
    • offset超前,超过了HighWater(真实已消费的位置)。再次消费会从offset位置开始,中间的消息就丢失了。相反offset落后与HW就导致重复消费。
最佳实践:

1~3 producer端参数
4~8 broker端参数

1.不使用producer.send(msg)提交消息,一定使用带有回调函数方式提交。

2.使用acks = all,意味着在ISR中的所有的副本broker都接收消息才认为提交成功。

3.设置producer端的retries值>0,即设置重试次数。

4.设置unclean.leader.election.enable=false禁止落后太多的副本选举成为leader,unclean leader指的是落后于最新消息的节点,如果它被选作leader那就肯定会丢失数据。有利有弊,这个参数发挥作用的情况只有当ISR(副本集合)中没有副本,才会执行unclean选举,如果不选举那么就会导致整个broker挂掉,失去高可用性。但一般还是要禁止掉,同时增加多个replicas(副本)个数来保证ISR中有正常的副本。

5.设置replication.factor=3副本个数,可以冗余消息到其他broker上。

6.设置min.insync.replicas>1,设置消息要写入多少个副本才算成功。生产环境最好要>1。

7.确保replication.factor > min.insync.replicas 即副本个数大于需成功写入的个数。生产环境设置replication.factor = min.insync.replicas + 1这样可以保证有一个副本挂掉的情况下仍然可以提交数据。

8.设置enable.auto.commit = false 即设置手动提交offset方式,对单consumer多线程的情况很关键。

在提交消息时,也建议同时使用异步提交+同步提交的策略。保证性能和安全性(弱一致性+最终一致性)!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
commitAysnc(); // 使用异步提交规避阻塞
}
} catch(Exception e) {
handle(e); // 处理异常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}