消费者
# 几个概念
消费者组 多个消费实例共享一个GroupId,多个消费实例组织在一起协调消费订阅的主题和分区。 消费实例可以是一个进程也可以是一个线程 GroupId是一个字符串,唯一标识一个消费组 GroupId.consumerInstance:GroupId.partition = 1:N,也就是说同一个消费者实例和partition的对应关系是1:N offset 位移(offset):consumer记录消费消息的位置 kafka的位移机制:
- consumer实例保存自己的offset,通过checkPoint机制定期持久化
- 新版本的kafka通过topic定期持久化到broker
- 老版本的kafka是持久化到zk的,但是zk并不适合频繁的写
rebalance 一个groupId内有多个consumer,其订阅topic有多个partition,那么那个消费者消费那个partition呢?rebalance就是做consumer和partition的匹配。
什么时候rebalance
- 组成员的变更(consumer的加入或者离开、崩溃)
- 订阅的partition发生变更
- 订阅的topic发生变更(按照正则方式订阅时是可能发生的)
如何rebalance consumerClient有两种策略可选,通过partition.assignment.strategy配置;默认是Range策略
- Range strategy:按照topic维度均分partition,问题是可能出现分配不均的问题
- RoundRobin:所有partition均分,分配的更均匀
谁来执行rebalance和consumer group管理 每个consumer group都会被分配一个coordinator用于组管理和位移管理。 当新版本consumer group的第一个consumer启动的时候,它会去和kafka server确定谁是它们组的coordinator,之后该group内的所有成员都会和该coordinator进行协调通信。
谁是coordinator consumer group位移信息写入__consumers_offsets的分区为partition,该partition的leader所在的broker就是该组的coordinator。
# 配置项
//组唯一id
public static final String GROUP_ID_CONFIG = "group.id";
//控制单次调用call()返回的记录数量
public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
/** 使用组管理的时候生效 如果consumer两次poll()调用时间超过这个阈值 组管理器会进行rebalance */
public static final String MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms";
/** consumer的心跳时间*/
public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
/**
* 自动提交offset 默认是true
* 可关闭,然后自己控制提交offset,通过auto.commit.interval.ms控制提交频次
*/
public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";
/**
* 自动提交offset的频次
*/
public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms";
/**
* 自定义分区策略的class全路径
*/
public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
/**
* 当offset不存在或者失效的情况下,指定消费策略
* 有两次消费策略earliest、latest
* 分别表示从最新的消息开始消费还是从最早的开始消费
*/
public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
/**
* 指定从broker获取的最小的数据量字节数
* 如果broker的数据量小于该配置那么会等一会再返回consumer
* 可降低broker和consumer之间的交互次数
*/
public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
编辑 (opens new window)
上次更新: 2022/05/22, 00:01:01