为什么重设位移?
开发中有时候会碰到,消息消费端出现故障统计错误,或者消息格式出现问题,需要修改消费者逻辑重新消费的情况。重设消息位移可以使消费者重新读取队列中的消息。
重设位移策略
纬度 | 策略 | 含义 |
---|---|---|
位移纬度 | earliest | 位移到队列未过期的最早消息 |
latest | 位移到队列未过期的最新消息 | |
current | 位移到队列已消费到的最新消息 | |
specified-offset | 位移到指定的offset处 | |
shift-by-N | 位移到当前位移的前/后N处 | |
时间纬度 | datetime | 位移到某个时间点 |
duration | 位移到当前时间前指定间隔处 |
设置方法
通过producer API设置
1
2
3
4
5
6//设置单个分区的位移
void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
//设置多个partition位移
void seekToBeginning(Collection<TopicPartition> partitions);
void seekToEnd(Collection<TopicPartition> partitions);通过kafka-consumer-groups.sh脚本设置
1
2#设置
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics -[--to-earliest/--to-latest/--to-current/--to-offset <offset>/--shift-by <offset_N>/--to-datetime <2019-06-20T20:00:00>/--by-duration <PD0H30M0S>] –execute