JavaDriver JavaDriver
首页
  • 基础
  • 并发
  • JVM
  • 设计模式
  • 计算机网络
  • 操作系统
  • 数据结构
  • 算法
  • MYSQL
  • REDIS
  • Netty
  • Kafka
系统设计
非技术
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

YoungAnn

西二旗Java老司机一枚 致力于社会主义添砖Java
首页
  • 基础
  • 并发
  • JVM
  • 设计模式
  • 计算机网络
  • 操作系统
  • 数据结构
  • 算法
  • MYSQL
  • REDIS
  • Netty
  • Kafka
系统设计
非技术
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Netty

  • Kafka

    • Kafka
    • 生产者
    • 消费者
    • 集群下的Kafka要考虑哪些事情
    • 如何保证Kafka的可靠性
  • 中间件
  • Kafka
YoungAnn
2022-05-21

生产者

# 创建生产者

创建生产者有三个属性是必选的 bootstrap.servers 指定broker集群的地址,格式为host:port key.serializer kafka broker希望收到消息的键和值都是字节数组,但是为了代码的可读性,允许producer把java对象发送给broker,但是需要指定一个实现了org.apache.kafka.common.serialization.Serializer接口的序列化器。 value.serializer 与key.serializer一样的道理。

		Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.1.1,172.16.1.2");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(props);
1
2
3
4
5

# 发送消息

发送消息有三种方式:

  • 发送并忘记(fire-and-forget)
  • 同步发送
  • 异步发送

发送并忘记 不关心消息是否到达

ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key,value);
producer.send(record);
1
2

同步发送

ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key,value);
try{
	producer.send(record).get();//producer.send(record)返回一个Future对象,调用get()阻塞等待返回结果
} catch (Exception e){
	e.printStackTrace();
}
1
2
3
4
5
6

kafkaProducer会发送两类错误

  • 可重试的:比如连接错误、无主(no leader)错误
  • 不可重试的:比如消息太大 异步发送 异步发送的异常需要记录下拉,producer支持回调。
ProducerRecord<String, String> record = new ProducerRecord<>(topicNameTrade, key,value);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    exception.printStackTrace();
                }
            });
1
2
3
4
5
6
7

# 生产者的配置

acks 指定要多少个分区副本收到消息,生产者才认为消息写入是成功的。其值有0、1、all.

  • 0:不等待任何服务器的相应
  • 1: 只要集群的首领节点收到
  • all: 等待所有副本都收到

buffer.memory 设置生产者内存缓冲区大小。 程序发送消息的速度 > 发送到broker的速度会导致这个缓冲区空间不足。 空间不足时程序阻塞还是抛异常取决于配置max.block.ms compression.type 默认,消息是不会被压缩的。 producer向broker发送消息不是逐条发送的,是一批一批发送的。 发送批次有可选的压缩方式,有三种:

  • snappy
  • gzip
  • lz4

retries 重试次数 默认重试之间会等待100ms retry.backoff.ms可设置这个等待时间 batch.size 有多个消息要发送到一个分区的时候,producer会把他们放到一个批次。 该配置指定是批次大小(字节)。 批次装满的时候会被发送出去,

# 序列化器

# 分区

kafka消息的recode包含topic、key、value。 broker用key分配分区。key相同的一定会分配到同一个分区。 如果key为null,那么将使用默认分区,默认的分区器是使用Round Robin算法。 也可以实现partitioner实现自定义分区器。

编辑 (opens new window)
上次更新: 2022/05/22, 00:01:01
Kafka
消费者

← Kafka 消费者→

最近更新
01
电商-商品系统设计
12-17
02
关于如何写OKR
12-09
03
对事不对人 vs 对人不对事
12-09
更多文章>

Gitalking ...

Theme by Vdoing | Copyright © 2022-2025 YoungAnnn | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式