JavaDriver JavaDriver
首页
  • 基础
  • 并发
  • JVM
  • 设计模式
  • 计算机网络
  • 操作系统
  • 数据结构
  • 算法
  • MYSQL
  • REDIS
  • Netty
  • Kafka
系统设计
非技术
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

YoungAnn

西二旗Java老司机一枚 致力于社会主义添砖Java
首页
  • 基础
  • 并发
  • JVM
  • 设计模式
  • 计算机网络
  • 操作系统
  • 数据结构
  • 算法
  • MYSQL
  • REDIS
  • Netty
  • Kafka
系统设计
非技术
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Netty

  • Kafka

    • Kafka
    • 生产者
    • 消费者
      • 几个概念
      • 配置项
    • 集群下的Kafka要考虑哪些事情
    • 如何保证Kafka的可靠性
  • 中间件
  • Kafka
YoungAnn
2022-05-21
目录

消费者

# 几个概念

消费者组 多个消费实例共享一个GroupId,多个消费实例组织在一起协调消费订阅的主题和分区。 消费实例可以是一个进程也可以是一个线程 GroupId是一个字符串,唯一标识一个消费组 GroupId.consumerInstance:GroupId.partition = 1:N,也就是说同一个消费者实例和partition的对应关系是1:N offset 位移(offset):consumer记录消费消息的位置 kafka的位移机制:

  • consumer实例保存自己的offset,通过checkPoint机制定期持久化
  • 新版本的kafka通过topic定期持久化到broker
  • 老版本的kafka是持久化到zk的,但是zk并不适合频繁的写

rebalance 一个groupId内有多个consumer,其订阅topic有多个partition,那么那个消费者消费那个partition呢?rebalance就是做consumer和partition的匹配。

什么时候rebalance

  • 组成员的变更(consumer的加入或者离开、崩溃)
  • 订阅的partition发生变更
  • 订阅的topic发生变更(按照正则方式订阅时是可能发生的)

如何rebalance consumerClient有两种策略可选,通过partition.assignment.strategy配置;默认是Range策略

  • Range strategy:按照topic维度均分partition,问题是可能出现分配不均的问题
  • RoundRobin:所有partition均分,分配的更均匀

谁来执行rebalance和consumer group管理 每个consumer group都会被分配一个coordinator用于组管理和位移管理。 当新版本consumer group的第一个consumer启动的时候,它会去和kafka server确定谁是它们组的coordinator,之后该group内的所有成员都会和该coordinator进行协调通信。

谁是coordinator consumer group位移信息写入__consumers_offsets的分区为partition,该partition的leader所在的broker就是该组的coordinator。

# 配置项

	//组唯一id
	public static final String GROUP_ID_CONFIG = "group.id";


    //控制单次调用call()返回的记录数量
    public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";

    /** 使用组管理的时候生效 如果consumer两次poll()调用时间超过这个阈值 组管理器会进行rebalance */
    public static final String MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms";

    /** consumer的心跳时间*/
    public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
    /**
     * 自动提交offset 默认是true
     * 可关闭,然后自己控制提交offset,通过auto.commit.interval.ms控制提交频次
     */
    public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";

    /**
     * 自动提交offset的频次
     */
    public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms";
    /**
     * 自定义分区策略的class全路径
     */
    public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";

    /**
     * 当offset不存在或者失效的情况下,指定消费策略
     * 有两次消费策略earliest、latest
     * 分别表示从最新的消息开始消费还是从最早的开始消费
     */
    public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";

    /**
     * 指定从broker获取的最小的数据量字节数
     * 如果broker的数据量小于该配置那么会等一会再返回consumer
     * 可降低broker和consumer之间的交互次数
     */
    public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
编辑 (opens new window)
上次更新: 2022/05/22, 00:01:01
生产者
集群下的Kafka要考虑哪些事情

← 生产者 集群下的Kafka要考虑哪些事情→

最近更新
01
电商-商品系统设计
12-17
02
关于如何写OKR
12-09
03
对事不对人 vs 对人不对事
12-09
更多文章>
Theme by Vdoing | Copyright © 2022-2023 YoungAnnn | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式