Kafka重设消息位移

为什么重设位移?
开发中有时候会碰到,消息消费端出现故障统计错误,或者消息格式出现问题,需要修改消费者逻辑重新消费的情况。重设消息位移可以使消费者重新读取队列中的消息。

重设位移策略

纬度 策略 含义
位移纬度 earliest 位移到队列未过期的最早消息
latest 位移到队列未过期的最新消息
current 位移到队列已消费到的最新消息
specified-offset 位移到指定的offset处
shift-by-N 位移到当前位移的前/后N处
时间纬度 datetime 位移到某个时间点
duration 位移到当前时间前指定间隔处

设置方法

  • 通过producer API设置

    1
    2
    3
    4
    5
    6
        //设置单个分区的位移
       void seek(TopicPartition partition, long offset);
    void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
       //设置多个partition位移
       void seekToBeginning(Collection<TopicPartition> partitions);
    void seekToEnd(Collection<TopicPartition> partitions);
  • 通过kafka-consumer-groups.sh脚本设置

    1
    2
    #设置
    bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics -[--to-earliest/--to-latest/--to-current/--to-offset <offset>/--shift-by <offset_N>/--to-datetime <2019-06-20T20:00:00>/--by-duration <PD0H30M0S>] –execute